Reliable Publish-Subscribe Service - SDLE Project 2021/2022 == This project was developed by T4-G17 from the MEIC course in FEUP: Carlos Lousada (up201806302@up.pt) José Maçães (up201806622@up.pt) Mariana Ramos (up201806869@up.pt) Tomás Mendes (up201806522@up.pt) ## Overview and project goals The main goal of this report is to describe the design and implementation of our Reliable Publish-Subscribe Service. All the required operations are supported by our service: - [X] **put()** - A Publisher publishes a message on a topic - ```java -jar TestApp.jar <pubId> put <topic> <message>``` - [X] **get()** - A Subscriber consumes a message from a topic -```java -jar TestApp.jar <subId> get <topic>``` - [X] **subscribe()** - A Subscriber subscribes to a topic -```java -jar TestApp.jar <subId> subscribe <topic>``` - [X] **unsubscribe()** - A Subscriber unsubscribes from a topic - ```java -jar TestApp.jar <subId> unsubscribe <topic>``` Also implemented are operations that allow us to run the different parts of our service, as well as a new operation to see all the topics a subscriber has subscribed to: - [X] Starting a **Subscriber** - ```java -jar Client.jar sub <subId>``` - [X] Starting a **Publisher** - ```java -jar Client.jar pub <pubId>``` - [X] Starting the **Proxy** - ```java -jar Proxy.jar ``` - [X] List of **Topics** - ```java -jar Client.jar <subId> topics ``` Furthermore, **"Exactly-once"** delivery is guaranteed in the presence of communication failures or process crashes, using the Request-Reply pattern to retransmit data. We also handled the multiple sockets using the **zmq_poller()** and made sure to register which messages the subscribers had already received, avoiding duplicated data. Lastly, we implemented persistent **storage** that allows the program to work correctly even if the intermediary server (proxy) fails. All of these features are explained in detail in the following sections. ## ZeroMQ #### Pub-Sub Network with a Proxy - XPUB/XSUB One of the requirements of this project was to use the **ZeroMQ** library, which already implements a quality solution for the Publish-Subscribe problem, connecting a set of publishers to a set of subscribers and facilitating data distribution. In order to allow multiple publishers, we used a pattern that included an intermediary: a static point in the network to which all other nodes connect. That being said, this was our base architecture: ![](https://i.imgur.com/FZOUWdJ.png) However, this does not guarantee **exactly-once** delivery. #### Request-Reply pattern - REQ/REP In the presence of communication failures or process crashes, there was no guarantee that the messages were already delivered or would eventually be delivered. By implementing the **Request-Reply** pattern, we opened new connections that allowed acknowledgement messages to be exchanged between the server and its clients. #### ZMQ_Poll In order to read from multiple sockets all at once and deal with the blocking aspect of these patterns, we used **zmq_poll()**. This way, we poll for activity on every socket. We also use a **periodical handler** for message synchronization (especially in REQ/REP communication), implementing timeouts (default 5000ms), and number of tries (default 3 tries). ## Architecture design and implementation aspects The image below summarises the architecture implemented. ![](https://i.imgur.com/1Mrz70S.png) #### Proxy and Connections The proxy is an intermediary that allows for several Publishers and Subscribers to communicate. It stores all the information using a nested class "Storage". This class contains two ConcurrentHashMaps. One "topics", that has contains all the topics and corresponding messages, and one "mapping", with the subscribers' ids and an index indicating their read status. As described in the previous section, the Proxy implements two main sockets: the XSUB, to receive messages from the publishers (put operation) and XPUB to forward those messages to the Subscribers (get operation) according to their subscriptions. The server (proxy) starts by binding every used socket to its respective address. Afterward, the clients (publishers and subscribers) can start connecting to the already bound sockets. Each service then creates a ZMQ-poller, allowing them to read data from multiple sockets all at once in an efficient manner. Additionally, we also implemented three Request/Reply sockets in the Proxy that are mainly used for confirmation messages (crash detection) and data retransmission (subscribed topics). The socket in port 5501 is used by the Subscribers to confirm to the Proxy that they received the message requested by a get operation correctly. On the other hand, the socket in port 5502 allows the Proxy to confirm to the Publishers that it processed their messages correctly after a put operation, and will deliver them to the clients that are subscribed to that topic. Finally, the socket in port 5503 was created when the need to restore previously subscribed topics arose. This way, when a subscriber crashes or shuts down, if it ends up being restored, it will send a request asking the server to send the topics that it had previously subscribed to. These messages are explained in detail in the Communication Protocols section. #### Publisher and Subscriber The Publisher contains one *PUB* socket, to communicate with the Proxy, more precisely to send the **PUT** message and publish new messages. The *REQ* socket, as already described, is used for dealing with acknowledgment messages. The subscriber has two lists. One for all the subscribed topics and one for the identifiers of the messages already received. Similar to the Publisher, it uses the *SUB* socket, as part of the basic PUB/SUB implementation to **SUBSCRIBE** and **UNSUBSCRIBE** to topics as well as **GET** messages from subscribed topics. The *REP* socket is used to deal with acknowledgment messages and, finally, the *REQ* socket, is used to request the proxy for previously subscribed topics in the eventuality of a crash. ## Communication Protocols The libzmq core library has already implemented two APIs to deal with message exchanges. We used the ZFrame class that provides methods to process messages sent by a *SUB*.subscribe and *SUB*.unsubscribe, such as ZFrame.recvFrame() and frame.getData(). A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code. All the data is processed in Strings that are converted to byte arrays. #### Communication between Publisher and Proxy The communication between the Publisher and the Proxy allows for publishers to publish - **put()** - messages on topics. First, the publisher sends the message frame "<topic>//<message>" to the proxy using the PUB Socket. Afterward, it sends another message "<pubId>", through the REQ socket, asking for a confirmation that the message was added correctly. Using a zmq_poll(), it waits for the reply of the Proxy that, if everything works fine, will re-send "<pubId>" through the REP Socket. #### Communication between Subscriber and Proxy The Subscriber and the Proxy communicate so that subscribers **subscribe** and **unsubscribe** to topics, and consume -using **get()** - messages from subscribed topics. - Subscribe The subscriber starts by sending a subscribe ZFrame through the SUB Socket with the following content: "<subID> // <topic>". This operation uses the method subscribe(). After this, the Proxy sends the message "subId // topic // 1" through the XPUB Socket confirming the subscription was successful. - Unsubscribe Similar to the subscribe operation, the subscriber sends an unsubscribe ZFrame through the SUB Socket with the following format "<subID> // <topic>". - Get This operation runs a little differently. Since we wanted to take the maximum advantage of the PUB/SUB pattern provided by Zeromq, we decided to use the *SUB* socket, but with a 'get' flag. This way the subscriber sends a subscribe ZFrame similar to the one used in the Subscribe operation, but with "// get" in the end. Then the Proxy sends "subId // topic // message" through the XPUB socket. After that comes the confirmation: the proxy, using the REQ socket sends "subId". And the Subscriber will reply with "subId" using the REP socket. ## Further Reliability #### Storage We had into consideration the possibility of the **proxy failing**. In order to save on disk all the topics, messages, and subscribers, we implemented a **Storage class**. This class contains two data structures with the information mentioned and implements methods that are then responsible to save them in the file "storage/data.ser". At first, we thought of calling these methods every five seconds, but we ended up deciding to write to the file after every subscriber operation, guaranteeing the proper fault tolerance. When the proxy restarts it reads its previous state from the file and no information is lost. #### Restore topics Finally, we also had into account the possibility of a **subscriber failing** for an indefinite period. Every time a subscriber is initialized it connects to a REQ socket already bounded by the proxy. Through this socket, the subscriber sends a request message to the proxy asking for the topics that it had already subscribed to. This message only contains the id of the subscriber. After that, the proxy will reply, using the REP socket, with a list of all the topics the subscriber had subscribed to. This list can be empty. The subscriber will read the reply and do a **subscribe** operation to every element of the list. ## Garbage Collector ## Conclusion In conclusion, we were able to fulfill all the requirements specified and think of new solutions for the problem at hand. Concurrent invocations of put(), subscribe(), and unsubscribe() on the same topic, made it hard to define all subscribers of that topic in the first guarantee. However, in a single server implementation of the service, we were able to make all the operations work like expected. Even in the presence of faults in the server or clients. Furthermore, we used several ZeroMQ patterns and APIs. Not only the XPUB/XSUB but also REQ/REP to ensure the exactly-once guarantees. Actually, in the fault of a publisher-proxy or subscriber-proxy communication, we always introduce the absence of communication. Never violating the exactly-once delivery. Additionally, we were able to obtain durable subscriptions by saving and reading the state of the proxy. Finally, additional features like the runtime delete of messages after all subscribers receive it helped us achieve a quality solution.