:::warning
:dromedary_camel: 基於企業整合模式EIP(enterprise integration patterns)設計的開源框架,使用component的方式接入第三方系統,便於快速簡單整合不同系統做處理及生產data
-> 直接使用URLs來串接不同的傳輸或訊息模型,像是HTTP、kafka、cxf
-> 包含有消息路由及轉換等特定領域語言(DSL)
:::
## :japanese_castle: 架構組成
- **CamelContext**,Camel的工作環境,一個application通常會有一個CamelContext對象,像是有:
1. SpringCamelContext與Spring框架縫合,透過createRegistrt創建applicationContextRegistry instance,再從其中的applicationContext的getBean取得instance
2. DefaultCamelContext運行所需的基本資訊,Service啟動時作為參數傳入,實現共享上下文的效果
- **Service**,大多數在Camel中的元素,都是一個獨立的service,像是Component、endpoint、routes等,透過CamelContext銜接,擁有獨立的生命週期,調用start()開始,stop()終止
- CamelTemplate,對CamelContext作包裝,提供發送消息到endpoint的方法
- **生產者模板**:producerTemplate、**fluentProducerTemplate(流式串接的寫法)**
- 同步執行
1. onlyIn模式: send(),sendBody(),sendBodyAndHeader()
2. inOut模式: request(),requestBody(),requestBodyAndHeader()
- 非同步執行 (返回future)
1. onlyIn模式: asyncSend(),asyncSendBody()
2. inOut模式: asyncRequestBody(),asyncRequestBodyAndHeader(),asyncRequestBodyAndHeaders()
3. asyncCallback(),asyncCallbackSendBody(),asyncCallbackRequestBody(): 包含callback
- **消費者模板**
- consumerTemplate
- 事件驅動模式 **event-driven consumer**: 常見的模式,像是client-server架構及網路服務,也會涉及到EIP模型的非同步receiver,監聽特定的messaging channel,像是TCP/IP port或Twitter handle, Amazon SQS queue或web socket等,等client送到message後,consumer啟動並開始處理
- 輪詢及排程輪詢模式 **polling/scheduled polling consumer**; 和事件驅動相反,這種模式會主動的拉取特定source的message,像是FTP server,這種polling consumer在EIP模型稱為同步receiver,因為在完成現在的message之前不會再poll新的message;但一般來說是用scheduled polling consumer,像是File, FTP, and email components都是這樣處理,non-scheduled的polling consumer只在特地呼叫時才會使用
- **endpoint**: 接受或發送數據的組件,指引channel要送到哪的model,支持多種協議,如jms, http, ftp, file
- camel能傳送或接收message給endpoint
- consumer (from)
- producer (to)
- or both
- 作為建立producer跟consumers的factory
- 使用URI描述目標系統的通訊 e.g. file:data/inbox?delay=5000
- schema: 指示哪個camel component處理這種type的endpoint,像是範例會使用FileComponent
- contextPath: 根據此context,指示該component建立endpoint
- Options: 其它選項,像delay=5000表示每5秒poll一次
- 實現org.apache.camel.Component interface建立
- DIRECT component: `direct:someName[?options]`,當producer發送message exchange時,提供直接、同步啟動consumer的方法,此endpoint能使連接到同樣層級context的既存route
- **processor**: 處理具體業務邏輯,接收來自Endpoint、Routing或別的processor的消息,需實作Process interface,用==exchange==型別傳遞,處理時截取出內含的message
- 串接各endpoint
- routing
- 轉換、調整、檢核、追蹤、interception、mediation、紀錄...
```java
public class testProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
// 1. getIn(): 擷取傳入的exchange內的訊息
Message message = exchange.getIn();
String body = message.getBody.toString();
// ===
// 執行數據轉換的邏輯
// ===
if(exchange.getPattern() == ExchangePattern.Inout){
// a. 自建立out message並將in message帶入
// copy headers from IN to OUT to propagate them
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
// getOut(): 擷取傳出的exchange內的訊息, 若無則建立一個空Message
Message.outMessage = excahnge.getOut();
outMessage.setBody(body + "|| other out")
// copy attachements from IN to OUT to propagate them
exchange.getOut().setAttachments(exchange.getIn().getAttachments());
// b. 或是直接更改in message, camel偵測無out message的話會直接拿in做成out message
String body = exchange.getIn().getBody(String.class);
// change the existing message to say Hello
exchange.getIn().setBody("Hello " + body);
}
}
}
```
- **exchange**: 訊息本體, 貫穿整條路徑執行的控制endpoint、處理器、表達式及路由條件判斷,內包含的資訊有:
- exception
- exchangeID
- FromEndpoint
- Properties: key/value結構配置共享的參數
- Pattern(ExchangePattern)交換器工作模式: 為enum,描述消息的傳遞方式,包含InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn
- Message IN/OUT: 當endpoint、processor之間的message在exchange中傳遞,exchange會自動將上一個輸出作為下一個輸入使用、包含MessageID、Attachment、Header、Body
- 方法
- Message exchange.getIn(): 擷取傳入的exchange內的訊息
- Message exchange.getOut():擷取傳出的exchange內的訊息, 若無則建立一個空Message
- Message exchange.hasOut(): 檢查是否有outMessage
- Message ExchangePattern: enum, 分有單向的InOnly event message, 或是雙向的InOut Request Reply message
- message: exchange的message部分,包含有body、headers、messageId、timestamp等,messageId預設等同於exchange.getExchangeId
- **route**: 路由規則,指示processor之間的路由跳轉,支持多種規則如下:
- content based router: 單一邏輯切轉至不同系統
- message filter: 篩選掉不需要的message
- dynamic router: 能透過條件表達式動態決定下個系統方向,完成後返回再次重新判斷直到沒有新的路徑為止
- recipient router: 轉導message至靜態或動態清單的特定收件者
- splitter: 拆解message,發送各element去不同的system
- aggregator: 集結多個message再發送
- recequencer: 將失序的message重排回正確順序
- composed message processor: 在多元素且不同processor的情況,維持整體message的流向
---
- routing code (主體設計為轉導,支援判斷及處理)
- from(): route起始點,始於作為input source的endpoint
- to(): 送message到靜態路由
- toD(): 送message到動態路由
- try..catch..finally: java姊妹款, 使用時則error handler不生效,如onException()
- doTry()
- doCatch()
- doFinally()
- end()
```java
from("direct:test").routeId("myroute")
.doTry().
doTry().
throwException(new IllegalArgumentException("Forced by me"))
.doCatch(Exception.class)
.log("docatch 1")
.throwException(new IllegalArgumentException("Second forced by me"))
// 注意巢狀結構要確實endDoTry(), 否則內部exception不會被拋出
.endDoTry() // end this doCatch block
.endDoTry() // end the inner doTry
.doCatch(Exception.class)
.log("docatch 2")
.end();
```
- setProperty(): 設定Exchange的property,以key/value的Map形式存放
```java
from("direct:a")
// 可以拿現有的exchange(bar) 設定到另一個exchange(foo)
.setProperty("foo", exchangeProperty("bar"))
.to("direct:b");
```
- setHeader(): 設定message的header,可以使用固定(constant)或變動header name(搭配Simple()語法使用),跟body不同,可以有多個欄位
```java
from("direct:a")
.setHeader("myHeader", constant("test"))
.to("direct:b")
.setHeader("randomNumber", simple("${random(1,100)}"))
.to("direct:c");
```
- setBody(): EIP patterns的message translator,轉換傳遞的內容(message),從路徑或從excahange都能設定
```java
from("direct:cheese")
.setBody(simple("Hello ${body}"))
.to("log:hello");
```
```java
from("file:messages/foo").to("jms:queue:foo");
from("direct:start").toD("${header.foo}"); // 抓message的header資訊動態導路
from("direct:start").toD("language:xpath:/order/@uri"); // toD使用不同語言
```
- choice(): 開啟分支路徑,可以巢狀結構,但不建議做的太複雜(it does not do it well)
- when()
- otherwise()
```java
// 有3種情況:
// 1. 只有一個主要的.when()/.otherwise()區塊,可以不顯式關閉,最後用.end()關閉choice()
// 若有巢狀結構,需顯式關閉(離開stack),否則不能繼續往下跑,
// 2. when()裡面包含多個choice,巢狀結構: 內層需顯式.endChoice()
// 3. when()裡面包含複雜巢狀區塊,像是loadBalance(),內層.endChoice()
// stack概念,所以要關閉when()/otherwise()區塊才能接著處理
from("direct:start")
.choice()
.when(body().contains("Camel"))
.loadBalance().roundRobin()
.to("mock:foo")
.to("mock:bar")
.choice()
.when(body().contains("people"))
.to("mock:test")
.endchoice()
.endChoice()
.otherwise()
.to("mock:result");
.end()
.to("direct:b")
```
- filter(expression 條件式篩選值)
```java
from("direct:a")
.filter(simple("${header.foo} == 'bar'"))
.to("direct:bar")
.end()
.to("direct:b")
```
- onException(): 特定exception類型要執行的動作,
- component(endpointFactory)
- 當CamelContext調用getEndpoint()時,根據Endpoint的URI前綴(schema)來尋找對應的Component再調用其方法
- 提供一致的endpoint interface
- 作為其他系統的connector
- 其它components
- simple(): 從架構層面抽出language作為Expression及predicate的工廠,免除再導入其它dependencies或語言的學習(如Groovy),simple是其中一個簡易的語言,內建許多變數像是exchange、body、header等,一般用來創建邏輯表達或一般表達式,在camel route使用少量scripting時適用,若是更為複雜的使用情境,則建議用Groovy, MVEL或OGNL
```java
simple("${header.title} contains 'Camel' && ${header.type'} == 'gold'")
// 若header存在foo這個key, 則true, message會被轉導到下個路徑; 否則message is dropped
from("seda:orders")
.filter().simple("${header.foo}")
.to("seda:fooOrders");
```
- 解決問題
- 不同系統/數據來源的整合應用,如共享或交換資訊
- 優化現有結構,使系統易於拓展或維護
- 使系統獨立不互相干擾
- camel的message system
- message endpoint
- message channel
- message
- message route
- message translator
- pipes and filters
---
### :closed_book: 應用場景
1. 消息匯聚: 匯聚不同server的數據,如ActiveMQ、RabbitMQ...;
2. 消息分發: 將消息從生產者轉發給接收者,有squential跟parallel 2種分發方式
```java
from("amqp:queue:order")
.to("uri:validateBean", "uri:handleBean", "uri:emailBean");
from("amqp:queue:order")
.multicast()
.to("uri:validateBean", "uri:handleBean", "uri:emailBean");
```
3. 消息轉換
```java
from("amqp:queue:order")
.process(new XmlToJsonProcessor())
.to("bean:orderHandler");
```
4. 規則引擎
```xml
<route>
<from uri="amqp:queue:order"/>
<multicast>
<to uri="uri:validateBean"/>
<to uri="uri:handleBean"/>
<to uri="uri:emailBean"/>
</multicast>
</route>
```
```java
from("amqp:queue:order")
.filter(header("foo")
.isEqualTo("bar"))
.choice()
.when(xpath("/person/city = 'London'"))
.to("file:target/messages/uk")
.otherwise()
.to("file:target/messages/others");
```
---
:mag: Spring 實作
---
```mermaid
graph TD;
創建CamelContext-->設定endpoints跟components;
設定endpoints跟components-->加上route規則;
加上route規則-->start啟動消息發送接收及處理;
start啟動消息發送接收及處理 --> stop關閉所有的endpoint跟執行緒
```
### 1. 引進套件
```java
<!-- Dependency for camel -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http4</artifactId>
</dependency>
```
### 2. 利用camelContext建立producer template去拋message
```java
@Service
public class ProcessServiceImpl implements ProcessService {
@Autowired
private CamelContext camelContext;
FluentProducerTemplate producer = camelContext.createFluentProducerTemplate();
try {
// 拋出message之後, 取得回覆內容
Exchange exchange = producer.to("direct:callEaiGW").withBody(request).send();
response = exchange.getOut().getBody(CubResponse.class);
} catch (Exception e){
logger.info("[Log] Exception message -> {}\n", e.getMessage(), e);
} finally {
try {
// 完成後, 顯式將此service停用
producer.stop();
} catch(Exception){
logger.info("Close producer fail!", e);
}
}
}
```
### 3. route extends RouteBuilder 建立consumer處理路徑
```java
// 此為Java DSL方法,也可以用xml或yml定義路徑
// 這邊選用direct component
public class EAIRoute extends RouteBuilder {
@Value("${eai.gw.url}")
private String gwUrl;
private String urlForGW;
@Override
public void configure() throws Exception {
//Error Handle
onException(Exception.class).process("ErrorHandleProcessor").handled(true);
StringBuilder sb = new StringBuilder();
sb.append(gwUrl).append("?bridgeEndpoint=true&clientConnectionManager=#connectionManager");
urlForGW = sb.toString();
// 由from接取producer傳來的message, 依序to到別的地方
from("direct:callEaiGW").to("SetBodyProcessor").to(urlForGW).to("GetResultProcessor");
}
}
```
### 4. **processor implements Processor定義各系統的執行內容(&錯誤處理)**
```java
@Component("SetBodyProcessor")
public class SetBodyProcessor implements Processor {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Utils utils;
@Override
public void process(Exchange exchange) throws Exception {
try {
Message message = exchange.getIn();
CubRequest<?> body = message.getBody(CubRequest.class);
MwHeader mw = body.getMwHeader();
exchange.setProperty("Header", message.getHeaders());
exchange.setProperty("Request", body);
String json = utils.transferObjToJson(body);
logger.info("[Log] Send Request -> {}, {}, {}", mw.getMsgid(), mw.getTxnseq(), json);
message.setHeader(Exchange.CONTENT_TYPE, "application/json;charset=UTF-8");
message.setBody(json);
} catch (Exception e) {
logger.info(e.getMessage(), e);
}
}
}
```
**REF**
1. [Apache Camel的认识和应用详解](https://blog.51cto.com/u_15459458/5158242)
2. [Apache Camel user manual](https://camel.apache.org/components/3.20.x/eips/choice-eip.html)
3. [Apache Camel](https://camel.apache.org/)
4. [Using .when(), .end(), .endChoice() in Apache Camel with examples](https://bvn13.medium.com/using-when-end-endchoice-in-apache-camel-with-examples-5e6e4f4f7f16)
5. [Camel end vs endChoice - not the usual query](https://stackoverflow.com/questions/32382322/camel-end-vs-endchoice-not-the-usual-query)
6. [Package org.apache.camel](https://www.javadoc.io/doc/org.apache.camel/camel-api/latest/org/apache/camel/package-summary.html)