# POSIX IPC ###### tags: `POSIX`,`IPC`, `message queue`, `semphore`, `mmap`, `share memory` why need POSIX? > uniform interface: file descriptor(fd) [name=Yen-Kuan Wu] [unix interprocess communication](https://www.slideshare.net/guest4c9430/unix-interprocess-communication) # IPC Overview [linux.conf.au 2013: IPC_Overview](http://man7.org/conf/lca2013/IPC_Overview-LCA-2013-printable.pdf) [Video](https://www.youtube.com/watch?v=vU2HDf5ZhO4) >P28. POSIX message queue P43. shared memory P47. mmap P50. Shared anonymous mapping P53. Shared file mapping P58. POSIX shared memory P68. POSIX semaphore P100. IPC IDs and handles P102. IPC access permissions P104. persistence 79 sem_t * open(path注意,狀名要如何避免) pdf O_CREAT | O_EXCL=>確保只有一個人開啟,可以確保有沒有撞名 fd # POSIX Message Queue POSIX Message Queue The POSIX standard defines a message queue mechanism based on System V IPC's message queue, extending it by some functionalities: * Simple file-based interface to the application * Support for message priorities * Support for asynchronous notification * Timeouts for blocking operations > [stackoverflow](http://unix.stackexchange.com/questions/6930/how-is-a-message-queue-implemented-in-the-linux-kernel) ## Message Queue > Solaris * [Open and Create](https://zinascii.com/2014/a-posix-queue-implementation.html#open) * [Send](https://zinascii.com/2014/a-posix-queue-implementation.html#send) * [Receive](https://zinascii.com/2014/a-posix-queue-implementation.html#receive) # 重點提醒 11.1.3 mq_close() 僅是減少 resource reference count並不會刪除 11.2.1 p416 最下面,也重新提到這件事,要使用 mq_unlink(),才會刪除 ~~11.2.2 mq_attr 可以看到下面的 tracing code~~ 11.2.3 注意傳遞 msg 是可以指定 priority 的 11.2.4 mq_notify 下面會另開一小節來說明 ## mq_notify prototype: `int mq_notify(mqd_t mqdes, const struct sigevent *sevp)` > one shot ```clike= union sigval { int sigval_int; void *sigval_ptr; } struct sigevent { int sigev_notify;/* 1. SIGEV_NONE 2. SIGEV_SIGNAL 3. SIGEV_THREAD*/ int sigev_signo; union sigval sig_value; void (*sigev_notify_function)(union sigval); void *sigev_notify_attrutes; } ``` # sysconf Reference to: [yenWu Note](https://hackmd.io/GYYwhg7AzAnADAJgLRQEYIBxICwYwUyRgBMRikRsIBGEOfMYDAVmKA==#回傳系統資訊-in-c) :::warning We only tracing Linux Kernel 2.6.39.4 Version ::: ## Data Structure Let's trace include/linux/mqueue.h first * struct mq_attr * struct mqueue_inode_info * struct msg_queue * struct ext_wait_queue * [include/linux/mqueue.h: struct mq_attr](http://lxr.linux.no/linux+v2.6.39.4/include/linux/mqueue.h#L25) ```clike= struct mq_attr { long mq_flags; /* message queue flags */ * long mq_maxmsg; /* maximum number of messages */ * long mq_msgsize; /* maximum message size */ * long mq_curmsgs; /* number of messages currently queued */ long __reserved[4]; /* ignored for input, zeroed for output */ }; ``` * [ipc/mqueue.c: struct mqueue_inode_info](http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L58) ```clike= struct mqueue_inode_info { spinlock_t lock; * struct inode vfs_inode; * wait_queue_head_t wait_q; * struct msg_msg **messages; * struct mq_attr attr; struct sigevent notify; struct pid* notify_owner; struct user_struct *user; /* user who created, for accounting */ struct sock *notify_sock; struct sk_buff *notify_cookie; /* for tasks waiting for free space and messages, respectively */ * struct ext_wait_queue e_wait_q[2]; * unsigned long qsize; /* size of queue in memory (sum of all msgs) */ }; ``` * [include/linux/msg.h: struct msg_queue](http://lxr.linux.no/linux+v2.6.39.4/include/linux/msg.h#L88) ```clike= /* one msq_queue structure for each present queue on the system */ struct msg_queue { * struct kern_ipc_perm q_perm; time_t q_stime; /* last msgsnd time */ time_t q_rtime; /* last msgrcv time */ time_t q_ctime; /* last change time */ unsigned long q_cbytes; /* current number of bytes on queue */ unsigned long q_qnum; /* number of messages in queue */ unsigned long q_qbytes; /* max number of bytes on queue */ pid_t q_lspid; /* pid of last msgsnd */ pid_t q_lrpid; /* last receive pid */ * struct list_head q_messages; * struct list_head q_receivers; * struct list_head q_senders; }; ``` * [ipc/mqueue.c: struct ext_wait_queue](http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L51) ```clike= struct ext_wait_queue { /* queue of sleeping tasks */ struct task_struct *task; struct list_head list; struct msg_msg *msg; /* ptr of loaded message */ int state; /* one of STATE_* values */ }; ``` ### Operation > http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L1220 ```clike= *static const struct inode_operations mqueue_dir_inode_operionations = { .lookup = simple_lookup, * .create = mqueue_create, * .unlink = mqueue_unlink, }; *static const struct file_operations mqueue_file_operations = { .flush = mqueue_flush_file, * .poll = mqueue_poll_file, * .read = mqueue_read_file, .llseek = default_llseek, }; static const struct super_operations mqueue_super_ops = { * .alloc_inode = mqueue_alloc_inode, .destroy_inode = mqueue_destroy_inode, .evict_inode = mqueue_evict_inode, .statfs = simple_statfs, }; *static struct file_system_type mqueue_fs_type = { .name = "mqueue", .mount = mqueue_mount, .kill_sb = kill_litter_super, }; ``` * [ipc/mqueue.c: init_mqueue_fs()](http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L1272) ```clike= static int __init init_mqueue_fs(void) { ... mqueue_inode_cachep = kmem_cache_create("mqueue_inode_cache", sizeof(struct mqueue_inode_info), 0, SLAB_HWCACHE_ALIGN, init_once); ... /* ignore failures - they are not fatal */ mq_sysctl_table = mq_register_sysctl_table(); * error = register_filesystem(&mqueue_fs_type); ... spin_lock_init(&mq_lock); init_ipc_ns.mq_mnt = kern_mount_data(&mqueue_fs_type, &init_ipc_ns); if (IS_ERR(init_ipc_ns.mq_mnt)) { error = PTR_ERR(init_ipc_ns.mq_mnt); goto out_filesystem; } ... } ``` * static struct inode *mqueue_get_inode(struct super_block *sb, struct ipc_namespace *ipc_ns, int mode, struct mq_attr *attr) > http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L110 ```clike= static struct inode *mqueue_get_inode(struct super_block *sb, struct ipc_namespace *ipc_ns, int mode, struct mq_attr *attr) { struct user_struct *u = current_user(); struct inode *inode; inode = new_inode(sb); if (inode) { inode->i_ino = get_next_ino(); ... if (S_ISREG(mode)) { struct mqueue_inode_info *info; struct task_struct *p = current; unsigned long mq_bytes, mq_msg_tblsz; **inode->i_fop = &mqueue_file_operations; inode->i_size = FILENT_SIZE;** /* mqueue specific info */ info = MQUEUE_I(inode); spin_lock_init(&info->lock); init_waitqueue_head(&info->wait_q); INIT_LIST_HEAD(&info->e_wait_q[0].list); INIT_LIST_HEAD(&info->e_wait_q[1].list); info->notify_owner = NULL; info->qsize = 0; info->user = NULL; /* set when all is ok */ memset(&info->attr, 0, sizeof(info->attr)); info->attr.mq_maxmsg = ipc_ns->mq_msg_max; info->attr.mq_msgsize = ipc_ns->mq_msgsize_max; if (attr) { info->attr.mq_maxmsg = attr->mq_maxmsg; info->attr.mq_msgsize = attr->mq_msgsize; } mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *); info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL); if (!info->messages) goto out_inode; mq_bytes = (mq_msg_tblsz + (info->attr.mq_maxmsg * info->attr.mq_msgsize)); spin_lock(&mq_lock); if (u->mq_bytes + mq_bytes < u->mq_bytes || u->mq_bytes + mq_bytes > task_rlimit(p, RLIMIT_MSGQUEUE)) { spin_unlock(&mq_lock); /* mqueue_evict_inode() releases info->messages */ goto out_inode; } u->mq_bytes += mq_bytes; spin_unlock(&mq_lock); /* all is ok */ info->user = get_uid(u); } else if (S_ISDIR(mode)) { inc_nlink(inode); /* Some things misbehave if size == 0 on a directory */ inode->i_size = 2 * DIRENT_SIZE; inode->i_op = &mqueue_dir_inode_operations; inode->i_fop = &simple_dir_operations; } } return inode; out_inode: iput(inode); return NULL; } ``` http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L295 ```clike= static int mqueue_create(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd) { struct inode *inode; struct mq_attr *attr = dentry->d_fsdata; int error; struct ipc_namespace *ipc_ns; spin_lock(&mq_lock); ipc_ns = __get_ns_from_inode(dir); if (!ipc_ns) { error = -EACCES; goto out_unlock; } if (ipc_ns->mq_queues_count >= ipc_ns->mq_queues_max && !capable(CAP_SYS_RESOURCE)) { error = -ENOSPC; goto out_unlock; } ipc_ns->mq_queues_count++; spin_unlock(&mq_lock); inode = mqueue_get_inode(dir->i_sb, ipc_ns, mode, attr); if (!inode) { error = -ENOMEM; spin_lock(&mq_lock); ipc_ns->mq_queues_count--; goto out_unlock; } put_ipc_ns(ipc_ns); dir->i_size += DIRENT_SIZE; dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME; d_instantiate(dentry, inode); dget(dentry); return 0; out_unlock: spin_unlock(&mq_lock); if (ipc_ns) put_ipc_ns(ipc_ns); return error; } ``` http://lxr.linux.no/linux+v2.6.39.4/ipc/mqueue.c#L350 ```clike= /* * This is routine for system read from queue file. * To avoid mess with doing here some sort of mq_receive we allow * to read only queue size & notification info (the only values * that are interesting from user point of view and aren't accessible * through std routines) */ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data, size_t count, loff_t *off) { struct mqueue_inode_info *info = MQUEUE_I(filp->f_path.dentry->d_inode); char buffer[FILENT_SIZE]; ssize_t ret; spin_lock(&info->lock); snprintf(buffer, sizeof(buffer), "QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n", info->qsize, info->notify_owner ? info->notify.sigev_notify : 0, (info->notify_owner && info->notify.sigev_notify == SIGEV_SIGNAL) ? info->notify.sigev_signo : 0, pid_vnr(info->notify_owner)); spin_unlock(&info->lock); buffer[sizeof(buffer)-1] = '\0'; ret = simple_read_from_buffer(u_data, count, off, buffer, strlen(buffer)); if (ret <= 0) return ret; filp->f_path.dentry->d_inode->i_atime = filp->f_path.dentry->d_inode->i_ctime = CURRENT_TIME; return ret; } ``` # POSIX Semphore [Semphore API](http://ozark.hendrix.edu/~leonard/420-01/2-23-10.pdf) [Futex Scaling for Muti-core System](https://www.slideshare.net/davidlohr/futex-scaling-for-multicore-systems) > SUSE [An Overview of Kernel Lock Improvements](http://events.linuxfoundation.org/sites/events/files/slides/linuxcon-2014-locking-final.pdf) > LinuxCon NA, 2014 * http://lxr.linux.no/linux+v2.6.39.4/include/linux/semaphore.h#L15 ```clike= /* Please don't access any members of this structure directly */ struct semaphore { spinlock_t lock; unsigned int count; struct list_head wait_list; }; ``` * http://lxr.linux.no/linux+v2.6.39.4/include/linux/semaphore.h#L32 ```clike= static inline void sema_init(struct semaphore *sem, int val) { static struct lock_class_key __key; *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val); lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0); } ``` * http://lxr.linux.no/linux+v2.6.39.4/kernel/semaphore.c#L42 ```clike= /** * down - acquire the semaphore * @sem: the semaphore to be acquired * * Acquires the semaphore. If no more tasks are allowed to acquire the * semaphore, calling this function will put the task to sleep until the * semaphore is released. * * Use of this function is deprecated, please use down_interruptible() or * down_killable() instead. */ void down(struct semaphore *sem) { unsigned long flags; spin_lock_irqsave(&sem->lock, flags); if (likely(sem->count > 0)) sem->count--; else __down(sem); spin_unlock_irqrestore(&sem->lock, flags); } EXPORT_SYMBOL(down); ``` * http://lxr.linux.no/linux+v2.6.39.4/kernel/semaphore.c#L171 ```clike= /** * up - release the semaphore * @sem: the semaphore to release * * Release the semaphore. Unlike mutexes, up() may be called from any * context and even by tasks which have never called down(). */ void up(struct semaphore *sem) { unsigned long flags; spin_lock_irqsave(&sem->lock, flags); if (likely(list_empty(&sem->wait_list))) sem->count++; else __up(sem); spin_unlock_irqrestore(&sem->lock, flags); } EXPORT_SYMBOL(up); ``` ## POSIX MMAP http://lxr.linux.no/linux+v2.6.39.4/arch/arm/mm/mmap.c ## POSIX Share Memory http://lxr.linux.no/linux+v2.6.39.4/include/linux/shmem_fs.h http://lxr.linux.no/linux+v2.6.39.4/mm/shmem.c http://lxr.linux.no/linux+v2.6.39.4/include/linux/shm.h http://lxr.linux.no/linux+v2.6.39.4/include/linux/mm.h