# Word-count contributed by `ilkclord` > [題目描述](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FrkbyTdGUd#undefined) ## Introduction In nowadays , " key words " is an important information .It can be used in analizing the tag of people and their behavior . Word-count program finds out the frequency of each word ,and the result can be used for finding key words .Since the finding process can be very complicated and wasting time , MapReduce method may be a proper solution . ## Program Word count program execute in MapReduce method .We show the MapReduce model of this program below . ### Master & Node Main funtion represents the master in this program , and each thread acts as a node machine in the program .The program creats thread local storage for each pthread , which stores the word it reads by file and information of reading .The thread local storage is correspond to the memory of node machine . ### Funtions * **Combine** There isn't any specific combine funtion in the program , because the combine process has been included in the map function . * **Reduce** The reduce funtion of this program is **wc_merge_results** * **Map** The map funtion of this program is **mr_map** which call **wc_merge_results** inside ### Execution 1 . `main` first initialize the state of file , pthrea_barrier and the cache of each pthread 2 . `run_threads` maps the task out by calling `pthread_create` for number of threads times . 3 . Each pthreads excute the `mr_map` which starts the counting process 4 . Using the pthread_barrier to wait untill all the counting process finished , starts the reduce process by calling `wc_merge_results` 5 . Call `wc_print` to show the results . ### Flow Chart ```graphviz Digraph G{ node[shape = box] main -> initialization ->thread_1->"all finished"->reduce_1->result initialization ->thread_2->"all finished"->reduce_2->result initialization ->thread_3->"all finished"->reduce_3->result initialization ->thread_4->"all finished"->reduce_4->result initialization ->thread_n->"all finished"->reduce_n->result rankdir = LR } ``` ## Code Overview ### Data Structure * **list_entry** ```c struct list_entry { struct list_entry *next, *prev; } ``` Indicates the `next` and `prev` in the double linked list * **ht_node** ```c struct ht_node { hash_t hash; struct list_entry list; } ``` `hash` : stores the hash value `list` : stores the collision list * **htable** ```c struct htable { hashfunc_t *hashfunc; cmp_t *cmp; uint32_t n_buckets; struct list_entry *buckets; } ``` `hashfunc` : the hash function of this hash table `cmp` : the cmp function of this hash table `n_buckets` : the size of the hash table `buckets` : the node of hash table , also can be consider as the root of the collision list * **wc_cache** ```c struct wc_cache { struct htable htable; } ``` Stores the htable inside the struct * **wc_word** ```c struct wc_word { char word[MAX_WORD_SIZE], *full_word; uint32_t counter; struct ht_node node ; struct ht_node node_main; } ``` `word` : stores the word `full_word` : if the word is bigger than the max size , use `full_word` to store it `node` : the current node of this word `node_main` the * **thread_info** ```c struct thread_info { pthread_t thread_id; /* ID returned by pthread_create() */ int thread_num; /* Application-defined thread # */ } ``` * **Thread Local Storage** ```c static __thread off_t worker_slice ; static __thread off_t worker_current ; ``` These two variable are used in the `buff_init` which trims the partial file to be suitable `worker_slice` : the size of partial file that the thread handled `worker_current` : indicates the current position of the file ```c static __thread uint32_t count = 0 ; static __thread uint32_t wsize = 0 ; static __thread char *word = NULL; static __thread char *worker_buffer; ``` `count` : indicate the current insert-position of `word` also means the words real size `wsize` : the size of `word` , the value is assingned by `MAX_WORD_SIZE` `word` : the storage of the word current reading in `worker_buffer` : the partial file handled by the thread ### Basic * **container_of** ```c #define container_of(list_ptr, container_type, member_name) \ ({ \ const typeof(((container_type *) 0)->member_name) *__member_ptr = \ (list_ptr); \ (container_type *) ((char *) __member_ptr - \ offsetof(container_type, member_name)); \ }) ``` Returns the address of the struct ### List * **list_element** ```c #define list_element(list_ptr, type, member) \ container_of(list_ptr, type, member) ``` Returns the address of the struct * **list_first** ```c #define list_first(root_ptr, type, member) \ list_element((root_ptr)->next, type, member) ``` Retrun the next node of `root_ptr` . * **list_next** ```c static inline struct list_entry *list_next(struct list_entry *root, struct list_entry *current) { if ((root == root->next) || (current->next == root)) return NULL; return current->next; } ``` Return the next node , if the list is single , return `0` * **list_for_each** ```c #define list_for_each_forward(root_ptr, iter) \ struct list_entry *__ptr; \ for (iter = (root_ptr)->next, __ptr = (struct list_entry *) (iter)->next; \ iter != (root_ptr); iter = (typeof((iter))) __ptr, \ __ptr = (struct list_entry *) iter->next) #define list_for_each(root_ptr, iter) list_for_each_forward(root_ptr, iter) ``` `list_for_each` is same as `list_for_each` by the definition , it traverse the linked list * **list_root_init** ```c static inline void list_root_init(struct list_entry *root) { root->next = root->prev = root; } ``` Make the root points to right position * **list_add_prev** ```c static inline void list_add(struct list_entry *root, struct list_entry *entry) { struct list_entry *prev_entry = root; struct list_entry *next_entry = root->next; entry->next = next_entry, entry->prev = prev_entry; prev_entry->next = entry, next_entry->prev = entry; } #define list_add_prev(root, entry) list_add((root)->prev, (entry)) ``` `list_add_prev` adds a new node to the front of the list by calling list_add * **list_empty** ```c #define list_empty(root) (root == (root)->next) ``` Check if the linked list is empty ### Hash Table * **ht_init** ```c static inline int ht_init(struct htable *h, hashfunc_t *hashfunc, cmp_t *cmp, uint32_t n_buckets) { h->hashfunc = hashfunc, h->cmp = cmp; h->n_buckets = n_buckets; h->buckets = malloc(sizeof(struct list_entry) * n_buckets); for (size_t i = 0; i < h->n_buckets; i++) list_root_init(&h->buckets[i]); return 0; } ``` It initialize the `htable` , with a`n_bucket` array . Return 0 if sucess . * **ht_destroy** ```c static inline int ht_destroy(struct htable *h) { free(h->buckets); return 0; } ``` Release the memory used by the `htable` * **ht_find** ```c static inline struct ht_node *ht_find(struct htable *h, void *key) { hash_t hval; uint32_t bkt; h->hashfunc(key, &hval, &bkt); struct list_entry *head = &h->buckets[bkt], *iter; list_for_each (head, iter) { struct ht_node *n = list_element(iter, struct ht_node, list); if (n->hash == hval) { int res = h->cmp(n, key); if (!res) return n; if (res > 0) return NULL; } } return NULL; } ``` Find the specific `key` in the hash table by converting the key to hash value , and traverse the collision linked list of the hash value . Return `NULL` if it isn't in the hash table , and return the node if it exists . * **ht_insert** ```c static inline int ht_insert(struct htable *h, struct ht_node *n, void *key) { hash_t hval; uint32_t bkt; h->hashfunc(key, &hval, &bkt); n->hash = hval; struct list_entry *head = &h->buckets[bkt], *iter; list_for_each (head, iter) { struct ht_node *tmp = list_element(iter, struct ht_node, list); if (tmp->hash >= hval) { int cmp = h->cmp(tmp, key); if (!cmp) /* already exist */ return -1; if (cmp > 0) { MMM; return 0; } } } list_add_prev(head, &n->list); return 0; } ``` Insert a new ht_node into the hash table , if it already exists return `-1` ,if success return `0` . * **ht_get_first** ```c static inline struct ht_node *ht_get_first(struct htable *h, uint32_t bucket) { struct list_entry *head = &h->buckets[bucket]; if (list_empty(head)) return NULL; return NNN(head, struct ht_node, list); } ``` Return the first node in the collision list of `h->buckets[bucket]`, because the first node `head` doesn't store the `word` , we use `list_first` to accomplished * **ht_get_next** ```c static inline struct ht_node *ht_get_next(struct htable *h, uint32_t bucket, struct ht_node *n) { struct list_entry *ln = list_next(&h->buckets[bucket], &n->list); if (!ln) return NULL; return list_element(ln, struct ht_node, list); } ``` Return the next node of the list by first calling `list_next` and then use `list_element` to get the address of the `ht_node` ### Word * **GET_WORD** ```c #define GET_WORD(w) (w->full_word ? w->full_word : w->word) ``` The struct `wc_word` stores the word in either `full_word` or `word` , it returns `*full_word` if it exists and `word` if doesn't. * **MIN_MAX** ```c #define MIN_MAX(a, b, op) \ ({ \ __typeof__(a) _a = (a); \ __typeof__(b) _b = (b); \ _a op _b ? _a : _b; \ }) #define MAX(a, b) MIN_MAX(a, b, >) #define MIN(a, b) MIN_MAX(a, b, <) ``` Compare the value of `a` and `b` in `op` way .`MAX` and `MIN` are two apllication * **__wc_cmp** ```c static inline int __wc_cmp(struct ht_node *n, void *key, char m) { struct wc_word *w = m ? container_of(n, struct wc_word, node_main) : container_of(n, struct wc_word, node); return strcasecmp(GET_WORD(w), (char *) key); } ``` Returns the result of `strcasecmp` , which is a compare way regardless of capital letter , return `0` if is the same , `>0` if `GET_WORD(w)` is greater than `key` and `<0` if less .`m` is used to determine which node is in `wc_word` is used to compare . * **wc_cmp / wc_cmp_main** ```c static int wc_cmp(struct ht_node *n, void *key) { return __wc_cmp(n, key, 0); } static int wc_cmp_main(struct ht_node *n, void *key) { return __wc_cmp(n, key, 1); } ``` Two application which apllies diffrent `m` in `__wc_cmp` * **get_code** ```c static uint32_t get_code(char *word) { uint32_t code = 0; /* The hash value is a concatenation of the letters */ char shift = RRR; for (int i = ((sizeof(code) * 8) / shift) - 1; i >= 0 && *word; i--) { char c = tolower(*(word)) - FIRST_LOWER_LETTER; code |= ((uint32_t) c) << (i * shift); word++; } return code; } ``` The hash function that converts a string to uint32_t `code` * **scale_range_init** ```c static inline int scale_range_init() { code_min = get_code("a"), code_max = get_code("zzzzzzzzzz"); code_range = (code_max - code_min); return 0; } ``` Initialize the `code_max` , `code_min` and `code_range` of the hash function * **scale_range** ```c static inline uint32_t scale_range(uint32_t code) { return (uint32_t)((((double) code - code_min) * n_buckets) / code_range); } ``` Finds out the position of `code` in the hash table * **hash_bucket** ```c static int hash_bucket(void *key, hash_t *hash, uint32_t *bkt) { uint32_t code = get_code((char *) key); *hash = (hash_t) code, *bkt = scale_range(code); return 0; } ``` Assign the position to `*bkt` and the hash value to `*hash` * **wc_init** ```c int wc_init(uint32_t n_threads, uint32_t n_words) { /* FIXME: a resizable hash table would be better */ n_buckets = MAX(MIN(n_words, MAX_N_WORDS), MIN_N_BUCKETS); thread_caches = malloc(sizeof(struct wc_cache) * n_threads); scale_range_init(); for (size_t i = 0; i < n_threads; i++) { if (ht_init(&thread_caches[i].htable, hash_bucket, wc_cmp, n_buckets)) return -1; } if (ht_init(&main_cache.htable, hash_bucket, wc_cmp_main, n_buckets)) return -1; return 0; } ``` Initialize the `wc_cahe` of each thread and also initialize the `htable` inside * **wc_strncpy** ```c static char *wc_strncpy(char *dest, char *src, size_t n) { /* case insensitive */ for (size_t i = 0; i < n && src[i] != '\0'; i++) dest[i] = (char) tolower((int) (src[i])); return dest; } ``` Copy the word from `*src` to `dest` with small letters * **wc_add_word** ```c int wc_add_word(uint32_t tid, char *word, uint32_t count) { struct wc_word *w; struct wc_cache *cache = &thread_caches[tid]; struct ht_node *n; if (!(n = ht_find(&cache->htable, word))) { /* word was absent. Allocate a new wc_word */ if (!(w = calloc(1, sizeof(struct wc_word)))) return -1; if (count > (MAX_WORD_SIZE - 1)) w->full_word = calloc(1, count + 1); wc_strncpy(GET_WORD(w), word, count); /* Add the new world to the table */ ht_insert(&cache->htable, &w->node, word); } else w = container_of(n, struct wc_word, node); w->counter++; return 0; } ``` If the `*word` isn't in the hash table , add the `*word` to the hash table in the cache of the thread that indicates by `tid` .If it is , plus `1` to the `counter` .Also if the `calloc` failed , return `-1` , which cause the thread to terminate . * **__merge_results** ```c static int __merge_results(uint32_t tid, uint32_t j, struct wc_cache *wcc) { struct wc_cache *cache = &main_cache; struct ht_node *iter = ht_get_first(&wcc->htable, j); for (; iter; iter = ht_get_next(&wcc->htable, j, iter)) { struct wc_word *iw = container_of(iter, struct wc_word, node); /* check if word already exists in main_cache */ char *wd = GET_WORD(iw); struct ht_node *n; if ((n = ht_find(&cache->htable, wd))) { /* word already exists. Get the word and increment it */ struct wc_word *w = container_of(n, struct wc_word, node_main); w->counter += iw->counter; } else /* if word does not exist, then insert the new word */ ht_insert(&cache->htable, PPP, wd); } return 0; } ``` Traverse the collision list , add the new word into `main` cache if it doesn't exist. If exists , add the `counter` on the exist `counter` . * **wc_merge_results** ```c int wc_merge_results(uint32_t tid, uint32_t n_threads) { uint32_t n_workers; /* Keep the number of workers <= nbthread */ if (n_threads > n_buckets) { if (tid > n_buckets - 1) return 0; n_workers = n_buckets; } else n_workers = n_threads; /* Each thread will treat at least wk_buckets */ uint32_t wk_buckets = n_buckets / n_workers; /* The range that this thread will treat */ uint32_t wk_bstart = wk_buckets * tid, wk_bend = wk_bstart + wk_buckets; /* last thread must also do last buckets */ if ((tid == (n_workers - 1))) wk_bend += QQQ; for (size_t i = 0; i < n_threads; i++) { struct wc_cache *cache = &thread_caches[i]; for (size_t j = wk_bstart; j < wk_bend; j++) { /* Traverse the buckets of all threads from wk_bstart to wk_bend. * Aggregate the nodes of theses buckets in the main_cache. */ __merge_results(tid, j, cache); } } return 0; } ``` The **Reduce** funtion in this program , which assign the specific part of the hash table that the `tid` threads needs to merge .When reducing , it goes through all cache in `thread_cache` and merge the result into `main_cache` . * **wc_print** ```c int wc_print(int id) { uint32_t total = 0, count_total = 0, bkt_total = 0, empty_bkt = 0; int valid = (id == -1); struct wc_cache *cache = valid ? &main_cache : &thread_caches[id]; for (size_t j = 0; j < n_buckets; j++) { struct ht_node *iter = ht_get_first(&cache->htable, j); for (; iter; iter = ht_get_next(&cache->htable, j, iter)) { struct wc_word *w = valid ? container_of(iter, struct wc_word, node_main) : container_of(iter, struct wc_word, node); printf("%s : %d\n", GET_WORD(w), w->counter); bkt_total++, total++; count_total += w->counter; } if (!bkt_total) empty_bkt++; bkt_total = 0; } printf("Words: %d, word counts: %d, full buckets: %d (ideal %d)\n", total, count_total, n_buckets - empty_bkt, MIN(total, n_buckets)); return 0; } ``` Print out the counting result stored in `main_cache` , by traversing the collision list in each `buckets` . * **__wc_destroy** ```c static int __wc_destroy(struct wc_cache *wcc, int id) { int valid = (id == -1); for (uint32_t j = 0; j < n_buckets; j++) { struct ht_node *iter = ht_get_first(&wcc->htable, j); for (; iter; iter = ht_get_next(&wcc->htable, j, iter)) { struct wc_word *w = valid ? container_of(iter, struct wc_word, node_main) : container_of(iter, struct wc_word, node); free(w->full_word); free(w); } } return 0; } ``` Release the memory used by `wc_word` , the address of `wc_word` can be found by calling `container_of` with `node` or `node_main` . * **wc_destroy** ```c int wc_destroy(uint32_t n_threads) { for (size_t i = 0; i < n_threads; i++) { if (__wc_destroy(&thread_caches[i], i)) return -1; if (ht_destroy(&thread_caches[i].htable)) return -1; } free(thread_caches); if (ht_destroy(&main_cache.htable)) return -1; return 0; } ``` Release the memeory used by `thread_cache` by releasing the `wc_word` first and then the `htable` . ### File Handling * **Macro** ```c #if defined(__linux__) #define MMAP_FLAGS (MAP_POPULATE | MAP_PRIVATE) #else #define MMAP_FLAGS (MAP_PRIVATE) #endif ``` Define the `MMAP_FLAGS` according to the system * **fa_init** ```c static inline int fa_init(char *file, uint32_t n_threads, off_t *fsz) { /* Opening file */ if ((fd = open(file, O_RDONLY)) < 0) { perror("open"); return -1; } /* Get file size */ if ((file_size = lseek(fd, 0, SEEK_END)) < 0) { perror("lseek"); return -1; } file_content = mmap(NULL, file_size, PROT_READ, MMAP_FLAGS, fd, 0); if (file_content == MAP_FAILED) file_content = NULL; *fsz = file_size; return 0; } ``` Open the file and get the size of the file , then map it into `file_content` using `mmap` .It has a gate to detect failure in each procedure . * **fa_read_init** ```c static inline int fa_read_init() { if (!file_content && !(worker_buffer = malloc(BUFFER_SIZE))) return -1; return 0;https://hackmd.io/rsfF9q5qSsaSOGQbDFylgw?both } ``` Initialize the `worker_buffer` and check if the file_content is available , if no error , return `0` . * **fa_read_destroy** ```c static inline int fa_read_destroy() { free(worker_buffer); return 0; } ``` Release the memory used by `worker_buffer` * **mmap** Maps a mapping of the file on vitual memeory which the address is continious . It is useful because we can access the meemory in a faster way .The system can accomplished it by using demand paging .It will load the page table first ,then swap the new page in when a page fault occur .On this procedure , if lots of page fault occured, , it may be quite expansive to do it . * **fa_read** ```c static inline off_t fa_read(uint32_t id, char **buff, off_t size, off_t pos) { off_t size_read; if (file_content) { if (pos >= file_size) /* EOF */ return 0; *buff = (char *) file_content + pos; off_t end = pos + size; size_read = (end > file_size) ? (end - file_size) : size; return size_read; } off_t size_to_read = BUFFER_SIZE < size ? BUFFER_SIZE : size; if ((size_read = pread(fd, worker_buffer, size_to_read, pos)) == -1) { perror("pread"); return -1; } *buff = worker_buffer; return size_read; } ``` If `fa_init` have mapped successfully , use `file_content` to write data to `worker_buffer` , if failed , it use the traditional but also slower way , `pread` , to load the data into `worker_buffer` ### Buffer * **add_letter** ```c static int add_letter(char c) { if ((count > wsize - 1) || !wsize) { wsize += MAX_WORD_SIZE; char *orig = word; if (!(word = realloc(word, wsize))) { free(orig); return -1; } } word[count++] = c; return 0; } ``` Add a letter to the `word` , if the size (`count`) is bigger than `MAX_WORD_SIZE` , resize the word and release the oringnal memeory * **add_sep** ```c static inline int add_sep(uint32_t tid) { if (count) { word[count] = '\0'; /* Add current word */ if (wc_add_word(tid, word, count)) return -1; count = 0; } return 0; } ``` When occur a not-letter char ,it means the the `word` is a word now . We add `\0` and call `wc_add_word` to insert into hash table .We will overwrite the data instead cleaning the `word` . * **buff_proceed** ```c static int buff_proceed(uint32_t tid, char *buff, size_t size, char last) { for (; size; size--, buff++) { if (!IS_LETTER(*buff)) { if (add_sep(tid)) /* Not a letter */ return -1; continue; } if (add_letter(*buff)) /* Is a letter */ return -1; } if (last) /* If this is the last buffer, end the word (if any) */ add_sep(tid); return 0; } ``` Operating the specific size by calling the `IS_LETTER` to determine the operation .Then calls `add_sep` for non-letter , and `add_letter` for letter .When reaching the last position ,`last` equals to `1` , return `0` ,and return `-1`, otherwise . * **buff_init** ```c static int buff_init(uint32_t tid) { if (fa_read_init()) return -1; worker_slice = file_size / n_threads; worker_current = worker_slice * tid; /* Last thread handle remaining bytes */ if (tid == (n_threads - 1)) worker_slice += file_size % n_threads; off_t worker_end = worker_current + worker_slice; /* Balance worker_slice to include words at the ends. * skip first letters if we are not the first thread. */ char *buff; do { if (tid == 0) break; if (fa_read(tid, &buff, 1, worker_current) != 1) return -1; if (!IS_LETTER(*buff)) break; worker_current++; worker_slice--; } while (*buff); /* add letters of the last word if we are not the last thread */ do { if (tid == (n_threads - 1)) break; if (fa_read(tid, &buff, 1, worker_end) != 1) return -1; if (!IS_LETTER(*buff)) break; worker_end++, worker_slice++; } while (*buff); return 0; } ``` The trimming function to make the partial file handled by threads be completed , prevent the cut-string situation . ( The most impressive code >< ) * **buff_destroy** ```c static int buff_destroy() { free(word); if (fa_read_destroy()) return -1; return 0; } ``` Release the memory used by `word` , and the `worker_buffer` . * **buff_read** ```c static int buff_read(uint32_t tid, char **buff, off_t *size, char *last) { if (!worker_slice) return 0; off_t size_read = fa_read(tid, buff, worker_slice, worker_current); if (size_read == -1) return -1; *size = size_read; worker_current += size_read, worker_slice -= size_read; if (!worker_slice) *last = 1; return 0; } ``` Calling the `fa_read` to read the data into `worker_buffer` , if it reads all the data it need , assinged `*last` be `1`. ### Map_Reduce * **Macro** ```c #define BETWEEN(_wd, _min, _max) ((_wd >= _min) && (_wd <= _max)) #define IS_LETTER(c) (BETWEEN(*buff, 'A', 'Z') || BETWEEN(*buff, 'a', 'z')) ``` Check if `_wd` is a letter by comparing the ascii code . * **mr_init** ```c int mr_init(void) { if (pthread_barrier_init(&barrier, NULL, n_threads)) { perror("barrier init"); return -1; } if (fa_init(file_name, n_threads, &file_size)) return -1; if (wc_init(n_threads, file_size / MAX_WORD_SIZE)) return -1; return 0; } ``` Initialize the `pthread_barrier` , `worker_buffer` and `thread_cache` . * **mr_destroy** ```c int mr_destroy(void) { if (pthread_barrier_destroy(&barrier)) { perror("barrier init"); return -1; } if (fa_init(file_name, n_threads, &file_size)) return -1; if (wc_destroy(n_threads)) return -1; return 0; } ``` Release the memory used by the program . * **mr_reduce** ```c int mr_reduce(void) { /* The merge is done by worker threads */ return 0; } ``` It is empty , because the `wc_merge_result` has taken its job away * **mr_print** ```c int mr_print(void) { return wc_print(-1); } ``` Call the `wc_print` to show the results * **mr_map** ```c void *mr_map(void *id) { uint32_t tid = ((struct thread_info *) id)->thread_num; int ret = buff_init(tid); if (ret) goto bail; char *buff; off_t size = 0; char last = 0; while (!(ret = buff_read(tid, &buff, &size, &last))) { if (!size) break; if (buff_proceed(tid, buff, size, last)) goto bail; if (last) /* If this was the last buffer */ break; } if (buff_destroy()) goto bail; /* wait for other worker before merging */ if (pthread_barrier_wait(&barrier) > 0) { perror("barrier wait"); goto bail; } /* merge results (done in parrallel) */ ret = wc_merge_results(tid, n_threads); bail: return ((void *) (long) ret); } ``` The **Mapping** function in this program , execute by mutiple threads in parallel , it first completed the size it needs to handle by calling `buff_init` ,then run the while loop to read the data and doing the counting work using `buff_read` and `buff_proceed` .The expected situation runs the loop once , which means `buff_read` reads the data successfully , and `buff_proceed` terminated without failure . After finishing the main caculating , it calls `buff_destroy` to release the memeory , and then starts to **Reduce** till all threads finish **Mapping**. ### Main && Pthread * **throw_err** ```c #define throw_err(msg) \ do { \ perror(msg); \ exit(EXIT_FAILURE); \ } while (0) ``` Prints out the error message. * **parse_args** ```c static int parse_args(int argc, char **argv) { if (argc < 3) return -1; file_name = argv[1]; if (!file_name) return -1; n_threads = atoi(argv[2]); if (!n_threads) return -1; return 0; } ``` Check if the users has called the program in correct form * **run_threads** ```c static void run_threads(void) { if (!(tinfo = calloc(n_threads, sizeof(struct thread_info)))) throw_err("calloc"); for (size_t i = 0; i < n_threads; i++) { tinfo[i].thread_num = i; if (pthread_create(&tinfo[i].thread_id, NULL, mr_map, &tinfo[i])) throw_err("thread create"); } } ``` Create `n_threads` pthread and strarts the **Mapping** procedure in parallel . * **wait_threads** ```c static void wait_threads() { for (size_t i = 0; i < n_threads; i++) if (pthread_join(tinfo[i].thread_id, NULL)) throw_err("thread join"); free(tinfo); } ``` The final part of program , terminate the pthread . * **now** ```c static double now() { struct timeval tp; if (gettimeofday(&tp, (struct timezone *) NULL) == -1) perror("gettimeofday"); return ((double) (tp.tv_sec) * 1000.0) + (((double) tp.tv_usec / 1000.0)); } ``` Get the current time , for caculating the executing time . ## TODO :::danger 注意 `__thread`,此為 [thread-local storage](https://en.wikipedia.org/wiki/Thread-local_storage),應予以探討。 :notes: jserv ::: :::warning 思考: 此處的 `wc_cache` 是否實際有 "cache" 的效益呢?//Code Overview -> Data Structure :notes: jserv ::: :::warning TODO: 1. 思考倘若輸入不是單一檔案,而是像 Linux 核心原始程式碼這樣包含大量的大小不一的檔案,該如何處理呢?又,能否發揮 MapReduce 的效益呢? preprocessing 是允許的,甚至接受 [tar 檔案格式](https://en.wikipedia.org/wiki/Tar_(computing))作為輸入。 2. [Occurrences of words in the Linux kernel source code over time](https://www.vidarholen.net/contents/wordcount/) 這網站做了很好的示範,儘管只輸出幾個單字,不過我們的程式框架可延展為「列出 Linux 核心原始程式碼的註解和文件中,出現最頻繁的前 50 個單字」的功能 :notes: jserv ::: ## Reference * [Linus Torvalds: mmap/mlock performance versus read](http://lkml.iu.edu/hypermail/linux/kernel/0004.0/0728.html) * [Which is fastest: read, fread, ifstream or mmap?](https://lemire.me/blog/2012/06/26/which-is-fastest-read-fread-ifstream-or-mmap/)