Try   HackMD

2021q1 Homework6 (quiz6)

contributed by < YOYOPIG >

tags: linux2021

2021q1 第 6 週測驗題

測驗 1

程式碼的理解與運作原理

這裡使用二進位的方式來處理大數的運算。定義需要的 macro 及一個大數:

/* how large the underlying array size should be */
#define UNIT_SIZE 4

/* These are dedicated to UNIT_SIZE == 4 */
#define UTYPE uint32_t
#define UTYPE_TMP uint64_t
#define FORMAT_STRING "%.08x"
#define MAX_VAL ((UTYPE_TMP) 0xFFFFFFFF)
#define BN_ARRAY_SIZE (128 / UNIT_SIZE) /* size of big-numbers in bytes */
struct bn { UTYPE array[BN_ARRAY_SIZE]; };

初始化大數及轉換:

static inline void bn_init(struct bn *n) {
    for (int i = 0; i < BN_ARRAY_SIZE; ++i)
        n->array[i] = 0;
}

static inline void bn_from_int(struct bn *n, UTYPE_TMP i) {
    bn_init(n);

    /* FIXME: what if machine is not little-endian? */
    n->array[0] = i;
    /* bit-shift with U64 operands to force 64-bit results */
    UTYPE_TMP tmp = i >> 32;
    n->array[1] = tmp;
}

這裡需要注意到,由於大數的 array 每一個元素為 uint32_t,因此需要兩格來儲存 uint64_t 類型的資料。首先將 lower 存到 array[0],再透過一次的 right shift 將 upper 存到 array[1]。如此一來,array 會呈現 little endian 式的排列。在註解中提到,若是使用的機器不為 little endian,是否會造成問題?這裡考慮兩個部份:

  • big endian 機器在做 shift operation 時,是否會有不同的結果
  • array 用的是 little-endian,在 big endian 機器上是否會不方便

首先,根據 stackoverflow 討論,shift 的結果和記憶體存放的 endianess 沒有關係,因此不會出現問題。而第 2 點只需要在後續程式處理的時候多加注意即可。

接著,實作把自定義的 big num 轉成 string 的函式

static void bn_to_str(struct bn *n, char *str, int nbytes) {
    /* index into array - reading "MSB" first -> big-endian */
    int j = BN_ARRAY_SIZE - 1;
    int i = 0; /* index into string representation */

    /* reading last array-element "MSB" first -> big endian */
    while ((j >= 0) && (nbytes > (i + 1))) {
        sprintf(&str[i], FORMAT_STRING, n->array[j]);
        i += (2 *
              UNIT_SIZE); /* step UNIT_SIZE hex-byte(s) forward in the string */
        j -= 1;           /* step one element back in the array */
    }

    /* Count leading zeros: */
    for (j = 0; str[j] == '0'; j++)
        ;

    /* Move string j places ahead, effectively skipping leading zeros */
    for (i = 0; i < (nbytes - j); ++i)
        str[i] = str[i + j];

    str[i] = 0;
}

由於人類閱讀的習慣接近 big-endian,由左至右為 MSB -> LSB 的型式,因此這裡在讀取的時候對 big num 做轉換。

加減法的實作

static void bn_dec(struct bn *n) {
    for (int i = 0; i < BN_ARRAY_SIZE; ++i) {
        UTYPE tmp = n->array[i];
        UTYPE res = tmp - 1;
        n->array[i] = res;

        COND;
    }
}

static void bn_add(struct bn *a, struct bn *b, struct bn *c) {
    int carry = 0;
    for (int i = 0; i < BN_ARRAY_SIZE; ++i) {
        UTYPE_TMP tmp = (UTYPE_TMP) a->array[i] + b->array[i] + carry;
        carry = (tmp > MAX_VAL);
        c->array[i] = (tmp & MAX_VAL);
    }
}

這裡實作大數的加減法,透過一個區域一個區域由 LSB 開始計算。在進位處理上,add 使用一個變數 carry 儲存,減法則在每次計算完過程後,檢查 res 以及 tmp 的大小關係。若 tmp > res 則代表沒有發生 underflow,可直接結束計算,否則需借位並繼續計算。COND(c) if (!(res > tmp)) break

乘法實作

static void bn_mul(struct bn *a, struct bn *b, struct bn *c) {
    struct bn row, tmp;
    bn_init(c);

    for (int i = 0; i < BN_ARRAY_SIZE; ++i) {
        bn_init(&row);

        for (int j = 0; j < BN_ARRAY_SIZE; ++j) {
            if (i + j < BN_ARRAY_SIZE) {
                bn_init(&tmp);
                III;
                bn_from_int(&tmp, intermediate);
                lshift_unit(&tmp, i + j);
                bn_add(&tmp, &row, &row);
            }
        }
        bn_add(c, &row, c);
    }
}

這裡使用 nested for loop 逐位計算。由於兩個 uint32_t 相乘,有可能會發生 overflow ,因此這裡需要透過一個較大的變數 uint64_t 來暫存結果。因此,III 選擇 (c) UTYPE_TMP intermediate = a->array[i] * (UTYPE_TMP) b->array[j]

TODO

Karatsuba algorithm 和 Schönhage–Strassen algorithm 這類的快速乘法演算法相繼提出,請引入到上述程式碼

sysprog21/bignum 中,已引入 Karatsuba algorithm 處理大數乘法加速。在實作乘法的 mul.c 註解中,可以看到日後打算改寫成更快的 Schönhage–Strassen algorithm
開始研究該演算法及其背後滿滿的數學


測驗 2

程式碼的理解與運作原理

這題取自 Leetcode 第 1 題 Two Sum,原題敘述如下:

Given an array of integers nums and an integer target, return indices of the two numbers such that they add up to target.
You may assume that each input would have exactly one solution, and you may not use the same element twice.
You can return the answer in any order.

當初使用 c++ 刷題時,沒有思考太多就直接用了內建的 std::unordered_map 解題。這裡因為 c 語言並沒有類似的工具,因此需自行實作。首先考慮 hash table 的定義方式:

struct hlist_node { struct hlist_node *next, **pprev; };
struct hlist_head { struct hlist_node *first; };
typedef struct { int bits; struct hlist_head *ht; } map_t;

這裡透過 hlist_node 以及 hlist_head 來定義 hash list,將所有具有相同 hash value 的元素加入同一個 list 中。而整個 map 結構則定義為 hash list array,另外用 bits 紀錄整個 array 的大小。初始化方法如下:

map_t *map_init(int bits) {
    map_t *map = malloc(sizeof(map_t));
    if (!map)
        return NULL;

    map->bits = bits;
    map->ht = malloc(sizeof(struct hlist_head) * MAP_HASH_SIZE(map->bits));
    if (map->ht) {
        for (int i = 0; i < MAP_HASH_SIZE(map->bits); i++)
            (map->ht)[i].first = NULL;
    } else {
        free(map);
        map = NULL;
    }
    return map;
}

因為使用 list 儲存,這裡在定義 key-value pair 的時候,需要再加上先前定義的 hlist_node

struct hash_key {
    int key;
    void *data;
    struct hlist_node node;
};

接下來就可以開始進行查找與插入了。

  • 查找
static struct hash_key *find_key(map_t *map, int key) {
    struct hlist_head *head = &(map->ht)[hash(key, map->bits)];
    for (struct hlist_node *p = head->first; p; p = p->next) {
        struct hash_key *kn = container_of(p, struct hash_key, node);
        if (kn->key == key)
            return kn;
    }
    return NULL;
}

void *map_get(map_t *map, int key)
{
    struct hash_key *kn = find_key(map, key);
    return kn ? kn->data : NULL;
}
  • 插入
void map_add(map_t *map, int key, void *data)
{
    struct hash_key *kn = find_key(map, key);
    if (kn)
        return;

    kn = malloc(sizeof(struct hash_key));
    kn->key = key, kn->data = data;

    struct hlist_head *h = &map->ht[hash(key, map->bits)];
    struct hlist_node *n = &kn->node, *first = h->first;
    NNN;
    if (first)
        first->pprev = &n->next;
    h->first = n;
    PPP;
}

大致看一下上面的程式碼可發現,在插入的時候,會將新元素插入至 list 的最前端,再檢查原先 list 中是否有元素,若有的話將其接在新元素的後方。因此,這裡 NNN 以及 PPP 應分別選 (c) n->next = first, (a) n->pprev = &h->first

Hash function implementation

在實作 hash function 時,會希望產生的 hash 值越均勻越好,以避免產生過多的 collision。若 hash function 的選擇不良,很可能最後大量的元素都被分配至同一個 list 中,失去了原先向低搜尋時間的效果。可以看到這裡 hash function 的選擇,應為 multiplication hashing 的一種。將數值乘上一個選定的常數後,透過 shift 取其較高的 bits 位做為 hash value。

#define GOLDEN_RATIO_32 0x61C88647
static inline unsigned int hash(unsigned int val, unsigned int bits) {
    /* High bits are more random, so use them. */
    return (val * GOLDEN_RATIO_32) >> (32 - bits);
}

Fibonacci Hashing

參考文章

注意到上面的程式碼中,選定的常數為 0x61C88647。使用這一特定常數的方法,又被稱為 Fibonacci Hashing。首先,golden ratio 的定義為

φxy, where xy=x+yx
將以上式子展開:
xy=x+yx0=x2xyy2

將 φ 代入式子中:
0=φ2φ1

φ=1+52

用 LaTeX 語法改寫數學式

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

已更正,謝謝老師提醒

接著,在常數的選擇上,根據我們的 bits 數值 W,選定與 W 互質且最接近 W/φ 的正整數 a 作為最後的常數。根據 bits 的大小,常見的常數為:

W a
2^16 40503
2^32 2654435769
2^64 11400714819323198485

若是使用了這樣的神奇數字作為最後的常數,會發現在 hashing 時,連續的 key 值會被盡可能的分散,進而達成避免過多 collision 的結果:

然而,若仔細看會發現,這裡程式碼中定義的 GOLDEN_RATIO_32 為 0x61C88647 = 1640531527,和上方表格中的 2654435769 = 0x9E3779B9 並不相同。閱讀 tools/include/linux/hash.h 中的程式碼會發現,在註解中已經給出了解釋。為了增加計算效率,使用 (1-φ) 取代 φ,且最後 hash 出來結果的 distribution 有著和原版一樣的特性,仍能維持低 collision。

Although a random odd number will do, it turns out that the golden ratio phi = (sqrt(5)-1)/2, or its negative, has particularly nice properties. (See Knuth vol 3, section 6.4, exercise 9.)
These are the negative, (1 - phi) = phi**2 = (3 - sqrt(5))/2, which is very slightly easier to multiply by and makes no difference to the hash distribution.

這裡設計一個簡單的實驗,分別將 GOLDEN_RATIO_32 設為原版的 0x9E3779B9 以及 linux 實作版本的 0x61C88647,對 1~10 做 hashing 並以 3 bits 儲存結果:

int main()
{
    clock_t begin = clock();
    for(int i=1;i<11;++i)
    {
        printf("%d : ", i);
        printf("%u\n", hash(i, 3));
    }
    clock_t end = clock();
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
    printf("%f\n", time_spent);
    return 0;
}

結果分別為:

orig 0x9E3779B9 hash 0x61C88647 hash
1 4 3
2 1 6
3 6 1
4 3 4
5 0 7
6 5 2
7 2 5
8 7 0
9 4 3
10 1 6

可以看到,不論是哪個版本,1~8 都被均勻的 hash 到了 0~7 當中,也就是 3 bits 所能表示的所有數值。從 9 開始 hash value 又從頭開始再輪一次。也就是說,之後的每一個數值 n,會和 n % 8 有著相同的 hash value。為了比較兩種方法的快慢,考慮對 1~100000 做 hashing,執行時間分別為:

  • 0x61C88647: 0.000437 (sec)
  • 0x9E3779B9: 0.000513 (sec)

linux 實作的改進版本,確實比原本快上一些。


測驗 3

程式碼的理解與運作原理

這份程式碼實作了 fiber

In computer science, a fiber is a particularly lightweight thread of execution.
Like threads, fibers share address space. However, fibers use cooperative multitasking while threads use preemptive multitasking. Threads often depend on the kernel's thread scheduler to preempt a busy thread and resume another thread; fibers yield themselves to run another fiber while executing.

可以看到,和一般的 preemptive multitasking 不同,fiber 不仰賴系統的 preemption,而是透過執行單元自行讓出運算資源(當然,需自行在適當時機呼叫 yield() 讓出控制權),以 cooperative multitasking 的方式確保程式的並行運算。

Fiber 的定義

typedef struct {
    pid_t pid;   /* The pid of the child thread as returned by clone */
    void *stack; /* The stack pointer */
} fiber_t;

/* The fiber "queue" */
static fiber_t fiber_list[MAX_FIBERS];

/* The pid of the parent process */
static pid_t parent;

/* The number of active fibers */
static int num_fibers = 0;

定義 fiber_t 由兩個變數組成,分別負責記錄 child thread 的 pid 以及其 stack pointer。同時,宣告 fiber_list 以儲存日後會產生的 fiber。

Fiber 的基本功能實作

void fiber_init()
{
    for (int i = 0; i < MAX_FIBERS; ++i)
        fiber_list[i].pid = 0, fiber_list[i].stack = 0;
    parent = getpid();
}

/* Yield control to another execution context */
void fiber_yield()
{
    /* move the current process to the end of the process queue. */
    sched_yield();
}

struct fiber_args {
    void (*func)(void);
};

static int fiber_start(void *arg)
{
    struct fiber_args *args = (struct fiber_args *) arg;
    void (*func)() = args->func;
    free(args);

    func();
    return 0;
}

實作一些 fiber 的基本功能,包括初始化 fiber、透過 function pointer 給定 fiber 執行的 function等,同時將 sched_yield() 函數 encapsulate 成 fiber_yield() 方便後面呼叫。

生成新 fiber 並執行

int fiber_spawn(void (*func)(void))
{
    if (num_fibers == MAX_FIBERS)
        return FIBER_MAXFIBERS;

    if ((fiber_list[num_fibers].stack = malloc(FIBER_STACK)) == 0)
        return FIBER_MALLOC_ERROR;

    struct fiber_args *args;
    if ((args = malloc(sizeof(*args))) == 0) {
        free(fiber_list[num_fibers].stack);
        return FIBER_MALLOC_ERROR;
    }
    args->func = func;

    fiber_list[num_fibers].pid = clone(
        fiber_start, KKK,
        SIGCHLD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_VM, args);
    if (fiber_list[num_fibers].pid == -1) {
        free(fiber_list[num_fibers].stack);
        free(args);
        return FIBER_CLONE_ERROR;
    }

    num_fibers++;
    return FIBER_NOERROR;
}

透過 clone() 系統呼叫生成新 fiber 並開始執行。若有任何錯誤發生,如 fiber 數量超出給定上限,或在 malloc 途中發生問題等,則將新分配的資源 free 掉並回傳對應的 error message。根據 linux man page,KKK 的位置應該要給 stack pointer,這裡選 (b) (char *) fiber_list[num_fibers].stack + FIBER_STACK

等待所有 fiber 執行完成

int fiber_wait_all()
{
    /* Check to see if we are in a fiber, since we do not get signals in the
     * child threads
     */
    pid_t pid = getpid();
    if (pid != parent)
        return FIBER_INFIBER;

    /* Wait for the fibers to quit, then free the stacks */
    while (num_fibers > 0) {
        if ((pid = wait(0)) == -1)
            exit(1);

        /* Find the fiber, free the stack, and swap it with the last one */
        for (int i = 0; i < num_fibers; ++i) {
            if (CCC) {
                free(fiber_list[i].stack);
                if (i != --num_fibers)
                    fiber_list[i] = fiber_list[num_fibers];
                break;
            }
        }
    }

    return FIBER_NOERROR;
}

由於只有 parent 需要做等待的動作,因此這裡先做了對應的檢查。接著,使用 wait()系統呼叫 等待任一 fiber 執行完畢。接著,在 fiber_list 中找到執行完畢的 fiber,將它的 stack free 掉。最後,為了保持 fiber_list 的整齊,將最後一個 fiber 和當前的空 fiber 對調,避免空間的零碎化。這裡 CCC 的位置應是在尋找執行完成的 fiber,應選 (b) fiber_list[i].pid == pid

Fix interleaving

反覆執行上述程式碼,有時輸出可見 fiber 中的 printf() 結果無法以一整行表示。錯誤結果範例如下:

1 * 1 = 1           
Fib(0) = 0
Fib(1) = 1
Fib(2) = 1
2 * 2 = 4 
3 * 3 = 9   
4 * 4 = 16  
5 * 5 = 25  
6 * 6 = 36     
7 * 7 = 49       
3) = 2   
8 * 8 = 64  
3) = 2           
9 * 9 = 81   
Fib(4) = 3         
Fib(5) = 5         
Fib(6) = 8          
Fib(7) = 13           
Fib(8) = 21           
Fib(9) = 34            
Fib(10) = 55       
Fib(11) = 89           
Fib(12) = 144         
Fib(13) = 233            
Fib(14) = 377

改善方法一:

由於 yield() 的呼叫是在 printf() 之後,因此推測應該不是兩個 thread 同時呼叫 printf(),比較可能是第一個 thread 呼叫完後,第二個 thread 才呼叫 printf(),但 I/O buffer 中還有東西未輸出至螢幕上導致。因此,這裡引入 mutex lock 保護每一個 printf()

static pthread_mutex_t print_mutex;

safePrint(char *str)
{
    pthread_mutex_lock(&print_mutex);
    printf(str);
    pthread_mutex_unlock(&print_mutex);
}

static void fibonacci() {
    int fib[2] = {0, 1};
    printf("Fib(0) = 0\nFib(1) = 1\n");
    for (int i = 2; i < 15; ++i) {
        int nextFib = fib[0] + fib[1];
        char str[50];
        sprintf(str, "Fib(%d) = %d\n", i, nextFib);
        safePrint(str);
        fib[0] = fib[1];
        fib[1] = nextFib;
        fiber_yield();
    }
}

static void squares() {
    for (int i = 1; i < 10; ++i) {
        char str[50];
        sprintf(str, "%d * %d = %d\n", i, i, i * i);
        safePrint(str);
        fiber_yield();
    }
}

int main() {
    pthread_mutex_init(&print_mutex, NULL);
    fiber_init();

    fiber_spawn(&fibonacci);
    fiber_spawn(&squares);

    /* Since fibers are non-preemptive, we must allow them to run */
    fiber_wait_all();

    return 0;
}

改善後結果如下:

Fib(0) = 0
Fib(1) = 1    
Fib(2) = 1             
1 * 1 = 1               
Fib(3) = 2             
Fib(4) = 3              
Fib(5) = 5              
Fib(6) = 8               
2 * 2 = 4                
3 * 3 = 9               
4 * 4 = 16                
5 * 5 = 25                 
6 * 6 = 36            
7 * 7 = 49              
8 * 8 = 64              
9 * 9 = 81              
Fib(7) = 13              
Fib(8) = 21              
Fib(9) = 34              
Fib(10) = 55           
Fib(11) = 89           
Fib(12) = 144           
Fib(13) = 233           
Fib(14) = 377

改善方法二:

在閱讀這篇 stack overflow 討論串時,發現還有另一種巧思可以解決這個問題。既然不同 thread 呼叫 printf() 時會發生問題,那就改由一個 thread 負責輸出就好,其餘 thread 只負責將欲輸出的結果加入某個被保護住的 thread-safe buffer。為了保持程式的簡單,這裡統一由 main thread 在程式結束前輸出至螢幕。

static pthread_mutex_t print_mutex;

/* Lock protected buffer for the printing thread. 
   For further application, change this to linked 
   list implementation, which is a more flexible 
   approach.
*/
char printBuffer[30][30];
int buf_count = 0;

void printBufferAdd(char *str)
{
    pthread_mutex_lock(&print_mutex);
    strcpy(printBuffer[buf_count], str);
    buf_count++;
    pthread_mutex_unlock(&print_mutex);
}

static void fibonacci() {
    int fib[2] = {0, 1};
    printf("Fib(0) = 0\nFib(1) = 1\n");
    for (int i = 2; i < 15; ++i) {
        int nextFib = fib[0] + fib[1];
        char str[50];
        sprintf(str, "Fib(%d) = %d\n", i, nextFib);
        printBufferAdd(str);
        fib[0] = fib[1];
        fib[1] = nextFib;
        fiber_yield();
    }
}

static void squares() {
    for (int i = 1; i < 10; ++i) {
        char str[50];
        sprintf(str, "%d * %d = %d\n", i, i, i * i);
        printBufferAdd(str);
        fiber_yield();
    }
}

int main() {
    pthread_mutex_init(&print_mutex, NULL);
    fiber_init();

    fiber_spawn(&fibonacci);
    fiber_spawn(&squares);

    /* Since fibers are non-preemptive, we must allow them to run */
    fiber_wait_all();
    
    /* Print all results once before the program ends. */
    for(int i=0;i<buf_count;++i)
    {
        printf("%s", printBuffer[i]);
    }

    return 0;
}

TODO: 複習生產者-消費者問題及 ring buffer,思考減少 mutex lock 的機制。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv