# Week 11 - Publish/Subscribe
###### tags: `WS2020-IN2259-DS`
## Introduction
### Definition
* Publish/Subscribe 是一種 **`messaging pattern`**,可以應用在「多對多 (**many-to-many**)」通訊。
* 資料來源端 (publishers) 負責出版 (publish **publications**),然後資料接收端 (subscribers) 進行訂閱 (subscribe via **subscriptions**) 並獲得通知 (**notifications**)。
* 例如對於感興趣的 publications,便會進行訂閱以獲得通知。
* 圖示:

### Pub/Sub API
* **`Publish(data, metadata)`**:
* Publication metadata **depends on the language** (cf. [Matching model](#Matching-Models))
* **`Subscribe(interest)`**:
* Subscription interest is defined **depending on the language** (cf. [Matching model](#Matching-Models))
* **`Unsubscribe(interest)`**:
* Remove a subscription
* `(Un-)Advertise(data type/domain)`:
* Publisher advertises type of content it will publish
### Pub/Sub System Design Dimensions
* **Matching-based** communication:
* 非 host-to-host communication。
* 基於 **`matching model`** 的 communication。
* Optimized for **scalability** and **performance**:
* 大量的 publishers 與 subscribers。
* 高出版率。
* **`Fast matching`**:通常為 **stateless**。
* 不會考慮序列或複合的 patterns。
* **`Simple matching`**:
* 通常只限於主題 (**topic**)。
* **Decoupling** properties (most popular):
* Time
* Space
* Synchronization
### Space Decoupling
* Clients (publishers & subscribers) 不需要保有 references 來認知彼此。
* Clients 有可能是實體分散的。
* 圖示:

### Time Decoupling
* Clients 不需要在同一時間在線。
* 圖示:

### Synchronization Decoupling
* ==Control flow is not blocked by the interaction.==
* 圖示:

### Why Decouple?
* 將出版與訂閱端點除耦合:
* 可以移除顯示相依關係。
* 減少協調作業 (coordination)。
* 減少同步作業 (synchronization)。
* 建立高度動態的系統 (highly dynamic systems)。
* 允許資料源與接收端自由來去。
* 支援 continuous operation。
* 啟用 event-based systems,增加 scalability。
* 使得對系統進行調適 (除錯) 更加困難。
* Makes debugging system more difficul
### Pub/Sub vs. Observer Design Pattern
* Observer design pattern 也稱作 Listener。
* **Observer design pattern 與 publish/subscribe 不同!**
* ODP 的 publisher (subject) 認識每一個 subscribers (observer),而且反之亦然。
* 圖示:

### Applications

### Enterprise Application Integration

### Computational Advertising: Filtering & Matching

### Pub/Sub Standards
* DCE Event Management Service (1990s)
* OMG Event Channel (1995)
* OMG Notification Service (1997)
* OMG Data Dissemination Service
* **MQ Telemetry Transport (1999)**

* Java Messaging Service (JMS) (1998, 2002)
* WS Eventing (2004)
* WS Notifications (2004)
* Active Message Queueing Protocol (AMQP)
* OGSI-Notification (6/2003) (Open Grid Services Architecture)
### Popular Systems in Industry

### Summary of Pub/Sub
* Pub/sub 在 publishers 與 subscribers 之間提供 **many-to-many** 通訊。
* 通訊至少在 **time**、**space**、**synchronization** 上做到「除耦合 (decoupled)」。
* 很多應用為 **event-based**,或是需要 **event notifications**,這些應用便會應用 pub/sub 設計。
* Pub/sub 有多種風格 (flavors),不同之處在於 **matching** 與 **routing** 的設計。
## Matching Models
### Matching & Filtering **Problem** in Pub/Sub
* **Matching problem**:
* 給定一個 publication $e$、一組 subscriptions $S$。
* 決定所有的 $s \in S$,與 $e$ 相符合。
* 圖示:

* 不同的 model:
* Topic-based filtering
* Content-based filtering
* Type-based filtering
* ...
### Matching Model
* 描述 subscriptions 與 publications 該**如何 (how) 表達 (express)**。
* 每一個 subscription 包含了 filters,以此決定是否與一個 publication 相符合。
* 當一個 publication 送出時,它將會與各個 subscription 進行比較,以檢視是否 **match**。
* 然後此 publication 將會被傳送至 matching 的 subscriber 處。
### Topic-Based Matching
* 又稱作 `subject-based filtering`、`group-based filtering` 或 `channel-based event filtering`。
* 然而,其中有一些細微的差異。
* **指令的改變**:
* `Publish(data, metadata)` → **`Publish(data, topic)`**
* `Subscribe(interest)` → **`Subscribe(topic)`**
* **Match** 建立在 **same topic** 的前提上,範例:
```c=
subscribe(“IBM”)
publish(“price:175.31”,“IBM”)
```
* Static publisher 與 subscriber 的關係:
* 於 publication 發布時,subscriber 的收件人集合 (**recipient set**) 會知道這個消息。
* 對於一個給定的 topic,除非 topics 或是 subscriptions 改變,否則每個 publish 該 topic 的訊息,都會被傳送至相同的 recipient set。
* Topics 可以階層式的組織管理 (**hierarchically organized**)。
* 如:sports/basketball/NBA
* 如:`publish` message to sports/basketball/NBA
* Topic subscriptions 可以使用「**通配符 (wildcards)**」。
* 如:`subscribe` to sports/basketball/*
### Content-Based Matching
* **`Subscription`**:
* Conjunction of **predicates**.
* 意思是由「屬性-運算子-值 (attribute-operator-value)」三個一組 (triple) 的組合。
* 例如:
```c=
(subject = news) ∧ (topic = travel) ∧ (date > 29.11.2011)
```
* **`Publication`** (a.k.a. **`event`**):
* 一組組由「屬性-值 (attribute-value)」所組成的 pairs。
* 例如:
```c=
(subject, news), (topic, travel), (date, 21.2.2011), ...
```
* **指令的改變**:
* `Publish(data, metadata)` → **`Publish(data, list(attribute-value))`**
* `Subscribe(interest)` → **`Subscribe(predicates)`**
* 當**所有的 predicates** 都滿足 (conjunction) 時才代表 **match**,例如:
```c=
subscribe(“stock = IBM”, “price > 175.0”, “date > 1-1-1970”)
// not match
publish(“stock = IBM”, “price = 175.31”, “date = 31-12-1969”)
// match
publish(“stock = IBM”, “price = 175.5”, “date = 1-1-2017”)
// not match
publish(“stock = IBM”, “price = 180.0”, “market = NYSE”)
// match
publish(“stock = IBM”, “price = 177.5”, “date = 1-1-2017”, “market = NYSE”)
```
* 關於其他內容的表格:

### Language Categories

### Content-Based Matching Algorithms
* 演算法範例:
* **Counting algorithm** [origins unknown to me; guess: 1970s]
* *Clustering algorithm [Fabret, Jacobsen et al., 2001]*
* 上述兩者都是「**兩階段 (two-phased)**」的 matching algorithms
1. **`Predicate matching phase`**:Match 所有的 predicates。
2. **`Subscriptions matching phase`**:將 subscriptions 與第一階段的結果進行 match。
### Two-Phased Matching
* **Decompose** subscriptions into predicates
$$
\begin{eqnarray*}
S_1 &=& \underbrace{(\mbox{subject}=\mbox{news})}_{P_1}\, \land \underbrace{(\mbox{topic}=\mbox{travel})}_{P_2}\, \land \underbrace{(\mbox{date}>\mbox{29.11.2011})}_{P_3} \\
S_2 &=& \underbrace{(\mbox{subject}=\mbox{news})}_{P_1}\, \land \underbrace{(\mbox{topic}=\mbox{stock})}_{P_4}\, \land \underbrace{(\mbox{date}>\mbox{30.11.2011})}_{P_5} \\
\end{eqnarray*}
$$
* **Convert** subscriptions to reference predicates
$$
\begin{eqnarray*}
S_1 &:=& P_1\, \land P_2\, \land P_3 \\
S_2 &:=& P_1\, \land P_4\, \land P_5 \\
\end{eqnarray*}
$$
* 將 subscriptions 剖析成一個個的 predicates,然後儲存並個別比較。
> **Break down** subscriptions into predicates, **store** and **match** them **individually**
### PHASE 1: PREDICATE MATCHING
#### Predicate Matching Problem
* Given a set $P$ of predicates and a publication $e$, **identify all predicates $p$ of $P$ which evaluate to true under $e$**.
* Example:

#### **Predicate Matching**: Top-level Data Structure
* **Hash table** on attribute name
* 圖示:

#### **Predicate Index**: General Purpose Data Structure
* 每個 operator 有一條 ordered linked list。
* Insert、delete、match 等指令都是 $O(n)\mbox{-operations}$ 的複雜度。
* 針對每個 $e$ 裡的屬性名稱 (attribute name) 與每一個運算子 (operator)。
* 如:「price」與「=、<、>、!=」
* 圖示:

* 替代上,可以使用 `B-tree` 或 `B+ tree`。
#### Finite Predicate Value Domain Types
* 有限的值域。
* **Countable** domain types with small cardinality (小基數)
* Integer intervals
* Collections (enums)
* Set of tags (e.g., in XML)
* Examples
* Price : [0, 1000], models variety of prices
* Color, city, state, country, size, weight
* All tags defined in a given XML schema
* Predicate domain often **context dependant**, but **limited in size**
* Prices of cars vs. prices of groceries
#### Predicate Matching For Finite Domains
* 因為值域限制於一個相對小的基數 (cardinality),因此用陣列 (表格) 就可以表示:

#### Predicate Matching Symmetries
* 利用運算的對稱性,看來有些空間可以省:

### PHASE 2: SUBSCRIPTION MATCHING
#### Subscription Matching: Counting Algorithm
* Subscriptions consist of **a set of predicates**:
$$
\begin{eqnarray*}
S_1 &=& \underbrace{(2< A<4)}_{P_1}\, \land \underbrace{(B=6)}_{P_2}\, \land \underbrace{(C >4)}_{P_3} \\
S_2 &=& \underbrace{(2< A<4)}_{P_1}\, \land \underbrace{(C=3)}_{P_4} \\
\end{eqnarray*}
$$
* 所有的 predicates 都滿足,才算是 match。
* **Idea**:
* 計算每個 subscription 滿足的 predicates 數量,再與此 subscription 總共的 predicates 數量進行比較 (一定要一樣才 match)。
#### Counting Algorithm: Subscription Insertion
* 首先於 `Indexes` 標記出符合 subscription 的 predicates。
* 再於 `Predicate vector` 處,將相對應此 predicates 的 subscriptions 插入至 `predicate-to-subscription association` 的陣列中。
* 最後於 `preds-per-sub` 處進行各個 subscription 總數量的累加。
* 圖示:

#### Counting Algorithm: Event Matching
* 首先於 `Indexes` 標記出符合 publication 的 predicates。
* 再於 `Predicate vector` 處,找到相對應此 predicates 的 subscriptions。
* 最後於 `COUNT` 處進行累加並與 `TOTAL NUMBER` 進行比對。
* 圖示:

### Matching Models Summary
* **Matching model**:
* Matching model 是為了進行 match,設計以用於表達 pubs 與 subs 的語言。
* **Matching problem**:
Given a publication, $p$, and a set of subscriptions, $S$, determine all subscriptions, $s \in S$, that match $e$.
* **Topic-based** vs. **content-based** matching
* **Tradeoff**: computation vs. communication 開銷。
* Topic-based:
* 可以使用 hash table,較少的 computation。
* 沒有辦法味 subscriber 進行屬性的 filtering,因此在 communication 需要更多開銷。
* content-based:
* matching 可以很有效率,但是需要較高的 computation。
* 可以允許 subscriber 做出 fie grained filter,因此有較佳 communication 效率。
* **Two-phase matching** 是 content-based model 中可以快速搜尋 matching 的高效技術。
* **Counting** algorithm
## Routing Model
### Pub/Sub Architecture
1. **Pub/Sub Middleware (中間層)**
* 由一個專門的 (dedicated) **`pub/submiddleware`** 組成。
* 負責接收所有的 operations。
* 負責儲存 subscriptions。
* 執行 **publication matching and dissemination**。
2. **Infrastructure-less**
* 沒有專門的 (dedicated) middleware。
* 由 clients (publishers and subscribers) 執行所有工作 (tasks)。
* 為 **peer-to-peer model**。
### Centralized vs. Distributed
* Centralized
* A single pub/sub entity
* Client-server interactions
* CORBA, JMS, ...
* **Decentralized**
* Matching and dissemination tasks **distributed**
* Either, multiple pub/sub entities (called **brokers**)
* Or, P2P model without brokers (e.g., [DHT](https://hackmd.io/kPu3zbJXQYuWVEp86Qd57A?view#P2P-System---DHT))
### Rendezvous-based Routing
* 對「出版空間 (publication space)」作 partition。
* 每一個 node 各自負責一個 partition。
* 歸屬於一個 partition 的 publications 與 subscriptions 將會被傳送至對應的 node。
* 範例:
* P2P Pub/Sub: Scribe (2001)
* Infrastructure-less、distributed。
* 建立於 P2P network 之上的 pub/sub network。
* 基於 structured P2P DHT ([Pastry](https://hackmd.io/kPu3zbJXQYuWVEp86Qd57A?view#P2P-System---Pastry-2001))。
* 在 Pastry的基礎上,支援 topic-based pub/sub API。
* 將 topics 雜湊並置入 DHT 中。
* Scribe Routing 圖示:

### Overlay-Based Routing
#### Overlay Broker Networks
* 基於覆蓋連結 (overlay links) 的訊息流 (messages flow)。
* Clients 連結至任意 edge [broker](#Centralized-vs-Distributed)。
* Matching 與 dissemination 都在 overlay 之中執行。
* 圖示:

#### 範例 1:RDV-based Overlay: Hermes (2004)
* 支援 **content-based** matching,而且可以廣告 (advertisements)。
* 其中一個 broker 為 **rendezvous point**。
* 每個 broker 都知道要怎麼抵達 rendezvous point。
* Rendezvous point 執行:**matching**、**subscriber notification**。
* 圖示 (其中 A、B、C、D 只是 forwarder):

* Possible Optimizations (可行的優化方案)
* 背景:只有一個 rendezvous point 不夠,我還要更多。
* 使用 space partitioning。
* 但是會增加 forwarding brokers 的狀態複雜度 (state complexity)。
* 方法:**Bloom filters** using Link IDs
* 可以減少 forwarding brokers 的狀態複雜度。
* 但是會引入 false positive dissemination (假陽性的分送)。
* 因此會增加 RP 的處理時間。
#### Bloom Filter
* 用以表示「一元素是否位於集合內」的高效率資料結構。
* 用一個**固定大小**的 **bit vector** 表示。
* 特性:
* 所有 set 中元素可以被插入 BF 中 (the set)。
* 可以測試一個元素是否位於 set 中。
* 但因為演算法限制,因此無法獲得位於 BF 中的所有元素清單 (只能測試!)。
* **原理**:
* 對一個 value 進行多次 hash,每 hash 一次就標記相對應的 vector 空間。
* 不同 value 進行 hash 出來的多次結果有很高的機率會不同,就算重疊也就僅僅表示兩個 values 在測試時都是陽性。
* 唯一要小心的是如果針對太多 values 進行 hash,導致 vector 幾乎全部都被標記,此時不管怎麼測都是假陽性。
* 演算法的誤差與討論:
* **假陽性 (false positives)**:有元素不在 set 中,但測試出來 BF 表示有。
* **假陰性 (false negatives)**:在 set 中的元素卻沒在 BF 測到,不可能發生。
* **Bloom Filter Operations**:

* **Example of Bloom Filter**:

#### 範例 2:Link IDs encoding in Bloom Filters (LIPSIN 2009)
* BF 內置入「前往 RP 的 edges (set)」。
* 當 forwarding node B 進行 forwarding 前,檢視連接自己的 edges 經 BF 測試後是否為「前往 RP 的 edges (set)」。
* 圖示:

### Filtering-Based Routing
* 行為:
1. 每一個 broker 都執行 filtering。
2. 基於 filtering,每個 broker 向 matching subs 分送 publications。
3. 下一個跳躍點 (hops) 重複上述行為,直到抵達 subs 為止。
* 特色:
* 支援任何 matching model。
* 如:content-based。
* Flood subs 以初始化 routing paths。
* 優化:
* **Subscription covering**
* **Advertising-based routing**
#### Subscription Flooding
* 一路傳送,建立一個樹狀的 **Publication Routing Table (PRT)**。
* 圖示:

#### Subscription Covering
* 首先 node b 進行 `Subscription Flooding`。
* 因為「$S_1$ covers $S_2$」的關係,因此於 $S_2$ flood 至 broker 2 時不需要繼續 forward 至 broker 3。
* 因為只要是從 broker 2 出發的 routing,所有符合 $S_1$ (也包含了 $S_2$) 的 publication 最終都會回到 broker 2。
* 但是要繼續往 $S_1$ 的源頭走,走到 broker 1 後操作與上述相同,不需要再走到 broker 4。
* 假設從 broker 4 來的 publication 符合 $S_1$,會傳送至 broker 1,然後再分別傳到 $S_1$ 與 $S_2$ 的源頭。
* 圖示:

#### Notification Delivery
* 一個 publisher 向 broker 7 出版了 `price = 899`,檢查到 $S_1$ 符合。
* 到 broker 2 時,檢查到 $S_2$ 也符合。
* 最後個別走至 node a 與 node b。
* 圖示:

#### Advertisement-Based Routing
* 不同於上述使用 subscriptions 來 flood 整個網路建立 Publication Routing Table (PRT)。
* 此處 flood **advertisements**,並建立 **Subscription Routing Table (SRT)**。
* 缺點:沒有 subscription covering,所以可能會有 redundant matching。
* 圖示:

### Routing Algorithms Summary
* **Rendezvous-based**
* **Partition-based**: Simple, requires master
* **DHT-based**: log(n) hops, structured network
* **Overlay-based**
* **Rendezvous-based**: Efficient matching, inefficient communication, RPs are bottlenecks
* **Filtering-based**: Computation intensive, efficient communication
* **Advertisement flooding**: Useful if the number of publishers is much smaller than subscribers