# Distributed Systems -Lab RabbitMQ Alexander Sergeev and Pia Döpper The repository can be found [here](https://github.com/pseusys/current/tree/master/sem10%2FDS-lab3) ## RabbitMQ Basics RabbitMQ is based on the Advanced Message Queuing Protocols (AMQP). The messaging broker picks up messages, allowing the producer to start a new task. The message is queued until the consumer picks it up. It is an asynchronous process and the sender and receiver do not have to act simultaneously. The sender can request an acknoledgement. The receiver can send it or not or refuse to accept messages. There are four stations/stops in Rabbit MQ - Producer: generates messages - Exchange: forwards messages - Queue: stores messages - Consumer: processes the message Routing keys are used to address the correct recipient. There is a binding between exchange and queue, so that each queue is connected to the exchange. There are four types of exchange: - Direct Exchange: direct connection between sender and receiver. - Topic Exchange: send to specific but multiple queues. - Fanout Exchange: Broadcast - Header Exchange: similar to Topic Exchanges, but uses the header to determine the queues instead of the routing key. ## The DS building system The algorithm we created is close enough to be an implementation of the task #6. Our ultimate goal was to make an algorithm described using notation similar to what we had on lecture executable. The algorithm is implemented with Python3, using `aio-pika` RabbitMQ support package. **NB!** The provided notation is VERY limited, in particular, it supports only message-driven systems with a few event types, but it can be extended. ### Configuration The algorithm takes as an input a configuration file and instantiate the distributed system it describes. The configuration file format is YAML (chosen for its readability). The files are obliged the have the following structure: ```yaml templates: template_1: one_shot: bool variables: var_1: val_1 ... in_pipes: - ... out_pipes: - ... init: ACTION_1 ... rules: RULE_1 ... ... matrix: node_1: template: template_name pipes: pipe_name: queue_name ... ... ``` #### `templates` Under this key the node templates are stored. In general, (referring to lecture) every working node in a distributed system should run the same code. The node template is this code, a program description that will be run by the node. However, some nodes eventually should differ: for example, an initialization or result collection node should run some other code. The different programs are represented by different node templates. The keys of the dictionary are template names, the values are template configurations. The template can have the following keys: - `one_shot`: a boolean value, if set to `True` the `rules` key of the template will be ignored. Default value `False`. - `variables`: a dictionary of variables, set for this template. Keys of the dictionary are variable names (should be in snake case), values are default values or [magic variables](#magic-variables). Default: `[empty dictionary]`. - `in_pipes`: List of input [pipe](#pipes) names of the template. The list elements should be unique (the behavior is undefined otherwise). The pipes are backed up by RabbitMQ queues, but this list elements **won't be used for queue names**. The elements are local identifiers inside of this template only. Default: `[empty list]`. - `out_pipes`: Same as `in_pipes`, but for output [pipe](#pipes) names. Default: `[empty list]`. - `init`: Dictionary containing [actions](#actions) to be executed on template [start](#node-lifecycle). The dictionary keys are action types, the values are action configurations. Default: `[empty dictionary]`. - `rules`: Dictionary containing [rules](#rules) to be triggered by messages, accepted by this node. The dictionary keys are action types, the values are action configurations. Default: `[empty dictionary]`. #### `matrix` Under this key the node instances are stored. Each node instance _must_ implement one of the templates. The implementation defines values local to each exact node. The keys of the dictionary are node names, the values are node configurations. The node implementation can have the following keys: - `template`: The name of the template this node is based on. **Required!** - `pipes`: Dictionary, defining mapping between global message channels (RabbitMQ queues) and local node pipes. **WARNING!** The keys of the dictionary should match elements of `in_pipes` and `out_pipes` of the template. If any of the local template pipes is not mapped to global message channel, an exception will be thrown. Default: `[empty dictionary]` (only if `in_pipes` and `out_pipes` are empty, otherwise **Required!**). #### Magic variables The magic variables are the strings that will be replaced with something in run time. Currently only two types of magic variables are supported: - `self`: Strings with pattern `self.SOME_NAME` will be replaced by value of current node variable named "SOME_NAME". - `message`: Strings with pattern `message.SOME_NAME` will be replaced by value of current message variable named "SOME_NAME" (in [rule](#rules) context). If no replacement is found for a magic variable, the string value is returned unmodified. There are also some special variables that are automatically defined for a node on creation. The currently supported special variables: - `NODE_ID`: unique integer node identifier (not defined before node creation). #### Pipes Pipes are local node interfaces of global message channels (RabbitMQ queues). Node can listen to some pipes (by declaring rules, consuming messages from it) and send messages to some pipes (by executing `send` action). #### Actions Actions are the changes that the node can emit upon initialization and in runtime. The changes can be both external (to the other nodes in the system) and internal (to the node variables). These are the currently supported actions: #### `send` Sends a message to one of the node output pipes. Has the following structure: ```yaml send: pipe: pipe_name message: var_1: val_1 ... ``` Here `pipe_name` is the name of the node output pipe the message will be sent to. **Required!** The `var_1: val_1`, `...` are variables (or [magic variables](#magic-variables)) that will be present in the message body. **Required!** ##### `set` Sets a variable of the current node. Has the following structure: ```yaml set: var_1: val_1 ``` The `var_1: val_1`, `...` are variables (or [magic variables](#magic-variables)) that will be set for the current node. #### Rules Rules are the external events that happen when the node receives a message. The rules are processed for not `one_shot` nodes only and are becoming active on node [start](#node-lifecycle). The rules have the following structure: ```yaml rule_name: pipe: input if: 'some valid python executable string' actions: ACTION_1 ... ``` Here `pipe` is the name of the node input pipe the rule is listening to. The rule can be activated by messages, coming from this pipe only. **Required!** The rule is only activated if `if` condition is met. This condition should be a valid python expression, returning a value that can be cast to `bool`. No globals can be used in the expression, but instead `self` and `message` locals are available (they work just like the [magic variables](#magic-variables)). By default, if no condition is specified, the rule is always executed. The `actions` are the actions that will be executed if the message is received and passes the condition. By default, if no actions is specified, nothing will happen. **NB!** If a message triggers two or more rules, an exception is risen. Otherwise, the message is consumed from the pipe. #### Node lifecycle The node lifecycle consists of three stages: 1. **Creation** Upon creation, node object is created, corresponding data read from YAML is parsed and processed. The internal variables of the node are set, all the rules, actions, etc. are created. 2. **Launch** Upon launch, the node connects to RabbitMQ and creates all the queues required to back up its pipes. 3. **Start** Upon start, node executes its `init` actions (if any) and closes connection to RabbitMQ if it is `one_shot`. Otherwise, the node starts consuming messages from its input pipes. ### System structure System contains a few python files: - `choreographer.py`: The main script, accepts console args, builds and launches system, catches exceptions. - `logger.py`: Prints logs, terminates system on critical (declaration) errors. - `node.py`: Creates, launches, starts and controls execution of a node. - `utils.py`: Utility functions and classes. Task configurations are stored in YAML files: - `pingpong.yml`: Ping-pong task. - `elections.yml`: Node elections in circle. - `diffusion.yml`: Implements diffusion algorithm on a node tree. ### Build and run system Before system run please make sure the RabbitMQ is running on your device. If it's not, you can run it with: ```shell make rabbit ``` System can be run with the following command: ```shell make run CONF=CONF_FILE ``` where 'CONF_FILE' is a name of configuration file. Following optional parameters are provided: - `LOGS`: Name of logging file, _default_: generated. - `VERB`: System verbosity level, should be one of (DEBUG, INFO, WARNING, ERROR, CRITICAL), _default_: INFO. ## Exercise 1 - Ping Pong ![](https://i.imgur.com/zshFwZR.png) The first task was to implement the Ping Pong algorithm from the lecture. The distributed algorithm consists of 3 rules: The first rule is the start of Ping Pong. If a node receives the request START and is not yet started itself, it sends a pong and in the implementation is the ponger node. The second rule is that if a node receives a PING and its ID is less than the message Id it sends a PONG. The third rule is that if a node receives a PONG and its own Id is greater than the Message Id it sends a PING. There is the firestarter which sends a start signal to the nodes. There are also two channels. One is assigned to the output_pipe of Pinger and the input_pipe of Ponger. The second channel is vice versa. The choreographer creates a distributed systems based on the configfile pingpong.yml: ```yaml templates: node: one_shot: False variables: id: 0 started: False in_channels: - input - starter out_channels: - output rules: start: channel: starter if: 'message.text == "START" and not self.started' send: channel: output message: text: PING id: self.id set: started: True receive_ping: channel: input if: 'message.text == "PING" and self.id < message.id' send: channel: output message: text: PONG id: self.id set: started: True receive_pong: channel: input if: 'message.text == "PONG" and self.id > message.id' send: channel: output message: text: PING id: self.id firestarter: one_shot: True out_channels: - torch init: send: channel: torch message: text: START id: -1 matrix: pinger: template: node channels: input: first output: second starter: outer ponger: template: node channels: input: second output: first starter: outer launcher: template: firestarter channels: torch: outer ``` Here we can now see the received and sent messages our nodes. In blue are information, in yellow are warnings and in green are actions or successful events. The ponger node receives a start signal and sends a ping. The pinger node receives a ping signal and sends a pong. We also see that when the ponger receives a ping, it will log an warning, that it is the wrong pipe and the rule cant be applied and vice versa for the pinger. ![](https://i.imgur.com/WjJPntp.png) ![](https://i.imgur.com/MUuUGTo.png) ## Exercise 2 - Election on a Ring ![](https://i.imgur.com/Abt1Kap.png) The second task is to program an algorithm that selects a node in a ring topology. We determine here that the node with the smallest ID is selected. There are three rules how the nodes choose their candidate. The node that starts sends its own ID as election. When a node now receives a message, it sends it on to the next one if the id is smaller than its own. If its own is smaller, it sends its Id further. If a node now receives as a message the Id that is identical to its own Id, it is elected and sets itself as the leader. All nodes starts at the same time electing themselves in the first step. The leader is set to -1 at the beginning. The choreographer creates a distributed systems based on the configfile elections.yml. Here you can see the nodes, channels, queues and rules: ```yaml templates: node: one_shot: False variables: id: NODE_ID leader: -1 in_channels: - input out_channels: - output rules: send_candidate: channel: input if: 'message.text == "ELECTION" and message.id < self.id' send: channel: output message: text: ELECTION id: message.id set: leader: message.id send_myself: channel: input if: 'message.text == "ELECTION" and message.id > self.id' send: channel: output message: text: ELECTION id: self.id set: leader: self.id cheer_leader: channel: input if: 'message.text == "ELECTION" and message.id == self.id' set: leader: self.id init: send: channel: output message: text: ELECTION id: self.id matrix: ring_node_1: template: node channels: input: ring_edge_6 output: ring_edge_1 ring_node_2: template: node channels: input: ring_edge_1 output: ring_edge_2 ring_node_3: template: node channels: input: ring_edge_2 output: ring_edge_3 ring_node_4: template: node channels: input: ring_edge_3 output: ring_edge_4 ring_node_5: template: node channels: input: ring_edge_4 output: ring_edge_5 ring_node_6: template: node channels: input: ring_edge_5 output: ring_edge_6 ``` The distributed system is launched. ![](https://i.imgur.com/mIspNVf.png) The start message is send: ![](https://i.imgur.com/8unOIuC.png) The election is done: ![](https://i.imgur.com/Ze7ClTw.png) ## Exercise 3 - Tree with RabbitMQ We solved this task indirectly by launching and configuring our distributed system with the configfile. It gives as input a matrix in tree format which describes the topology of the distributed system. See the config-files task 1 and 2.