# 2021q1 Homework6 (quiz6)
contributed by < `YOYOPIG` >
###### tags: `linux2021`
> [2021q1 第 6 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz6)
## 測驗 1
### 程式碼的理解與運作原理
這裡使用二進位的方式來處理大數的運算。定義需要的 macro 及一個大數:
``` cpp
/* how large the underlying array size should be */
#define UNIT_SIZE 4
/* These are dedicated to UNIT_SIZE == 4 */
#define UTYPE uint32_t
#define UTYPE_TMP uint64_t
#define FORMAT_STRING "%.08x"
#define MAX_VAL ((UTYPE_TMP) 0xFFFFFFFF)
#define BN_ARRAY_SIZE (128 / UNIT_SIZE) /* size of big-numbers in bytes */
struct bn { UTYPE array[BN_ARRAY_SIZE]; };
```
初始化大數及轉換:
``` cpp
static inline void bn_init(struct bn *n) {
for (int i = 0; i < BN_ARRAY_SIZE; ++i)
n->array[i] = 0;
}
static inline void bn_from_int(struct bn *n, UTYPE_TMP i) {
bn_init(n);
/* FIXME: what if machine is not little-endian? */
n->array[0] = i;
/* bit-shift with U64 operands to force 64-bit results */
UTYPE_TMP tmp = i >> 32;
n->array[1] = tmp;
}
```
這裡需要注意到,由於大數的 array 每一個元素為 `uint32_t`,因此需要兩格來儲存 `uint64_t` 類型的資料。首先將 lower 存到 `array[0]`,再透過一次的 right shift 將 upper 存到 `array[1]`。如此一來,array 會呈現 little endian 式的排列。在註解中提到,若是使用的機器不為 little endian,是否會造成問題?這裡考慮兩個部份:
* big endian 機器在做 shift operation 時,是否會有不同的結果
* array 用的是 little-endian,在 big endian 機器上是否會不方便
首先,根據 [stackoverflow 討論](https://stackoverflow.com/questions/7184789/does-bit-shift-depend-on-endianness),shift 的結果和記憶體存放的 endianess 沒有關係,因此不會出現問題。而第 2 點只需要在後續程式處理的時候多加注意即可。
接著,實作把自定義的 big num 轉成 string 的函式
``` cpp
static void bn_to_str(struct bn *n, char *str, int nbytes) {
/* index into array - reading "MSB" first -> big-endian */
int j = BN_ARRAY_SIZE - 1;
int i = 0; /* index into string representation */
/* reading last array-element "MSB" first -> big endian */
while ((j >= 0) && (nbytes > (i + 1))) {
sprintf(&str[i], FORMAT_STRING, n->array[j]);
i += (2 *
UNIT_SIZE); /* step UNIT_SIZE hex-byte(s) forward in the string */
j -= 1; /* step one element back in the array */
}
/* Count leading zeros: */
for (j = 0; str[j] == '0'; j++)
;
/* Move string j places ahead, effectively skipping leading zeros */
for (i = 0; i < (nbytes - j); ++i)
str[i] = str[i + j];
str[i] = 0;
}
```
由於人類閱讀的習慣接近 big-endian,由左至右為 MSB -> LSB 的型式,因此這裡在讀取的時候對 big num 做轉換。
加減法的實作
``` cpp
static void bn_dec(struct bn *n) {
for (int i = 0; i < BN_ARRAY_SIZE; ++i) {
UTYPE tmp = n->array[i];
UTYPE res = tmp - 1;
n->array[i] = res;
COND;
}
}
static void bn_add(struct bn *a, struct bn *b, struct bn *c) {
int carry = 0;
for (int i = 0; i < BN_ARRAY_SIZE; ++i) {
UTYPE_TMP tmp = (UTYPE_TMP) a->array[i] + b->array[i] + carry;
carry = (tmp > MAX_VAL);
c->array[i] = (tmp & MAX_VAL);
}
}
```
這裡實作大數的加減法,透過一個區域一個區域由 LSB 開始計算。在進位處理上,add 使用一個變數 carry 儲存,減法則在每次計算完過程後,檢查 res 以及 tmp 的大小關係。若 `tmp > res` 則代表沒有發生 underflow,可直接結束計算,否則需借位並繼續計算。`COND` 選 `(c) if (!(res > tmp)) break`
乘法實作
``` cpp
static void bn_mul(struct bn *a, struct bn *b, struct bn *c) {
struct bn row, tmp;
bn_init(c);
for (int i = 0; i < BN_ARRAY_SIZE; ++i) {
bn_init(&row);
for (int j = 0; j < BN_ARRAY_SIZE; ++j) {
if (i + j < BN_ARRAY_SIZE) {
bn_init(&tmp);
III;
bn_from_int(&tmp, intermediate);
lshift_unit(&tmp, i + j);
bn_add(&tmp, &row, &row);
}
}
bn_add(c, &row, c);
}
}
```
這裡使用 nested for loop 逐位計算。由於兩個 `uint32_t` 相乘,有可能會發生 overflow ,因此這裡需要透過一個較大的變數 `uint64_t` 來暫存結果。因此,III 選擇 `(c) UTYPE_TMP intermediate = a->array[i] * (UTYPE_TMP) b->array[j]`。
### TODO
> Karatsuba algorithm 和 Schönhage–Strassen algorithm 這類的快速乘法演算法相繼提出,請引入到上述程式碼
在[ sysprog21/bignum ](https://github.com/sysprog21/bignum)中,已引入 Karatsuba algorithm 處理大數乘法加速。在實作乘法的 [mul.c](https://github.com/sysprog21/bignum/blob/master/mul.c) 註解中,可以看到日後打算改寫成更快的 [Schönhage–Strassen algorithm](https://en.wikipedia.org/wiki/Sch%C3%B6nhage%E2%80%93Strassen_algorithm)
開始研究該演算法~~及其背後滿滿的數學~~
---
## 測驗 2
### 程式碼的理解與運作原理
這題取自 Leetcode 第 1 題 [Two Sum](https://leetcode.com/problems/two-sum/),原題敘述如下:
> Given an array of integers nums and an integer target, return indices of the two numbers such that they add up to target.
> You may assume that each input would have exactly one solution, and you may not use the same element twice.
> You can return the answer in any order.
當初使用 c++ 刷題時,沒有思考太多就直接用了內建的 `std::unordered_map` 解題。這裡因為 c 語言並沒有類似的工具,因此需自行實作。首先考慮 hash table 的定義方式:
``` cpp
struct hlist_node { struct hlist_node *next, **pprev; };
struct hlist_head { struct hlist_node *first; };
typedef struct { int bits; struct hlist_head *ht; } map_t;
```
這裡透過 `hlist_node` 以及 `hlist_head` 來定義 hash list,將所有具有相同 hash value 的元素加入同一個 list 中。而整個 map 結構則定義為 hash list array,另外用 `bits` 紀錄整個 array 的大小。初始化方法如下:
``` cpp
map_t *map_init(int bits) {
map_t *map = malloc(sizeof(map_t));
if (!map)
return NULL;
map->bits = bits;
map->ht = malloc(sizeof(struct hlist_head) * MAP_HASH_SIZE(map->bits));
if (map->ht) {
for (int i = 0; i < MAP_HASH_SIZE(map->bits); i++)
(map->ht)[i].first = NULL;
} else {
free(map);
map = NULL;
}
return map;
}
```
因為使用 list 儲存,這裡在定義 key-value pair 的時候,需要再加上先前定義的 `hlist_node`:
``` cpp
struct hash_key {
int key;
void *data;
struct hlist_node node;
};
```
接下來就可以開始進行查找與插入了。
* 查找
``` cpp
static struct hash_key *find_key(map_t *map, int key) {
struct hlist_head *head = &(map->ht)[hash(key, map->bits)];
for (struct hlist_node *p = head->first; p; p = p->next) {
struct hash_key *kn = container_of(p, struct hash_key, node);
if (kn->key == key)
return kn;
}
return NULL;
}
void *map_get(map_t *map, int key)
{
struct hash_key *kn = find_key(map, key);
return kn ? kn->data : NULL;
}
```
* 插入
``` cpp
void map_add(map_t *map, int key, void *data)
{
struct hash_key *kn = find_key(map, key);
if (kn)
return;
kn = malloc(sizeof(struct hash_key));
kn->key = key, kn->data = data;
struct hlist_head *h = &map->ht[hash(key, map->bits)];
struct hlist_node *n = &kn->node, *first = h->first;
NNN;
if (first)
first->pprev = &n->next;
h->first = n;
PPP;
}
```
大致看一下上面的程式碼可發現,在插入的時候,會將新元素插入至 list 的最前端,再檢查原先 list 中是否有元素,若有的話將其接在新元素的後方。因此,這裡 NNN 以及 PPP 應分別選 `(c) n->next = first`, `(a) n->pprev = &h->first`
### Hash function implementation
在實作 hash function 時,會希望產生的 hash 值越均勻越好,以避免產生過多的 collision。若 hash function 的選擇不良,很可能最後大量的元素都被分配至同一個 list 中,失去了原先向低搜尋時間的效果。可以看到這裡 hash function 的選擇,應為 multiplication hashing 的一種。將數值乘上一個選定的常數後,透過 shift 取其較高的 `bits` 位做為 hash value。
``` cpp
#define GOLDEN_RATIO_32 0x61C88647
static inline unsigned int hash(unsigned int val, unsigned int bits) {
/* High bits are more random, so use them. */
return (val * GOLDEN_RATIO_32) >> (32 - bits);
}
```
#### Fibonacci Hashing
> [參考文章](https://web.archive.org/web/20161121124236/http://brpreiss.com/books/opus4/html/page214.html)
注意到上面的程式碼中,選定的常數為 `0x61C88647`。使用這一特定常數的方法,又被稱為 [Fibonacci Hashing](https://en.wikipedia.org/wiki/Hash_function#Fibonacci_hashing)。首先,golden ratio 的定義為
$$
φ \equiv \frac{x}{y}, \ where\ \frac{x}{y} = \frac{x+y}{x}
$$
將以上式子展開:
$$
\frac{x}{y} = \frac{x+y}{x} \implies 0 = x^2-xy-y^2
$$
將 φ 代入式子中:
$$
\implies 0 = φ^2-φ-1
$$
$$
\implies φ = \frac{1+\sqrt[]{5}}{2}
$$
:::warning
用 LaTeX 語法改寫數學式
:notes: jserv
:::
> 已更正,謝謝老師提醒
接著,在常數的選擇上,根據我們的 bits 數值 W,選定與 W 互質且最接近 W/φ 的正整數 a 作為最後的常數。根據 bits 的大小,常見的常數為:
| W | a |
| -------- | -------- |
| 2^16 | 40503 |
| 2^32 | 2654435769 |
| 2^64 | 11400714819323198485 |
若是使用了這樣的神奇數字作為最後的常數,會發現在 hashing 時,連續的 key 值會被盡可能的分散,進而達成避免過多 collision 的結果:

然而,若仔細看會發現,這裡程式碼中定義的 `GOLDEN_RATIO_32` 為 0x61C88647 = 1640531527,和上方表格中的 2654435769 = 0x9E3779B9 並不相同。閱讀 [tools/include/linux/hash.h](https://github.com/torvalds/linux/blob/master/tools/include/linux/hash.h) 中的程式碼會發現,在註解中已經給出了解釋。為了增加計算效率,使用 (1-φ) 取代 φ,且最後 hash 出來結果的 distribution 有著和原版一樣的特性,仍能維持低 collision。
> Although a random odd number will do, it turns out that the golden ratio phi = (sqrt(5)-1)/2, or its negative, has particularly nice properties. (See Knuth vol 3, section 6.4, exercise 9.)
These are the negative, (1 - phi) = phi**2 = (3 - sqrt(5))/2, which is very slightly easier to multiply by and makes no difference to the hash distribution.
這裡設計一個簡單的實驗,分別將 `GOLDEN_RATIO_32` 設為原版的 0x9E3779B9 以及 linux 實作版本的 0x61C88647,對 1~10 做 hashing 並以 3 bits 儲存結果:
``` cpp
int main()
{
clock_t begin = clock();
for(int i=1;i<11;++i)
{
printf("%d : ", i);
printf("%u\n", hash(i, 3));
}
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("%f\n", time_spent);
return 0;
}
```
結果分別為:
| orig | 0x9E3779B9 hash | 0x61C88647 hash |
| -------- | -------- | -------- |
| 1 | 4 | 3 |
| 2 | 1 | 6 |
| 3 | 6 | 1 |
| 4 | 3 | 4 |
| 5 | 0 | 7 |
| 6 | 5 | 2 |
| 7 | 2 | 5 |
| 8 | 7 | 0 |
| 9 | 4 | 3 |
| 10 | 1 | 6 |
可以看到,不論是哪個版本,1~8 都被均勻的 hash 到了 0~7 當中,也就是 3 bits 所能表示的所有數值。從 9 開始 hash value 又從頭開始再輪一次。也就是說,之後的每一個數值 n,會和 n % 8 有著相同的 hash value。為了比較兩種方法的快慢,考慮對 1~100000 做 hashing,執行時間分別為:
* 0x61C88647: 0.000437 (sec)
* 0x9E3779B9: 0.000513 (sec)
linux 實作的改進版本,確實比原本快上一些。
---
## 測驗 3
### 程式碼的理解與運作原理
這份程式碼實作了 [fiber](https://en.wikipedia.org/wiki/Fiber_(computer_science))。
> In computer science, a fiber is a particularly lightweight thread of execution.
> Like threads, fibers share address space. However, fibers use cooperative multitasking while threads use preemptive multitasking. Threads often depend on the kernel's thread scheduler to preempt a busy thread and resume another thread; fibers yield themselves to run another fiber while executing.
可以看到,和一般的 preemptive multitasking 不同,fiber 不仰賴系統的 preemption,而是透過執行單元自行讓出運算資源(當然,需自行在適當時機呼叫 `yield()` 讓出控制權),以 cooperative multitasking 的方式確保程式的並行運算。
#### Fiber 的定義
``` cpp
typedef struct {
pid_t pid; /* The pid of the child thread as returned by clone */
void *stack; /* The stack pointer */
} fiber_t;
/* The fiber "queue" */
static fiber_t fiber_list[MAX_FIBERS];
/* The pid of the parent process */
static pid_t parent;
/* The number of active fibers */
static int num_fibers = 0;
```
定義 fiber_t 由兩個變數組成,分別負責記錄 child thread 的 pid 以及其 stack pointer。同時,宣告 `fiber_list` 以儲存日後會產生的 fiber。
#### Fiber 的基本功能實作
``` cpp
void fiber_init()
{
for (int i = 0; i < MAX_FIBERS; ++i)
fiber_list[i].pid = 0, fiber_list[i].stack = 0;
parent = getpid();
}
/* Yield control to another execution context */
void fiber_yield()
{
/* move the current process to the end of the process queue. */
sched_yield();
}
struct fiber_args {
void (*func)(void);
};
static int fiber_start(void *arg)
{
struct fiber_args *args = (struct fiber_args *) arg;
void (*func)() = args->func;
free(args);
func();
return 0;
}
```
實作一些 fiber 的基本功能,包括初始化 fiber、透過 function pointer 給定 fiber 執行的 function等,同時將 `sched_yield()` 函數 encapsulate 成 `fiber_yield()` 方便後面呼叫。
#### 生成新 fiber 並執行
``` cpp
int fiber_spawn(void (*func)(void))
{
if (num_fibers == MAX_FIBERS)
return FIBER_MAXFIBERS;
if ((fiber_list[num_fibers].stack = malloc(FIBER_STACK)) == 0)
return FIBER_MALLOC_ERROR;
struct fiber_args *args;
if ((args = malloc(sizeof(*args))) == 0) {
free(fiber_list[num_fibers].stack);
return FIBER_MALLOC_ERROR;
}
args->func = func;
fiber_list[num_fibers].pid = clone(
fiber_start, KKK,
SIGCHLD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_VM, args);
if (fiber_list[num_fibers].pid == -1) {
free(fiber_list[num_fibers].stack);
free(args);
return FIBER_CLONE_ERROR;
}
num_fibers++;
return FIBER_NOERROR;
}
```
透過 [clone() 系統呼叫](https://man7.org/linux/man-pages/man2/clone.2.html)生成新 fiber 並開始執行。若有任何錯誤發生,如 fiber 數量超出給定上限,或在 malloc 途中發生問題等,則將新分配的資源 free 掉並回傳對應的 error message。根據 linux man page,KKK 的位置應該要給 stack pointer,這裡選 `(b) (char *) fiber_list[num_fibers].stack + FIBER_STACK`。
#### 等待所有 fiber 執行完成
``` cpp
int fiber_wait_all()
{
/* Check to see if we are in a fiber, since we do not get signals in the
* child threads
*/
pid_t pid = getpid();
if (pid != parent)
return FIBER_INFIBER;
/* Wait for the fibers to quit, then free the stacks */
while (num_fibers > 0) {
if ((pid = wait(0)) == -1)
exit(1);
/* Find the fiber, free the stack, and swap it with the last one */
for (int i = 0; i < num_fibers; ++i) {
if (CCC) {
free(fiber_list[i].stack);
if (i != --num_fibers)
fiber_list[i] = fiber_list[num_fibers];
break;
}
}
}
return FIBER_NOERROR;
}
```
由於只有 parent 需要做等待的動作,因此這裡先做了對應的檢查。接著,使用 [wait()系統呼叫](https://man7.org/linux/man-pages/man2/wait.2.html) 等待任一 fiber 執行完畢。接著,在 `fiber_list` 中找到執行完畢的 fiber,將它的 stack free 掉。最後,為了保持 `fiber_list` 的整齊,將最後一個 fiber 和當前的空 fiber 對調,避免空間的零碎化。這裡 CCC 的位置應是在尋找執行完成的 fiber,應選 `(b) fiber_list[i].pid == pid`。
### Fix interleaving
反覆執行上述程式碼,有時輸出可見 fiber 中的 `printf()` 結果無法以一整行表示。錯誤結果範例如下:
```
1 * 1 = 1
Fib(0) = 0
Fib(1) = 1
Fib(2) = 1
2 * 2 = 4
3 * 3 = 9
4 * 4 = 16
5 * 5 = 25
6 * 6 = 36
7 * 7 = 49
3) = 2
8 * 8 = 64
3) = 2
9 * 9 = 81
Fib(4) = 3
Fib(5) = 5
Fib(6) = 8
Fib(7) = 13
Fib(8) = 21
Fib(9) = 34
Fib(10) = 55
Fib(11) = 89
Fib(12) = 144
Fib(13) = 233
Fib(14) = 377
```
#### 改善方法一:
由於 `yield()` 的呼叫是在 `printf()` 之後,因此推測應該不是兩個 thread 同時呼叫 `printf()`,比較可能是第一個 thread 呼叫完後,第二個 thread 才呼叫 `printf()`,但 I/O buffer 中還有東西未輸出至螢幕上導致。因此,這裡引入 mutex lock 保護每一個 `printf()`。
``` cpp
static pthread_mutex_t print_mutex;
safePrint(char *str)
{
pthread_mutex_lock(&print_mutex);
printf(str);
pthread_mutex_unlock(&print_mutex);
}
static void fibonacci() {
int fib[2] = {0, 1};
printf("Fib(0) = 0\nFib(1) = 1\n");
for (int i = 2; i < 15; ++i) {
int nextFib = fib[0] + fib[1];
char str[50];
sprintf(str, "Fib(%d) = %d\n", i, nextFib);
safePrint(str);
fib[0] = fib[1];
fib[1] = nextFib;
fiber_yield();
}
}
static void squares() {
for (int i = 1; i < 10; ++i) {
char str[50];
sprintf(str, "%d * %d = %d\n", i, i, i * i);
safePrint(str);
fiber_yield();
}
}
int main() {
pthread_mutex_init(&print_mutex, NULL);
fiber_init();
fiber_spawn(&fibonacci);
fiber_spawn(&squares);
/* Since fibers are non-preemptive, we must allow them to run */
fiber_wait_all();
return 0;
}
```
改善後結果如下:
```
Fib(0) = 0
Fib(1) = 1
Fib(2) = 1
1 * 1 = 1
Fib(3) = 2
Fib(4) = 3
Fib(5) = 5
Fib(6) = 8
2 * 2 = 4
3 * 3 = 9
4 * 4 = 16
5 * 5 = 25
6 * 6 = 36
7 * 7 = 49
8 * 8 = 64
9 * 9 = 81
Fib(7) = 13
Fib(8) = 21
Fib(9) = 34
Fib(10) = 55
Fib(11) = 89
Fib(12) = 144
Fib(13) = 233
Fib(14) = 377
```
#### 改善方法二:
在閱讀[這篇 stack overflow 討論串](https://stackoverflow.com/questions/23586682/how-to-use-printf-in-multiple-threads)時,發現還有另一種巧思可以解決這個問題。既然不同 thread 呼叫 `printf()` 時會發生問題,那就改由一個 thread 負責輸出就好,其餘 thread 只負責將欲輸出的結果加入某個被保護住的 thread-safe buffer。為了保持程式的簡單,這裡統一由 main thread 在程式結束前輸出至螢幕。
``` cpp
static pthread_mutex_t print_mutex;
/* Lock protected buffer for the printing thread.
For further application, change this to linked
list implementation, which is a more flexible
approach.
*/
char printBuffer[30][30];
int buf_count = 0;
void printBufferAdd(char *str)
{
pthread_mutex_lock(&print_mutex);
strcpy(printBuffer[buf_count], str);
buf_count++;
pthread_mutex_unlock(&print_mutex);
}
static void fibonacci() {
int fib[2] = {0, 1};
printf("Fib(0) = 0\nFib(1) = 1\n");
for (int i = 2; i < 15; ++i) {
int nextFib = fib[0] + fib[1];
char str[50];
sprintf(str, "Fib(%d) = %d\n", i, nextFib);
printBufferAdd(str);
fib[0] = fib[1];
fib[1] = nextFib;
fiber_yield();
}
}
static void squares() {
for (int i = 1; i < 10; ++i) {
char str[50];
sprintf(str, "%d * %d = %d\n", i, i, i * i);
printBufferAdd(str);
fiber_yield();
}
}
int main() {
pthread_mutex_init(&print_mutex, NULL);
fiber_init();
fiber_spawn(&fibonacci);
fiber_spawn(&squares);
/* Since fibers are non-preemptive, we must allow them to run */
fiber_wait_all();
/* Print all results once before the program ends. */
for(int i=0;i<buf_count;++i)
{
printf("%s", printBuffer[i]);
}
return 0;
}
```
:::warning
TODO: 複習生產者-消費者問題及 ring buffer,思考減少 mutex lock 的機制。
:notes: jserv
:::