:dromedary_camel: 基於企業整合模式EIP(enterprise integration patterns)設計的開源框架,使用component的方式接入第三方系統,便於快速簡單整合不同系統做處理及生產data
-> 直接使用URLs來串接不同的傳輸或訊息模型,像是HTTP、kafka、cxf
-> 包含有消息路由及轉換等特定領域語言(DSL)

:japanese_castle: 架構組成

  • CamelContext,Camel的工作環境,一個application通常會有一個CamelContext對象,像是有:
    1. SpringCamelContext與Spring框架縫合,透過createRegistrt創建applicationContextRegistry instance,再從其中的applicationContext的getBean取得instance
    2. DefaultCamelContext運行所需的基本資訊,Service啟動時作為參數傳入,實現共享上下文的效果

  • Service,大多數在Camel中的元素,都是一個獨立的service,像是Component、endpoint、routes等,透過CamelContext銜接,擁有獨立的生命週期,調用start()開始,stop()終止

    • CamelTemplate,對CamelContext作包裝,提供發送消息到endpoint的方法
      • 生產者模板:producerTemplate、fluentProducerTemplate(流式串接的寫法)
        • 同步執行
          1. onlyIn模式: send(),sendBody(),sendBodyAndHeader()
          2. inOut模式: request(),requestBody(),requestBodyAndHeader()
        • 非同步執行 (返回future)
          1. onlyIn模式: asyncSend(),asyncSendBody()
          2. inOut模式: asyncRequestBody(),asyncRequestBodyAndHeader(),asyncRequestBodyAndHeaders()
          3. asyncCallback(),asyncCallbackSendBody(),asyncCallbackRequestBody(): 包含callback
      • 消費者模板
        • consumerTemplate
          • 事件驅動模式 event-driven consumer: 常見的模式,像是client-server架構及網路服務,也會涉及到EIP模型的非同步receiver,監聽特定的messaging channel,像是TCP/IP port或Twitter handle, Amazon SQS queue或web socket等,等client送到message後,consumer啟動並開始處理
          • 輪詢及排程輪詢模式 polling/scheduled polling consumer; 和事件驅動相反,這種模式會主動的拉取特定source的message,像是FTP server,這種polling consumer在EIP模型稱為同步receiver,因為在完成現在的message之前不會再poll新的message;但一般來說是用scheduled polling consumer,像是File, FTP, and email components都是這樣處理,non-scheduled的polling consumer只在特地呼叫時才會使用
    • endpoint: 接受或發送數據的組件,指引channel要送到哪的model,支持多種協議,如jms, http, ftp, file
      • camel能傳送或接收message給endpoint
        • consumer (from)
        • producer (to)
        • or both
      • 作為建立producer跟consumers的factory
      • 使用URI描述目標系統的通訊 e.g. file:data/inbox?delay=5000
        • schema: 指示哪個camel component處理這種type的endpoint,像是範例會使用FileComponent
        • contextPath: 根據此context,指示該component建立endpoint
        • Options: 其它選項,像delay=5000表示每5秒poll一次
      • 實現org.apache.camel.Component interface建立
      • DIRECT component: direct:someName[?options],當producer發送message exchange時,提供直接、同步啟動consumer的方法,此endpoint能使連接到同樣層級context的既存route
    • processor: 處理具體業務邏輯,接收來自Endpoint、Routing或別的processor的消息,需實作Process interface,用exchange型別傳遞,處理時截取出內含的message
      • 串接各endpoint
      • routing
      • 轉換、調整、檢核、追蹤、interception、mediation、紀錄
      ​​​​​​​​public class testProcessor implements Processor{
      ​​​​​​​​    @Override
      ​​​​​​​​        public void process(Exchange exchange) throws Exception{
      ​​​​​​​​            // 1. getIn(): 擷取傳入的exchange內的訊息
      ​​​​​​​​            Message message = exchange.getIn(); 
      ​​​​​​​​            String body = message.getBody.toString();
      ​​​​​​​​            // ===
      ​​​​​​​​            // 執行數據轉換的邏輯
      ​​​​​​​​            // ===
      ​​​​​​​​            if(exchange.getPattern() == ExchangePattern.Inout){
      ​​​​​​​​                // a. 自建立out message並將in message帶入
      ​​​​​​​​                // copy headers from IN to OUT to propagate them
      ​​​​​​​​                exchange.getOut().setHeaders(exchange.getIn().getHeaders());
      ​​​​​​​​                // getOut(): 擷取傳出的exchange內的訊息, 若無則建立一個空Message
      ​​​​​​​​                Message.outMessage = excahnge.getOut();
      ​​​​​​​​                outMessage.setBody(body + "|| other out")
      ​​​​​​​​                // copy attachements from IN to OUT to propagate them
      ​​​​​​​​               exchange.getOut().setAttachments(exchange.getIn().getAttachments());
      ​​​​​​​​                
      ​​​​​​​​                // b. 或是直接更改in message, camel偵測無out message的話會直接拿in做成out message
      ​​​​​​​​                String body = exchange.getIn().getBody(String.class);
      ​​​​​​​​                // change the existing message to say Hello
      ​​​​​​​​                exchange.getIn().setBody("Hello " + body);
      ​​​​​​​​            }
      ​​​​​​​​        }
      ​​​​​​​​}
      
    • exchange: 訊息本體, 貫穿整條路徑執行的控制endpoint、處理器、表達式及路由條件判斷,內包含的資訊有:
      • exception
      • exchangeID
      • FromEndpoint
      • Properties: key/value結構配置共享的參數
      • Pattern(ExchangePattern)交換器工作模式: 為enum,描述消息的傳遞方式,包含InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn
      • Message IN/OUT: 當endpoint、processor之間的message在exchange中傳遞,exchange會自動將上一個輸出作為下一個輸入使用、包含MessageID、Attachment、Header、Body
      • 方法
        • Message exchange.getIn(): 擷取傳入的exchange內的訊息
        • Message exchange.getOut():擷取傳出的exchange內的訊息, 若無則建立一個空Message
        • Message exchange.hasOut(): 檢查是否有outMessage
        • Message ExchangePattern: enum, 分有單向的InOnly event message, 或是雙向的InOut Request Reply message
      • message: exchange的message部分,包含有body、headers、messageId、timestamp等,messageId預設等同於exchange.getExchangeId
    • route: 路由規則,指示processor之間的路由跳轉,支持多種規則如下:
      • content based router: 單一邏輯切轉至不同系統
      • message filter: 篩選掉不需要的message
      • dynamic router: 能透過條件表達式動態決定下個系統方向,完成後返回再次重新判斷直到沒有新的路徑為止
      • recipient router: 轉導message至靜態或動態清單的特定收件者
      • splitter: 拆解message,發送各element去不同的system
      • aggregator: 集結多個message再發送
      • recequencer: 將失序的message重排回正確順序
      • composed message processor: 在多元素且不同processor的情況,維持整體message的流向

      • routing code (主體設計為轉導,支援判斷及處理)

        • from(): route起始點,始於作為input source的endpoint
        • to(): 送message到靜態路由
        • toD(): 送message到動態路由
        • try..catch..finally: java姊妹款, 使用時則error handler不生效,如onException()
          • doTry()
          • doCatch()
          • doFinally()
          • end()
        ​​​​​​​​​​​​from("direct:test").routeId("myroute")
        ​​​​​​​​​​​​    .doTry().
        ​​​​​​​​​​​​        doTry().
        ​​​​​​​​​​​​            throwException(new IllegalArgumentException("Forced by me"))
        ​​​​​​​​​​​​        .doCatch(Exception.class)
        ​​​​​​​​​​​​            .log("docatch 1")
        ​​​​​​​​​​​​            .throwException(new IllegalArgumentException("Second forced by me"))
        ​​​​​​​​​​​​    // 注意巢狀結構要確實endDoTry(), 否則內部exception不會被拋出
        ​​​​​​​​​​​​         .endDoTry() // end this doCatch block
        ​​​​​​​​​​​​     .endDoTry() // end the inner doTry
        ​​​​​​​​​​​​    .doCatch(Exception.class)
        ​​​​​​​​​​​​        .log("docatch 2")
        ​​​​​​​​​​​​    .end();
        
        • setProperty(): 設定Exchange的property,以key/value的Map形式存放
        ​​​​​​​​​​​​from("direct:a")
        ​​​​​​​​​​​​    // 可以拿現有的exchange(bar) 設定到另一個exchange(foo)
        ​​​​​​​​​​​​    .setProperty("foo", exchangeProperty("bar"))
        ​​​​​​​​​​​​    .to("direct:b");
        
        • setHeader(): 設定message的header,可以使用固定(constant)或變動header name(搭配Simple()語法使用),跟body不同,可以有多個欄位
        ​​​​​​​​​​​​from("direct:a")
        ​​​​​​​​​​​​    .setHeader("myHeader", constant("test"))
        ​​​​​​​​​​​​    .to("direct:b")
        ​​​​​​​​​​​​    .setHeader("randomNumber", simple("${random(1,100)}"))
        ​​​​​​​​​​​​    .to("direct:c");
        
        • setBody(): EIP patterns的message translator,轉換傳遞的內容(message),從路徑或從excahange都能設定
        ​​​​​​​​​​​​from("direct:cheese")
        ​​​​​​​​​​​​    .setBody(simple("Hello ${body}"))
        ​​​​​​​​​​​​    .to("log:hello");
        
        ​​​​​​​​​​​​from("file:messages/foo").to("jms:queue:foo");
        ​​​​​​​​​​​​from("direct:start").toD("${header.foo}"); // 抓message的header資訊動態導路
        ​​​​​​​​​​​​from("direct:start").toD("language:xpath:/order/@uri"); // toD使用不同語言
        
        • choice(): 開啟分支路徑,可以巢狀結構,但不建議做的太複雜(it does not do it well)
          • when()
          • otherwise()
        ​​​​​​​​​​​​// 有3種情況: 
        ​​​​​​​​​​​​// 1. 只有一個主要的.when()/.otherwise()區塊,可以不顯式關閉,最後用.end()關閉choice()
        ​​​​​​​​​​​​// 若有巢狀結構,需顯式關閉(離開stack),否則不能繼續往下跑,
        ​​​​​​​​​​​​// 2. when()裡面包含多個choice,巢狀結構: 內層需顯式.endChoice()
        ​​​​​​​​​​​​// 3. when()裡面包含複雜巢狀區塊,像是loadBalance(),內層.endChoice()
        ​​​​​​​​​​​​// stack概念,所以要關閉when()/otherwise()區塊才能接著處理
        ​​​​​​​​​​​​from("direct:start")
        ​​​​​​​​​​​​    .choice()
        ​​​​​​​​​​​​        .when(body().contains("Camel"))
        ​​​​​​​​​​​​            .loadBalance().roundRobin()
        ​​​​​​​​​​​​                .to("mock:foo")
        ​​​​​​​​​​​​                .to("mock:bar")
        ​​​​​​​​​​​​                .choice()
        ​​​​​​​​​​​​                    .when(body().contains("people"))
        ​​​​​​​​​​​​                        .to("mock:test")
        ​​​​​​​​​​​​                .endchoice()
        ​​​​​​​​​​​​            .endChoice()
        ​​​​​​​​​​​​        .otherwise()
        ​​​​​​​​​​​​            .to("mock:result");
        ​​​​​​​​​​​​    .end()
        ​​​​​​​​​​​​    .to("direct:b")
        
        • filter(expression 條件式篩選值)
        ​​​​​​​​​​​​from("direct:a")
        ​​​​​​​​​​​​    .filter(simple("${header.foo} == 'bar'"))
        ​​​​​​​​​​​​        .to("direct:bar")
        ​​​​​​​​​​​​    .end()
        ​​​​​​​​​​​​    .to("direct:b")
        
        • onException(): 特定exception類型要執行的動作,
    • component(endpointFactory)
      • 當CamelContext調用getEndpoint()時,根據Endpoint的URI前綴(schema)來尋找對應的Component再調用其方法
      • 提供一致的endpoint interface
      • 作為其他系統的connector
    • 其它components
      • simple(): 從架構層面抽出language作為Expression及predicate的工廠,免除再導入其它dependencies或語言的學習(如Groovy),simple是其中一個簡易的語言,內建許多變數像是exchange、body、header等,一般用來創建邏輯表達或一般表達式,在camel route使用少量scripting時適用,若是更為複雜的使用情境,則建議用Groovy, MVEL或OGNL
      ​​​​​​​​simple("${header.title} contains 'Camel' && ${header.type'} == 'gold'")
      ​​​​​​​​    
      ​​​​​​​​// 若header存在foo這個key, 則true, message會被轉導到下個路徑; 否則message is dropped
      ​​​​​​​​from("seda:orders")
      ​​​​​​​​    .filter().simple("${header.foo}")
      ​​​​​​​​        .to("seda:fooOrders");
      
  • 解決問題

    • 不同系統/數據來源的整合應用,如共享或交換資訊
    • 優化現有結構,使系統易於拓展或維護
    • 使系統獨立不互相干擾
  • camel的message system

    • message endpoint
    • message channel
    • message
    • message route
    • message translator
    • pipes and filters

:closed_book: 應用場景

  1. 消息匯聚: 匯聚不同server的數據,如ActiveMQ、RabbitMQ
  2. 消息分發: 將消息從生產者轉發給接收者,有squential跟parallel 2種分發方式
from("amqp:queue:order")
    .to("uri:validateBean", "uri:handleBean", "uri:emailBean");

from("amqp:queue:order")
    .multicast()
        .to("uri:validateBean", "uri:handleBean", "uri:emailBean");
  1. 消息轉換
from("amqp:queue:order")
    .process(new XmlToJsonProcessor())
        .to("bean:orderHandler");
  1. 規則引擎
<route>
    <from uri="amqp:queue:order"/>
    <multicast>
        <to uri="uri:validateBean"/>
        <to uri="uri:handleBean"/>
        <to uri="uri:emailBean"/>
    </multicast>
</route>

from("amqp:queue:order")
    .filter(header("foo")
    .isEqualTo("bar"))
    .choice()
    .when(xpath("/person/city = &#39;London&#39;"))
      .to("file:target/messages/uk")
    .otherwise()
      .to("file:target/messages/others");

:mag: Spring 實作

創建CamelContext

設定endpoints跟components

加上route規則

start啟動消息發送接收及處理

stop關閉所有的endpoint跟執行緒

1. 引進套件

    <!-- Dependency for camel -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-http4</artifactId>
    </dependency>

2. 利用camelContext建立producer template去拋message

@Service
public class ProcessServiceImpl implements ProcessService {
    @Autowired
    private CamelContext camelContext;

    FluentProducerTemplate producer = camelContext.createFluentProducerTemplate();
    
    try {
        // 拋出message之後, 取得回覆內容
        Exchange exchange = producer.to("direct:callEaiGW").withBody(request).send();
        response = exchange.getOut().getBody(CubResponse.class);
    } catch (Exception e){
        logger.info("[Log] Exception message -> {}\n", e.getMessage(), e);
    } finally {
        try {
            // 完成後, 顯式將此service停用
            producer.stop();
        } catch(Exception){
            logger.info("Close producer fail!", e);
        }
    }
}  

3. route extends RouteBuilder 建立consumer處理路徑

// 此為Java DSL方法,也可以用xml或yml定義路徑
// 這邊選用direct component
public class EAIRoute extends RouteBuilder {
    @Value("${eai.gw.url}")
    private String gwUrl;

    private String urlForGW;

    @Override
    public void configure() throws Exception {
        //Error Handle
        onException(Exception.class).process("ErrorHandleProcessor").handled(true);

        StringBuilder sb = new StringBuilder();
        sb.append(gwUrl).append("?bridgeEndpoint=true&clientConnectionManager=#connectionManager");
        urlForGW = sb.toString();

        // 由from接取producer傳來的message, 依序to到別的地方
       from("direct:callEaiGW").to("SetBodyProcessor").to(urlForGW).to("GetResultProcessor");
    }
}

4. processor implements Processor定義各系統的執行內容(&錯誤處理)

@Component("SetBodyProcessor")
public class SetBodyProcessor implements Processor {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Utils utils;

    @Override
    public void process(Exchange exchange) throws Exception {
        try {
            Message message = exchange.getIn();
            CubRequest<?> body = message.getBody(CubRequest.class);
            MwHeader mw = body.getMwHeader();

            exchange.setProperty("Header", message.getHeaders());
            exchange.setProperty("Request", body);

            String json = utils.transferObjToJson(body);
            logger.info("[Log] Send Request -> {}, {}, {}", mw.getMsgid(), mw.getTxnseq(), json);

            message.setHeader(Exchange.CONTENT_TYPE, "application/json;charset=UTF-8");
            message.setBody(json);
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        }
    }
}

REF

  1. Apache Camel的认识和应用详解
  2. Apache Camel user manual
  3. Apache Camel
  4. Using .when(), .end(), .endChoice() in Apache Camel with examples
  5. Camel end vs endChoice - not the usual query
  6. Package org.apache.camel