# Word-count
contributed by `ilkclord`
> [題目描述](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FrkbyTdGUd#undefined)
## Introduction
In nowadays , " key words " is an important information .It can be used in analizing the tag of people and their behavior . Word-count program finds out the frequency of each word ,and the result can be used for finding key words .Since the finding process can be very complicated and wasting time , MapReduce method may be a proper solution .
## Program
Word count program execute in MapReduce method .We show the MapReduce model of this program below .
### Master & Node
Main funtion represents the master in this program , and each thread acts as a node machine in the program .The program creats thread local storage for each pthread , which stores the word it reads by file and information of reading .The thread local storage is correspond to the memory of node machine .
### Funtions
* **Combine**
There isn't any specific combine funtion in the program , because the combine process has been included in the map function .
* **Reduce**
The reduce funtion of this program is **wc_merge_results**
* **Map**
The map funtion of this program is **mr_map** which call **wc_merge_results** inside
### Execution
1 . `main` first initialize the state of file , pthrea_barrier and the cache of each pthread
2 . `run_threads` maps the task out by calling `pthread_create` for number of threads times .
3 . Each pthreads excute the `mr_map` which starts the counting process
4 . Using the pthread_barrier to wait untill all the counting process finished , starts the reduce process by calling `wc_merge_results`
5 . Call `wc_print` to show the results .
### Flow Chart
```graphviz
Digraph G{
node[shape = box]
main -> initialization ->thread_1->"all finished"->reduce_1->result
initialization ->thread_2->"all finished"->reduce_2->result
initialization ->thread_3->"all finished"->reduce_3->result
initialization ->thread_4->"all finished"->reduce_4->result
initialization ->thread_n->"all finished"->reduce_n->result
rankdir = LR
}
```
## Code Overview
### Data Structure
* **list_entry**
```c
struct list_entry {
struct list_entry *next, *prev;
}
```
Indicates the `next` and `prev` in the double linked list
* **ht_node**
```c
struct ht_node {
hash_t hash;
struct list_entry list;
}
```
`hash` : stores the hash value
`list` : stores the collision list
* **htable**
```c
struct htable {
hashfunc_t *hashfunc;
cmp_t *cmp;
uint32_t n_buckets;
struct list_entry *buckets;
}
```
`hashfunc` : the hash function of this hash table
`cmp` : the cmp function of this hash table
`n_buckets` : the size of the hash table
`buckets` : the node of hash table , also can be consider as the root of the collision list
* **wc_cache**
```c
struct wc_cache {
struct htable htable;
}
```
Stores the htable inside the struct
* **wc_word**
```c
struct wc_word {
char word[MAX_WORD_SIZE], *full_word;
uint32_t counter;
struct ht_node node ;
struct ht_node node_main;
}
```
`word` : stores the word
`full_word` : if the word is bigger than the max size , use `full_word` to store it
`node` : the current node of this word
`node_main` the
* **thread_info**
```c
struct thread_info {
pthread_t thread_id; /* ID returned by pthread_create() */
int thread_num; /* Application-defined thread # */
}
```
* **Thread Local Storage**
```c
static __thread off_t worker_slice ;
static __thread off_t worker_current ;
```
These two variable are used in the `buff_init` which trims the partial file to be suitable
`worker_slice` : the size of partial file that the thread handled
`worker_current` : indicates the current position of the file
```c
static __thread uint32_t count = 0 ;
static __thread uint32_t wsize = 0 ;
static __thread char *word = NULL;
static __thread char *worker_buffer;
```
`count` : indicate the current insert-position of `word` also means the words real size
`wsize` : the size of `word` , the value is assingned by `MAX_WORD_SIZE`
`word` : the storage of the word current reading in
`worker_buffer` : the partial file handled by the thread
### Basic
* **container_of**
```c
#define container_of(list_ptr, container_type, member_name) \
({ \
const typeof(((container_type *) 0)->member_name) *__member_ptr = \
(list_ptr); \
(container_type *) ((char *) __member_ptr - \
offsetof(container_type, member_name)); \
})
```
Returns the address of the struct
### List
* **list_element**
```c
#define list_element(list_ptr, type, member) \
container_of(list_ptr, type, member)
```
Returns the address of the struct
* **list_first**
```c
#define list_first(root_ptr, type, member) \
list_element((root_ptr)->next, type, member)
```
Retrun the next node of `root_ptr` .
* **list_next**
```c
static inline struct list_entry *list_next(struct list_entry *root,
struct list_entry *current)
{
if ((root == root->next) || (current->next == root)) return NULL;
return current->next;
}
```
Return the next node , if the list is single , return `0`
* **list_for_each**
```c
#define list_for_each_forward(root_ptr, iter) \
struct list_entry *__ptr; \
for (iter = (root_ptr)->next, __ptr = (struct list_entry *) (iter)->next; \
iter != (root_ptr); iter = (typeof((iter))) __ptr, \
__ptr = (struct list_entry *) iter->next)
#define list_for_each(root_ptr, iter) list_for_each_forward(root_ptr, iter)
```
`list_for_each` is same as `list_for_each` by the definition , it traverse the linked list
* **list_root_init**
```c
static inline void list_root_init(struct list_entry *root)
{
root->next = root->prev = root;
}
```
Make the root points to right position
* **list_add_prev**
```c
static inline void list_add(struct list_entry *root, struct list_entry *entry)
{
struct list_entry *prev_entry = root;
struct list_entry *next_entry = root->next;
entry->next = next_entry, entry->prev = prev_entry;
prev_entry->next = entry, next_entry->prev = entry;
}
#define list_add_prev(root, entry) list_add((root)->prev, (entry))
```
`list_add_prev` adds a new node to the front of the list by calling list_add
* **list_empty**
```c
#define list_empty(root) (root == (root)->next)
```
Check if the linked list is empty
### Hash Table
* **ht_init**
```c
static inline int ht_init(struct htable *h,
hashfunc_t *hashfunc,
cmp_t *cmp,
uint32_t n_buckets)
{
h->hashfunc = hashfunc, h->cmp = cmp;
h->n_buckets = n_buckets;
h->buckets = malloc(sizeof(struct list_entry) * n_buckets);
for (size_t i = 0; i < h->n_buckets; i++) list_root_init(&h->buckets[i]);
return 0;
}
```
It initialize the `htable` , with a`n_bucket` array . Return 0 if sucess .
* **ht_destroy**
```c
static inline int ht_destroy(struct htable *h)
{
free(h->buckets);
return 0;
}
```
Release the memory used by the `htable`
* **ht_find**
```c
static inline struct ht_node *ht_find(struct htable *h, void *key)
{
hash_t hval;
uint32_t bkt;
h->hashfunc(key, &hval, &bkt);
struct list_entry *head = &h->buckets[bkt], *iter;
list_for_each (head, iter) {
struct ht_node *n = list_element(iter, struct ht_node, list);
if (n->hash == hval) {
int res = h->cmp(n, key);
if (!res) return n;
if (res > 0) return NULL;
}
}
return NULL;
}
```
Find the specific `key` in the hash table by converting the key to hash value , and traverse the collision linked list of the hash value . Return `NULL` if it isn't in the hash table , and return the node if it exists .
* **ht_insert**
```c
static inline int ht_insert(struct htable *h, struct ht_node *n, void *key)
{
hash_t hval;
uint32_t bkt;
h->hashfunc(key, &hval, &bkt);
n->hash = hval;
struct list_entry *head = &h->buckets[bkt], *iter;
list_for_each (head, iter) {
struct ht_node *tmp = list_element(iter, struct ht_node, list);
if (tmp->hash >= hval) {
int cmp = h->cmp(tmp, key);
if (!cmp) /* already exist */
return -1;
if (cmp > 0) {
MMM;
return 0;
}
}
}
list_add_prev(head, &n->list);
return 0;
}
```
Insert a new ht_node into the hash table , if it already exists return `-1` ,if success return `0` .
* **ht_get_first**
```c
static inline struct ht_node *ht_get_first(struct htable *h, uint32_t bucket)
{
struct list_entry *head = &h->buckets[bucket];
if (list_empty(head)) return NULL;
return NNN(head, struct ht_node, list);
}
```
Return the first node in the collision list of `h->buckets[bucket]`, because the first node `head` doesn't store the `word` , we use `list_first` to accomplished
* **ht_get_next**
```c
static inline struct ht_node *ht_get_next(struct htable *h,
uint32_t bucket,
struct ht_node *n)
{
struct list_entry *ln = list_next(&h->buckets[bucket], &n->list);
if (!ln) return NULL;
return list_element(ln, struct ht_node, list);
}
```
Return the next node of the list by first calling `list_next` and then use `list_element` to get the address of the `ht_node`
### Word
* **GET_WORD**
```c
#define GET_WORD(w) (w->full_word ? w->full_word : w->word)
```
The struct `wc_word` stores the word in either `full_word` or `word` , it returns `*full_word` if it exists and `word` if doesn't.
* **MIN_MAX**
```c
#define MIN_MAX(a, b, op) \
({ \
__typeof__(a) _a = (a); \
__typeof__(b) _b = (b); \
_a op _b ? _a : _b; \
})
#define MAX(a, b) MIN_MAX(a, b, >)
#define MIN(a, b) MIN_MAX(a, b, <)
```
Compare the value of `a` and `b` in `op` way .`MAX` and `MIN` are two apllication
* **__wc_cmp**
```c
static inline int __wc_cmp(struct ht_node *n, void *key, char m)
{
struct wc_word *w = m ? container_of(n, struct wc_word, node_main)
: container_of(n, struct wc_word, node);
return strcasecmp(GET_WORD(w), (char *) key);
}
```
Returns the result of `strcasecmp` , which is a compare way regardless of capital letter , return `0` if is the same , `>0` if `GET_WORD(w)` is greater than `key` and `<0` if less .`m` is used to determine which node is in `wc_word` is used to compare .
* **wc_cmp / wc_cmp_main**
```c
static int wc_cmp(struct ht_node *n, void *key)
{
return __wc_cmp(n, key, 0);
}
static int wc_cmp_main(struct ht_node *n, void *key)
{
return __wc_cmp(n, key, 1);
}
```
Two application which apllies diffrent `m` in `__wc_cmp`
* **get_code**
```c
static uint32_t get_code(char *word)
{
uint32_t code = 0;
/* The hash value is a concatenation of the letters */
char shift =
RRR;
for (int i = ((sizeof(code) * 8) / shift) - 1; i >= 0 && *word; i--) {
char c = tolower(*(word)) - FIRST_LOWER_LETTER;
code |= ((uint32_t) c) << (i * shift);
word++;
}
return code;
}
```
The hash function that converts a string to uint32_t `code`
* **scale_range_init**
```c
static inline int scale_range_init()
{
code_min = get_code("a"), code_max = get_code("zzzzzzzzzz");
code_range = (code_max - code_min);
return 0;
}
```
Initialize the `code_max` , `code_min` and `code_range` of the hash function
* **scale_range**
```c
static inline uint32_t scale_range(uint32_t code)
{
return (uint32_t)((((double) code - code_min) * n_buckets) / code_range);
}
```
Finds out the position of `code` in the hash table
* **hash_bucket**
```c
static int hash_bucket(void *key, hash_t *hash, uint32_t *bkt)
{
uint32_t code = get_code((char *) key);
*hash = (hash_t) code, *bkt = scale_range(code);
return 0;
}
```
Assign the position to `*bkt` and the hash value to `*hash`
* **wc_init**
```c
int wc_init(uint32_t n_threads, uint32_t n_words)
{
/* FIXME: a resizable hash table would be better */
n_buckets = MAX(MIN(n_words, MAX_N_WORDS), MIN_N_BUCKETS);
thread_caches = malloc(sizeof(struct wc_cache) * n_threads);
scale_range_init();
for (size_t i = 0; i < n_threads; i++) {
if (ht_init(&thread_caches[i].htable, hash_bucket, wc_cmp, n_buckets))
return -1;
}
if (ht_init(&main_cache.htable, hash_bucket, wc_cmp_main, n_buckets))
return -1;
return 0;
}
```
Initialize the `wc_cahe` of each thread and also initialize the `htable` inside
* **wc_strncpy**
```c
static char *wc_strncpy(char *dest, char *src, size_t n)
{
/* case insensitive */
for (size_t i = 0; i < n && src[i] != '\0'; i++)
dest[i] = (char) tolower((int) (src[i]));
return dest;
}
```
Copy the word from `*src` to `dest` with small letters
* **wc_add_word**
```c
int wc_add_word(uint32_t tid, char *word, uint32_t count)
{
struct wc_word *w;
struct wc_cache *cache = &thread_caches[tid];
struct ht_node *n;
if (!(n = ht_find(&cache->htable, word))) {
/* word was absent. Allocate a new wc_word */
if (!(w = calloc(1, sizeof(struct wc_word)))) return -1;
if (count > (MAX_WORD_SIZE - 1)) w->full_word = calloc(1, count + 1);
wc_strncpy(GET_WORD(w), word, count);
/* Add the new world to the table */
ht_insert(&cache->htable, &w->node, word);
} else
w = container_of(n, struct wc_word, node);
w->counter++;
return 0;
}
```
If the `*word` isn't in the hash table , add the `*word` to the hash table in the cache of the thread that indicates by `tid` .If it is , plus `1` to the `counter` .Also if the `calloc` failed , return `-1` , which cause the thread to terminate .
* **__merge_results**
```c
static int __merge_results(uint32_t tid, uint32_t j, struct wc_cache *wcc)
{
struct wc_cache *cache = &main_cache;
struct ht_node *iter = ht_get_first(&wcc->htable, j);
for (; iter; iter = ht_get_next(&wcc->htable, j, iter)) {
struct wc_word *iw = container_of(iter, struct wc_word, node);
/* check if word already exists in main_cache */
char *wd = GET_WORD(iw);
struct ht_node *n;
if ((n = ht_find(&cache->htable, wd))) {
/* word already exists. Get the word and increment it */
struct wc_word *w = container_of(n, struct wc_word, node_main);
w->counter += iw->counter;
} else /* if word does not exist, then insert the new word */
ht_insert(&cache->htable, PPP, wd);
}
return 0;
}
```
Traverse the collision list , add the new word into `main` cache if it doesn't exist. If exists , add the `counter` on the exist `counter` .
* **wc_merge_results**
```c
int wc_merge_results(uint32_t tid, uint32_t n_threads)
{
uint32_t n_workers;
/* Keep the number of workers <= nbthread */
if (n_threads > n_buckets) {
if (tid > n_buckets - 1) return 0;
n_workers = n_buckets;
} else
n_workers = n_threads;
/* Each thread will treat at least wk_buckets */
uint32_t wk_buckets = n_buckets / n_workers;
/* The range that this thread will treat */
uint32_t wk_bstart = wk_buckets * tid, wk_bend = wk_bstart + wk_buckets;
/* last thread must also do last buckets */
if ((tid == (n_workers - 1))) wk_bend += QQQ;
for (size_t i = 0; i < n_threads; i++) {
struct wc_cache *cache = &thread_caches[i];
for (size_t j = wk_bstart; j < wk_bend; j++) {
/* Traverse the buckets of all threads from wk_bstart to wk_bend.
* Aggregate the nodes of theses buckets in the main_cache.
*/
__merge_results(tid, j, cache);
}
}
return 0;
}
```
The **Reduce** funtion in this program , which assign the specific part of the hash table that the `tid` threads needs to merge .When reducing , it goes through all cache in `thread_cache` and merge the result into `main_cache` .
* **wc_print**
```c
int wc_print(int id)
{
uint32_t total = 0, count_total = 0, bkt_total = 0, empty_bkt = 0;
int valid = (id == -1);
struct wc_cache *cache = valid ? &main_cache : &thread_caches[id];
for (size_t j = 0; j < n_buckets; j++) {
struct ht_node *iter = ht_get_first(&cache->htable, j);
for (; iter; iter = ht_get_next(&cache->htable, j, iter)) {
struct wc_word *w =
valid ? container_of(iter, struct wc_word, node_main)
: container_of(iter, struct wc_word, node);
printf("%s : %d\n", GET_WORD(w), w->counter);
bkt_total++, total++;
count_total += w->counter;
}
if (!bkt_total) empty_bkt++;
bkt_total = 0;
}
printf("Words: %d, word counts: %d, full buckets: %d (ideal %d)\n", total,
count_total, n_buckets - empty_bkt, MIN(total, n_buckets));
return 0;
}
```
Print out the counting result stored in `main_cache` , by traversing the collision list in each `buckets` .
* **__wc_destroy**
```c
static int __wc_destroy(struct wc_cache *wcc, int id)
{
int valid = (id == -1);
for (uint32_t j = 0; j < n_buckets; j++) {
struct ht_node *iter = ht_get_first(&wcc->htable, j);
for (; iter; iter = ht_get_next(&wcc->htable, j, iter)) {
struct wc_word *w =
valid ? container_of(iter, struct wc_word, node_main)
: container_of(iter, struct wc_word, node);
free(w->full_word);
free(w);
}
}
return 0;
}
```
Release the memory used by `wc_word` , the address of `wc_word` can be found by calling `container_of` with `node` or `node_main` .
* **wc_destroy**
```c
int wc_destroy(uint32_t n_threads)
{
for (size_t i = 0; i < n_threads; i++) {
if (__wc_destroy(&thread_caches[i], i)) return -1;
if (ht_destroy(&thread_caches[i].htable)) return -1;
}
free(thread_caches);
if (ht_destroy(&main_cache.htable)) return -1;
return 0;
}
```
Release the memeory used by `thread_cache` by releasing the `wc_word` first and then the `htable` .
### File Handling
* **Macro**
```c
#if defined(__linux__)
#define MMAP_FLAGS (MAP_POPULATE | MAP_PRIVATE)
#else
#define MMAP_FLAGS (MAP_PRIVATE)
#endif
```
Define the `MMAP_FLAGS` according to the system
* **fa_init**
```c
static inline int fa_init(char *file, uint32_t n_threads, off_t *fsz)
{
/* Opening file */
if ((fd = open(file, O_RDONLY)) < 0) {
perror("open");
return -1;
}
/* Get file size */
if ((file_size = lseek(fd, 0, SEEK_END)) < 0) {
perror("lseek");
return -1;
}
file_content = mmap(NULL, file_size, PROT_READ, MMAP_FLAGS, fd, 0);
if (file_content == MAP_FAILED) file_content = NULL;
*fsz = file_size;
return 0;
}
```
Open the file and get the size of the file , then map it into `file_content` using `mmap` .It has a gate to detect failure in each procedure .
* **fa_read_init**
```c
static inline int fa_read_init()
{
if (!file_content && !(worker_buffer = malloc(BUFFER_SIZE))) return -1;
return 0;https://hackmd.io/rsfF9q5qSsaSOGQbDFylgw?both
}
```
Initialize the `worker_buffer` and check if the file_content is available , if no error , return `0` .
* **fa_read_destroy**
```c
static inline int fa_read_destroy()
{
free(worker_buffer);
return 0;
}
```
Release the memory used by `worker_buffer`
* **mmap**
Maps a mapping of the file on vitual memeory which the address is continious . It is useful because we can access the meemory in a faster way .The system can accomplished it by using demand paging .It will load the page table first ,then swap the new page in when a page fault occur .On this procedure , if lots of page fault occured, , it may be quite expansive to do it .
* **fa_read**
```c
static inline off_t fa_read(uint32_t id, char **buff, off_t size, off_t pos)
{
off_t size_read;
if (file_content) {
if (pos >= file_size) /* EOF */
return 0;
*buff = (char *) file_content + pos;
off_t end = pos + size;
size_read = (end > file_size) ? (end - file_size) : size;
return size_read;
}
off_t size_to_read = BUFFER_SIZE < size ? BUFFER_SIZE : size;
if ((size_read = pread(fd, worker_buffer, size_to_read, pos)) == -1) {
perror("pread");
return -1;
}
*buff = worker_buffer;
return size_read;
}
```
If `fa_init` have mapped successfully , use `file_content` to write data to `worker_buffer` , if failed , it use the traditional but also slower way , `pread` , to load the data into `worker_buffer`
### Buffer
* **add_letter**
```c
static int add_letter(char c)
{
if ((count > wsize - 1) || !wsize) {
wsize += MAX_WORD_SIZE;
char *orig = word;
if (!(word = realloc(word, wsize))) {
free(orig);
return -1;
}
}
word[count++] = c;
return 0;
}
```
Add a letter to the `word` , if the size (`count`) is bigger than `MAX_WORD_SIZE` , resize the word and release the oringnal memeory
* **add_sep**
```c
static inline int add_sep(uint32_t tid)
{
if (count) {
word[count] = '\0'; /* Add current word */
if (wc_add_word(tid, word, count)) return -1;
count = 0;
}
return 0;
}
```
When occur a not-letter char ,it means the the `word` is a word now . We add `\0` and call `wc_add_word` to insert into hash table .We will overwrite the data instead cleaning the `word` .
* **buff_proceed**
```c
static int buff_proceed(uint32_t tid, char *buff, size_t size, char last)
{
for (; size; size--, buff++) {
if (!IS_LETTER(*buff)) {
if (add_sep(tid)) /* Not a letter */
return -1;
continue;
}
if (add_letter(*buff)) /* Is a letter */
return -1;
}
if (last) /* If this is the last buffer, end the word (if any) */
add_sep(tid);
return 0;
}
```
Operating the specific size by calling the `IS_LETTER` to determine the operation .Then calls `add_sep` for non-letter , and `add_letter` for letter .When reaching the last position ,`last` equals to `1` , return `0` ,and return `-1`, otherwise .
* **buff_init**
```c
static int buff_init(uint32_t tid)
{
if (fa_read_init()) return -1;
worker_slice = file_size / n_threads;
worker_current = worker_slice * tid;
/* Last thread handle remaining bytes */
if (tid == (n_threads - 1)) worker_slice += file_size % n_threads;
off_t worker_end = worker_current + worker_slice;
/* Balance worker_slice to include words at the ends.
* skip first letters if we are not the first thread.
*/
char *buff;
do {
if (tid == 0) break;
if (fa_read(tid, &buff, 1, worker_current) != 1) return -1;
if (!IS_LETTER(*buff)) break;
worker_current++;
worker_slice--;
} while (*buff);
/* add letters of the last word if we are not the last thread */
do {
if (tid == (n_threads - 1)) break;
if (fa_read(tid, &buff, 1, worker_end) != 1) return -1;
if (!IS_LETTER(*buff)) break;
worker_end++, worker_slice++;
} while (*buff);
return 0;
}
```
The trimming function to make the partial file handled by threads be completed , prevent the cut-string situation . ( The most impressive code >< )
* **buff_destroy**
```c
static int buff_destroy()
{
free(word);
if (fa_read_destroy()) return -1;
return 0;
}
```
Release the memory used by `word` , and the `worker_buffer` .
* **buff_read**
```c
static int buff_read(uint32_t tid, char **buff, off_t *size, char *last)
{
if (!worker_slice) return 0;
off_t size_read = fa_read(tid, buff, worker_slice, worker_current);
if (size_read == -1) return -1;
*size = size_read;
worker_current += size_read, worker_slice -= size_read;
if (!worker_slice) *last = 1;
return 0;
}
```
Calling the `fa_read` to read the data into `worker_buffer` , if it reads all the data it need , assinged `*last` be `1`.
### Map_Reduce
* **Macro**
```c
#define BETWEEN(_wd, _min, _max) ((_wd >= _min) && (_wd <= _max))
#define IS_LETTER(c) (BETWEEN(*buff, 'A', 'Z') || BETWEEN(*buff, 'a', 'z'))
```
Check if `_wd` is a letter by comparing the ascii code .
* **mr_init**
```c
int mr_init(void)
{
if (pthread_barrier_init(&barrier, NULL, n_threads)) {
perror("barrier init");
return -1;
}
if (fa_init(file_name, n_threads, &file_size)) return -1;
if (wc_init(n_threads, file_size / MAX_WORD_SIZE)) return -1;
return 0;
}
```
Initialize the `pthread_barrier` , `worker_buffer` and `thread_cache` .
* **mr_destroy**
```c
int mr_destroy(void)
{
if (pthread_barrier_destroy(&barrier)) {
perror("barrier init");
return -1;
}
if (fa_init(file_name, n_threads, &file_size)) return -1;
if (wc_destroy(n_threads)) return -1;
return 0;
}
```
Release the memory used by the program .
* **mr_reduce**
```c
int mr_reduce(void)
{
/* The merge is done by worker threads */
return 0;
}
```
It is empty , because the `wc_merge_result` has taken its job away
* **mr_print**
```c
int mr_print(void)
{
return wc_print(-1);
}
```
Call the `wc_print` to show the results
* **mr_map**
```c
void *mr_map(void *id)
{
uint32_t tid = ((struct thread_info *) id)->thread_num;
int ret = buff_init(tid);
if (ret) goto bail;
char *buff;
off_t size = 0;
char last = 0;
while (!(ret = buff_read(tid, &buff, &size, &last))) {
if (!size) break;
if (buff_proceed(tid, buff, size, last)) goto bail;
if (last) /* If this was the last buffer */
break;
}
if (buff_destroy()) goto bail;
/* wait for other worker before merging */
if (pthread_barrier_wait(&barrier) > 0) {
perror("barrier wait");
goto bail;
}
/* merge results (done in parrallel) */
ret = wc_merge_results(tid, n_threads);
bail:
return ((void *) (long) ret);
}
```
The **Mapping** function in this program , execute by mutiple threads in parallel , it first completed the size it needs to handle by calling `buff_init` ,then run the while loop to read the data and doing the counting work using `buff_read` and `buff_proceed` .The expected situation runs the loop once , which means `buff_read` reads the data successfully , and `buff_proceed` terminated without failure .
After finishing the main caculating , it calls `buff_destroy` to release the memeory , and then starts to **Reduce** till all threads finish **Mapping**.
### Main && Pthread
* **throw_err**
```c
#define throw_err(msg) \
do { \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
```
Prints out the error message.
* **parse_args**
```c
static int parse_args(int argc, char **argv)
{
if (argc < 3) return -1;
file_name = argv[1];
if (!file_name) return -1;
n_threads = atoi(argv[2]);
if (!n_threads) return -1;
return 0;
}
```
Check if the users has called the program in correct form
* **run_threads**
```c
static void run_threads(void)
{
if (!(tinfo = calloc(n_threads, sizeof(struct thread_info))))
throw_err("calloc");
for (size_t i = 0; i < n_threads; i++) {
tinfo[i].thread_num = i;
if (pthread_create(&tinfo[i].thread_id, NULL, mr_map, &tinfo[i]))
throw_err("thread create");
}
}
```
Create `n_threads` pthread and strarts the **Mapping** procedure in parallel .
* **wait_threads**
```c
static void wait_threads()
{
for (size_t i = 0; i < n_threads; i++)
if (pthread_join(tinfo[i].thread_id, NULL)) throw_err("thread join");
free(tinfo);
}
```
The final part of program , terminate the pthread .
* **now**
```c
static double now()
{
struct timeval tp;
if (gettimeofday(&tp, (struct timezone *) NULL) == -1)
perror("gettimeofday");
return ((double) (tp.tv_sec) * 1000.0) + (((double) tp.tv_usec / 1000.0));
}
```
Get the current time , for caculating the executing time .
## TODO
:::danger
注意 `__thread`,此為 [thread-local storage](https://en.wikipedia.org/wiki/Thread-local_storage),應予以探討。
:notes: jserv
:::
:::warning
思考: 此處的 `wc_cache` 是否實際有 "cache" 的效益呢?//Code Overview -> Data Structure
:notes: jserv
:::
:::warning
TODO:
1. 思考倘若輸入不是單一檔案,而是像 Linux 核心原始程式碼這樣包含大量的大小不一的檔案,該如何處理呢?又,能否發揮 MapReduce 的效益呢?
preprocessing 是允許的,甚至接受 [tar 檔案格式](https://en.wikipedia.org/wiki/Tar_(computing))作為輸入。
2. [Occurrences of words in the Linux kernel source code over time](https://www.vidarholen.net/contents/wordcount/) 這網站做了很好的示範,儘管只輸出幾個單字,不過我們的程式框架可延展為「列出 Linux 核心原始程式碼的註解和文件中,出現最頻繁的前 50 個單字」的功能
:notes: jserv
:::
## Reference
* [Linus Torvalds: mmap/mlock performance versus read](http://lkml.iu.edu/hypermail/linux/kernel/0004.0/0728.html)
* [Which is fastest: read, fread, ifstream or mmap?](https://lemire.me/blog/2012/06/26/which-is-fastest-read-fread-ifstream-or-mmap/)