# Zero-MQ ## Design Concepts - "how to connect any code to any code, anywhere." - "possible building blocks that people could understand and use easily." ## 評論現今常用集中式協定 HTTP is perhaps the one solution to have been simple enough to work, but it arguably makes the problem worse by encouraging developers and architects to think in terms of **big servers and thin, stupid clients.** So today people are still connecting applications using raw UDP and TCP, proprietary protocols, HTTP, and WebSockets. It remains **painful, slow, hard to scale, and essentially centralized.** ## REP-REQ 模式 (Client Server) ![](https://i.imgur.com/S2uTsCI.png) ### REP Server example ```clike= #include <zmq.h> #include <stdio.h> #include <unistd.h> #include <string.h> int main (void) { void *context = zmq_ctx_new (); // Socket to talk to clients void *responder = zmq_socket (context, ZMQ_REP); zmq_bind (responder, "tcp://*:5555"); while (1) { // Wait for next request from client zmq_msg_t request; zmq_msg_init (&request); zmq_msg_recv (&request, responder, 0); printf ("Received Hello\n"); zmq_msg_close (&request); // Do some 'work' sleep (1); // Send reply back to client zmq_msg_t reply; zmq_msg_init_size (&reply, 5); memcpy (zmq_msg_data (&reply), "World", 5); zmq_msg_send (&reply, responder, 0); zmq_msg_close (&reply); } // We never get here but if we did, this would be how we end zmq_close (responder); zmq_ctx_destroy (context); return 0; } ``` ### REQ Client 模式 (發送10個request) ```clike= // Hello World client #include <zmq.h> #include <string.h> #include <stdio.h> #include <unistd.h> int main (void) { printf ("Connecting to hello world server...\n"); void *context = zmq_ctx_new (); void *requester = zmq_socket (context, ZMQ_REQ); zmq_connect (requester, "tcp://localhost:5555"); int request_nbr; for (request_nbr = 0; request_nbr != 10; request_nbr++) { char buffer [10]; printf ("Sending Hello %d...\n", request_nbr); zmq_send (requester, "Hello", 5, 0); zmq_recv (requester, buffer, 10, 0); printf ("Received World %d\n", request_nbr); } zmq_close (requester); zmq_ctx_destroy (context); return 0; } ``` ### REP-REQ描述 They create a ØMQ context to work with, and a socket. The server binds its REP (reply) socket to port 5555. It then waits for a request in a loop, and responds each time with a reply. The client sends a request and reads the reply back from the server. ## 字串傳輸注意事項 **ØMQ doesn’t know anything about the data you send except its size in bytes. That means you are responsible for formatting it safely so that applications can read it back.** ### C In C and some other languages, strings are terminated with a null byte. We could send a string like “HELLO” with that extra null byte: ```clike= zmq_msg_init_data (&request, "Hello", 6, NULL, NULL); ``` ### Python However, if you send a string from another language, it probably will not include thatnull byte. For example, when we send that same string in Python, we do this: ```python= socket.send ("Hello") ``` ### C 接收字串記得terminate When you receive string data from ØMQ in C, you simply cannot trust that it’s safely terminated. Every single time you read a string, **you should allocate a new buffer with space for an extra byte, copy the string, and terminate it properly with a null.** ```clike= // Receive 0MQ string from socket and convert into C string static char * s_recv (void *socket) { zmq_msg_t message; zmq_msg_init (&message); int size = zmq_msg_recv (&message, socket, 0); if (size == -1) return NULL; char *string = malloc (size + 1); memcpy (string, zmq_msg_data (&message), size); zmq_msg_close (&message); string [size] = 0; return (string); } ``` ## 檢查zero-MQ 版本 ```clike= #include "zhelpers.h" int main (void) { int major, minor, patch; zmq_version (&major, &minor, &patch); printf ("Current 0MQ version is %d.%d.%d\n", major, minor, patch); return EXIT_SUCCESS; } ``` ## zhelper.h: s_recv(*socket) & s_send(*socket, char *string) ```clike= // Receive 0MQ string from socket and convert into C string // Caller must free returned string. Returns NULL if the context // is being terminated. s_recv (void *socket) { char buffer [256]; int size = zmq_recv (socket, buffer, 255, 0); if (size == -1) return NULL; buffer[size] = '\0'; return strndup (buffer, sizeof(buffer) - 1); // remember that the strdup family of functions use malloc/alloc for space for the new string. It must be manually // freed when you are done with it. Failure to do so will allow a heap attack. } // Convert C string to 0MQ string and send to socket static int s_send (void *socket, char *string) { int size = zmq_send (socket, string, strlen (string), 0); return size; } ``` ## One-way data distribution (PUB-SUB) **A server pushes updates to a set of clients.** ![](https://i.imgur.com/INvhPLW.png) ### wuserver.c ```clike= //wuserver.c // Binds PUB socket to tcp://*:5556 // Publishes random weather updates // #include "zhelpers.h" int main (void) { // Prepare our context and publisher void *context = zmq_ctx_new (); void *publisher = zmq_socket (context, ZMQ_PUB); int rc = zmq_bind (publisher, "tcp://*:5556"); assert (rc == 0); // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { // Get values that will fool the boss int zipcode, temperature, relhumidity; zipcode = randof (100000); temperature = randof (215) - 80; relhumidity = randof (50) + 10; // Send message to all subscribers char update [20]; sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity); s_send (publisher, update); } zmq_close (publisher); zmq_ctx_destroy (context); return 0; } ``` ### wuclient.c ```clike= // Weather update client // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode #include "zhelpers.h" int main (int argc, char *argv []) { // Socket to talk to server printf ("Collecting updates from weather server...\n"); void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); int rc = zmq_connect (subscriber, "tcp://localhost:5556"); assert (rc == 0); // Subscribe to zipcode, default is NYC, 10001 char *filter = (argc > 1)? argv [1]: "10001 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter)); assert (rc == 0); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { char *string = s_recv (subscriber); int zipcode, temperature, relhumidity; sscanf (string, "%d %d %d", &zipcode, &temperature, &relhumidity); total_temp += temperature; free (string); } printf ("Average temperature for zipcode '%s' was %dF\n", filter, (int) (total_temp / update_nbr)); zmq_close (subscriber); zmq_ctx_destroy (context); return 0; } ``` ### SUB Note **Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE. If you don’t set any subscription, you won’t get any messages.** - The subscriber can set many subscriptions, which are added together. - A subscription is often but not necessarily a printable string. ```clike= // Subscribe to zipcode, default is NYC, 10001 char *filter = (argc > 1)? argv [1]: "10001 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter)); assert (rc == 0); // Subscribe to zipcode, default is NYC, 10001 char *filter2 = (argc > 2)? argv [2]: "10002 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter2, strlen (filter2)); assert (rc == 0); ``` ### zmp_setsocketopt() pass ### bind & connect Note In theory, with ØMQ sockets **it does not matter which end connects and which end binds**. However, in practice there are undocumented differences that I’ll come to later. For now, **bind the PUB and connect the SUB**, unless your network design makes that impossible. ### SUB 延遲遺失第一筆訊息 (Slow Joiner) you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, **the subscriber will always miss the first messages that the publisher sends.** This is because as the subscriber **connects to the publisher** (something that **takes a small but nonzero amount of time**), **the publisher may already be sending messages out.** ### 自己做實驗 • Subscriber connects to an endpoint and receives and counts messages. • Publisher binds to an endpoint and immediately sends 1,000 messages. The subscriber will most likely not receive anything. You’ll blink, check that you set a correct filter, and try again, and the subscriber will still not receive anything. ### Synchronization In Chapter 2, we’ll explain how to **synchronize a publisher and subscribers** so that you **don’t start to publish data until the subscribers really are connected and ready.** The alternative to synchronization is to simply assume that the published data **stream is infinite and has no start and no end.** ### publish-subscribe pattern - A subscriber can connect to more than one publisher, using one connect call each time. Data will **then arrive and be interleaved (“fair queued”)** so that no single publisher drowns out the others. - If a publisher has no connected subscribers, then it will simply drop all messages. - If you’re using TCP and a subscriber is slow, messages will **queue up on the publisher.** ## Parallel processing model ![](https://i.imgur.com/myFAnTx.png) - **A ventilator that produces tasks** that can be done in parallel - A set of **workers that processes tasks** - A **sink that collects results** back from the worker processes ## Parallel task ventilator (taskvent.c) generates 100 tasks, each one a message telling the worker to sleep for some number of milliseconds. ```clike= // Task ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket #include "zhelpers.h" int main (void) { void *context = zmq_ctx_new (); // Socket to send messages on void *sender = zmq_socket (context, ZMQ_PUSH); zmq_bind (sender, "tcp://*:5557"); // Socket to send start of batch message on void *sink = zmq_socket (context, ZMQ_PUSH); zmq_connect (sink, "tcp://localhost:5558"); // in order to load balance and run parallel // should wait for all workers setting down printf ("Press Enter when the workers are ready: "); getchar (); printf ("Sending tasks to workers...\n"); // The first message is "0" and signals start of batch s_send (sink, "0"); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = randof (100) + 1; total_msec += workload; char string [10]; sprintf (string, "%d", workload); s_send (sender, string); } printf ("Total expected cost: %d msec\n", total_msec); zmq_close (sink); zmq_close (sender); zmq_ctx_destroy (context); return 0; } ``` ### zhelper.h: s_sleep(int msecs), s_clock(void) ```clike= // Sleep for a number of milliseconds static void s_sleep (int msecs) { struct timespec t; t.tv_sec = msecs / 1000; t.tv_nsec = (msecs % 1000) * 1000000; nanosleep (&t, NULL); } // Return current system clock as milliseconds static int64_t s_clock (void) { struct timeval tv; gettimeofday (&tv, NULL); return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000); } ``` ### Parallel task worker (taskwork.c) It receives a message, sleeps for that number of seconds, and then signals that it’s finished. ```clike= // Task worker // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket #include "zhelpers.h" int main (void) { // Socket to receive messages on void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_connect (receiver, "tcp://localhost:5557"); // Socket to send messages to void *sender = zmq_socket (context, ZMQ_PUSH); zmq_connect (sender, "tcp://localhost:5558"); // Process tasks forever while (1) { char *string = s_recv (receiver); printf ("%s.", string); // Show progress fflush (stdout); s_sleep (atoi (string)); // Do the work free (string); s_send (sender, ""); // Send results to sink } zmq_close (receiver); zmq_close (sender); zmq_ctx_destroy (context); return 0; } ``` ### Parallel task sink (tasksink.c) It collects the 100 messages and then calculates how long the overall processing took, so we can confirm that the workers really were running in parallel if there are more than one of them. ```clike= // Task sink // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket #include "zhelpers.h" int main (void) { // Prepare our context and socket void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_bind (receiver, "tcp://*:5558"); // Wait for start of batch char *string = s_recv (receiver); free (string); // Start our clock now int64_t start_time = s_clock (); // Process 100 confirmations int task_nbr; for (task_nbr = 0; task_nbr < 100; task_nbr++) { char *string = s_recv (receiver); free (string); if (task_nbr % 10 == 0) printf (":"); else printf ("."); fflush (stdout); } // Calculate and report duration of batch printf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time)); zmq_close (receiver); zmq_ctx_destroy (context); return 0; } ``` ### Code Details - We say that the **ventilator and sink are stable parts** of our architecture and the **workers are dynamic parts of it**. - We have to **synchronize the start of the batch with all workers** being up and running. The connect method takes a certain amount of time, so when a set of workers connect to the ventilator, **the first one to successfully connect will get a whole load of messages** in that short time **while the others are still connecting**. If you don’t synchronize the start of the batch somehow, the system won’t run in parallel at all. - The ventilator’s **PUSH socket distributes tasks to workers evenly**. This is called **load balancing**. - The sink’s **PULL socket collects results from workers evenly.** This is called **fair queuing** ### Fair Queuing ![](https://i.imgur.com/HLJj8uv.png) ## Getting the Context Right ØMQ applications always start by creating a context, and then using that for creating sockets. In C, it’s the zmq_ctx_new() call. **You should create and use exactly one context in your process.** Technically, **the context is the container for all sockets in a single process**, and it acts as the transport for inproc sockets, which are the fastest way to connect threads in one process. **Do one zmq_ctx_new() at the start of your main code, and one zmq_ctx_destroy() at the end.** ### fork() If you’re using the fork() system call, each process needs its own context. **If you do zmq_ctx_new() in the main process before calling fork(), the child processes get their own contexts.** ## Making a Clean Exit Memory leaks are one thing, but ØMQ is quite finicky about how you exit an application. ### zmq_ctx_destroy() If you leave any sockets open, the zmq_ctx_destroy() function will hang forever. And even if you close all sockets, zmq_ctx_destroy() will by default wait forever if there are pending connects or sends, unless you set the LINGER to zero on those sockets before closing them. ### Exit Tips - Always close a message the moment you are done with it, using zmq_msg_close(). See below: ```clike= zmq_msg_t request; zmq_msg_init (&request); zmq_msg_recv (&request, responder, 0); zmq_msg_close (&request); ``` - If you are opening and closing a lot of sockets, that’s probably a sign that you need to redesign your application. - When you exit the program, close your sockets and then call zmq_ctx_destroy(). This destroys the context. ```clike= zmq_close (receiver); zmq_ctx_destroy (context); ``` ### Multithreaded 0MQ Exit Tips - First, do not try to use the same socket from multiple threads. - Next, you need to shut down each socket that has ongoing requests. The proper way is to set a low LINGER value (one second), and then close the socket. - Finally, destroy the context. This will cause any blocking receives or polls or sends in attached threads (i.e., which share the same context) to return with an error. Catch that error, and then set LINGER on and close sockets in that thread, and exit. #### LINGER Value ZeroMQ is designed to deliver messages as reliably as possible by default. One way it does this is by **allowing outgoing messages to 'linger' in their queues** even when the socket that sent them has been closed. The ZeroMQ communication **thread will hang around until all its outgoing messages have been sent** even if the socket is closed. To modify this behavior, we can set the zmq.LINGER on the socket, **setting a maximum amount of time** in milliseconds that the thread will try to send messages after its socket has been closed. ```clike= int linger_msec = 1000; zmq_setsockopt(socket, ZMQ_LINGER, (int *)&linger_msec, sizeof(linger_msec)); ``` ## Why we need 0MQ ### Message Broker Problem (AMQP) - It would mean adding an additional big box, and a new single point of failure. - Broker would become another big server to maintain. ### QMP Transports ![](https://i.imgur.com/7SSojYS.png) ### Features ![](https://i.imgur.com/kazFdbQ.png) - It handles I/O asynchronously, in background threads. These communicate with application threads using lock-free data structures, so concurrent ØMQ applications need no locks, semaphores, or other wait states. ![](https://i.imgur.com/igX1n0S.png) - Components can come and go dynamically, and ØMQ will automatically reconnect. - It queues messages automatically when needed. pushing messages as close as possible to the receiver before queuing them. - When a queue is full, ØMQ automatically blocks senders, or throws away messages, depending on the kind of messaging you are doing (the so-called “pattern”). - It lets your applications talk to each other over arbitrary transports: TCP, multicast, in-process, inter-process. You don’t need to change your code to use a different transport. - It lets you route messages using a variety of patterns, such as request-reply and publish-subscribe. ![](https://i.imgur.com/I8FJFwW.png) - It lets you create proxies to queue, forward, or capture messages with a single call. ![](https://i.imgur.com/hYIy2zo.png) - It delivers whole messages exactly as they were sent. If you write a 10KB message, you will receive a 10KB message. - It does not impose any format on messages. They are blobs of zero bytes to gigabytes large. ### Let developers focus on processing tasks Superficially, it’s a socket-inspired API on which you do zmq_msg_recv() and zmq_msg_send(). But the message processing loop rapidly becomes the central loop, and your application soon breaks down into a set of message processing tasks. And it scales: each of these tasks maps to a node, and the nodes talk to each other across arbitrary transports. Two nodes in one process (node is a thread), two nodes on one box (node is a process), or two boxes on one network (node is a box)—it’s all the same, with no application code changes. ### Socket Scalability ![](https://i.imgur.com/fz5QDDV.png) The weather server has a single socket, and yet here we have it **sending data to five clients in parallel.** So the ØMQ socket is acting like a little server, silently accepting client requests and shoving data out to them as fast as the network can handle it. **And it’s a multithreaded server**, squeezing more juice out of your CPU. ## 0MQ Socket and Connetion In the ØMQ universe, sockets are doorways to fast little background communications engines that **manage a whole set of connections automagically for you**. You can’t see, work with, open, close, or attach state to these connections. Whether you **use blocking send or receive or poll, all you can talk to is the socket**, not the connections it manages for you. The connections are private and invisible, and this is the key to ØMQ’s scalability. ## Multi-part Message ### Send multi-part message ```clike= /* Send a multi-part message consisting of three parts to socket */ rc = zmq_send (socket, &part1, ZMQ_SNDMORE); rc = zmq_send (socket, &part2, ZMQ_SNDMORE); /* Final part; no more parts to follow */ rc = zmq_send (socket, &part3, 0); ``` ### Reviece multi-part message ```clike= int64_t more; size_t more_size = sizeof more; do { /* Create an empty ØMQ message to hold the message part */ zmq_msg_t part; int rc = zmq_msg_init (&part); assert (rc == 0); /* Block until a message is available to be received from socket */ rc = zmq_recv (socket, &part, 0); assert (rc == 0); /* Determine if more message parts are to follow */ rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); assert (rc == 0); zmq_msg_close (&part); } while (more); ```