# Linux 核心設計: RCU 同步機制 以概念來說,RCU 要使用在有大量讀取要求,但只有少量的寫入要求,且不會太在意讀取到舊資料的情況。 最簡單的想法就是當有資料要進行寫入時,先複製一份並修改,在其修改後所有讀取必須讀到新的資料,但如果在寫入前就在讀取資料的話,則那些正在讀取的資料還是會讀到舊資料,等到沒有用戶在讀取舊資料了,再把新資料拿來替代就資料並刪除就資料。 也因此, RCU 非常重要的應用就是在 linked-list。 ## RCU 在核心模組的應用: 在 [rculist](https://github.com/torvalds/linux/blob/master/include/linux/rculist.h) 中,就有提供了大量的 rcu 函式,以 replace 舉例來說: ```c static inline void list_replace_rcu(struct list_head *old, struct list_head *new) { new->next = old->next; new->prev = old->prev; rcu_assign_pointer(list_next_rcu(new->prev), new); new->next->prev = new; old->prev = LIST_POISON2; } ``` 先將 new 的內容複製為 old 的內容後,透過 rcu_assign_pointer() 函式進行修改,這邊值得一提的是其使用 list_next_rcu,因為實際上要修改的指標是 old 的前一個的 next 指標,而非 old 指標,以圖例來說明的話: ```graphviz digraph _graph_name_ { rankdir=LR; l1[label = "list1"] l2[label = "list2"] l3[label = "l3"] new[label = "new"] old[penwidth=0]; l1->l2[color = red] l2->l3 new->l3 old->l2 } ``` 實際上我們要修改的是紅色那個指標指向的位址,而不是 old 指標。 ```graphviz digraph _graph_name_ { rankdir=LR; l1[label = "list1"] l2[label = "list2"] l3[label = "l3"] new[label = "new"] old[penwidth=0]; l1->new[color = red] l2->l3 new->l3 old->l2 } ``` 參考 [rcu_example](https://github.com/jinb-park/rcu_example) ,一個簡單的 driver 演示了 rcu 函式的使用: 先 clone 下來後,因為專案內已經有寫好 Makefile,直接 make 就會產生 .ko 檔,透過 ismod 加入即可。 ```c static int list_rcu_example_init(void) { spin_lock_init(&books_lock); test_example(0); test_example(1); return 0; } ``` 因為程式已經把範例內容寫在 init 內, 載入後檢查 dmesg 即可看到執行狀況: ```c [68873.600792] book1 borrow : 0 [68873.600793] book2 borrow : 0 [68873.600795] borrow success 0, preempt_count : 0 [68873.600797] borrow success 1, preempt_count : 0 [68873.600799] id : 0, name : book1, author : jb, borrow : 1, addr : ffff95660c06c600 [68873.600801] id : 1, name : book2, author : jb, borrow : 1, addr : ffff95660c06d980 ``` 解釋其行為: 以 add_book 為例,先產生一個 book ,再透過 list_add_rcu 來加入。 ```c b = kzalloc(sizeof(struct book), GFP_KERNEL); if(!b) return; b->id = id; strncpy(b->name, name, sizeof(b->name)); strncpy(b->author, author, sizeof(b->author)); b->borrow = 0; /** * list_add_rcu * * add_node(writer - add) use spin_lock() */ spin_lock(&books_lock); list_add_rcu(&b->node, &books); spin_unlock(&books_lock); ``` list_add_rcu: ```c static inline void __list_add_rcu(struct list_head *new, struct list_head *prev, struct list_head *next) { if (!__list_add_valid(new, prev, next)) return; new->next = next; new->prev = prev; rcu_assign_pointer(list_next_rcu(prev), new); next->prev = new; } ``` 傳入的 new 就是新建立的節點,prev 是前一項,next 是後一項,而這個函式傳入的 prev 是 head, next 是 head->next,因此實際上就是把 new 透過 rcu_assign_pointer 插入到最前面。 如果要借書,就必須修改當前 borrow 為 1 ```c static int borrow_book(int id, int async) { struct book *b = NULL; struct book *new_b = NULL; struct book *old_b = NULL; /** * updater * * (updater) require that alloc new node & copy, update new node & reclaim old node * list_replace_rcu() is used to do that. */ rcu_read_lock(); list_for_each_entry(b, &books, node) { if(b->id == id) { if(b->borrow) { rcu_read_unlock(); return -1; } old_b = b; break; } } if(!old_b) { rcu_read_unlock(); return -1; } new_b = kzalloc(sizeof(struct book), GFP_ATOMIC); if(!new_b) { rcu_read_unlock(); return -1; } memcpy(new_b, old_b, sizeof(struct book)); new_b->borrow = 1; spin_lock(&books_lock); list_replace_rcu(&old_b->node, &new_b->node); spin_unlock(&books_lock); rcu_read_unlock(); if(async) { call_rcu(&old_b->rcu, book_reclaim_callback); }else { synchronize_rcu(); kfree(old_b); } pr_info("borrow success %d, preempt_count : %d\n", id, preempt_count()); return 0; } ``` 這段程式碼中,首先因為要先讀取整個 list,要先尋找要借的書是否存在,並檢查是否已經被借走,因此使用 rcu_read_lock 和 rcu_read_unlock 來設定進入時間,假如找到了,複製一份該 book 並將 borrow 修改為 1 ,並使用 sychronize_rcu 來等之前使用舊資料的用戶都不會再使用後將 old_book 刪除。 :::info 按照 rcu 的規則,這樣似乎會造成兩個人同時借到當時還沒有被界走的書? ::: 其餘的還書、刪書都是類似操作。