基於企業整合模式EIP(enterprise integration patterns)設計的開源框架,使用component的方式接入第三方系統,便於快速簡單整合不同系統做處理及生產data
-> 直接使用URLs來串接不同的傳輸或訊息模型,像是HTTP、kafka、cxf
-> 包含有消息路由及轉換等特定領域語言(DSL)
架構組成
-
CamelContext,Camel的工作環境,一個application通常會有一個CamelContext對象,像是有:
1. SpringCamelContext與Spring框架縫合,透過createRegistrt創建applicationContextRegistry instance,再從其中的applicationContext的getBean取得instance
2. DefaultCamelContext運行所需的基本資訊,Service啟動時作為參數傳入,實現共享上下文的效果
-
Service,大多數在Camel中的元素,都是一個獨立的service,像是Component、endpoint、routes等,透過CamelContext銜接,擁有獨立的生命週期,調用start()開始,stop()終止
- CamelTemplate,對CamelContext作包裝,提供發送消息到endpoint的方法
- 生產者模板:producerTemplate、fluentProducerTemplate(流式串接的寫法)
- 同步執行
- onlyIn模式: send(),sendBody(),sendBodyAndHeader()
- inOut模式: request(),requestBody(),requestBodyAndHeader()
- 非同步執行 (返回future)
- onlyIn模式: asyncSend(),asyncSendBody()
- inOut模式: asyncRequestBody(),asyncRequestBodyAndHeader(),asyncRequestBodyAndHeaders()
- asyncCallback(),asyncCallbackSendBody(),asyncCallbackRequestBody(): 包含callback
- 消費者模板
- consumerTemplate
- 事件驅動模式 event-driven consumer: 常見的模式,像是client-server架構及網路服務,也會涉及到EIP模型的非同步receiver,監聽特定的messaging channel,像是TCP/IP port或Twitter handle, Amazon SQS queue或web socket等,等client送到message後,consumer啟動並開始處理
- 輪詢及排程輪詢模式 polling/scheduled polling consumer; 和事件驅動相反,這種模式會主動的拉取特定source的message,像是FTP server,這種polling consumer在EIP模型稱為同步receiver,因為在完成現在的message之前不會再poll新的message;但一般來說是用scheduled polling consumer,像是File, FTP, and email components都是這樣處理,non-scheduled的polling consumer只在特地呼叫時才會使用
- endpoint: 接受或發送數據的組件,指引channel要送到哪的model,支持多種協議,如jms, http, ftp, file
- camel能傳送或接收message給endpoint
- consumer (from)
- producer (to)
- or both
- 作為建立producer跟consumers的factory
- 使用URI描述目標系統的通訊 e.g. file:data/inbox?delay=5000
- schema: 指示哪個camel component處理這種type的endpoint,像是範例會使用FileComponent
- contextPath: 根據此context,指示該component建立endpoint
- Options: 其它選項,像delay=5000表示每5秒poll一次
- 實現org.apache.camel.Component interface建立
- DIRECT component:
direct:someName[?options]
,當producer發送message exchange時,提供直接、同步啟動consumer的方法,此endpoint能使連接到同樣層級context的既存route
- processor: 處理具體業務邏輯,接收來自Endpoint、Routing或別的processor的消息,需實作Process interface,用exchange型別傳遞,處理時截取出內含的message
- 串接各endpoint
- routing
- 轉換、調整、檢核、追蹤、interception、mediation、紀錄…
- exchange: 訊息本體, 貫穿整條路徑執行的控制endpoint、處理器、表達式及路由條件判斷,內包含的資訊有:
- exception
- exchangeID
- FromEndpoint
- Properties: key/value結構配置共享的參數
- Pattern(ExchangePattern)交換器工作模式: 為enum,描述消息的傳遞方式,包含InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn
- Message IN/OUT: 當endpoint、processor之間的message在exchange中傳遞,exchange會自動將上一個輸出作為下一個輸入使用、包含MessageID、Attachment、Header、Body
- 方法
- Message exchange.getIn(): 擷取傳入的exchange內的訊息
- Message exchange.getOut():擷取傳出的exchange內的訊息, 若無則建立一個空Message
- Message exchange.hasOut(): 檢查是否有outMessage
- Message ExchangePattern: enum, 分有單向的InOnly event message, 或是雙向的InOut Request Reply message
- message: exchange的message部分,包含有body、headers、messageId、timestamp等,messageId預設等同於exchange.getExchangeId
- route: 路由規則,指示processor之間的路由跳轉,支持多種規則如下:
- content based router: 單一邏輯切轉至不同系統
- message filter: 篩選掉不需要的message
- dynamic router: 能透過條件表達式動態決定下個系統方向,完成後返回再次重新判斷直到沒有新的路徑為止
- recipient router: 轉導message至靜態或動態清單的特定收件者
- splitter: 拆解message,發送各element去不同的system
- aggregator: 集結多個message再發送
- recequencer: 將失序的message重排回正確順序
- composed message processor: 在多元素且不同processor的情況,維持整體message的流向
- component(endpointFactory)
- 當CamelContext調用getEndpoint()時,根據Endpoint的URI前綴(schema)來尋找對應的Component再調用其方法
- 提供一致的endpoint interface
- 作為其他系統的connector
- 其它components
- simple(): 從架構層面抽出language作為Expression及predicate的工廠,免除再導入其它dependencies或語言的學習(如Groovy),simple是其中一個簡易的語言,內建許多變數像是exchange、body、header等,一般用來創建邏輯表達或一般表達式,在camel route使用少量scripting時適用,若是更為複雜的使用情境,則建議用Groovy, MVEL或OGNL
-
解決問題
- 不同系統/數據來源的整合應用,如共享或交換資訊
- 優化現有結構,使系統易於拓展或維護
- 使系統獨立不互相干擾
-
camel的message system
- message endpoint
- message channel
- message
- message route
- message translator
- pipes and filters
應用場景
- 消息匯聚: 匯聚不同server的數據,如ActiveMQ、RabbitMQ…;
- 消息分發: 將消息從生產者轉發給接收者,有squential跟parallel 2種分發方式
- 消息轉換
- 規則引擎
Spring 實作
1. 引進套件
2. 利用camelContext建立producer template去拋message
3. route extends RouteBuilder 建立consumer處理路徑
4. processor implements Processor定義各系統的執行內容(&錯誤處理)
REF
- Apache Camel的认识和应用详解
- Apache Camel user manual
- Apache Camel
- Using .when(), .end(), .endChoice() in Apache Camel with examples
- Camel end vs endChoice - not the usual query
- Package org.apache.camel