# Broker - Message Queue - RabitMQ
###### tags: `Broker` `Message Queue` `RabitMQ`
---
# Links
* [MQTT v5.0 support #2554](https://github.com/rabbitmq/rabbitmq-server/issues/2554)
* [Configuration](https://www.rabbitmq.com/configure.html)
* [MQTT Plugin](https://www.rabbitmq.com/mqtt.html)
* [Clients Libraries and Developer Tools](https://www.rabbitmq.com/devtools.html)
https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
https://github.com/andypiper/rmq-mqtt-demo/blob/master/MqttTest.java
[RabbitMQ MQTT Broker](https://tech.scargill.net/rabbitmq-mqtt-broker/)
## Client
* [rabbitmq / rabbitmq-tutorials](https://github.com/rabbitmq/rabbitmq-tutorials/)
* https://github.com/rabbitmq/rabbitmq-mqtt
* https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_mqtt
* [pika 1.1.0](https://pypi.org/project/pika/)
Pika is a RabbitMQ (AMQP 0-9-1) client library for Python.
### Java
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
https://rabbitmq.docs.pivotal.io/37/rabbit-web-docs/tutorials/tutorial-three-java.html
* [RabbitMQ Java Client Library](https://www.rabbitmq.com/java-client.html)
* [Java Client API Guide](https://www.rabbitmq.com/api-guide.html)
---
# [AMQP 0-9-1 Model Explained](https://www.rabbitmq.com/tutorials/amqp-concepts.html)
## What is AMQP 0-9-1?
AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol
that enables conforming client applications to communicate
with conforming messaging middleware brokers.
## Brokers and Their Role
Messaging brokers receive messages from publishers
(__applications that publish them__, also known as __producers__)
and __route__ them to consumers (__applications that process them__).
Since it is a network protocol, the publishers, consumers
and the broker can all reside on different machines.
## AMQP 0-9-1 Model in Brief
The AMQP 0-9-1 Model has the following view of the world:
messages are published to exchanges,
which are often compared to post offices or mailboxes.
Exchanges then distribute message copies to queues
using rules called __bindings__.
Then the broker either deliver messages to consumers subscribed to queues,
or consumers fetch/pull messages from queues on demand.
## AMQP 0-9-1 is a Programmable Protocol
AMQP 0-9-1 is a programmable protocol in the sense that AMQP 0-9-1 entities and routing schemes are primarily defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.
This gives application developers a lot of freedom but also requires them to be aware of potential definition conflicts. In practice, definition conflicts are rare and often indicate a misconfiguration.
Applications declare the AMQP 0-9-1 entities that they need, define necessary routing schemes and may choose to delete AMQP 0-9-1 entities when they are no longer used.
## Exchanges and Exchange Types
Exchanges are AMQP 0-9-1 entities where messages are sent.
Exchanges take a message and route it into zero or more queues.
The routing algorithm used depends on the exchange type
and rules called bindings. AMQP 0-9-1 brokers provide four __exchange types__:
Exchange type | Default pre-declared names
--- | ---
Direct exchange | (Empty string) and amq.direct
Fanout exchange | amq.fanout
Topic exchange | amq.topic
Headers exchange | amq.match (and amq.headers in RabbitMQ)
Besides the exchange type,
exchanges are declared with a number of __attributes__,
the most important of which are:
* Name
* Durability (exchanges survive broker restart)
* Auto-delete (exchange is deleted when last queue is unbound from it)
* Arguments (optional, used by plugins and broker-specific features)
Exchanges can be durable or transient.
__Durable exchanges survive broker restart__ whereas
__transient exchanges do not__
(they have to be redeclared when broker comes back online).
Not all scenarios and use cases require exchanges to be durable.
### Default Exchange
The __default exchange__ is a __direct exchange__
with no name (empty string) pre-declared by the broker.
It has one __special property__ that makes it very useful
for simple applications: every queue that is created
is automatically bound to it with a routing key
which is the same as the queue name.
For example, when you declare a queue with the name of ``search-indexing-online``,
the AMQP 0-9-1 broker will bind it to the default exchange
using ``search-indexing-online`` as the routing key
(in this context sometimes referred to as the binding key).
Therefore, a message published to the default exchange
with the routing key ``search-indexing-online``
will be routed to the queue ``search-indexing-online``.
In other words, the default exchange makes it seem like
it is possible to deliver messages directly to queues,
even though that is not technically what is happening.
### Direct Exchange
A direct exchange __delivers messages to queues
based on the message routing key__.
A direct exchange is ideal for the __unicast__ routing of messages
(although they can be used for multicast routing as well).
Here is how it works:
* A queue binds to the exchange with a routing key ``K``
* When a new message with routing key ``R``
arrives at the direct exchange,
the exchange routes it to the queue if ``K = R``
Direct exchanges are often used to distribute tasks between multiple workers
(instances of the same application) in a round robin manner.
When doing so, it is important to understand that,
in AMQP 0-9-1, messages are load balanced between consumers and not between queues.
A direct exchange can be represented graphically as follows:
### Fanout Exchange
A fanout exchange __routes messages to all of the queues
that are bound to it and the routing key is ignored__.
If ``N`` queues are bound to a fanout exchange,
when a new message is published to that
exchange a copy of the message is delivered to all ``N`` queues.
Fanout exchanges are ideal for the __broadcast__ routing of messages.
Because a fanout exchange delivers a copy of a message
to every queue bound to it, its use cases are quite similar:
* Massively multi-player online (MMO) games can use it for leaderboard updates or other global events
* Sport news sites can use fanout exchanges for distributing score updates to mobile clients in near real-time
* Distributed systems can broadcast various state and configuration updates
* Group chats can distribute messages between participants using a fanout exchange (although AMQP does not have a built-in concept of presence, so XMPP may be a better choice)
A fanout exchange can be represented graphically as follows:
### Topic Exchange
Topic exchanges route messages to one or many queues
based on matching between a message __routing key__ and
the __pattern that was used to bind a queue to an exchange__.
The topic exchange type is often used to
implement various __publish/subscribe pattern__ variations.
Topic exchanges are commonly used for the __multicast routing of messages__.
Topic exchanges have a very broad set of use cases.
Whenever a problem involves multiple consumers/applications
that selectively choose which type of messages they want to receive,
the use of topic exchanges should be considered.
Example uses:
* Distributing data relevant to specific geographic location, for example, points of sale
* Background task processing done by multiple workers, each capable of handling specific set of tasks
* Stocks price updates (and updates on other kinds of financial data)
* News updates that involve categorization or tagging (for example, only for a particular sport or team)
* Orchestration of services of different kinds in the cloud
* Distributed architecture/OS-specific software builds or packaging where each builder can handle only one architecture or OS
### Headers Exchange
A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.
It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the "x-match" binding argument is for. When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.
Headers exchanges can be looked upon as "direct exchanges on steroids". Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.
Note that headers beginning with the string x- will not be used to evaluate matches.
## Queues
[Queues](https://www.rabbitmq.com/queues.html) in the AMQP 0-9-1 model are very similar to queues
in other message- and task-queueing systems:
__they store messages that are consumed by applications__.
Queues share some properties with exchanges,
but also have some additional properties:
* Name
* Durable (the queue will survive a broker restart)
* Exclusive (used by only one connection and the queue will be deleted when that connection closes)
* Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
* Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)
Before a queue can be used it has to be declared. Declaring a queue will cause it to be created if it does not already exist. The declaration will have no effect if the queue does already exist and its attributes are the same as those in the declaration. When the existing queue attributes are not the same as those in the declaration a channel-level exception with code 406 (PRECONDITION_FAILED) will be raised.
### Queue Names
__Applications may pick queue names or
ask the broker to generate a name for them__.
Queue names may be up to 255 bytes of UTF-8 characters.
An AMQP 0-9-1 broker can __generate a unique queue name on behalf of an app__.
To use this feature, pass an empty string as the queue name argument.
The generated name will be returned to the client with queue declaration response.
Queue names __starting with ``amq.`` are reserved
for internal use by the broker__.
Attempts to declare a queue with a name that
violates this rule will result in
a __channel-level exception__ with reply code ``403`` (``ACCESS_REFUSED``).
``amq.`` 是保留
### Queue Durability
In AMQP 0-9-1, queues can be declared as durable or transient.
Metadata of a __durable queue is stored on disk__,
while metadata of a __transient queue is stored in memory__ when possible.
The same distinction is made for messages at publishing time.
In environments and use cases where durability is important,
applications must use durable queues and make sure that
publish mark published messages as persisted.
This topic is covered in more detailed in the Queues guide.
### Bindings
Bindings are __rules__ that __exchanges use__ (among other things)
__to route messages to queues__.
__To instruct an exchange ``E`` to route messages to a queue ``Q``,
``Q`` has to be bound to ``E``__.
Bindings may have an optional __routing key__ attribute
used by some exchange types. The purpose of the routing key
is to select certain messages published to an exchange
to be routed to the bound queue. In other words, the routing key acts like a filter.
Queue 與 Exchange 要透過 Binding
可以指定 routing key - 用來挑選被 published 到 特定 Exchanges 的 messages
簡單來說,就是 filter
To draw an analogy:
* __Queue__ is like your __destination__ in New York city
* __Exchange__ is like JFK __airport__
* __Bindings__ are __routes__ from JFK to your destination.
There can be zero or many ways to reach it
Having this layer of indirection enables routing scenarios
that are impossible or very hard to implement
using publishing directly to queues and also eliminates certain amount of
duplicated work application developers have to do.
If a message cannot be routed to any queue
(for example, because there are no bindings
for the exchange it was published to)
it is either [dropped or returned to the publisher](https://www.rabbitmq.com/publishers.html#unroutable),
depending on message attributes the publisher has set.
無法被 routing 的 message 可能被 dropped 或 導回給 publisher
...
## Consumers
Storing messages in queues is useless unless applications can consume them. In the AMQP 0-9-1 Model, there are two ways for applications to do this:
* Subscribe to have messages delivered to them ("push API"): this is the recommended option
* Polling ("pull API"): this way is highly inefficient and should be avoided in most cases
With the "push API", applications have to indicate interest in consuming messages from a particular queue. When they do so, we say that they register a consumer or, simply put, subscribe to a queue. It is possible to have more than one consumer per queue or to register an exclusive consumer (excludes all other consumers from the queue while it is consuming).
Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings.
## Message Acknowledgements
Consumer applications – that is, applications that receive and process messages – may occasionally fail to process individual messages or will sometimes just crash. There is also the possibility of network issues causing problems. This raises a question: when should the broker remove messages from queues? The AMQP 0-9-1 specification gives consumers control over this. There are two acknowledgement modes:
* After broker sends a message to an application (using either basic.deliver or basic.get-ok method).
* After the application sends back an acknowledgement (using the basic.ack method).
The former choice is called the automatic acknowledgement model, while the latter is called the explicit acknowledgement model. With the explicit model the application chooses when it is time to send an acknowledgement. It can be right after receiving a message, or after persisting it to a data store before processing, or after fully processing the message (for example, successfully fetching a Web page, processing and storing it into some persistent data store).
If a consumer dies without sending an acknowledgement, the broker will redeliver it to another consumer or, if none are available at the time, the broker will wait until at least one consumer is registered for the same queue before attempting redelivery.
...
## Connections
AMQP 0-9-1 connections are typically __long-lived__.
AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery.
Connections use __authentication__ and can be __protected using TLS__.
When an application no longer needs to be connected to the server,
it should gracefully close its AMQP 0-9-1 connection
instead of abruptly closing the underlying TCP connection.
連線是長時間存活的
## Channels
Some applications need multiple connections to the broker.
However, it is undesirable to keep many TCP connections open at the same time
because doing so consumes system resources and
makes it more difficult to configure firewalls.
AMQP 0-9-1 __connections are multiplexed with channels that
can be thought of as "lightweight connections that share a single TCP connection"__.
Every protocol operation performed by a client happens on a channel.
Communication on a particular channel is completely separate
from communication on another channel,
therefore every protocol method also carries a channel ID
(a.k.a. channel number), an integer that
both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own.
When a connection is closed, so are all channels on it.
For applications that __use multiple threads/processes for processing,
it is very common to open a new channel per thread/process
and not share channels between them__.
TCP connection 利用 channel 來進行多工,而被共享
一個 thread / process 使用一個 channel
channel 有一個 Identifier
## Virtual Hosts
To make it possible for a single broker to host multiple isolated "environments"
(groups of users, exchanges, queues and so on),
AMQP 0-9-1 includes the concept of virtual hosts (vhosts).
They are similar to virtual hosts used by many popular Web servers and
provide completely isolated environments in which AMQP entities live.
Protocol clients specify what vhosts they want to use during connection negotiation.
---
# [RabbitMQ Cluster using Docker Compose](https://medium.com/@kailashyogeshwar/rabbitmq-cluster-using-docker-compose-7397ea378d73)
__RabbitMQ__ is the most widely used open-source message broker service
developed by __Pivotal Tracker__.
RabbitMQ is developed using __Erlang__ and
can be easily integrated into your __microservices__ architecture.
RabbitMq supports below protocols
* AMQP
* HTTP
* STOMP
* MQTT
## How RabbitMQ Works?
RabbitMQ has an __Exchange_ component which
is __responsible for routing packages to queues__.
Each __consumer__ gets its own __Queue__ based on a __logic__ that you use,
there are 4 types of logic that you can use in Exchange.
* __Direct Exchange__:
Will be __direct to queue based on a message routing key__
* __Fanout Exchange__:
Will __publish all queues that have the same routing key__
* __Topic Exchange__:
Will __publish to all queues that have the same routing key
and routing pattern specified in the binding__.
* __Headers Exchange__: Headers mean headers in sending a file HTTP, like when you send Image the header is “image/*”.
Topic Exchange - publish - Routing key - Binding
## RabbitMQ Components
RabbitMQ has four basic components :
* __Producer__ :
Responsible for publishing message to queue
* __Exchange__:
Responsible for routing message to it’s destination queue.
* __Queue__:
Message Storage which is bounded to Exchange using binding_key.
* __Consumer__: Linked to one of the queues to consume messages.
## Why use RabbitMQ?
To overcome the disadvantages that are faced in monolithic architecture like single-point failure and scalability. Microservices architecture deals with decoupling the core components of your application into small manageable applications. This system design also improves the Single Responsibility Principle.
As the application is decoupled it is also flexible to develop to the next phase. It also provides developers the flexibility to connect two different applications developed using different languages.
Advantages of RabbitMQ:
* HA
* Multi-protocol support
* Many Clients
* Clustering
* Management UI
* Huge Community
* Plugin system
## RabbitMQ vs Apache Kafka
## Setup RabbitMQ with Docker Compose
I have created a docker image that you can use to deploy a RabbitMQ cluster on your machine.
Github: https://github.com/kailashyogeshwar85/docker-rabbitmq-cluster
DockerHub: https://hub.docker.com/r/lucifer8591/rabbitmq-server
Let’s discuss the components that were used in creating __RabbitMQ Cluster__
and __build using docker-compose__.
Architecture:
So we will have three containers each running RabbitMQ instance
and other slave brokers are connected to its master component.
---
# [Capture IOT sensor’s data — MQTT Protocol + RabbitMQ as MQTT Broker](https://medium.com/@ritresh.girdhar/capture-iot-sensors-data-mqtt-protocol-rabbitmq-as-mqtt-broker-30bd89ac94c3)
### Queues tab --> Add a new queue
* Type: Classic
* Name: ``weather-events``
* Durability: Durable
* Auto delete: No
* Arguments: Empty = Empty
### Exchanges tab --> Add a new exchange
* Name: ``weather.exchange``
* Type: topic
* Durability: Durable
* Auto delete: No
* Internal: No
* Arguments: Empty = Empty
### Exchanges tab --> Click ``weather.exchange`` --> Bindings --> Add binding from this exchange
* To queue: ``weather-events``
* Routing key: ``weather.*``
* Arguments: Empty = Empty
```
{
"temperature": {
"min": 25,
"max": 33,
"unit": "celsisu"
},
"time": 1611279751
}
```
---
[eclipse / paho.mqtt.python](https://github.com/eclipse/paho.mqtt.python)
[How to use celery in mqtt #370](https://github.com/eclipse/paho.mqtt.python/issues/370)
---
## Overview
The client API exposes key entities in the AMQP 0-9-1 protocol model, with additional abstractions for ease of use.
RabbitMQ Java client uses com.rabbitmq.client as its top-level package. The key classes and interfaces are:
Channel: represents an AMQP 0-9-1 channel, and provides most of the operations (protocol methods).
Connection: represents an AMQP 0-9-1 connection
ConnectionFactory: constructs Connection instances
Consumer: represents a message consumer
DefaultConsumer: commonly used base class for consumers
BasicProperties: message properties (metadata)
BasicProperties.Builder: builder for BasicProperties
Protocol operations are available through the Channel interface. Connection is used to open channels, register connection lifecycle event handlers, and close connections that are no longer needed. Connections are instantiated through ConnectionFactory, which is how you configure various connection settings, such as the vhost or username.
Connections and Channels
The core API classes are Connection and Channel, representing an AMQP 0-9-1 connection and channel, respectively. They are typically imported before used:
When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message.
Networks are unreliable and applications may fail to process messages therefore the AMQP 0-9-1 model has a notion of message acknowledgements: when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message (or group of messages).
In certain situations, for example, when a message cannot be routed, messages may be returned to publishers, dropped, or, if the broker implements an extension, placed into a so-called "dead letter queue". Publishers choose how to handle situations like this by publishing messages using certain parameters.
Queues, exchanges and bindings are collectively referred to as AMQP entities.
Connection and Channel Lifespan
Client connections are meant to be long-lived. The underlying protocol is designed and optimized for long running connections. That means that opening a new connection per operation, e.g. a message published, is unnecessary and strongly discouraged as it will introduce a lot of network roundtrips and overhead.
Channels are also meant to be long-lived but since many recoverable protocol errors will result in channel closure, channel lifespan could be shorter than that of its connection. Closing and opening new channels per operation is usually unnecessary but can be appropriate. When in doubt, consider reusing channels first.
Channel-level exceptions such as attempts to consume from a queue that does not exist will result in channel closure. A closed channel can no longer be used and will not receive any more events from the server (such as message deliveries). Channel-level exceptions will be logged by RabbitMQ and will initiate a shutdown sequence for the channel (see below).
Client-Provided Connection Name
RabbitMQ nodes have a limited amount of information about their clients:
their TCP endpoint (source IP address and port)
the credentials used
This information alone can make identifying applications and instances problematic, in particular when credentials can be shared and clients connect over a load balancer but Proxy protocol cannot be enabled.
To make it easier to identify clients in server logs and management UI, AMQP 0-9-1 client connections, including the RabbitMQ Java client, can provide a custom identifier. If set, the identifier will be mentioned in log entries and management UI. The identifier is known as the client-provided connection name. The name can be used to identify an application or a specific component within an application. The name is optional; however, developers are strongly encouraged to provide one as it would significantly simplify certain operational tasks.
RabbitMQ Java client's ConnectionFactory#newConnection method overrides accept a client-provided connection name. Here's a modified connection example used above which provides such a name:
---
---