titanoboa

JAVA源码 2025-08-19

概要

titanoboa使复杂的工作流程变得容易。

它是一个完全分布式的,高度可扩展的低代码工作流编排平台。
它在JVM上运行,并支持Java,Kotlin&Clojure。 (Python也来了!)
您可以在任何地方运行它:本地或云中;在裸金属,VM,Docker或K8上。
titanoboa支持聚类,您可以轻松地扩展到数百个节点。

由于其通用,分布式且易于扩展的设计,您可以将titanoboa用于各种各样的目的。
您可以使用它:

  • 用于大数据处理和数据管道自动化
  • 用于编排核心微服务- 作为复合微服务的替代
  • 作为功能齐全的IPAA /集成平台
  • 用于数据转换 / ETL
  • 因为它自动化
  • 作为服务巴士(ESB)
  • 用于批处理处理

另请参见titanoboa .io和我们的Wiki。预定义的工作流程步骤在这里。

动机

titanoboa创建旨在创建一个工作流平台,该平台将支持复杂工作流引擎和大多数企业集成模式的所有常规功能,包括:

  • 顺序和/或并行步骤执行
  • 可配置的步骤在错误和高级可自定义错误处理时重试
  • 高级分支和条件流
  • 潜在的循环工作流程图
  • 分离器聚合器(又名MAP/降低)模式,该模式允许处理较大的数据集
  • 完全交易的性质确保即使发生故障转移,所有步骤也是执行的
  • 在运行时迅速开发和部署新工作流程的完全可扩展性和能力

除了这项titanoboa之外,还努力尊重不可分割的性能和功能编程原则。这使得它可以完全分发并在没有中央主服务器或数据库的情况下高度使用。这也有助于提高性能限制,因此titanoboa不仅适合批处理处理,而且适用于绩效关键工作流程。

titanoboa的浏览器GUI不仅可用于监视工作流程作业和服务器配置,而且还可以提供具有工作流可视化,高级数据和属性编辑器,AutComplete功能和一个重置(即使是Java!)的内置构建IDE,以便用户可以快速直接在那里进行测试开发新的工作。


titanoboa是专为Java&Clojure开发人员设计的,我们正在努力使其即使对于没有任何Clojure知识的Java开发人员也可以使用

要了解titanoboa背后的一些概念及其如何使用,请随时观看此RE:Clojure Conference Talk:

预定义步骤

尽管很容易直接从GUI(如我们的演示中显示)快速开发新的工作流程步骤,但是此存储库中有许多现成的步骤和任务。包括的一些步骤是:

  • AWS(EC2,S3,SNS,SQS,SES)
  • JDBC客户端
  • HTTP客户端
  • SMTP客户端
  • sftp
  • PDF世代

还有更多。

如果您创建了自己的自定义步骤(以Java或Clojure为单位),并且希望在其他工作流程中重复使用它们,那么将自己的自定义现成步骤添加到GUI中非常容易 - 请参阅我们的Wiki。您甚至可以添加自定义图标!

先决条件

Java 8或更高。

Docker

Docker Image可以在titanoboa / titanoboa中获得,您可以按照以下方式运行它:

titanoboa / titanoboa :0.9.0">
 docker run -d -p 3000:3000 --name titanoboa titanoboa / titanoboa :0.9.0

安装

顺便说一句,您无需安装任何内容,因为您只需单击即可在浏览器中启动一个免费的托管实例!只需前往https://cloud.**tit*anoboa.io

但是,如果愿意,请安装!我们建议可以尝试titanoboa的GUI,因为这是最好的起点!从https://g*i*thub.co*m/mikub/titanoboa/releases/download/0.9.1/ titanoboa -0.9.9.1_gui.zip下载titanoboa的发行版。

 curl --remote-name https://g*i*thub.co*m/mikub/titanoboa/releases/download/0.9.1/ titanoboa -0.9.1_gui.zip

注意GUI现在也可以免费用于商业用途。耶!

注意如果您打算在Java 8 JRE上运行titanoboa服务器,请下载JRE的发行版:

 curl --remote-name https://g*i*thub.co*m/mikub/titanoboa/releases/download/0.9.1/ titanoboa -0.9.1_jre_gui.zip

解压缩文件:

 unzip titanoboa -0.9.1_gui.zip

然后执行开始脚本:

 ./start

在您的控制台中,您应该看到一堆日志消息,最终您会看到

 INFO [main] - Started @2338ms

这意味着服务器成功启动。默认情况下,服务器和GUI都将在端口3000上启动,您可以在浏览器中打开http:// localhost:3000。

恭喜!您刚刚启动了titanoboa服务器!

您可以继续尝试创建示例工作流程或观看我们的演示。

没有GUI的服务器

从https://g*i*thub.co*m/mikub/titanoboa/releases/download/0.9.1/titanoboa titanoboa下载最新版本。

 curl --remote-name https://g*i*thub.co*m/mikub/titanoboa/releases/download/0.9.1/ titanoboa -0.9.1.zip

注意如果您打算在Java 8 JRE上运行titanoboa服务器,请下载JRE的发行版:

 curl --remote-name https://g*i*thub.co*m/mikub/titanoboa/releases/download/0.9.1/ titanoboa -0.9.1_jre.zip

然后按照上述说明进行操作。默认情况下,服务器将在端口3000上启动。

从存储库建造

这些步骤在我们的Wiki上描述。

服务器配置

服务器配置和外部依赖项文件可以由System属性boa.server.config.pathboa.server.dependencies.path指定:

titanoboa .server">
 java -Dboa.server.config.path=boa_server_config_local.clj -Dboa.server.dependencies.path=ext-dependencies.clj -cp "./build/ titanoboa .jar:./lib/*" titanoboa .server

有关更多详细信息,请参见服务器配置Wiki。

入门

在开始之前,熟悉titanoboa的概念和工作流设计原理可能是一个好主意。

还可以随时观看Re:Clojure 2020 Conference的Miro的演讲,该会议很好地概述了titanoboa可以做什么。

使用titanoboa GUI开发和测试工作流程

titanoboa GUI是开始开发和测试工作流程的好地方:

在Wiki中查看如何创建示例工作流程的示例。

在Java中开发自定义工作流程步骤

titanoboa也应被Java开发人员使用(除了像EDN这样的概念之外)不需要熟悉Clojure。如果您不想使用Clojure Java Interop实例化对象和/或调用您的方法,那么您也有其他选择:

要创建自定义工作流程步骤,只需在项目中添加(Maven)依赖性即可。并创建一个将实现IO的类。 titanoboa .java.iworkloadfn接口:

 public interface IWorkloadFn {
   public Object invoke ( Map properties );
}

如果然后将项目(或相应的Maven伪像)添加到titanoboa的外部依赖项中,则可以在Workflow-FN字段中使用类名。该类将自动实例化为单例bean(因此必须具有一个没有Argumet的构造函数),并且从任何Workflow-FN中对其进行所有后续引用都将调用其调用方法:

:workload-fn io. titanoboa .java.SampleWorkloadImpl

或者

:workload-fn 'io. titanoboa .java.SampleWorkloadImpl

或在GUI中:

Java Lambda支持

为了快速开发和测试新步骤,您还可以在GUI中键入Lambda功能,并在运行时对titanoboa进行评估。在此处阅读有关它的更多信息,或者在我们的演示中查看它是如何完成的。

在您的clojure repl中本地开发和测试工作流程

如果您不能使用GUI并且不想使用REST API,则可以在本地启动并在那里使用titanoboa播放。要么是从仓库中构建titanoboa ,要么将其作为LeiningenMaven依赖性构建:

titanoboa "0.9.0"]">
[io. titanoboa / titanoboa " 0.9.0 " ]
titanoboa titanoboa 0.9.0 ">
< dependency >
  < groupId >io. titanoboa groupId >
  < artifactId > titanoboa artifactId >
  < version >0.9.0version >
dependency >

然后启动一个替补开始:

定义样本“ Hello World!”工作流:

titanoboa.system] [ titanoboa .repo] [ titanoboa .system.local] [ titanoboa .processor])) (defn hello-fn [p] {:message (str "Hello " (or (:name p) "human") "!") :return-code (nil? (:name p))}) (defn greet-fn [p] {:message (str (:message p) " Nice to meet you!")}) (defn fill-in-blanks [p] {:message (str (:message p) " What is your name?")}) (def job-def {:first-step "step1" :name "test" :properties {:name nil} :steps [{:id "step1" :type :custom :supertype :tasklet :next [[false "step2"] [true "step3"]] :workload-fn 'local-system-test/hello-fn :properties {}} {:id "step2" :type :custom :supertype :tasklet :workload-fn 'local-system-test/greet-fn :next [] :properties {}} {:id "step3" :type :custom :supertype :tasklet :workload-fn 'local-system-test/fill-in-blanks :next [] :properties {}}]})">
( ns local-system-test
  ( :require [ titanoboa .system]
            [ titanoboa .repo]
            [ titanoboa .system.local]
            [ titanoboa .processor]))

( defn hello-fn [p]
  { :message ( str " Hello " ( or ( :name p) " human " ) " ! " )
   :return-code ( nil? ( :name p))})

( defn greet-fn [p]
  { :message ( str ( :message p) " Nice to meet you! " )})

( defn fill-in-blanks [p]
  { :message ( str ( :message p) " What is your name? " )})
  
( def job-def { :first-step " step1 "
              :name       " test "
              :properties { :name nil }
              :steps      [{ :id          " step1 "
                            :type :custom
                            :supertype :tasklet
                            :next [[ false " step2 " ] [ true " step3 " ]]
                            :workload-fn 'local-system-test/hello-fn
                            :properties  {}}
                           { :id          " step2 " :type :custom :supertype :tasklet
                            :workload-fn 'local-system-test/greet-fn
                            :next        []
                            :properties  {}}
                           { :id          " step3 " :type :custom :supertype :tasklet
                            :workload-fn 'local-system-test/fill-in-blanks
                            :next        []
                            :properties  {}}]})

从可以处理工作的工人开始简单的本地系统:

titanoboa.system/start-system! :core-local {:core-local {:system-def #' titanoboa .system.local/local-core-system :worker-def #' titanoboa .system.local/local-worker-system :worker-count 2}} {:new-jobs-chan new-jobs-chan :jobs-chan jobs-chan :finished-jobs-chan finished-jobs-chan :node-id "localhost" :eviction-interval (* 1000 60 5) :eviction-age (* 1000 60 10) :jobs-repo-path "repo-test/" :job-folder-path "job-folders/"}) INFO [nREPL-worker-0] - Starting system :core-local ... INFO [nREPL-worker-0] - Starting CacheEvictionComponent... INFO [CacheEvictionComponent thread 0] - Starting CacheEvictionComponent thread [ CacheEvictionComponent thread 0 ]. INFO [nREPL-worker-0] - Starting action processor pool... INFO [nREPL-worker-0] - Starting to watch repo folder for changes: dev-resources/repo-test/ INFO [nREPL-worker-0] - System :core-local started => true ( titanoboa .system/start-workers! :core-local {:core-local {:system-def #' titanoboa .system.local/local-core-system :worker-def #' titanoboa .system.local/local-worker-system :worker-count 2}}) INFO [nREPL-worker-1] - Starting 2 workers for system :core-local : INFO [nREPL-worker-1] - Starting a worker for system :core-local ... INFO [nREPL-worker-1] - Starting job worker.... INFO [nREPL-worker-1] - Starting a worker for system :core-local ... INFO [nREPL-worker-1] - Starting job worker.... => nil ">
( def new-jobs-chan ( clojure.core.async/chan ( clojure.core.async/dropping-buffer 1024 )))
( def jobs-chan ( clojure.core.async/chan ( clojure.core.async/dropping-buffer 1024 )))
( def finished-jobs-chan ( clojure.core.async/chan ( clojure.core.async/dropping-buffer 1024 )))

( titanoboa .system/start-system! :core-local
                                  { :core-local { :system-def   #' titanoboa .system.local/local-core-system
                                          :worker-def   #' titanoboa .system.local/local-worker-system
                                          :worker-count 2 }}
                                  { :new-jobs-chan      new-jobs-chan
                                   :jobs-chan          jobs-chan
                                   :finished-jobs-chan finished-jobs-chan
                                   :node-id            " localhost "
                                   :eviction-interval  ( * 1000 60 5 )
                                   :eviction-age       ( * 1000 60 10 )
                                   :jobs-repo-path     " repo-test/ "
                                   :job-folder-path    " job-folders/ " })
                                   
 INFO [nREPL-worker-0] - Starting system :core-local ...
 INFO [nREPL-worker-0] - Starting CacheEvictionComponent...
 INFO [CacheEvictionComponent thread 0 ] - Starting CacheEvictionComponent thread [ CacheEvictionComponent thread 0 ].
 INFO [nREPL-worker-0] - Starting action processor pool...
 INFO [nREPL-worker-0] - Starting to watch repo folder for changes:  dev-resources/repo-test/
 INFO [nREPL-worker-0] - System :core-local started
=> true
 
( titanoboa .system/start-workers! :core-local
                                   { :core-local { :system-def   #' titanoboa .system.local/local-core-system
                                           :worker-def   #' titanoboa .system.local/local-worker-system
                                           :worker-count 2 }})
                                           
 INFO [nREPL-worker-1] - Starting 2 workers for system :core-local :
 INFO [nREPL-worker-1] - Starting a worker for system :core-local ...
 INFO [nREPL-worker-1] - Starting job worker....
 INFO [nREPL-worker-1] - Starting a worker for system :core-local ...
 INFO [nREPL-worker-1] - Starting job worker....
=> nil                                           

开始工作:

titanoboa.processor/run-job! :core-local {:jobdef job-def :properties {:name "World"}} true) INFO [nREPL-worker-2] - Submitting new job [] into new jobs channel... INFO [async-thread-macro-2] - Initializing a new job; First step will be: [ step1 ] INFO [async-thread-macro-2] - Retrieved job [ d673c759-4fc6-4af1-bdad-d1dfd0f50f22 ] from jobs channel; Starting step [ step1 ] INFO [async-thread-macro-2] - Next step is step2 ; Submitting into jobs channel for next step's processing... INFO [async-thread-macro-1] - Initializing a next step; next step [ step2 ] was found among steps as step [ step2 ] INFO [async-thread-macro-2] - Acking main message for step step1 with thread stack nil INFO [async-thread-macro-1] - Retrieved job [ d673c759-4fc6-4af1-bdad-d1dfd0f50f22 ] from jobs channel; Starting step [ step2 ] INFO [async-thread-macro-1] - Looping through finalize-job! fn with thread-stack: [] INFO [async-thread-macro-1] - Acking main message for step step2 with thread stack nil INFO [async-thread-macro-1] - Job d673c759-4fc6-4af1-bdad-d1dfd0f50f22 has finshed. => {:properties {:name "World", :message "Hello World! Nice to meet you!"}, :step-start #inst"2018-11-23T07:35:34.218-00:00", :step-retries {}, :tracking-id nil, :step-state :completed, :start #inst"2018-11-23T07:35:34.203-00:00", :history [{:step-state :completed, :start #inst"2018-11-23T07:35:34.210-00:00", :duration 6, :result false, :node-id "localhost", :id "step1", :next-step "step2", :exception nil, :end #inst"2018-11-23T07:35:34.216-00:00", :retry-count nil, :thread-stack nil, :message "Step [step1] finshed with result [false]n"} {:id "step2", :step-state :running, :start #inst"2018-11-23T07:35:34.218-00:00", :node-id "localhost", :retry-count 0} {:step-state :completed, :start #inst"2018-11-23T07:35:34.218-00:00", :duration 1, :result nil, :node-id "localhost", :id "step2", :next-step nil, :exception nil, :end #inst"2018-11-23T07:35:34.219-00:00", :retry-count nil, :thread-stack nil, :message "Step [step2] finshed with result []n"}], :duration 16, :state :finished, :jobid "d673c759-4fc6-4af1-bdad-d1dfd0f50f22", :create-folder? true, :node-id "localhost", :next-step nil, :end #inst"2018-11-23T07:35:34.219-00:00"}">
( titanoboa .processor/run-job! :core-local
                              { :jobdef job-def
                               :properties { :name " World " }}
                              true )

 INFO [nREPL-worker-2] - Submitting new job [] into new jobs channel...
 INFO [async-thread-macro-2] - Initializing a new job ; First step will be: [ step1 ]
 INFO [async-thread-macro-2] - Retrieved job [ d673c759-4fc6-4af1-bdad-d1dfd0f50f22 ] from jobs channel ; Starting step [ step1 ]
 INFO [async-thread-macro-2] - Next step is  step2 ; Submitting into jobs channel for next step's processing...
 INFO [async-thread-macro-1] - Initializing a next step ; next step [ step2 ] was found among steps as step [ step2 ]
 INFO [async-thread-macro-2] - Acking main message for step  step1  with thread stack  nil
 INFO [async-thread-macro-1] - Retrieved job [ d673c759-4fc6-4af1-bdad-d1dfd0f50f22 ] from jobs channel ; Starting step [ step2 ]
 INFO [async-thread-macro-1] - Looping through finalize-job! fn with thread-stack:  []
 INFO [async-thread-macro-1] - Acking main message for step  step2  with thread stack  nil
 INFO [async-thread-macro-1] - Job  d673c759-4fc6-4af1-bdad-d1dfd0f50f22  has finshed.
=>
{ :properties { :name " World " , :message " Hello World! Nice to meet you! " },
 :step-start #inst " 2018-11-23T07:35:34.218-00:00 " ,
 :step-retries {},
 :tracking-id nil,
  :step-state :completed ,
 :start #inst " 2018-11-23T07:35:34.203-00:00 " ,
 :history [{ :step-state :completed ,
            :start #inst " 2018-11-23T07:35:34.210-00:00 " ,
            :duration 6 ,
            :result false ,
            :node-id " localhost " ,
            :id " step1 " ,
            :next-step " step2 " ,
            :exception nil,
            :end #inst " 2018-11-23T07:35:34.216-00:00 " ,
            :retry-count nil,
            :thread-stack nil,
            :message " Step [step1] finshed with result [false] n " }
           { :id " step2 " ,
            :step-state :running ,
            :start #inst " 2018-11-23T07:35:34.218-00:00 " ,
            :node-id " localhost " ,
            :retry-count 0 }
           { :step-state :completed ,
            :start #inst " 2018-11-23T07:35:34.218-00:00 " ,
            :duration 1 ,
            :result nil,
            :node-id " localhost " ,
            :id " step2 " ,
            :next-step nil,
            :exception nil,
            :end #inst " 2018-11-23T07:35:34.219-00:00 " ,
            :retry-count nil,
            :thread-stack nil,
            :message " Step [step2] finshed with result [] n " }],
 :duration 16 ,
 :state :finished ,
 :jobid " d673c759-4fc6-4af1-bdad-d1dfd0f50f22 " ,
 :create-folder? true ,
 :node-id " localhost " ,
 :next-step nil,
 :end #inst " 2018-11-23T07:35:34.219-00:00 " }

完成测试后,您可能需要停止系统:

titanoboa.system/stop-all-systems!) INFO [nREPL-worker-3] - Stopping all workers for system :core-local INFO [nREPL-worker-3] - Stopping job worker gracefully; sending a stop signal to the worker via service bus.... INFO [nREPL-worker-3] - Stopping job worker gracefully; sending a stop signal to the worker via service bus.... INFO [nREPL-worker-3] - Stopping system :core-local ... INFO [nREPL-worker-3] - Stopping action processor pool... INFO [nREPL-worker-3] - Stopping CacheEvictionComponent thread [ CacheEvictionComponent thread 0 ]...">
( titanoboa .system/stop-all-systems! )

 INFO [nREPL-worker-3] - Stopping all workers for system :core-local
 INFO [nREPL-worker-3] - Stopping job worker gracefully ; sending a stop signal to the worker via service bus....
 INFO [nREPL-worker-3] - Stopping job worker gracefully ; sending a stop signal to the worker via service bus....
 INFO [nREPL-worker-3] - Stopping system :core-local ...
 INFO [nREPL-worker-3] - Stopping action processor pool...
 INFO [nREPL-worker-3] - Stopping CacheEvictionComponent thread [ CacheEvictionComponent thread 0 ]...

执照

版权所有©Commsor Inc.

titanoboa已获得AGPL许可证的许可。

下载源码

通过命令行克隆项目:

git clone https://github.com/commsor/titanoboa.git