# Week 11 - Publish/Subscribe ###### tags: `WS2020-IN2259-DS` ## Introduction ### Definition * Publish/Subscribe 是一種 **`messaging pattern`**,可以應用在「多對多 (**many-to-many**)」通訊。 * 資料來源端 (publishers) 負責出版 (publish **publications**),然後資料接收端 (subscribers) 進行訂閱 (subscribe via **subscriptions**) 並獲得通知 (**notifications**)。 * 例如對於感興趣的 publications,便會進行訂閱以獲得通知。 * 圖示: ![](https://i.imgur.com/HtjOGR6.png) ### Pub/Sub API * **`Publish(data, metadata)`**: * Publication metadata **depends on the language** (cf. [Matching model](#Matching-Models)) * **`Subscribe(interest)`**: * Subscription interest is defined **depending on the language** (cf. [Matching model](#Matching-Models)) * **`Unsubscribe(interest)`**: * Remove a subscription * `(Un-)Advertise(data type/domain)`: * Publisher advertises type of content it will publish ### Pub/Sub System Design Dimensions * **Matching-based** communication: * 非 host-to-host communication。 * 基於 **`matching model`** 的 communication。 * Optimized for **scalability** and **performance**: * 大量的 publishers 與 subscribers。 * 高出版率。 * **`Fast matching`**:通常為 **stateless**。 * 不會考慮序列或複合的 patterns。 * **`Simple matching`**: * 通常只限於主題 (**topic**)。 * **Decoupling** properties (most popular): * Time * Space * Synchronization ### Space Decoupling * Clients (publishers & subscribers) 不需要保有 references 來認知彼此。 * Clients 有可能是實體分散的。 * 圖示: ![](https://i.imgur.com/pIz4XvH.png =500x) ### Time Decoupling * Clients 不需要在同一時間在線。 * 圖示: ![](https://i.imgur.com/xwBFEcq.png =500x) ### Synchronization Decoupling * ==Control flow is not blocked by the interaction.== * 圖示: ![](https://i.imgur.com/1hVSZRb.png =500x) ### Why Decouple? * 將出版與訂閱端點除耦合: * 可以移除顯示相依關係。 * 減少協調作業 (coordination)。 * 減少同步作業 (synchronization)。 * 建立高度動態的系統 (highly dynamic systems)。 * 允許資料源與接收端自由來去。 * 支援 continuous operation。 * 啟用 event-based systems,增加 scalability。 * 使得對系統進行調適 (除錯) 更加困難。 * Makes debugging system more difficul ### Pub/Sub vs. Observer Design Pattern * Observer design pattern 也稱作 Listener。 * **Observer design pattern 與 publish/subscribe 不同!** * ODP 的 publisher (subject) 認識每一個 subscribers (observer),而且反之亦然。 * 圖示: ![](https://i.imgur.com/kWUu3FQ.png =600x) ### Applications ![](https://i.imgur.com/q1seIkh.png =600x) ### Enterprise Application Integration ![](https://i.imgur.com/w9uasBM.png =600x) ### Computational Advertising: Filtering & Matching ![](https://i.imgur.com/b6cADoZ.png =500x) ### Pub/Sub Standards * DCE Event Management Service (1990s) * OMG Event Channel (1995) * OMG Notification Service (1997) * OMG Data Dissemination Service * **MQ Telemetry Transport (1999)** ![](https://i.imgur.com/Yl00lSO.png) * Java Messaging Service (JMS) (1998, 2002) * WS Eventing (2004) * WS Notifications (2004) * Active Message Queueing Protocol (AMQP) * OGSI-Notification (6/2003) (Open Grid Services Architecture) ### Popular Systems in Industry ![](https://i.imgur.com/McWGRqN.png) ### Summary of Pub/Sub * Pub/sub 在 publishers 與 subscribers 之間提供 **many-to-many** 通訊。 * 通訊至少在 **time**、**space**、**synchronization** 上做到「除耦合 (decoupled)」。 * 很多應用為 **event-based**,或是需要 **event notifications**,這些應用便會應用 pub/sub 設計。 * Pub/sub 有多種風格 (flavors),不同之處在於 **matching** 與 **routing** 的設計。 ## Matching Models ### Matching & Filtering **Problem** in Pub/Sub * **Matching problem**: * 給定一個 publication $e$、一組 subscriptions $S$。 * 決定所有的 $s \in S$,與 $e$ 相符合。 * 圖示: ![](https://i.imgur.com/AUneJlL.png =150x) * 不同的 model: * Topic-based filtering * Content-based filtering * Type-based filtering * ... ### Matching Model * 描述 subscriptions 與 publications 該**如何 (how) 表達 (express)**。 * 每一個 subscription 包含了 filters,以此決定是否與一個 publication 相符合。 * 當一個 publication 送出時,它將會與各個 subscription 進行比較,以檢視是否 **match**。 * 然後此 publication 將會被傳送至 matching 的 subscriber 處。 ### Topic-Based Matching * 又稱作 `subject-based filtering`、`group-based filtering` 或 `channel-based event filtering`。 * 然而,其中有一些細微的差異。 * **指令的改變**: * `Publish(data, metadata)` → **`Publish(data, topic)`** * `Subscribe(interest)` → **`Subscribe(topic)`** * **Match** 建立在 **same topic** 的前提上,範例: ```c= subscribe(“IBM”) publish(“price:175.31”,“IBM”) ``` * Static publisher 與 subscriber 的關係: * 於 publication 發布時,subscriber 的收件人集合 (**recipient set**) 會知道這個消息。 * 對於一個給定的 topic,除非 topics 或是 subscriptions 改變,否則每個 publish 該 topic 的訊息,都會被傳送至相同的 recipient set。 * Topics 可以階層式的組織管理 (**hierarchically organized**)。 * 如:sports/basketball/NBA * 如:`publish` message to sports/basketball/NBA * Topic subscriptions 可以使用「**通配符 (wildcards)**」。 * 如:`subscribe` to sports/basketball/* ### Content-Based Matching * **`Subscription`**: * Conjunction of **predicates**. * 意思是由「屬性-運算子-值 (attribute-operator-value)」三個一組 (triple) 的組合。 * 例如: ```c= (subject = news) ∧ (topic = travel) ∧ (date > 29.11.2011) ``` * **`Publication`** (a.k.a. **`event`**): * 一組組由「屬性-值 (attribute-value)」所組成的 pairs。 * 例如: ```c= (subject, news), (topic, travel), (date, 21.2.2011), ... ``` * **指令的改變**: * `Publish(data, metadata)` → **`Publish(data, list(attribute-value))`** * `Subscribe(interest)` → **`Subscribe(predicates)`** * 當**所有的 predicates** 都滿足 (conjunction) 時才代表 **match**,例如: ```c= subscribe(“stock = IBM”, “price > 175.0”, “date > 1-1-1970”) // not match publish(“stock = IBM”, “price = 175.31”, “date = 31-12-1969”) // match publish(“stock = IBM”, “price = 175.5”, “date = 1-1-2017”) // not match publish(“stock = IBM”, “price = 180.0”, “market = NYSE”) // match publish(“stock = IBM”, “price = 177.5”, “date = 1-1-2017”, “market = NYSE”) ``` * 關於其他內容的表格: ![](https://i.imgur.com/fKqLVJC.png) ### Language Categories ![](https://i.imgur.com/MNIao5m.png =600x) ### Content-Based Matching Algorithms * 演算法範例: * **Counting algorithm** [origins unknown to me; guess: 1970s] * *Clustering algorithm [Fabret, Jacobsen et al., 2001]* * 上述兩者都是「**兩階段 (two-phased)**」的 matching algorithms 1. **`Predicate matching phase`**:Match 所有的 predicates。 2. **`Subscriptions matching phase`**:將 subscriptions 與第一階段的結果進行 match。 ### Two-Phased Matching * **Decompose** subscriptions into predicates $$ \begin{eqnarray*} S_1 &=& \underbrace{(\mbox{subject}=\mbox{news})}_{P_1}\, \land \underbrace{(\mbox{topic}=\mbox{travel})}_{P_2}\, \land \underbrace{(\mbox{date}>\mbox{29.11.2011})}_{P_3} \\ S_2 &=& \underbrace{(\mbox{subject}=\mbox{news})}_{P_1}\, \land \underbrace{(\mbox{topic}=\mbox{stock})}_{P_4}\, \land \underbrace{(\mbox{date}>\mbox{30.11.2011})}_{P_5} \\ \end{eqnarray*} $$ * **Convert** subscriptions to reference predicates $$ \begin{eqnarray*} S_1 &:=& P_1\, \land P_2\, \land P_3 \\ S_2 &:=& P_1\, \land P_4\, \land P_5 \\ \end{eqnarray*} $$ * 將 subscriptions 剖析成一個個的 predicates,然後儲存並個別比較。 > **Break down** subscriptions into predicates, **store** and **match** them **individually** ### PHASE 1: PREDICATE MATCHING #### Predicate Matching Problem * Given a set $P$ of predicates and a publication $e$, **identify all predicates $p$ of $P$ which evaluate to true under $e$**. * Example: ![](https://i.imgur.com/eg8GuhC.png =500x) #### **Predicate Matching**: Top-level Data Structure * **Hash table** on attribute name * 圖示: ![](https://i.imgur.com/IVaveBi.png) #### **Predicate Index**: General Purpose Data Structure * 每個 operator 有一條 ordered linked list。 * Insert、delete、match 等指令都是 $O(n)\mbox{-operations}$ 的複雜度。 * 針對每個 $e$ 裡的屬性名稱 (attribute name) 與每一個運算子 (operator)。 * 如:「price」與「=、<、>、!=」 * 圖示: ![](https://i.imgur.com/QUOOeVE.png) * 替代上,可以使用 `B-tree` 或 `B+ tree`。 #### Finite Predicate Value Domain Types * 有限的值域。 * **Countable** domain types with small cardinality (小基數) * Integer intervals * Collections (enums) * Set of tags (e.g., in XML) * Examples * Price : [0, 1000], models variety of prices * Color, city, state, country, size, weight * All tags defined in a given XML schema * Predicate domain often **context dependant**, but **limited in size** * Prices of cars vs. prices of groceries #### Predicate Matching For Finite Domains * 因為值域限制於一個相對小的基數 (cardinality),因此用陣列 (表格) 就可以表示: ![](https://i.imgur.com/ubCKN2Q.png) #### Predicate Matching Symmetries * 利用運算的對稱性,看來有些空間可以省: ![](https://i.imgur.com/oKpr5Ok.png) ### PHASE 2: SUBSCRIPTION MATCHING #### Subscription Matching: Counting Algorithm * Subscriptions consist of **a set of predicates**: $$ \begin{eqnarray*} S_1 &=& \underbrace{(2< A<4)}_{P_1}\, \land \underbrace{(B=6)}_{P_2}\, \land \underbrace{(C >4)}_{P_3} \\ S_2 &=& \underbrace{(2< A<4)}_{P_1}\, \land \underbrace{(C=3)}_{P_4} \\ \end{eqnarray*} $$ * 所有的 predicates 都滿足,才算是 match。 * **Idea**: * 計算每個 subscription 滿足的 predicates 數量,再與此 subscription 總共的 predicates 數量進行比較 (一定要一樣才 match)。 #### Counting Algorithm: Subscription Insertion * 首先於 `Indexes` 標記出符合 subscription 的 predicates。 * 再於 `Predicate vector` 處,將相對應此 predicates 的 subscriptions 插入至 `predicate-to-subscription association` 的陣列中。 * 最後於 `preds-per-sub` 處進行各個 subscription 總數量的累加。 * 圖示: ![](https://i.imgur.com/8sD4dvf.png) #### Counting Algorithm: Event Matching * 首先於 `Indexes` 標記出符合 publication 的 predicates。 * 再於 `Predicate vector` 處,找到相對應此 predicates 的 subscriptions。 * 最後於 `COUNT` 處進行累加並與 `TOTAL NUMBER` 進行比對。 * 圖示: ![](https://i.imgur.com/CGCd2rx.png) ### Matching Models Summary * **Matching model**: * Matching model 是為了進行 match,設計以用於表達 pubs 與 subs 的語言。 * **Matching problem**: Given a publication, $p$, and a set of subscriptions, $S$, determine all subscriptions, $s \in S$, that match $e$. * **Topic-based** vs. **content-based** matching * **Tradeoff**: computation vs. communication 開銷。 * Topic-based: * 可以使用 hash table,較少的 computation。 * 沒有辦法味 subscriber 進行屬性的 filtering,因此在 communication 需要更多開銷。 * content-based: * matching 可以很有效率,但是需要較高的 computation。 * 可以允許 subscriber 做出 fie grained filter,因此有較佳 communication 效率。 * **Two-phase matching** 是 content-based model 中可以快速搜尋 matching 的高效技術。 * **Counting** algorithm ## Routing Model ### Pub/Sub Architecture 1. **Pub/Sub Middleware (中間層)** * 由一個專門的 (dedicated) **`pub/submiddleware`** 組成。 * 負責接收所有的 operations。 * 負責儲存 subscriptions。 * 執行 **publication matching and dissemination**。 2. **Infrastructure-less** * 沒有專門的 (dedicated) middleware。 * 由 clients (publishers and subscribers) 執行所有工作 (tasks)。 * 為 **peer-to-peer model**。 ### Centralized vs. Distributed * Centralized * A single pub/sub entity * Client-server interactions * CORBA, JMS, ... * **Decentralized** * Matching and dissemination tasks **distributed** * Either, multiple pub/sub entities (called **brokers**) * Or, P2P model without brokers (e.g., [DHT](https://hackmd.io/kPu3zbJXQYuWVEp86Qd57A?view#P2P-System---DHT)) ### Rendezvous-based Routing * 對「出版空間 (publication space)」作 partition。 * 每一個 node 各自負責一個 partition。 * 歸屬於一個 partition 的 publications 與 subscriptions 將會被傳送至對應的 node。 * 範例: * P2P Pub/Sub: Scribe (2001) * Infrastructure-less、distributed。 * 建立於 P2P network 之上的 pub/sub network。 * 基於 structured P2P DHT ([Pastry](https://hackmd.io/kPu3zbJXQYuWVEp86Qd57A?view#P2P-System---Pastry-2001))。 * 在 Pastry的基礎上,支援 topic-based pub/sub API。 * 將 topics 雜湊並置入 DHT 中。 * Scribe Routing 圖示: ![](https://i.imgur.com/DUHlVqI.png) ### Overlay-Based Routing #### Overlay Broker Networks * 基於覆蓋連結 (overlay links) 的訊息流 (messages flow)。 * Clients 連結至任意 edge [broker](#Centralized-vs-Distributed)。 * Matching 與 dissemination 都在 overlay 之中執行。 * 圖示: ![](https://i.imgur.com/MQnEv3m.png) #### 範例 1:RDV-based Overlay: Hermes (2004) * 支援 **content-based** matching,而且可以廣告 (advertisements)。 * 其中一個 broker 為 **rendezvous point**。 * 每個 broker 都知道要怎麼抵達 rendezvous point。 * Rendezvous point 執行:**matching**、**subscriber notification**。 * 圖示 (其中 A、B、C、D 只是 forwarder): ![](https://i.imgur.com/CnGK1w1.png =500x) * Possible Optimizations (可行的優化方案) * 背景:只有一個 rendezvous point 不夠,我還要更多。 * 使用 space partitioning。 * 但是會增加 forwarding brokers 的狀態複雜度 (state complexity)。 * 方法:**Bloom filters** using Link IDs * 可以減少 forwarding brokers 的狀態複雜度。 * 但是會引入 false positive dissemination (假陽性的分送)。 * 因此會增加 RP 的處理時間。 #### Bloom Filter * 用以表示「一元素是否位於集合內」的高效率資料結構。 * 用一個**固定大小**的 **bit vector** 表示。 * 特性: * 所有 set 中元素可以被插入 BF 中 (the set)。 * 可以測試一個元素是否位於 set 中。 * 但因為演算法限制,因此無法獲得位於 BF 中的所有元素清單 (只能測試!)。 * **原理**: * 對一個 value 進行多次 hash,每 hash 一次就標記相對應的 vector 空間。 * 不同 value 進行 hash 出來的多次結果有很高的機率會不同,就算重疊也就僅僅表示兩個 values 在測試時都是陽性。 * 唯一要小心的是如果針對太多 values 進行 hash,導致 vector 幾乎全部都被標記,此時不管怎麼測都是假陽性。 * 演算法的誤差與討論: * **假陽性 (false positives)**:有元素不在 set 中,但測試出來 BF 表示有。 * **假陰性 (false negatives)**:在 set 中的元素卻沒在 BF 測到,不可能發生。 * **Bloom Filter Operations**: ![](https://i.imgur.com/AoGblDY.png =500x) * **Example of Bloom Filter**: ![](https://i.imgur.com/67Xo3YO.png =500x) #### 範例 2:Link IDs encoding in Bloom Filters (LIPSIN 2009) * BF 內置入「前往 RP 的 edges (set)」。 * 當 forwarding node B 進行 forwarding 前,檢視連接自己的 edges 經 BF 測試後是否為「前往 RP 的 edges (set)」。 * 圖示: ![](https://i.imgur.com/bMvBbeh.png =500x) ### Filtering-Based Routing * 行為: 1. 每一個 broker 都執行 filtering。 2. 基於 filtering,每個 broker 向 matching subs 分送 publications。 3. 下一個跳躍點 (hops) 重複上述行為,直到抵達 subs 為止。 * 特色: * 支援任何 matching model。 * 如:content-based。 * Flood subs 以初始化 routing paths。 * 優化: * **Subscription covering** * **Advertising-based routing** #### Subscription Flooding * 一路傳送,建立一個樹狀的 **Publication Routing Table (PRT)**。 * 圖示: ![](https://i.imgur.com/Qb4KSdA.png =500x) #### Subscription Covering * 首先 node b 進行 `Subscription Flooding`。 * 因為「$S_1$ covers $S_2$」的關係,因此於 $S_2$ flood 至 broker 2 時不需要繼續 forward 至 broker 3。 * 因為只要是從 broker 2 出發的 routing,所有符合 $S_1$ (也包含了 $S_2$) 的 publication 最終都會回到 broker 2。 * 但是要繼續往 $S_1$ 的源頭走,走到 broker 1 後操作與上述相同,不需要再走到 broker 4。 * 假設從 broker 4 來的 publication 符合 $S_1$,會傳送至 broker 1,然後再分別傳到 $S_1$ 與 $S_2$ 的源頭。 * 圖示: ![](https://i.imgur.com/VCpdpQx.png =500x) #### Notification Delivery * 一個 publisher 向 broker 7 出版了 `price = 899`,檢查到 $S_1$ 符合。 * 到 broker 2 時,檢查到 $S_2$ 也符合。 * 最後個別走至 node a 與 node b。 * 圖示: ![](https://i.imgur.com/C2HgDvR.png =500x) #### Advertisement-Based Routing * 不同於上述使用 subscriptions 來 flood 整個網路建立 Publication Routing Table (PRT)。 * 此處 flood **advertisements**,並建立 **Subscription Routing Table (SRT)**。 * 缺點:沒有 subscription covering,所以可能會有 redundant matching。 * 圖示: ![](https://i.imgur.com/RPYsd2t.png =500x) ### Routing Algorithms Summary * **Rendezvous-based** * **Partition-based**: Simple, requires master * **DHT-based**: log(n) hops, structured network * **Overlay-based** * **Rendezvous-based**: Efficient matching, inefficient communication, RPs are bottlenecks * **Filtering-based**: Computation intensive, efficient communication * **Advertisement flooding**: Useful if the number of publishers is much smaller than subscribers