# 51. Introduction To POSIX IPC Portable Operating System Interface - Message queues: - be used to pass messages between processes. - message boundaries are preserved, so that readers and writers **communicate in units of messages** - **permit each message to be assigned a priority**, which allows high-priority messages to be queued ahead of low-priority messages - Semaphores: - a kernel-maintained **integer whose value is never permitted to go below 0**. - using two operations that **increase and decrease a semaphore’s value by one** - Shared memory: - enables multiple processes to **share the same region of memory** - POSIX shared memory **provides fast IPC**. - Once one process has updated the shared memory, the change is immediately visible to other processes sharing the same region. ### API Overview - Summary of programming interfaces for POSIX IPC objects: ![](https://i.imgur.com/B9lIW03.png) #### IPC object names - **a name consisting of an initial slash**, followed by one of more nonslash characters; **forexample, /myobject.** #### Creating or opening an IPC object - Each IPC mechanism has an associated **open call** (mq_open(), sem_open(), or shm_open()), which is **analogous to the traditional UNIX open()** system call used for files. Given an IPC object name, the IPC open call either - The handle returned by the IPC open call **is analogous to the file descriptor** returned by the traditional open() system call - message queues: return a message queue descriptor, a value of type mqd_t - semaphores: a pointer of type sem_t * - shared memory: a file descriptor. - All of the IPC open calls permit at least three arguments — **name, oflag, and mode**: - **oflag** is a bit mask that can include at least the following flags: ![](https://i.imgur.com/PDn8PDM.png) - **mode** is a bit mask specifying the permissions to be placed on a new object - The values that may be specified for mode are the **same as for files** - The ownership and group ownership of a new IPC object are taken from the effective user and group IDs of the process making the IPC open call. - Ref: https://blog.csdn.net/bestkilly/article/details/52402676 ``` fd = shm_open("/mymem", O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); ``` #### Closing an IPC object - `close`: For POSIX message queues and semaphores - `munmap`: For POSIX shared memory object #### IPC object permissions - IPC objects have a permissions mask that is the same as for files (rxw) #### IPC object deletion and object persistence - As with open files, POSIX IPC objects are `reference counted` - kernel maintains a count of the number of open references to the object. - makes it easier for applications to determine when the object can be safely deleted. - Each IPC object has a corresponding `unlink` - analogous to the traditional `unlink()` system call for files. - immediately removes the object’s name, and then destroys the object once all processes cease using it - After an object is unlinked, IPC open calls specifying the same object name will refer to a new object - POSIX IPC objects have kernel persistence. **Once created, an object continues to exist until it is unlinked or the system is shut down.** ## Comparison of System V IPC and POSIX IPC Ref: 1. https://www.cnblogs.com/philip-tell-truth/p/6284475.html 2. https://stackoverflow.com/questions/4582968/system-v-ipc-vs-posix-ipc#:~:text=System%20V%20IPC%20is%20older,are%20given%20with%20integer%20number. ## Summary - POSIX IPC is the general name given to three IPC mechanisms—**message queues, semaphores, and shared memory** - POSIX IPC interface is more consistent with the traditional UNIX file model. IPC objects - identified by `names` - managed using `open, close, and unlink` # 52 POSIX MESSAGE QUEUES ## Overview - main functions in the POSIX message queue API are the following: - `mq_open()`: creates a new message queue or opens an existing queue, returning a message queue descriptor for use in later calls - `mq_send()` writes a message to a queue. - `mq_receive()` reads a message from a queue - `mq_close()` closes a message queue that the process previously opened. - `mq_unlink()` removes a message queue name and marks the queue for deletion when all processes have closed it. - `mq_getattr()` and `mq_setattr()`: set and get attributes - `mq_notify()` allows a process to register for message notification from a queue - After registering, the process is notified of the availability of a message by **delivery of a signal** or by the **invocation of a function in a separate thread**. ### Opening, Closing, and Unlinking a Message Queue #### Opening a message queue `mq_open()`: Opening a message queue ![](https://i.imgur.com/e9avNAL.png) - oflag argument: ![](https://i.imgur.com/EGaHxBD.png) - `O_NONBLOCK` : If a subsequent call to mq_receive() or mq_send() can’t be performed without blocking, the call will fail immediately with the error EAGAIN - if O_CREAT is specified in flags, two further arguments are required: `mode` and `attr`. - `attr`: `mq_attr` structure that specifies attributes for the new message queue, describe in Section 52.4. - Upon successful completion, `mq_open()` returns a `message queue descriptor`, a value of type `mqd_t` - Effect of `fork()`, `exec()`, and process termination on message queue descriptors - During a fork(), the child process receives copies of its parent’s message queue descriptors, - child doesn’t inherit any of its parent’s message notification registrations. #### Closing a message queue ` mq_close()`: closes the message queue descriptor mqdes. ![](https://i.imgur.com/KJvN9Ev.png) - **If the calling process has registered via mqdes** for message notification from the queue (Section 52.6), then the notification **registration is automatically removed**, and **another process can subsequently register** for message notification from the queue. - A message queue descriptor is automatically closed when a process terminates or calls `exec()`. - As `close()` for files, closing a message queue doesn’t delete it. For that purpose, we need `mq_unlink()`, which is the message queue analog of `unlink()`. #### Removing a message queue `mq_unlink()`: removes the message queue identified by name and marks the queue to be destroyed once all processes cease using it ## Relationship Between Descriptors and Message Queues - relationship between a message queue descriptor and an open message queue is analogous to the relationship between a file descriptor and an open file ![](https://i.imgur.com/tDWSfDM.png) - An open message queue description has an associated set of flags. eg `O_NONBLOCK` - Two processes can hold message queue descriptors that refer to the same open message queue description. - occur because a process opens a message queue and then calls fork() - Two processes can hold open message queue descriptors that refer to different message queue descriptions that refer to the same message queue ## Message Queue Attributes mq_open(), mq_getattr(), and mq_setattr() functions all permit an argument that is a pointer to an mq_attr structure: ![](https://i.imgur.com/2Ffw3kj.png) - Only some of the fields are used by each of the three functions. - The structure contains information about the open message queue description - Some of the fields contain information that is **fixed at the time the queue is created with `mq_open()`**; the others return information about the **current state of the message queue description** (mq_flags) or message queue (mq_curmsgs). ### Setting message queue attributes during queue creation - `mq_maxmsg`: defines the **limit on the number of messages** that can be placed on the queue using mq_send(). This value must be greater than 0. - `mq_msgsize`: defines the **upper limit on the size of each message** that may be placed on the queue. This value must be greater than 0. ``` c= #include <mqueue.h> #include <sys/stat.h> #include <fcntl.h> #include "tlpi_hdr.h" static void usageError(const char *progName) { fprintf(stderr, "Usage: %s [-cx] [-m maxmsg] [-s msgsize] mq-name " "[octal-perms]\n", progName); fprintf(stderr, " -c Create queue (O_CREAT)\n"); fprintf(stderr, " -m maxmsg Set maximum # of messages\n"); fprintf(stderr, " -s msgsize Set maximum message size\n"); fprintf(stderr, " -x Create exclusively (O_EXCL)\n"); exit(EXIT_FAILURE); } int main(int argc, char *argv[]) { int flags, opt; mode_t perms; mqd_t mqd; struct mq_attr attr, *attrp; attrp = NULL; attr.mq_maxmsg = 50; attr.mq_msgsize = 2048; flags = O_RDWR; /* Parse command-line options */ while ((opt = getopt(argc, argv, "cm:s:x")) != -1) { switch (opt) { case 'c': flags |= O_CREAT; break; case 'm': attr.mq_maxmsg = atoi(optarg); attrp = &attr; break; case 's': attr.mq_msgsize = atoi(optarg); attrp = &attr; break; case 'x': flags |= O_EXCL; break; default: usageError(argv[0]); } } if (optind >= argc) usageError(argv[0]); perms = (argc <= optind + 1) ? (S_IRUSR | S_IWUSR) : getInt(argv[optind + 1], GN_BASE_8, "octal-perms"); mqd = mq_open(argv[optind], flags, perms, attrp); if (mqd == (mqd_t) -1) errExit("mq_open"); exit(EXIT_SUCCESS); } ``` ### Retrieving message queue attributes `mq_getattr()`: returns an `mq_attr` structure containing **information about the message queue descriptio**n and the message queue associated with the descriptor mqdes. ![](https://i.imgur.com/JDhIVEb.png) ```c #include <mqueue.h> #include "tlpi_hdr.h" int main(int argc, char *argv[]) { mqd_t mqd; struct mq_attr attr; if (argc != 2 || strcmp(argv[1], "--help") == 0) usageErr("%s mq-name\n", argv[0]); mqd = mq_open(argv[1], O_RDONLY); if (mqd == (mqd_t) -1) errExit("mq_open"); if (mq_getattr(mqd, &attr) == -1) errExit("mq_getattr"); printf("Maximum # of messages on queue: %ld\n", attr.mq_maxmsg); printf("Maximum message size: %ld\n", attr.mq_msgsize); printf("# of messages currently on queue: %ld\n", attr.mq_curmsgs); exit(EXIT_SUCCESS); } ``` - In the following shell session, we use the program in Listing above to create a message queue with implementation-defined default attributes ![](https://i.imgur.com/Cmcy1Am.png) ### Modifying message queue attribute `mq_setattr()`: ![](https://i.imgur.com/kZrWG9F.png) - If oldattr is non-NULL, it returns an mq_attr structure containing the previous message queue description flags and message queue attributes - **The only attribute that SUSv3 specifies that can be changed** using mq_setattr() is the state of the O_NONBLOCK flag. ![](https://i.imgur.com/A8qlPMO.png) ## Exchanging Messages - functions that are used to send messages to and receive messages from a queue. ### Sending Messages `mq_send()`: adds the message in the buffer pointed to by `msg_ptr` to the message queue referred to by the descriptor `mqdes`. ![](https://i.imgur.com/iycwawP.png) - `msg_len`: specifies the length of the message pointed to by `msg_ptr` - `msg_prio`: nonnegative integer priority. Messages are ordered within the queue in descending order of priority - When a new message is added to the queue, it is placed after any other messages of the same priority. (FILO) - If an application doesn’t need to use message priorities, it is sufficient to always specify msg_prio as 0. - priorities at least in the range 0 to 31, depend on `MQ_PRIO_MAX` ```c= #include <mqueue.h> #include <fcntl.h > /*For definition of O_NONBLOCK */ #include "tlpi_hdr.h" static void usageError(const char *progName) { fprintf(stderr, "Usage: %s[-n] name msg[prio]\n", progName); fprintf(stderr, " -n Use O_NONBLOCK flag\n"); exit(EXIT_FAILURE); } int main(int argc, char *argv[]) { int flags, opt; mqd_t mqd; unsigned int prio; flags = O_WRONLY; while ((opt = getopt(argc, argv, "n")) != -1) { switch (opt) { case 'n': flags |= O_NONBLOCK; break; default: usageError(argv[0]); } } if (optind + 1 >= argc) usageError(argv[0]); mqd = mq_open(argv[optind], flags); if (mqd == (mqd_t) - 1) errExit("mq_open"); prio = (argc > optind + 2) ? atoi(argv[optind + 2]) : 0; if (mq_send(mqd, argv[optind + 1], strlen(argv[optind + 1]), prio) == -1) errExit("mq_send"); exit(EXIT_SUCCESS); } ``` ### Receiving Messages `mq_receive()`: removes the oldest message with the highest priority from the message queue referred to by `mqdes` and returns that message in the buffer pointed to by `msg_ptr`. ![](https://i.imgur.com/pkJOLPV.png) - `msg_len`: specify the number of bytes of space available in the buffer pointed to by `msg_ptr`. - `msg_len` must be greater than or equal to the `mq_msgsize` - `msg_prio`: If `msg_prio` is not NULL, then the priority of the received message is copied into the location pointed to by `msg_pri` ```clike= #include <mqueue.h> #include <fcntl.h> /* For definition of O_NONBLOCK */ #include "tlpi_hdr.h" static void usageError(const char * progName) { fprintf(stderr, "Usage: %s [-n] name\n", progName); fprintf(stderr, " -n Use O_NONBLOCK flag\n"); exit(EXIT_FAILURE); } int main(int argc, char * argv[]) { int flags, opt; mqd_t mqd; unsigned int prio; void * buffer; struct mq_attr attr; ssize_t numRead; flags = O_RDONLY; while ((opt = getopt(argc, argv, "n")) != -1) { switch (opt) { case 'n': flags |= O_NONBLOCK; break; default: usageError(argv[0]); } } if (optind >= argc) usageError(argv[0]); mqd = mq_open(argv[optind], flags); if (mqd == (mqd_t) - 1) errExit("mq_open"); if (mq_getattr(mqd, & attr) == -1) errExit("mq_getattr"); buffer = malloc(attr.mq_msgsize); if (buffer == NULL) errExit("malloc"); numRead = mq_receive(mqd, buffer, attr.mq_msgsize, & prio); if (numRead == -1) errExit("mq_receive"); printf("Read %ld bytes; priority = %u\n", (long) numRead, prio); if (write(STDOUT_FILENO, buffer, numRead) == -1) errExit("write"); write(STDOUT_FILENO, "\n", 1); exit(EXIT_SUCCESS); } ``` #### Example - Send ![](https://i.imgur.com/ZLEqRSE.png) - Receive ![](https://i.imgur.com/O16XupY.png) - Continue Receive... ![](https://i.imgur.com/rd1liDu.png) - Nonblocking receive ![](https://i.imgur.com/nq5CSuB.png) ### Sending and Receiving Messages with a Timeout ![](https://i.imgur.com/x5e1yxk.png) - If a call to `mq_timedsend()` or `mq_timedreceive()` times out without being able to complete its operation, then the call fails with the error ETIMEDOUT. ## Message Notification ![](https://hackmd.io/_uploads/SJQnNedEn.png) `mq_notify()`: registers the calling process to receive a notification when a message arrives on the empty queue referred to by the descriptor `mqdes`. - At any time, **only one process** (“the registered process”) can be registered to receive a notification from a particular message queue - The registered process is notified only when a new message arrives on a queue that was **previously empty**. - After one notification is sent to the registered process, **the registration is removed**, and any process can then register itself for notification. - The registered process is notified only **if some other process is not currently blocked in a call to `mq_receive()`** for the queue. - A process can explicitly deregister itself as the target for message notification by calling `mq_notify()` with a notification argument of **NULL**. ![](https://i.imgur.com/yVw4334.png) The sigev_notify field of this structure: - `SIGEV_NONE`: Register this process for notification, but when a message arrives on the previously empty queue, don’t actually notify the process. - `SIGEV_SIGNAL`: Notify the process by generating the signal specified in the `sigev_signo` field. - `SIGEV_THREAD`: Notify the process by calling the function specified in `sigev_notify_function` as if it were the start function in a new thread. ### Receiving Notification via a Signal 1. init a nonblocking `mqd` 2. set `mqd` attr 3. setup buff 4. Block the notification signal 5. Make an initial call to `mq_notify()` to register the process 6. Call `sigsuspend()`, which unblocks the notification signal and waits until the signal is caught 7. Call `mq_notify()` to reregister this process to receive message notification 8. Execute a while loop that drains the queue by reading as many messages as possible ![](https://i.imgur.com/xlOiPDe.png) ![](https://i.imgur.com/0TjuPBU.png) ### Receiving Notification via a Thread - When message notification occurs, the program **reenables notification before draining the queue** `2` - Nonblocking mode is employed so that, after receiving a notification, we can completely drain the queue without blocking `5` ![](https://i.imgur.com/PwX83sN.png) ![](https://i.imgur.com/FUoNPhL.png) ## Linux-Specific Features ### Displaying and deleting message queue objects via the command line POSIX IPC objects are **implemented as files** in virtual file systems, and that these files **can be listed and removed with ls and rm**. In order to do this with POSIX message queues, we must mount the message queue file system using a command of the following form: ``` mount -t mqueue source target ``` Example: ```shell= $ su Password: # mkdir /dev/mqueue # mount -t mqueue none /dev/mqueue ``` ```shell= $ cat /proc/mounts | grep mqueue none /dev/mqueue mqueue rw 0 0 $ ls -ld /dev/mqueue drwxrwxrwt 2 root root 40 Jul 26 12:09 /dev/mqueue ``` ```shell= $ ./pmsg_create -c /newq $ ls /dev/mqueue newq $ rm /dev/mqueue/newq ``` ### Obtaining information about a message queue ```shell= $ ./pmsg_create -c /mq $ ./pmsg_send /mq abcdefg $ cat /dev/mqueue/mq QSIZE:7 NOTIFY:0 SIGNO:0 NOTIFY_PID:0 ``` ## Message Queue Limits ![](https://i.imgur.com/905g0Qm.png) ![](https://i.imgur.com/sLmY0Uw.png) ## Summary - POSIX message queues allow processes to exchange data in the form of messages. - Each message has an associated integer priority, and messages are queued