# 2021q1 Homework6 (quiz6) contributed by < `YOYOPIG` > ###### tags: `linux2021` > [2021q1 第 6 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz6) ## 測驗 1 ### 程式碼的理解與運作原理 這裡使用二進位的方式來處理大數的運算。定義需要的 macro 及一個大數: ``` cpp /* how large the underlying array size should be */ #define UNIT_SIZE 4 /* These are dedicated to UNIT_SIZE == 4 */ #define UTYPE uint32_t #define UTYPE_TMP uint64_t #define FORMAT_STRING "%.08x" #define MAX_VAL ((UTYPE_TMP) 0xFFFFFFFF) #define BN_ARRAY_SIZE (128 / UNIT_SIZE) /* size of big-numbers in bytes */ struct bn { UTYPE array[BN_ARRAY_SIZE]; }; ``` 初始化大數及轉換: ``` cpp static inline void bn_init(struct bn *n) { for (int i = 0; i < BN_ARRAY_SIZE; ++i) n->array[i] = 0; } static inline void bn_from_int(struct bn *n, UTYPE_TMP i) { bn_init(n); /* FIXME: what if machine is not little-endian? */ n->array[0] = i; /* bit-shift with U64 operands to force 64-bit results */ UTYPE_TMP tmp = i >> 32; n->array[1] = tmp; } ``` 這裡需要注意到,由於大數的 array 每一個元素為 `uint32_t`,因此需要兩格來儲存 `uint64_t` 類型的資料。首先將 lower 存到 `array[0]`,再透過一次的 right shift 將 upper 存到 `array[1]`。如此一來,array 會呈現 little endian 式的排列。在註解中提到,若是使用的機器不為 little endian,是否會造成問題?這裡考慮兩個部份: * big endian 機器在做 shift operation 時,是否會有不同的結果 * array 用的是 little-endian,在 big endian 機器上是否會不方便 首先,根據 [stackoverflow 討論](https://stackoverflow.com/questions/7184789/does-bit-shift-depend-on-endianness),shift 的結果和記憶體存放的 endianess 沒有關係,因此不會出現問題。而第 2 點只需要在後續程式處理的時候多加注意即可。 接著,實作把自定義的 big num 轉成 string 的函式 ``` cpp static void bn_to_str(struct bn *n, char *str, int nbytes) { /* index into array - reading "MSB" first -> big-endian */ int j = BN_ARRAY_SIZE - 1; int i = 0; /* index into string representation */ /* reading last array-element "MSB" first -> big endian */ while ((j >= 0) && (nbytes > (i + 1))) { sprintf(&str[i], FORMAT_STRING, n->array[j]); i += (2 * UNIT_SIZE); /* step UNIT_SIZE hex-byte(s) forward in the string */ j -= 1; /* step one element back in the array */ } /* Count leading zeros: */ for (j = 0; str[j] == '0'; j++) ; /* Move string j places ahead, effectively skipping leading zeros */ for (i = 0; i < (nbytes - j); ++i) str[i] = str[i + j]; str[i] = 0; } ``` 由於人類閱讀的習慣接近 big-endian,由左至右為 MSB -> LSB 的型式,因此這裡在讀取的時候對 big num 做轉換。 加減法的實作 ``` cpp static void bn_dec(struct bn *n) { for (int i = 0; i < BN_ARRAY_SIZE; ++i) { UTYPE tmp = n->array[i]; UTYPE res = tmp - 1; n->array[i] = res; COND; } } static void bn_add(struct bn *a, struct bn *b, struct bn *c) { int carry = 0; for (int i = 0; i < BN_ARRAY_SIZE; ++i) { UTYPE_TMP tmp = (UTYPE_TMP) a->array[i] + b->array[i] + carry; carry = (tmp > MAX_VAL); c->array[i] = (tmp & MAX_VAL); } } ``` 這裡實作大數的加減法,透過一個區域一個區域由 LSB 開始計算。在進位處理上,add 使用一個變數 carry 儲存,減法則在每次計算完過程後,檢查 res 以及 tmp 的大小關係。若 `tmp > res` 則代表沒有發生 underflow,可直接結束計算,否則需借位並繼續計算。`COND` 選 `(c) if (!(res > tmp)) break` 乘法實作 ``` cpp static void bn_mul(struct bn *a, struct bn *b, struct bn *c) { struct bn row, tmp; bn_init(c); for (int i = 0; i < BN_ARRAY_SIZE; ++i) { bn_init(&row); for (int j = 0; j < BN_ARRAY_SIZE; ++j) { if (i + j < BN_ARRAY_SIZE) { bn_init(&tmp); III; bn_from_int(&tmp, intermediate); lshift_unit(&tmp, i + j); bn_add(&tmp, &row, &row); } } bn_add(c, &row, c); } } ``` 這裡使用 nested for loop 逐位計算。由於兩個 `uint32_t` 相乘,有可能會發生 overflow ,因此這裡需要透過一個較大的變數 `uint64_t` 來暫存結果。因此,III 選擇 `(c) UTYPE_TMP intermediate = a->array[i] * (UTYPE_TMP) b->array[j]`。 ### TODO > Karatsuba algorithm 和 Schönhage–Strassen algorithm 這類的快速乘法演算法相繼提出,請引入到上述程式碼 在[ sysprog21/bignum ](https://github.com/sysprog21/bignum)中,已引入 Karatsuba algorithm 處理大數乘法加速。在實作乘法的 [mul.c](https://github.com/sysprog21/bignum/blob/master/mul.c) 註解中,可以看到日後打算改寫成更快的 [Schönhage–Strassen algorithm](https://en.wikipedia.org/wiki/Sch%C3%B6nhage%E2%80%93Strassen_algorithm) 開始研究該演算法~~及其背後滿滿的數學~~ --- ## 測驗 2 ### 程式碼的理解與運作原理 這題取自 Leetcode 第 1 題 [Two Sum](https://leetcode.com/problems/two-sum/),原題敘述如下: > Given an array of integers nums and an integer target, return indices of the two numbers such that they add up to target. > You may assume that each input would have exactly one solution, and you may not use the same element twice. > You can return the answer in any order. 當初使用 c++ 刷題時,沒有思考太多就直接用了內建的 `std::unordered_map` 解題。這裡因為 c 語言並沒有類似的工具,因此需自行實作。首先考慮 hash table 的定義方式: ``` cpp struct hlist_node { struct hlist_node *next, **pprev; }; struct hlist_head { struct hlist_node *first; }; typedef struct { int bits; struct hlist_head *ht; } map_t; ``` 這裡透過 `hlist_node` 以及 `hlist_head` 來定義 hash list,將所有具有相同 hash value 的元素加入同一個 list 中。而整個 map 結構則定義為 hash list array,另外用 `bits` 紀錄整個 array 的大小。初始化方法如下: ``` cpp map_t *map_init(int bits) { map_t *map = malloc(sizeof(map_t)); if (!map) return NULL; map->bits = bits; map->ht = malloc(sizeof(struct hlist_head) * MAP_HASH_SIZE(map->bits)); if (map->ht) { for (int i = 0; i < MAP_HASH_SIZE(map->bits); i++) (map->ht)[i].first = NULL; } else { free(map); map = NULL; } return map; } ``` 因為使用 list 儲存,這裡在定義 key-value pair 的時候,需要再加上先前定義的 `hlist_node`: ``` cpp struct hash_key { int key; void *data; struct hlist_node node; }; ``` 接下來就可以開始進行查找與插入了。 * 查找 ``` cpp static struct hash_key *find_key(map_t *map, int key) { struct hlist_head *head = &(map->ht)[hash(key, map->bits)]; for (struct hlist_node *p = head->first; p; p = p->next) { struct hash_key *kn = container_of(p, struct hash_key, node); if (kn->key == key) return kn; } return NULL; } void *map_get(map_t *map, int key) { struct hash_key *kn = find_key(map, key); return kn ? kn->data : NULL; } ``` * 插入 ``` cpp void map_add(map_t *map, int key, void *data) { struct hash_key *kn = find_key(map, key); if (kn) return; kn = malloc(sizeof(struct hash_key)); kn->key = key, kn->data = data; struct hlist_head *h = &map->ht[hash(key, map->bits)]; struct hlist_node *n = &kn->node, *first = h->first; NNN; if (first) first->pprev = &n->next; h->first = n; PPP; } ``` 大致看一下上面的程式碼可發現,在插入的時候,會將新元素插入至 list 的最前端,再檢查原先 list 中是否有元素,若有的話將其接在新元素的後方。因此,這裡 NNN 以及 PPP 應分別選 `(c) n->next = first`, `(a) n->pprev = &h->first` ### Hash function implementation 在實作 hash function 時,會希望產生的 hash 值越均勻越好,以避免產生過多的 collision。若 hash function 的選擇不良,很可能最後大量的元素都被分配至同一個 list 中,失去了原先向低搜尋時間的效果。可以看到這裡 hash function 的選擇,應為 multiplication hashing 的一種。將數值乘上一個選定的常數後,透過 shift 取其較高的 `bits` 位做為 hash value。 ``` cpp #define GOLDEN_RATIO_32 0x61C88647 static inline unsigned int hash(unsigned int val, unsigned int bits) { /* High bits are more random, so use them. */ return (val * GOLDEN_RATIO_32) >> (32 - bits); } ``` #### Fibonacci Hashing > [參考文章](https://web.archive.org/web/20161121124236/http://brpreiss.com/books/opus4/html/page214.html) 注意到上面的程式碼中,選定的常數為 `0x61C88647`。使用這一特定常數的方法,又被稱為 [Fibonacci Hashing](https://en.wikipedia.org/wiki/Hash_function#Fibonacci_hashing)。首先,golden ratio 的定義為 $$ φ \equiv \frac{x}{y}, \ where\ \frac{x}{y} = \frac{x+y}{x} $$ 將以上式子展開: $$ \frac{x}{y} = \frac{x+y}{x} \implies 0 = x^2-xy-y^2 $$ 將 φ 代入式子中: $$ \implies 0 = φ^2-φ-1 $$ $$ \implies φ = \frac{1+\sqrt[]{5}}{2} $$ :::warning 用 LaTeX 語法改寫數學式 :notes: jserv ::: > 已更正,謝謝老師提醒 接著,在常數的選擇上,根據我們的 bits 數值 W,選定與 W 互質且最接近 W/φ 的正整數 a 作為最後的常數。根據 bits 的大小,常見的常數為: | W | a | | -------- | -------- | | 2^16 | 40503 | | 2^32 | 2654435769 | | 2^64 | 11400714819323198485 | 若是使用了這樣的神奇數字作為最後的常數,會發現在 hashing 時,連續的 key 值會被盡可能的分散,進而達成避免過多 collision 的結果: ![](https://i.imgur.com/m99L28x.png) 然而,若仔細看會發現,這裡程式碼中定義的 `GOLDEN_RATIO_32` 為 0x61C88647 = 1640531527,和上方表格中的 2654435769 = 0x9E3779B9 並不相同。閱讀 [tools/include/linux/hash.h](https://github.com/torvalds/linux/blob/master/tools/include/linux/hash.h) 中的程式碼會發現,在註解中已經給出了解釋。為了增加計算效率,使用 (1-φ) 取代 φ,且最後 hash 出來結果的 distribution 有著和原版一樣的特性,仍能維持低 collision。 > Although a random odd number will do, it turns out that the golden ratio phi = (sqrt(5)-1)/2, or its negative, has particularly nice properties. (See Knuth vol 3, section 6.4, exercise 9.) These are the negative, (1 - phi) = phi**2 = (3 - sqrt(5))/2, which is very slightly easier to multiply by and makes no difference to the hash distribution. 這裡設計一個簡單的實驗,分別將 `GOLDEN_RATIO_32` 設為原版的 0x9E3779B9 以及 linux 實作版本的 0x61C88647,對 1~10 做 hashing 並以 3 bits 儲存結果: ``` cpp int main() { clock_t begin = clock(); for(int i=1;i<11;++i) { printf("%d : ", i); printf("%u\n", hash(i, 3)); } clock_t end = clock(); double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; printf("%f\n", time_spent); return 0; } ``` 結果分別為: | orig | 0x9E3779B9 hash | 0x61C88647 hash | | -------- | -------- | -------- | | 1 | 4 | 3 | | 2 | 1 | 6 | | 3 | 6 | 1 | | 4 | 3 | 4 | | 5 | 0 | 7 | | 6 | 5 | 2 | | 7 | 2 | 5 | | 8 | 7 | 0 | | 9 | 4 | 3 | | 10 | 1 | 6 | 可以看到,不論是哪個版本,1~8 都被均勻的 hash 到了 0~7 當中,也就是 3 bits 所能表示的所有數值。從 9 開始 hash value 又從頭開始再輪一次。也就是說,之後的每一個數值 n,會和 n % 8 有著相同的 hash value。為了比較兩種方法的快慢,考慮對 1~100000 做 hashing,執行時間分別為: * 0x61C88647: 0.000437 (sec) * 0x9E3779B9: 0.000513 (sec) linux 實作的改進版本,確實比原本快上一些。 --- ## 測驗 3 ### 程式碼的理解與運作原理 這份程式碼實作了 [fiber](https://en.wikipedia.org/wiki/Fiber_(computer_science))。 > In computer science, a fiber is a particularly lightweight thread of execution. > Like threads, fibers share address space. However, fibers use cooperative multitasking while threads use preemptive multitasking. Threads often depend on the kernel's thread scheduler to preempt a busy thread and resume another thread; fibers yield themselves to run another fiber while executing. 可以看到,和一般的 preemptive multitasking 不同,fiber 不仰賴系統的 preemption,而是透過執行單元自行讓出運算資源(當然,需自行在適當時機呼叫 `yield()` 讓出控制權),以 cooperative multitasking 的方式確保程式的並行運算。 #### Fiber 的定義 ``` cpp typedef struct { pid_t pid; /* The pid of the child thread as returned by clone */ void *stack; /* The stack pointer */ } fiber_t; /* The fiber "queue" */ static fiber_t fiber_list[MAX_FIBERS]; /* The pid of the parent process */ static pid_t parent; /* The number of active fibers */ static int num_fibers = 0; ``` 定義 fiber_t 由兩個變數組成,分別負責記錄 child thread 的 pid 以及其 stack pointer。同時,宣告 `fiber_list` 以儲存日後會產生的 fiber。 #### Fiber 的基本功能實作 ``` cpp void fiber_init() { for (int i = 0; i < MAX_FIBERS; ++i) fiber_list[i].pid = 0, fiber_list[i].stack = 0; parent = getpid(); } /* Yield control to another execution context */ void fiber_yield() { /* move the current process to the end of the process queue. */ sched_yield(); } struct fiber_args { void (*func)(void); }; static int fiber_start(void *arg) { struct fiber_args *args = (struct fiber_args *) arg; void (*func)() = args->func; free(args); func(); return 0; } ``` 實作一些 fiber 的基本功能,包括初始化 fiber、透過 function pointer 給定 fiber 執行的 function等,同時將 `sched_yield()` 函數 encapsulate 成 `fiber_yield()` 方便後面呼叫。 #### 生成新 fiber 並執行 ``` cpp int fiber_spawn(void (*func)(void)) { if (num_fibers == MAX_FIBERS) return FIBER_MAXFIBERS; if ((fiber_list[num_fibers].stack = malloc(FIBER_STACK)) == 0) return FIBER_MALLOC_ERROR; struct fiber_args *args; if ((args = malloc(sizeof(*args))) == 0) { free(fiber_list[num_fibers].stack); return FIBER_MALLOC_ERROR; } args->func = func; fiber_list[num_fibers].pid = clone( fiber_start, KKK, SIGCHLD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_VM, args); if (fiber_list[num_fibers].pid == -1) { free(fiber_list[num_fibers].stack); free(args); return FIBER_CLONE_ERROR; } num_fibers++; return FIBER_NOERROR; } ``` 透過 [clone() 系統呼叫](https://man7.org/linux/man-pages/man2/clone.2.html)生成新 fiber 並開始執行。若有任何錯誤發生,如 fiber 數量超出給定上限,或在 malloc 途中發生問題等,則將新分配的資源 free 掉並回傳對應的 error message。根據 linux man page,KKK 的位置應該要給 stack pointer,這裡選 `(b) (char *) fiber_list[num_fibers].stack + FIBER_STACK`。 #### 等待所有 fiber 執行完成 ``` cpp int fiber_wait_all() { /* Check to see if we are in a fiber, since we do not get signals in the * child threads */ pid_t pid = getpid(); if (pid != parent) return FIBER_INFIBER; /* Wait for the fibers to quit, then free the stacks */ while (num_fibers > 0) { if ((pid = wait(0)) == -1) exit(1); /* Find the fiber, free the stack, and swap it with the last one */ for (int i = 0; i < num_fibers; ++i) { if (CCC) { free(fiber_list[i].stack); if (i != --num_fibers) fiber_list[i] = fiber_list[num_fibers]; break; } } } return FIBER_NOERROR; } ``` 由於只有 parent 需要做等待的動作,因此這裡先做了對應的檢查。接著,使用 [wait()系統呼叫](https://man7.org/linux/man-pages/man2/wait.2.html) 等待任一 fiber 執行完畢。接著,在 `fiber_list` 中找到執行完畢的 fiber,將它的 stack free 掉。最後,為了保持 `fiber_list` 的整齊,將最後一個 fiber 和當前的空 fiber 對調,避免空間的零碎化。這裡 CCC 的位置應是在尋找執行完成的 fiber,應選 `(b) fiber_list[i].pid == pid`。 ### Fix interleaving 反覆執行上述程式碼,有時輸出可見 fiber 中的 `printf()` 結果無法以一整行表示。錯誤結果範例如下: ``` 1 * 1 = 1 Fib(0) = 0 Fib(1) = 1 Fib(2) = 1 2 * 2 = 4 3 * 3 = 9 4 * 4 = 16 5 * 5 = 25 6 * 6 = 36 7 * 7 = 49 3) = 2 8 * 8 = 64 3) = 2 9 * 9 = 81 Fib(4) = 3 Fib(5) = 5 Fib(6) = 8 Fib(7) = 13 Fib(8) = 21 Fib(9) = 34 Fib(10) = 55 Fib(11) = 89 Fib(12) = 144 Fib(13) = 233 Fib(14) = 377 ``` #### 改善方法一: 由於 `yield()` 的呼叫是在 `printf()` 之後,因此推測應該不是兩個 thread 同時呼叫 `printf()`,比較可能是第一個 thread 呼叫完後,第二個 thread 才呼叫 `printf()`,但 I/O buffer 中還有東西未輸出至螢幕上導致。因此,這裡引入 mutex lock 保護每一個 `printf()`。 ``` cpp static pthread_mutex_t print_mutex; safePrint(char *str) { pthread_mutex_lock(&print_mutex); printf(str); pthread_mutex_unlock(&print_mutex); } static void fibonacci() { int fib[2] = {0, 1}; printf("Fib(0) = 0\nFib(1) = 1\n"); for (int i = 2; i < 15; ++i) { int nextFib = fib[0] + fib[1]; char str[50]; sprintf(str, "Fib(%d) = %d\n", i, nextFib); safePrint(str); fib[0] = fib[1]; fib[1] = nextFib; fiber_yield(); } } static void squares() { for (int i = 1; i < 10; ++i) { char str[50]; sprintf(str, "%d * %d = %d\n", i, i, i * i); safePrint(str); fiber_yield(); } } int main() { pthread_mutex_init(&print_mutex, NULL); fiber_init(); fiber_spawn(&fibonacci); fiber_spawn(&squares); /* Since fibers are non-preemptive, we must allow them to run */ fiber_wait_all(); return 0; } ``` 改善後結果如下: ``` Fib(0) = 0 Fib(1) = 1 Fib(2) = 1 1 * 1 = 1 Fib(3) = 2 Fib(4) = 3 Fib(5) = 5 Fib(6) = 8 2 * 2 = 4 3 * 3 = 9 4 * 4 = 16 5 * 5 = 25 6 * 6 = 36 7 * 7 = 49 8 * 8 = 64 9 * 9 = 81 Fib(7) = 13 Fib(8) = 21 Fib(9) = 34 Fib(10) = 55 Fib(11) = 89 Fib(12) = 144 Fib(13) = 233 Fib(14) = 377 ``` #### 改善方法二: 在閱讀[這篇 stack overflow 討論串](https://stackoverflow.com/questions/23586682/how-to-use-printf-in-multiple-threads)時,發現還有另一種巧思可以解決這個問題。既然不同 thread 呼叫 `printf()` 時會發生問題,那就改由一個 thread 負責輸出就好,其餘 thread 只負責將欲輸出的結果加入某個被保護住的 thread-safe buffer。為了保持程式的簡單,這裡統一由 main thread 在程式結束前輸出至螢幕。 ``` cpp static pthread_mutex_t print_mutex; /* Lock protected buffer for the printing thread. For further application, change this to linked list implementation, which is a more flexible approach. */ char printBuffer[30][30]; int buf_count = 0; void printBufferAdd(char *str) { pthread_mutex_lock(&print_mutex); strcpy(printBuffer[buf_count], str); buf_count++; pthread_mutex_unlock(&print_mutex); } static void fibonacci() { int fib[2] = {0, 1}; printf("Fib(0) = 0\nFib(1) = 1\n"); for (int i = 2; i < 15; ++i) { int nextFib = fib[0] + fib[1]; char str[50]; sprintf(str, "Fib(%d) = %d\n", i, nextFib); printBufferAdd(str); fib[0] = fib[1]; fib[1] = nextFib; fiber_yield(); } } static void squares() { for (int i = 1; i < 10; ++i) { char str[50]; sprintf(str, "%d * %d = %d\n", i, i, i * i); printBufferAdd(str); fiber_yield(); } } int main() { pthread_mutex_init(&print_mutex, NULL); fiber_init(); fiber_spawn(&fibonacci); fiber_spawn(&squares); /* Since fibers are non-preemptive, we must allow them to run */ fiber_wait_all(); /* Print all results once before the program ends. */ for(int i=0;i<buf_count;++i) { printf("%s", printBuffer[i]); } return 0; } ``` :::warning TODO: 複習生產者-消費者問題及 ring buffer,思考減少 mutex lock 的機制。 :notes: jserv :::