Try   HackMD

2021q1 第 8 週測驗題: 測驗 2

tags: linux2021

測驗題目總覽

本題目檢驗學員對 bitwise 操作, linked list, hash table, MapReduce 概念的認知

並行程式設計 —— 案例探討: Map-Reduce,談及 functional programming (FP) 常見以下三種操作:

  • map(): 配合給定行為,回傳一個陣列或鏈結串列
  • filter(): 用以檢索和篩選
  • reduce(): 將素材混合在一塊,得到最終結果

其中映射函數 (Map) 對一些獨立元素組成的概念上的列表的每一個元素進行指定的操作。

  • 例如一個成績列表,有人發現所有學生的成績都被高估了一分,於是教師可定義一個「減一」的映射函數,用來修正這個錯誤。

事實上,每個元素都是被獨立操作的,而原始列表沒有被更改,因為這裡創建了一個新的列表來保存新的答案,這就是說,Map 操作是可高度並行的,這對高效能要求的應用以及並行計算領域的需求非常有用。

至於歸納函數 (Reduce) 則是對一個列表的元素進行適當的合併。

  • 例如想知道班級的平均分,可以定義一個歸納函數,通過讓列表中的奇數(odd)或偶數(even)元素跟自己的相鄰的元素相加的方式把列表減半,如此遞歸運算直到列表只剩下一個元素,然後用這個元素除以人數,就得到了平均分。

我如何向老婆解釋 MapReduce

流程示意圖:







g



輸入

輸入



分割

分割



輸入->分割





1

1



分割->1





2

2



分割->2





3

3



分割->3





4

4



分割->4





...

...



分割->...





N

N



分割->N





合併

合併



1->合併





2->合併





3->合併





4->合併





...->合併





N->合併





輸出

輸出



合併->輸出





取自 Google 公司論文 MapReduce: Simplified Data Processing on Large Clusters 的範例,展示 word count (可簡稱 wc) 如何透過 MapReduce 來實作,流程圖如下:

以在大量的檔案中尋找每個單字出現的次數 (wc) 為例:

映射函數 (Map) 在每個單字出現時,指派該單字數值 1,即每當它出現一次,歸納函數 (Reduce) 則將每個單字各自出現的次數加在一起。

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");

reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

:warning: 對!你沒看錯,就是 786 行 C 程式,經過這幾週的訓練,你閱讀原始程式碼應該可以快多了吧?

以下是個藉由 POSIX Thread 實作的 MapReduce 風格 wc 程式: (word-count.c)

/* Word cache configs */ #define MAX_WORD_SIZE 32 #define MAX_N_WORDS 8192 #include <stddef.h> #define container_of(list_ptr, container_type, member_name) \ ({ \ const typeof(((container_type *) 0)->member_name) *__member_ptr = \ (list_ptr); \ (container_type *) ((char *) __member_ptr - \ offsetof(container_type, member_name)); \ }) struct list_entry { struct list_entry *next, *prev; }; #define list_element(list_ptr, type, member) \ container_of(list_ptr, type, member) #define list_first(root_ptr, type, member) \ list_element((root_ptr)->next, type, member) static inline struct list_entry *list_next(struct list_entry *root, struct list_entry *current) { if ((root == root->next) || (current->next == root)) return NULL; return current->next; } /* FIXME: this forbids having 2 list_for_each in the same function, because the * variable __ptr will be defined twice, which results in a compilation error. * The __ptr is necessary because some functions delete iter while traversing * the list. */ #define list_for_each_forward(root_ptr, iter) \ struct list_entry *__ptr; \ for (iter = (root_ptr)->next, __ptr = (struct list_entry *) (iter)->next; \ iter != (root_ptr); iter = (typeof((iter))) __ptr, \ __ptr = (struct list_entry *) iter->next) #define list_for_each(root_ptr, iter) list_for_each_forward(root_ptr, iter) static inline void list_root_init(struct list_entry *root) { root->next = root->prev = root; } static inline void list_add(struct list_entry *root, struct list_entry *entry) { struct list_entry *prev_entry = root; struct list_entry *next_entry = root->next; entry->next = next_entry, entry->prev = prev_entry; prev_entry->next = entry, next_entry->prev = entry; } #define list_add_prev(root, entry) list_add((root)->prev, (entry)) #define list_empty(root) (root == (root)->next) #include <stdint.h> #include <stdlib.h> typedef uint32_t hash_t; /* A node of the table */ struct ht_node { hash_t hash; struct list_entry list; }; /* user-defined functions */ typedef int hashfunc_t(void *key, hash_t *hash, uint32_t *bkt); typedef int cmp_t(struct ht_node *n, void *key); /* hash table */ struct htable { hashfunc_t *hashfunc; cmp_t *cmp; uint32_t n_buckets; struct list_entry *buckets; }; /* Initializes a hash table */ static inline int ht_init(struct htable *h, hashfunc_t *hashfunc, cmp_t *cmp, uint32_t n_buckets) { h->hashfunc = hashfunc, h->cmp = cmp; h->n_buckets = n_buckets; h->buckets = malloc(sizeof(struct list_entry) * n_buckets); for (size_t i = 0; i < h->n_buckets; i++) list_root_init(&h->buckets[i]); return 0; } /* destroys ressource called by ht_init */ static inline int ht_destroy(struct htable *h) { free(h->buckets); return 0; } /* Find an element with the key in the hash table. * Return the element if success. */ static inline struct ht_node *ht_find(struct htable *h, void *key) { hash_t hval; uint32_t bkt; h->hashfunc(key, &hval, &bkt); struct list_entry *head = &h->buckets[bkt], *iter; list_for_each (head, iter) { struct ht_node *n = list_element(iter, struct ht_node, list); if (n->hash == hval) { int res = h->cmp(n, key); if (!res) return n; if (res > 0) return NULL; } } return NULL; } /* Insert a new element with the key 'key' in the htable. * Return 0 if success. */ static inline int ht_insert(struct htable *h, struct ht_node *n, void *key) { hash_t hval; uint32_t bkt; h->hashfunc(key, &hval, &bkt); n->hash = hval; struct list_entry *head = &h->buckets[bkt], *iter; list_for_each (head, iter) { struct ht_node *tmp = list_element(iter, struct ht_node, list); if (tmp->hash >= hval) { int cmp = h->cmp(tmp, key); if (!cmp) /* already exist */ return -1; if (cmp > 0) { MMM; return 0; } } } list_add_prev(head, &n->list); return 0; } static inline struct ht_node *ht_get_first(struct htable *h, uint32_t bucket) { struct list_entry *head = &h->buckets[bucket]; if (list_empty(head)) return NULL; return NNN(head, struct ht_node, list); } static inline struct ht_node *ht_get_next(struct htable *h, uint32_t bucket, struct ht_node *n) { struct list_entry *ln = list_next(&h->buckets[bucket], &n->list); if (!ln) return NULL; return list_element(ln, struct ht_node, list); } /* cache of words. Count the number of word using a modified hash table */ struct wc_cache { struct htable htable; }; struct wc_word { char word[MAX_WORD_SIZE], *full_word; uint32_t counter; struct ht_node node, node_main; }; #include <ctype.h> #include <limits.h> #include <stdio.h> #include <string.h> /* TODO: handle '-' character (hyphen) */ /* TODO: add number support */ /* FIXME: remove the assumptions on ASCII encoding */ static uint32_t n_buckets; static struct wc_cache main_cache, *thread_caches; #define FIRST_LOWER_LETTER 'a' #define N_LETTERS (('z' - 'a') + 1) #define MIN_N_BUCKETS N_LETTERS #define GET_WORD(w) (w->full_word ? w->full_word : w->word) #define MIN_MAX(a, b, op) \ ({ \ __typeof__(a) _a = (a); \ __typeof__(b) _b = (b); \ _a op _b ? _a : _b; \ }) #define MAX(a, b) MIN_MAX(a, b, >) #define MIN(a, b) MIN_MAX(a, b, <) /* Called to compare word if their hash value is similar */ static inline int __wc_cmp(struct ht_node *n, void *key, char m) { struct wc_word *w = m ? container_of(n, struct wc_word, node_main) : container_of(n, struct wc_word, node); return strcasecmp(GET_WORD(w), (char *) key); } static int wc_cmp(struct ht_node *n, void *key) { return __wc_cmp(n, key, 0); } static int wc_cmp_main(struct ht_node *n, void *key) { return __wc_cmp(n, key, 1); } static uint32_t code_min, code_max, code_range; static uint32_t get_code(char *word) { uint32_t code = 0; /* The hash value is a concatenation of the letters */ char shift = RRR; for (int i = ((sizeof(code) * 8) / shift) - 1; i >= 0 && *word; i--) { char c = tolower(*(word)) - FIRST_LOWER_LETTER; code |= ((uint32_t) c) << (i * shift); word++; } return code; } static inline int scale_range_init() { code_min = get_code("a"), code_max = get_code("zzzzzzzzzz"); code_range = (code_max - code_min); return 0; } static inline uint32_t scale_range(uint32_t code) { return (uint32_t)((((double) code - code_min) * n_buckets) / code_range); } /* Must keep an an alphabetic order when assiging buckets. */ static int hash_bucket(void *key, hash_t *hash, uint32_t *bkt) { uint32_t code = get_code((char *) key); *hash = (hash_t) code, *bkt = scale_range(code); return 0; } /* Initialize each (worker+main) cache */ int wc_init(uint32_t n_threads, uint32_t n_words) { /* FIXME: a resizable hash table would be better */ n_buckets = MAX(MIN(n_words, MAX_N_WORDS), MIN_N_BUCKETS); thread_caches = malloc(sizeof(struct wc_cache) * n_threads); scale_range_init(); for (size_t i = 0; i < n_threads; i++) { if (ht_init(&thread_caches[i].htable, hash_bucket, wc_cmp, n_buckets)) return -1; } if (ht_init(&main_cache.htable, hash_bucket, wc_cmp_main, n_buckets)) return -1; return 0; } /* Copy while setting to lower case */ static char *wc_strncpy(char *dest, char *src, size_t n) { /* case insensitive */ for (size_t i = 0; i < n && src[i] != '\0'; i++) dest[i] = (char) tolower((int) (src[i])); return dest; } /* Add a word to the cache of the thread tid. If the word already exists, * increment its counter. Otherwise, add a new word. */ int wc_add_word(uint32_t tid, char *word, uint32_t count) { struct wc_word *w; struct wc_cache *cache = &thread_caches[tid]; struct ht_node *n; if (!(n = ht_find(&cache->htable, word))) { /* word was absent. Allocate a new wc_word */ if (!(w = calloc(1, sizeof(struct wc_word)))) return -1; if (count > (MAX_WORD_SIZE - 1)) w->full_word = calloc(1, count + 1); wc_strncpy(GET_WORD(w), word, count); /* Add the new world to the table */ ht_insert(&cache->htable, &w->node, word); } else w = container_of(n, struct wc_word, node); w->counter++; return 0; } static int __merge_results(uint32_t tid, uint32_t j, struct wc_cache *wcc) { struct wc_cache *cache = &main_cache; struct ht_node *iter = ht_get_first(&wcc->htable, j); for (; iter; iter = ht_get_next(&wcc->htable, j, iter)) { struct wc_word *iw = container_of(iter, struct wc_word, node); /* check if word already exists in main_cache */ char *wd = GET_WORD(iw); struct ht_node *n; if ((n = ht_find(&cache->htable, wd))) { /* word already exists. Get the word and increment it */ struct wc_word *w = container_of(n, struct wc_word, node_main); w->counter += iw->counter; } else /* if word does not exist, then insert the new word */ ht_insert(&cache->htable, PPP, wd); } return 0; } /* Merge the results of all threads to the main cache. * This Merge is done in parralel by all threads. * Each thread handles at least n_buckets/n_threads buckets. */ int wc_merge_results(uint32_t tid, uint32_t n_threads) { uint32_t n_workers; /* Keep the number of workers <= nbthread */ if (n_threads > n_buckets) { if (tid > n_buckets - 1) return 0; n_workers = n_buckets; } else n_workers = n_threads; /* Each thread will treat at least wk_buckets */ uint32_t wk_buckets = n_buckets / n_workers; /* The range that this thread will treat */ uint32_t wk_bstart = wk_buckets * tid, wk_bend = wk_bstart + wk_buckets; /* last thread must also do last buckets */ if ((tid == (n_workers - 1))) wk_bend += QQQ; for (size_t i = 0; i < n_threads; i++) { struct wc_cache *cache = &thread_caches[i]; for (size_t j = wk_bstart; j < wk_bend; j++) { /* Traverse the buckets of all threads from wk_bstart to wk_bend. * Aggregate the nodes of theses buckets in the main_cache. */ __merge_results(tid, j, cache); } } return 0; } /* Print the merged results */ int wc_print(int id) { uint32_t total = 0, count_total = 0, bkt_total = 0, empty_bkt = 0; int valid = (id == -1); struct wc_cache *cache = valid ? &main_cache : &thread_caches[id]; for (size_t j = 0; j < n_buckets; j++) { struct ht_node *iter = ht_get_first(&cache->htable, j); for (; iter; iter = ht_get_next(&cache->htable, j, iter)) { struct wc_word *w = valid ? container_of(iter, struct wc_word, node_main) : container_of(iter, struct wc_word, node); printf("%s : %d\n", GET_WORD(w), w->counter); bkt_total++, total++; count_total += w->counter; } if (!bkt_total) empty_bkt++; bkt_total = 0; } printf("Words: %d, word counts: %d, full buckets: %d (ideal %d)\n", total, count_total, n_buckets - empty_bkt, MIN(total, n_buckets)); return 0; } static int __wc_destroy(struct wc_cache *wcc, int id) { int valid = (id == -1); for (uint32_t j = 0; j < n_buckets; j++) { struct ht_node *iter = ht_get_first(&wcc->htable, j); for (; iter; iter = ht_get_next(&wcc->htable, j, iter)) { struct wc_word *w = valid ? container_of(iter, struct wc_word, node_main) : container_of(iter, struct wc_word, node); free(w->full_word); free(w); } } return 0; } /* Destroy ressource allocated by wc_init */ /* Free nodes and htable */ int wc_destroy(uint32_t n_threads) { for (size_t i = 0; i < n_threads; i++) { if (__wc_destroy(&thread_caches[i], i)) return -1; if (ht_destroy(&thread_caches[i].htable)) return -1; } free(thread_caches); if (ht_destroy(&main_cache.htable)) return -1; return 0; } #include <fcntl.h> #include <sys/mman.h> #include <sys/stat.h> #include <unistd.h> /* I/O operation configs */ #ifndef BUFFER_SIZE #define BUFFER_SIZE 4096 #endif static int fd; static off_t file_size; static void *file_content; static __thread char *worker_buffer; #if defined(__linux__) #define MMAP_FLAGS (MAP_POPULATE | MAP_PRIVATE) #else #define MMAP_FLAGS (MAP_PRIVATE) #endif /* Initialize file access for worker threads. * Should be called by main thread. Return 0 on success. */ static inline int fa_init(char *file, uint32_t n_threads, off_t *fsz) { /* Opening file */ if ((fd = open(file, O_RDONLY)) < 0) { perror("open"); return -1; } /* Get file size */ if ((file_size = lseek(fd, 0, SEEK_END)) < 0) { perror("lseek"); return -1; } file_content = mmap(NULL, file_size, PROT_READ, MMAP_FLAGS, fd, 0); if (file_content == MAP_FAILED) file_content = NULL; *fsz = file_size; return 0; } /* Initialize file read access. * Should be called by worker threads. Return 0 on success. */ static inline int fa_read_init() { if (!file_content && !(worker_buffer = malloc(BUFFER_SIZE))) return -1; return 0; } /* destroy fa_read_init allocated ressource */ static inline int fa_read_destroy() { free(worker_buffer); return 0; } /* Read worker part of the file. Should be called by worker threads. */ static inline off_t fa_read(uint32_t id, char **buff, off_t size, off_t pos) { off_t size_read; if (file_content) { if (pos >= file_size) /* EOF */ return 0; *buff = (char *) file_content + pos; off_t end = pos + size; size_read = (end > file_size) ? (end - file_size) : size; return size_read; } off_t size_to_read = BUFFER_SIZE < size ? BUFFER_SIZE : size; if ((size_read = pread(fd, worker_buffer, size_to_read, pos)) == -1) { perror("pread"); return -1; } *buff = worker_buffer; return size_read; } #include <pthread.h> char *file_name; uint32_t n_threads; struct thread_info { pthread_t thread_id; /* ID returned by pthread_create() */ int thread_num; /* Application-defined thread # */ }; #define BETWEEN(_wd, _min, _max) ((_wd >= _min) && (_wd <= _max)) #define IS_LETTER(c) (BETWEEN(*buff, 'A', 'Z') || BETWEEN(*buff, 'a', 'z')) static off_t file_size; static pthread_barrier_t barrier; int mr_init(void) { if (pthread_barrier_init(&barrier, NULL, n_threads)) { perror("barrier init"); return -1; } if (fa_init(file_name, n_threads, &file_size)) return -1; if (wc_init(n_threads, file_size / MAX_WORD_SIZE)) return -1; return 0; } int mr_destroy(void) { if (pthread_barrier_destroy(&barrier)) { perror("barrier init"); return -1; } if (fa_init(file_name, n_threads, &file_size)) return -1; if (wc_destroy(n_threads)) return -1; return 0; } int mr_reduce(void) { /* The merge is done by worker threads */ return 0; } int mr_print(void) { return wc_print(-1); } static __thread off_t worker_slice, worker_current; static __thread uint32_t count = 0, wsize = 0; static __thread char *word = NULL; /* The next three funcitons handle a buffer of the file. * Note that a buffer may end in the middle of word. * The word will be completed on the next call to the func. */ static int add_letter(char c) { if ((count > wsize - 1) || !wsize) { wsize += MAX_WORD_SIZE; char *orig = word; if (!(word = realloc(word, wsize))) { free(orig); return -1; } } word[count++] = c; return 0; } static inline int add_sep(uint32_t tid) { if (count) { word[count] = '\0'; /* Add current word */ if (wc_add_word(tid, word, count)) return -1; count = 0; } return 0; } static int buff_proceed(uint32_t tid, char *buff, size_t size, char last) { for (; size; size--, buff++) { if (!IS_LETTER(*buff)) { if (add_sep(tid)) /* Not a letter */ return -1; continue; } if (add_letter(*buff)) /* Is a letter */ return -1; } if (last) /* If this is the last buffer, end the word (if any) */ add_sep(tid); return 0; } /* Configure the buffer slices of each worker */ static int buff_init(uint32_t tid) { if (fa_read_init()) return -1; worker_slice = file_size / n_threads; worker_current = worker_slice * tid; /* Last thread handle remaining bytes */ if (tid == (n_threads - 1)) worker_slice += file_size % n_threads; off_t worker_end = worker_current + worker_slice; /* Balance worker_slice to include words at the ends. * skip first letters if we are not the first thread. */ char *buff; do { if (tid == 0) break; if (fa_read(tid, &buff, 1, worker_current) != 1) return -1; if (!IS_LETTER(*buff)) break; worker_current++; worker_slice--; } while (*buff); /* add letters of the last word if we are not the last thread */ do { if (tid == (n_threads - 1)) break; if (fa_read(tid, &buff, 1, worker_end) != 1) return -1; if (!IS_LETTER(*buff)) break; worker_end++, worker_slice++; } while (*buff); return 0; } static int buff_destroy() { free(word); if (fa_read_destroy()) return -1; return 0; } /* Read a buffer from the file */ static int buff_read(uint32_t tid, char **buff, off_t *size, char *last) { if (!worker_slice) return 0; off_t size_read = fa_read(tid, buff, worker_slice, worker_current); if (size_read == -1) return -1; *size = size_read; worker_current += size_read, worker_slice -= size_read; if (!worker_slice) *last = 1; return 0; } void *mr_map(void *id) { uint32_t tid = ((struct thread_info *) id)->thread_num; int ret = buff_init(tid); if (ret) goto bail; char *buff; off_t size = 0; char last = 0; while (!(ret = buff_read(tid, &buff, &size, &last))) { if (!size) break; if (buff_proceed(tid, buff, size, last)) goto bail; if (last) /* If this was the last buffer */ break; } if (buff_destroy()) goto bail; /* wait for other worker before merging */ if (pthread_barrier_wait(&barrier) > 0) { perror("barrier wait"); goto bail; } /* merge results (done in parrallel) */ ret = wc_merge_results(tid, n_threads); bail: return ((void *) (long) ret); } #include <sys/time.h> static struct thread_info *tinfo; #define throw_err(msg) \ do { \ perror(msg); \ exit(EXIT_FAILURE); \ } while (0) static int parse_args(int argc, char **argv) { if (argc < 3) return -1; file_name = argv[1]; if (!file_name) return -1; n_threads = atoi(argv[2]); if (!n_threads) return -1; return 0; } static void run_threads(void) { if (!(tinfo = calloc(n_threads, sizeof(struct thread_info)))) throw_err("calloc"); for (size_t i = 0; i < n_threads; i++) { tinfo[i].thread_num = i; if (pthread_create(&tinfo[i].thread_id, NULL, mr_map, &tinfo[i])) throw_err("thread create"); } } static void wait_threads() { for (size_t i = 0; i < n_threads; i++) if (pthread_join(tinfo[i].thread_id, NULL)) throw_err("thread join"); free(tinfo); } static double now() { struct timeval tp; if (gettimeofday(&tp, (struct timezone *) NULL) == -1) perror("gettimeofday"); return ((double) (tp.tv_sec) * 1000.0) + (((double) tp.tv_usec / 1000.0)); } int main(int argc, char **argv) { if (-1 == parse_args(argc, argv)) { printf("ERROR: Wrong arguments\n"); printf("usage: %s FILE_NAME THREAD_NUMBER\n", argv[0]); exit(EXIT_FAILURE); } double start = now(); if (mr_init()) exit(EXIT_FAILURE); run_threads(); wait_threads(); if (mr_reduce()) exit(EXIT_FAILURE); /* Done here, to avoid counting the printing */ double end = now(); if (mr_print()) exit(EXIT_FAILURE); if (mr_destroy()) exit(EXIT_FAILURE); printf("Done in %g msec\n", end - start); exit(EXIT_SUCCESS); }

編譯方式:

$ gcc -Wall -o word-count word-count.c -lpthread

本程式由於使用到 pthread_barrier_initpthread_barrier_wait,無法在 macOS 編譯,應以 GNU/Linux 作為主要開發和測試的環境

Project Gutenberg (古騰堡計畫) 下載 The Adventures of Tom Sawyer (《湯姆歷險記》):

$ wget https://www.gutenberg.org/files/74/74-0.txt

出處:information_source: 古騰堡計畫(簡稱 PG) 依西方世界第一個採用活字刷的 15 世紀德國印刷商命名。它的任務是把公版著作 (public domain) 數位化,放在網際網路上供大眾自由取用。它的目標包含:

  1. 為社會大眾製作及散布電子文本;
  2. 所有的人都能接受,不受價格及近用性(硬體及軟體)的限制,電子文本沒有任何標價;
  3. 立即可用的電子文本,不需額外的調整,可被人眼及電腦程式讀取,甚至比紙本還受歡迎;
  4. 每年倍增電子文本;

測試,分別用 1, 2, 4 個執行緒啟動 word-count:

./word-count 74-0.txt 1
./word-count 74-0.txt 2
./word-count 74-0.txt 4

參考執行輸出:

a : 1955
abandoned : 4
abash : 1
...
zenith : 1
zephyr : 1
zip : 1
Words: 7616, word counts: 77523, full buckets: 1046 (ideal 7616)
Done in 9.22021 msec

讓你回想起第 6 週測驗題提到的一段話嗎?
「就像英語單詞書的第一個單詞總是 Abandon 一樣,很多沒有毅力堅持的人就只能記住這一個單詞,所以通常情況下單詞書就前幾頁有翻動的痕跡,後面都是嶄新如初」

請依據題目描述和程式碼註解,補完程式碼,使其運作符合預期。

作答區

MMM = ?

NNN = ?

PPP = ?

QQQ = ?

RRR = ?

延伸問題:

  1. 解釋上述程式碼運作原理 (也可搭配程式碼解說 mmap 系統呼叫的作用),應對照 Google 公司論文 MapReduce: Simplified Data Processing on Large Clusters 搭配解說
  2. 指出上述程式碼實作的缺失和可能的改進,並著手開發
  3. 考慮 Processor affinity,設計特製的 job queue,可仿效 Linux 核心的 Concurrency Managed Workqueue 介面,引入 lock-free 的設計,改寫上述程式碼成為效率更高的 MapReduce 實作