:::warning :dromedary_camel: 基於企業整合模式EIP(enterprise integration patterns)設計的開源框架,使用component的方式接入第三方系統,便於快速簡單整合不同系統做處理及生產data -> 直接使用URLs來串接不同的傳輸或訊息模型,像是HTTP、kafka、cxf -> 包含有消息路由及轉換等特定領域語言(DSL) ::: ## :japanese_castle: 架構組成 - **CamelContext**,Camel的工作環境,一個application通常會有一個CamelContext對象,像是有: 1. SpringCamelContext與Spring框架縫合,透過createRegistrt創建applicationContextRegistry instance,再從其中的applicationContext的getBean取得instance 2. DefaultCamelContext運行所需的基本資訊,Service啟動時作為參數傳入,實現共享上下文的效果 - **Service**,大多數在Camel中的元素,都是一個獨立的service,像是Component、endpoint、routes等,透過CamelContext銜接,擁有獨立的生命週期,調用start()開始,stop()終止 - CamelTemplate,對CamelContext作包裝,提供發送消息到endpoint的方法 - **生產者模板**:producerTemplate、**fluentProducerTemplate(流式串接的寫法)** - 同步執行 1. onlyIn模式: send(),sendBody(),sendBodyAndHeader() 2. inOut模式: request(),requestBody(),requestBodyAndHeader() - 非同步執行 (返回future) 1. onlyIn模式: asyncSend(),asyncSendBody() 2. inOut模式: asyncRequestBody(),asyncRequestBodyAndHeader(),asyncRequestBodyAndHeaders() 3. asyncCallback(),asyncCallbackSendBody(),asyncCallbackRequestBody(): 包含callback - **消費者模板** - consumerTemplate - 事件驅動模式 **event-driven consumer**: 常見的模式,像是client-server架構及網路服務,也會涉及到EIP模型的非同步receiver,監聽特定的messaging channel,像是TCP/IP port或Twitter handle, Amazon SQS queue或web socket等,等client送到message後,consumer啟動並開始處理 - 輪詢及排程輪詢模式 **polling/scheduled polling consumer**; 和事件驅動相反,這種模式會主動的拉取特定source的message,像是FTP server,這種polling consumer在EIP模型稱為同步receiver,因為在完成現在的message之前不會再poll新的message;但一般來說是用scheduled polling consumer,像是File, FTP, and email components都是這樣處理,non-scheduled的polling consumer只在特地呼叫時才會使用 - **endpoint**: 接受或發送數據的組件,指引channel要送到哪的model,支持多種協議,如jms, http, ftp, file - camel能傳送或接收message給endpoint - consumer (from) - producer (to) - or both - 作為建立producer跟consumers的factory - 使用URI描述目標系統的通訊 e.g. file:data/inbox?delay=5000 - schema: 指示哪個camel component處理這種type的endpoint,像是範例會使用FileComponent - contextPath: 根據此context,指示該component建立endpoint - Options: 其它選項,像delay=5000表示每5秒poll一次 - 實現org.apache.camel.Component interface建立 - DIRECT component: `direct:someName[?options]`,當producer發送message exchange時,提供直接、同步啟動consumer的方法,此endpoint能使連接到同樣層級context的既存route - **processor**: 處理具體業務邏輯,接收來自Endpoint、Routing或別的processor的消息,需實作Process interface,用==exchange==型別傳遞,處理時截取出內含的message - 串接各endpoint - routing - 轉換、調整、檢核、追蹤、interception、mediation、紀錄... ```java public class testProcessor implements Processor{ @Override public void process(Exchange exchange) throws Exception{ // 1. getIn(): 擷取傳入的exchange內的訊息 Message message = exchange.getIn(); String body = message.getBody.toString(); // === // 執行數據轉換的邏輯 // === if(exchange.getPattern() == ExchangePattern.Inout){ // a. 自建立out message並將in message帶入 // copy headers from IN to OUT to propagate them exchange.getOut().setHeaders(exchange.getIn().getHeaders()); // getOut(): 擷取傳出的exchange內的訊息, 若無則建立一個空Message Message.outMessage = excahnge.getOut(); outMessage.setBody(body + "|| other out") // copy attachements from IN to OUT to propagate them exchange.getOut().setAttachments(exchange.getIn().getAttachments()); // b. 或是直接更改in message, camel偵測無out message的話會直接拿in做成out message String body = exchange.getIn().getBody(String.class); // change the existing message to say Hello exchange.getIn().setBody("Hello " + body); } } } ``` - **exchange**: 訊息本體, 貫穿整條路徑執行的控制endpoint、處理器、表達式及路由條件判斷,內包含的資訊有: - exception - exchangeID - FromEndpoint - Properties: key/value結構配置共享的參數 - Pattern(ExchangePattern)交換器工作模式: 為enum,描述消息的傳遞方式,包含InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn - Message IN/OUT: 當endpoint、processor之間的message在exchange中傳遞,exchange會自動將上一個輸出作為下一個輸入使用、包含MessageID、Attachment、Header、Body - 方法 - Message exchange.getIn(): 擷取傳入的exchange內的訊息 - Message exchange.getOut():擷取傳出的exchange內的訊息, 若無則建立一個空Message - Message exchange.hasOut(): 檢查是否有outMessage - Message ExchangePattern: enum, 分有單向的InOnly event message, 或是雙向的InOut Request Reply message - message: exchange的message部分,包含有body、headers、messageId、timestamp等,messageId預設等同於exchange.getExchangeId - **route**: 路由規則,指示processor之間的路由跳轉,支持多種規則如下: - content based router: 單一邏輯切轉至不同系統 - message filter: 篩選掉不需要的message - dynamic router: 能透過條件表達式動態決定下個系統方向,完成後返回再次重新判斷直到沒有新的路徑為止 - recipient router: 轉導message至靜態或動態清單的特定收件者 - splitter: 拆解message,發送各element去不同的system - aggregator: 集結多個message再發送 - recequencer: 將失序的message重排回正確順序 - composed message processor: 在多元素且不同processor的情況,維持整體message的流向 --- - routing code (主體設計為轉導,支援判斷及處理) - from(): route起始點,始於作為input source的endpoint - to(): 送message到靜態路由 - toD(): 送message到動態路由 - try..catch..finally: java姊妹款, 使用時則error handler不生效,如onException() - doTry() - doCatch() - doFinally() - end() ```java from("direct:test").routeId("myroute") .doTry(). doTry(). throwException(new IllegalArgumentException("Forced by me")) .doCatch(Exception.class) .log("docatch 1") .throwException(new IllegalArgumentException("Second forced by me")) // 注意巢狀結構要確實endDoTry(), 否則內部exception不會被拋出 .endDoTry() // end this doCatch block .endDoTry() // end the inner doTry .doCatch(Exception.class) .log("docatch 2") .end(); ``` - setProperty(): 設定Exchange的property,以key/value的Map形式存放 ```java from("direct:a") // 可以拿現有的exchange(bar) 設定到另一個exchange(foo) .setProperty("foo", exchangeProperty("bar")) .to("direct:b"); ``` - setHeader(): 設定message的header,可以使用固定(constant)或變動header name(搭配Simple()語法使用),跟body不同,可以有多個欄位 ```java from("direct:a") .setHeader("myHeader", constant("test")) .to("direct:b") .setHeader("randomNumber", simple("${random(1,100)}")) .to("direct:c"); ``` - setBody(): EIP patterns的message translator,轉換傳遞的內容(message),從路徑或從excahange都能設定 ```java from("direct:cheese") .setBody(simple("Hello ${body}")) .to("log:hello"); ``` ```java from("file:messages/foo").to("jms:queue:foo"); from("direct:start").toD("${header.foo}"); // 抓message的header資訊動態導路 from("direct:start").toD("language:xpath:/order/@uri"); // toD使用不同語言 ``` - choice(): 開啟分支路徑,可以巢狀結構,但不建議做的太複雜(it does not do it well) - when() - otherwise() ```java // 有3種情況: // 1. 只有一個主要的.when()/.otherwise()區塊,可以不顯式關閉,最後用.end()關閉choice() // 若有巢狀結構,需顯式關閉(離開stack),否則不能繼續往下跑, // 2. when()裡面包含多個choice,巢狀結構: 內層需顯式.endChoice() // 3. when()裡面包含複雜巢狀區塊,像是loadBalance(),內層.endChoice() // stack概念,所以要關閉when()/otherwise()區塊才能接著處理 from("direct:start") .choice() .when(body().contains("Camel")) .loadBalance().roundRobin() .to("mock:foo") .to("mock:bar") .choice() .when(body().contains("people")) .to("mock:test") .endchoice() .endChoice() .otherwise() .to("mock:result"); .end() .to("direct:b") ``` - filter(expression 條件式篩選值) ```java from("direct:a") .filter(simple("${header.foo} == 'bar'")) .to("direct:bar") .end() .to("direct:b") ``` - onException(): 特定exception類型要執行的動作, - component(endpointFactory) - 當CamelContext調用getEndpoint()時,根據Endpoint的URI前綴(schema)來尋找對應的Component再調用其方法 - 提供一致的endpoint interface - 作為其他系統的connector - 其它components - simple(): 從架構層面抽出language作為Expression及predicate的工廠,免除再導入其它dependencies或語言的學習(如Groovy),simple是其中一個簡易的語言,內建許多變數像是exchange、body、header等,一般用來創建邏輯表達或一般表達式,在camel route使用少量scripting時適用,若是更為複雜的使用情境,則建議用Groovy, MVEL或OGNL ```java simple("${header.title} contains 'Camel' && ${header.type'} == 'gold'") // 若header存在foo這個key, 則true, message會被轉導到下個路徑; 否則message is dropped from("seda:orders") .filter().simple("${header.foo}") .to("seda:fooOrders"); ``` - 解決問題 - 不同系統/數據來源的整合應用,如共享或交換資訊 - 優化現有結構,使系統易於拓展或維護 - 使系統獨立不互相干擾 - camel的message system - message endpoint - message channel - message - message route - message translator - pipes and filters --- ### :closed_book: 應用場景 1. 消息匯聚: 匯聚不同server的數據,如ActiveMQ、RabbitMQ...; 2. 消息分發: 將消息從生產者轉發給接收者,有squential跟parallel 2種分發方式 ```java from("amqp:queue:order") .to("uri:validateBean", "uri:handleBean", "uri:emailBean"); from("amqp:queue:order") .multicast() .to("uri:validateBean", "uri:handleBean", "uri:emailBean"); ``` 3. 消息轉換 ```java from("amqp:queue:order") .process(new XmlToJsonProcessor()) .to("bean:orderHandler"); ``` 4. 規則引擎 ```xml <route> <from uri="amqp:queue:order"/> <multicast> <to uri="uri:validateBean"/> <to uri="uri:handleBean"/> <to uri="uri:emailBean"/> </multicast> </route> ``` ```java from("amqp:queue:order") .filter(header("foo") .isEqualTo("bar")) .choice() .when(xpath("/person/city = &#39;London&#39;")) .to("file:target/messages/uk") .otherwise() .to("file:target/messages/others"); ``` --- :mag: Spring 實作 --- ```mermaid graph TD; 創建CamelContext-->設定endpoints跟components; 設定endpoints跟components-->加上route規則; 加上route規則-->start啟動消息發送接收及處理; start啟動消息發送接收及處理 --> stop關閉所有的endpoint跟執行緒 ``` ### 1. 引進套件 ```java <!-- Dependency for camel --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-http4</artifactId> </dependency> ``` ### 2. 利用camelContext建立producer template去拋message ```java @Service public class ProcessServiceImpl implements ProcessService { @Autowired private CamelContext camelContext; FluentProducerTemplate producer = camelContext.createFluentProducerTemplate(); try { // 拋出message之後, 取得回覆內容 Exchange exchange = producer.to("direct:callEaiGW").withBody(request).send(); response = exchange.getOut().getBody(CubResponse.class); } catch (Exception e){ logger.info("[Log] Exception message -> {}\n", e.getMessage(), e); } finally { try { // 完成後, 顯式將此service停用 producer.stop(); } catch(Exception){ logger.info("Close producer fail!", e); } } } ``` ### 3. route extends RouteBuilder 建立consumer處理路徑 ```java // 此為Java DSL方法,也可以用xml或yml定義路徑 // 這邊選用direct component public class EAIRoute extends RouteBuilder { @Value("${eai.gw.url}") private String gwUrl; private String urlForGW; @Override public void configure() throws Exception { //Error Handle onException(Exception.class).process("ErrorHandleProcessor").handled(true); StringBuilder sb = new StringBuilder(); sb.append(gwUrl).append("?bridgeEndpoint=true&clientConnectionManager=#connectionManager"); urlForGW = sb.toString(); // 由from接取producer傳來的message, 依序to到別的地方 from("direct:callEaiGW").to("SetBodyProcessor").to(urlForGW).to("GetResultProcessor"); } } ``` ### 4. **processor implements Processor定義各系統的執行內容(&錯誤處理)** ```java @Component("SetBodyProcessor") public class SetBodyProcessor implements Processor { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private Utils utils; @Override public void process(Exchange exchange) throws Exception { try { Message message = exchange.getIn(); CubRequest<?> body = message.getBody(CubRequest.class); MwHeader mw = body.getMwHeader(); exchange.setProperty("Header", message.getHeaders()); exchange.setProperty("Request", body); String json = utils.transferObjToJson(body); logger.info("[Log] Send Request -> {}, {}, {}", mw.getMsgid(), mw.getTxnseq(), json); message.setHeader(Exchange.CONTENT_TYPE, "application/json;charset=UTF-8"); message.setBody(json); } catch (Exception e) { logger.info(e.getMessage(), e); } } } ``` **REF** 1. [Apache Camel的认识和应用详解](https://blog.51cto.com/u_15459458/5158242) 2. [Apache Camel user manual](https://camel.apache.org/components/3.20.x/eips/choice-eip.html) 3. [Apache Camel](https://camel.apache.org/) 4. [Using .when(), .end(), .endChoice() in Apache Camel with examples](https://bvn13.medium.com/using-when-end-endchoice-in-apache-camel-with-examples-5e6e4f4f7f16) 5. [Camel end vs endChoice - not the usual query](https://stackoverflow.com/questions/32382322/camel-end-vs-endchoice-not-the-usual-query) 6. [Package org.apache.camel](https://www.javadoc.io/doc/org.apache.camel/camel-api/latest/org/apache/camel/package-summary.html)