# 2021q3 Homework2 (hp)
contributed by < [`yian02`](https://github.com/yian02) >
###### tags: `linux2021`
# 1. 解釋程式碼運作原理
## 研讀 [A Pragmatic Implementation of Non Blocking Linked List](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf)
### `search` operation
`insert` 和 `delete` 都會需要先使用 `search` 找到插入位置或要刪除的 node 後,再進行相對應的操作。
在論文中, search 結束後會得到兩個資訊: left node 和 right node,論文中有說明確保其符合以下條件:
1. left node 的 key 比要尋找的 key 小
2. right node 的 key 大於等於要尋找的 key
3. left node 和 right node 都要是 unmarked 的
4. left node 和 right node 要是緊鄰的 (也就是 search 的操作會去刪除在 left node 和 right node 中間被 marked 的 node)
在論文中,`search` 函式被分為三個部分:
1. 遍歷 list,找出第一個 node 中的 key 大於等於要搜尋的 key 的 node,此為 right node (left node 在此時是 right node 的前一個未被標記為刪除的 node)
2. 檢查 left node 是否緊鄰 right node,若是即回傳,若否則繼續到步驟 3.
3. 將 left node 和 right node 中間被標記為刪除的 node 刪除
:::info
:bulb: 在此論文中,是使用 `node->next` 來標註此 node 是否邏輯上被刪除。
:::
## `insert` operation
共需三個步驟:
1. 使用 search 找到要插入的位置
2. 將要插入的 node 的 `node->next` 設為 `right_node` (此步驟不會影響到原本的 list,因此用一般的賦值方式即可)
3. 使用 CAS 將 `left_node.next` 內儲存的記憶體位置從原本的 `right_node` 換成要插入的 node。(此步驟會影響到原本的 list,需要使用 CAS)
## `delete` operation
`delete` 需要兩個 (或以上,如果第一次嘗試刪除失敗的話) 的 CAS 操作來完成:
1. 使用 `search` 找到要刪除的 node (若找到會儲存在 `right_node` 中)
2. 先使用 `right_node_next` 變數來儲存 `right_node->next` 的值,並確認其不是被標註為刪除的
3. 使用 CAS 標記 `right_node->next`,代表 `right_node` 在邏輯上已被刪除 (若此 CAS 操作失敗,則回到步驟 1. 重新開始)
4. 用 CAS 嘗試將 `left_node->next` 修改為之前儲存的 `right_node_next`,此操作成功的條件為 `left_node` 和 `right_node` 依舊是相鄰的。操作成功即回傳
5. 若步驟 4. 的 CAS 失敗,則會呼叫 `search` ,透過 `search` 會刪除找到的 `right_node` 和 `left_node` 間被標註的 node 的特性,嘗試將剛剛標註的 node 刪除。
## 測驗題程式碼運作原理
### Hazard Pointer
Hazard pointer 負責管理與決定記憶體釋放的時機,在某個 thread 要使用一個特定的記憶體位置時,會將該記憶體位置加入自己的 hazard pointer list 中,一旦使用完畢則會將其清除。若某個特定的 thread 要釋放特定的記憶體空間,就要先去檢查是否有其他 thread 在使用該記憶體位置的資料,確定安全後才能真正的釋放空間。
先從 hazard pointer 中所使用到的資料結構來看:
#### Retire List
```c
typedef struct {
int size;
uintptr_t *list;
} retirelist_t;
```
`retirelist_t` 會儲存已經被 delete 但空間還未被釋放的 pointer,一旦程式確定 retire list 中儲存的 pointer 已經沒有人在使用之後,就會釋放記憶體空間。在 `retire_list_t` 中,使用一個陣列 `unitptr_t *list` 來儲存準備要被釋放的 pointer 之外,也會儲存這個 retire list 目前的大小。
#### Hazard Pointer List
```c
#define CLPAD (128 / sizeof(uintptr_t))
struct list_hp {
int max_hps;
alignas(128) atomic_uintptr_t *hp[HP_MAX_THREADS];
alignas(128) retirelist_t *rl[HP_MAX_THREADS * CLPAD];
list_hp_deletefunc_t *deletefunc;
};
```
`list_hp` 結構中,使用型態為 `atomic_unitptr_t` 的二維陣列來儲存 hazard pointer,並使用型態為 `retirelist_t` 的二維陣列來儲存 retire list。這個結構中也儲存了最大容許的 hazard pointer 數量,和在從記憶體中釋放 hazard pointer 所要呼叫的函式 `deletefunc` 的 function pointer。
此程式中的每一個 thread 都可以透過呼叫 `static inline tid()` 還取得自己的編號:
```c
static thread_local int tid_v = TID_UNKNOWN;
static atomic_int_fast32_t tid_v_base = ATOMIC_VAR_INIT(0);
static inline int tid(void)
{
if (tid_v == TID_UNKNOWN) {
tid_v = atomic_fetch_add(&tid_v_base, 1);
assert(tid_v < HP_MAX_THREADS);
}
return tid_v;
}
```
這個函式的運作原理,是透過累加一個所有 thread 的共享 atomic 變數 `tid_v_base`,來將每個 thread 編號。每個 thread 在初始化時,其 thread local 的變數 `tid_v` 會被初始化為 `TID_UNKNOWN`,在呼叫 `static inline int tid(void)` 時,會使用 atomic 操作將 `tid_v_base` 加一,並用加完後的回傳值當作自己的 thread ID。
而將記憶體位置加入 hazard pointer 中的函式為以下兩者:
```c
uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr)
uintptr_t list_hp_protect_release(list_hp_t *hp, int ihp, uintptr_t ptr)
```
兩個函式都使用 `atomic_store_explicit` 並根據所在的 thread 將 pointer 存入 hazard pointer 中。兩個函式的差異只是一個是使用預設最嚴格的 memeory order,另一個是使用指定的 `memory_order_release`。
負責將準備釋放空間的 pointer 放入 retire list 的,為以下函式:
```c
void list_hp_retire(list_hp_t *hp, uintptr_t ptr)
```
這個函式會將傳入的 `ptr` 根據所在的 thread,放入相對應的 retire list 中,並更新 retire list 中儲存的 pointer 個數,若該數目大於 `HP_THRESHOLD_R` ,則會試圖將該 thread 的 retire list 中的 pointer 釋放 (預設的 `HP_THRESHOLD_R` 為 0,代表每次只要將 pointer retire,就會試圖去釋放空間)。
上述函式是透過遍歷所有 thread 所擁有的 hazard pointer 陣列,確定自己準備釋放的記憶體位置沒有在任何 thread 的 hazard pointer 陣列中,即可執行預先給定的 call back 函式 `deletefunc` 來釋放記憶體空間。在確定要釋放記憶體空間時,會將在 retire list 的下一個 pointer 寫入目前的位置,就是將放在後面的人往前放的意思,可以加速之後的搜尋速度。
## Linked list
### 結構
#### list node
```c
typedef uintptr_t list_key_t;
typedef struct list_node {
alignas(128) uint32_t magic;
alignas(128) atomic_uintptr_t next;
list_key_t key;
} list_node_t;
/* Per list variables */
typedef struct list {
atomic_uintptr_t head, tail;
list_hp_t *hp;
} list_t;
```
- `list_node_t` 中的 `next` 因為會在被(邏輯)上刪除時被標記,標記的動作需要使用到 atomic 操作,所以需使用 `atomic_uintpre_t`
- `list_t` 中會儲存兩個 dummy heads `head` and `tail`
### 操作
#### mark
```c
#define is_marked(p) (bool) ((uintptr_t)(p) &0x01)
#define get_marked(p) ((uintptr_t)(p) | (0x01))
#define get_unmarked(p) ((uintptr_t)(p) & (~0x01))
```
- 透過作業系統定址的特性,將永遠都是 0 的 LSB 拿來當作標記使用
#### insert
`list_insert` 的函式會告如下:
```c=1
bool list_insert(list_t *list, list_key_t key)
{
list_node_t *curr = NULL, *next = NULL;
atomic_uintptr_t *prev = NULL;
list_node_t *node = list_node_new(key);
while (true) {
/* If the key is already stored in the list, destroy the new node, clear
* the hazard pointer, then return false.
* (The insert operation fails if the key is already in the list)
*/
if (__list_find(list, &key, &prev, &curr, &next)) {
list_node_destroy(node);
list_hp_clear(list->hp);
return false;
}
atomic_store_explicit(&node->next, (uintptr_t) curr,
memory_order_relaxed);
uintptr_t tmp = get_unmarked(curr);
if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) {
list_hp_clear(list->hp);
return true;
}
}
}
```
透過 `__list_find` 函式,先搜尋要插入的 `key` 是否已經存在在 list 中,若存在即插入失敗。若不存在,則開始將新的 node 插入 list。
插入的步驟如下:
1. 先將新的 node 的 `next` 透過 atomic 操作變為 `__list_find` 所找到的 `curr` (此步驟不會變更原本的 list,因此可以使用 `relaxed`) (若 `__list_find` 尋找成功的話,會自動將找到的 pointer 加入 hp 中)
2. 檢查前一個 node 的指標是否還是指著 `tmp`,若是的話則將前一個 node 的指標指向的記憶體位置內的資料換成 node 的位置。
3. 若 2. 成功,則清除 hp 並回傳。
4. 若 2. 失敗,則重複直到成功。
#### delete
```c=1
bool list_delete(list_t *list, list_key_t key)
{
list_node_t *curr, *next;
atomic_uintptr_t *prev;
while (true) {
if (!__list_find(list, &key, &prev, &curr, &next)) {
list_hp_clear(list->hp);
return false;
}
uintptr_t tmp = get_unmarked(next);
if (!atomic_compare_exchange_strong(&curr->next, &tmp,
get_marked(next)))
continue;
tmp = get_unmarked(curr);
if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) {
list_hp_clear(list->hp);
list_hp_retire(list->hp, get_unmarked(curr));
} else {
list_hp_clear(list->hp);
}
return true;
}
}
```
delete 先使用 `__list_find` 找出要刪除的 `key` 是否有在 list 中,若找到,則:
1. 確認 `curr->next` 和 `next` 是否還相同,若相同,則將 `curr->next` 內儲存的指標換成被標記的,代表 `curr` 在「邏輯上」已被刪除
2. 若 1. 失敗,則重複至成功為止
3. 確認 `curr` 的前一個 node 是否還指著 `curr` ,是的話就把前一個 node 指著的位置換成 `next`。
4. 若 3. 成功,則清除 hp ,並將刪除的 node 加入 retire list 中,回傳 true
5. 若 3. 失敗,則清除 hp,回傳 true (將 node 加入 retire list 的工作就交給之後的 `__list_find`)
#### `__list_find`
這個程式中最重要的函式非 `__list_find` 莫屬了。
```c=1
static bool __list_find(list_t *list,
list_key_t *key,
atomic_uintptr_t **par_prev,
list_node_t **par_curr,
list_node_t **par_next)
{
atomic_uintptr_t *prev = NULL;
list_node_t *curr = NULL, *next = NULL;
try_again:
// search from the head of the list
prev = &list->head;
curr = (list_node_t *) atomic_load(prev);
// add the current to the list of hazard pointers
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
if (atomic_load(prev) != get_unmarked(curr)) // the list has been modified
goto try_again;
while (true) {
if (!get_unmarked_node(curr))
return false;
next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next);
// add the next node to the list of hazard pointers
(void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next)
break;
if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail))
break;
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
// check if the node is marked or not
if (get_unmarked_node(next) == next) {
// the node isn't marked
if (!(get_unmarked_node(curr)->key < *key)) {
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return *key == get_unmarked_node(curr)->key;
}
prev = &get_unmarked_node(curr)->next;
(void) list_hp_protect_release(list->hp, HP_PREV,
get_unmarked(curr));
} else {
// the node is marked
uintptr_t tmp = get_unmarked(curr);
// using CAS to remove the marked node
if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next)))
goto try_again;
// add the removed node the retire list
list_hp_retire(list->hp, get_unmarked(curr));
}
curr = next;
(void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
}
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return false;
}
```
這個函式中有三個重要的指標:
- `curr`:目前的 node 的記憶體位置
- `next`:下一個 node 的記憶體位置
- `prev`:前一個 node 中指向目前的 node 的指標(也就是前一個 node 中的 `next`)
在第 18 行的 while loop 開始時,即會判斷目前這個 node 的前後是否有被更動過,若被更動過的話,即會需要重新來過一遍。
確認沒有被更動過後,會先判斷目前拿到的 node 是否是被標記刪除的 (也就是 `curr->next` 是否是被標記的),若是被標記的,就會呼叫 `list_hp_retire` 來將其放入 retire list 中,若條件許可也會將其空間釋放。
若 node 不是被標記刪除的,就會判對目前 node 中所儲存的 `key` 是否為目標,因為程式中的 list 是遞增排序,因此只有在現在找到的 node 中的 `key` 比目標的 `key` 還要小的時候,才會需要繼續往下一個 node 找。
若找到的 key 不小於目標的 `key` 時,就可以不用再往下找,只要判斷目前的 `key` 是否等於目標的 `key`,並回傳相對應的布林值。
---
## 2. 指出改進空間並實作
TODO
---
## 3. 對比 [rcu_list](https://github.com/sysprog21/concurrent-programs/tree/master/rcu_list),解釋同為 lock-free 演算法,跟上述 [Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 手法有何異同?能否指出 [rcu_list](https://github.com/sysprog21/concurrent-programs/tree/master/rcu_list) 實作缺陷並改進?
TODO
---