--- tags: linux2022, linux --- # Lockless Multithreaded Logger 實作 contributed by < `AmyLin0210` > 探討 [threaded-logger](https://github.com/sysprog21/threaded-logger) 的實作,分析其中 reader-writer 議題,包含 atomics 和 futex 的使用,思索後續的改進 ---- ## 執行方法 編譯 ```shell $ make ``` 執行測試程式 ```shell $ scripts/test.sh ``` `logger` 命令與參數定義為以下範例,前七個皆為必填: ``` ./build/logger <threads> <min q lines> <max q lines> <total lines> <print max/thd> <us wait> <wait chances> [blocking (0)] [printlost (0)] [delay sec] ``` 預設的測試命令與參數為: ``` threads min q lines max q lines total lines print max/thd us wait wait chances blocking printlost delay sec --------------------------------------------------------------------------------------------------------------------------------- build/logger 5 50 200 100000 1000 100 10 0 0 5 ``` 在這個專案裡,所有印出的 `log` 預設都是丟到 `stderr` ,但為了方便呈現,故在測試程式內有將 `stderr` 給 redirect 至 `build/out.log` 這份檔案中,執行後可以在終端機及檔案中看到執行結果。 ## 程式碼分析 首先看到專案內的檔案架構 ```bash tree . . ├── colors.h ├── LICENSE ├── logger.c ├── logger-colors.c ├── logger.h ├── main.c ├── Makefile ├── README.md └── scripts └── test.sh ``` ### main.c ```c typedef struct { unsigned long print_max; int thread_max; int lines_min, lines_max; int lines_total; int uwait; int chances; int opts; } thread_params_t; ``` 首先來看到參數設定的部份,`main` 函式中第 22 行會把 opts 給預設為 `LOGGER_OPT_PREALLOC` ,可以在 `logger.h` 中找到他的值為 `4` - 在第 24 ~ 27 行的地方,設定 `NONBLOCK` 以及 `PRINTLOST` ,預設皆為 `0` - 在第 48 ~ 52 行的地方,使用了 [clock_gettime](https://linux.die.net/man/3/clock_gettime) 函式去紀錄 `fprintf` 開始前與結束後的時間,以計算一個 `fprintf` 需要多久 - 在第 55 行的地方,呼叫了 `logger_init` ,放入的參數分別是 - queues_max: thp.thread_max * 1.5 - lines_max: 50 - opts: 0 (代表 logger opt 選項) 在實際去看過 `logger_init` 的程式碼後,會發現這個函式裡處理的是 reader 的 thread :::warning 如果將 `logger_init` 改成 `logger_reader_init` 是不是比較符合程式碼所想要做的事情? ::: - 在第 63 ~ 71 行的地方,初始化 wirter 的 thread - 第 113 行的地方,會發現 `queue_size` 是一個介於 `lines_min` 與 `lines_max` 之間的隨機數,但還沒了解為什麼要這樣設計 - 在這邊發現了我們最開始 argv 所輸入的參數,目標就是設定 writer 的部份 :::warning 從程式碼內可以看到 `logger_pthread_create` 的目標是處理 writer thread 若改成 `logger_writer_create` 之類帶有 writer 語意的名稱可能會比較好 ::: - 在第 74 ~ 93 行的地方,使用了 non-blocking 的 [pthread_tryjoin_np](https://man7.org/linux/man-pages/man3/pthread_tryjoin_np.3.html),目標為若有一條 writer 執行緒已經結束了,但是還有一些 log 還沒被讀到,會再重新建立一條 writer 執行緒 #### main ```c= int main(int argc, char **argv) { int start_wait = 0; if (argc < 7) { printf( "%s <threads> <min q lines> <max q lines> <total lines> <print " "max/thd> <us wait> <wait chances> [blocking (0)] [printlost (0)] " "[delay sec]\n", argv[0]); return 1; } thread_params_t thp = { .thread_max = atoi(argv[1]), .lines_min = atoi(argv[2]), .lines_max = atoi(argv[3]), .lines_total = atoi(argv[4]), .print_max = atoi(argv[5]), .uwait = atoi(argv[6]), .chances = atoi(argv[7]), .opts = LOGGER_OPT_PREALLOC, }; if (argc > 8 && atoi(argv[8])) thp.opts |= LOGGER_OPT_NONBLOCK; if (argc > 9 && atoi(argv[9]) && (thp.opts & LOGGER_OPT_NONBLOCK)) thp.opts |= LOGGER_OPT_PRINTLOST; if (argc > 10) start_wait = atoi(argv[10]); srand(time(NULL)); fprintf(stderr, "cmdline: "); for (int i = 0; i < argc; i++) fprintf(stderr, "%s ", argv[i]); fprintf(stderr, "\nthreads[%d] q_min[%d] q_max[%d] lines_total[%d] " "max_lines/thr[%lu] (1/%d chances to wait %d us) %s%s\n", thp.thread_max, thp.lines_min, thp.lines_max, thp.lines_total, thp.print_max, thp.chances, thp.uwait, (thp.opts & LOGGER_OPT_NONBLOCK) ? "non-blocking" : "", (thp.opts & LOGGER_OPT_PRINTLOST) ? "+printlost" : ""); fprintf( stderr, "Waiting for %d seconds after the logger-reader thread is started\n\n", start_wait); struct timespec before, after; clock_gettime(CLOCK_MONOTONIC, &before); fprintf(stderr, "For reference, the call to fprintf(stderr,...) to print this line " "took: "); clock_gettime(CLOCK_MONOTONIC, &after); fprintf(stderr, "%lu ns\n\n", elapsed_ns(before, after)); logger_init(thp.thread_max * 1.5, 50, 0); sleep(start_wait); /* Writer threads */ pthread_t tid[thp.thread_max]; char tnm[thp.thread_max][LOGGER_MAX_THREAD_NAME_SZ]; unsigned long printed_lines = 0; for (int i = 0; i < thp.thread_max; i++) { int queue_size = thp.lines_min + rand() % (thp.lines_max - thp.lines_min + 1); snprintf(tnm[i], LOGGER_MAX_THREAD_NAME_SZ, "writer-thd-%04d", i); logger_pthread_create(tnm[i], queue_size, thp.opts, &tid[i], NULL, (void *) writer, (void *) &thp); printed_lines += thp.print_max; } while (printed_lines < thp.lines_total) { for (int i = 0; i < thp.thread_max; i++) { if (tid[i] && pthread_tryjoin_np(tid[i], NULL)) continue; if (printed_lines < thp.lines_total) { /* Not the right amount... Restart the exited thread */ int queue_size = thp.lines_min + rand() % (thp.lines_max - thp.lines_min + 1); logger_pthread_create(tnm[i], queue_size, LOGGER_OPT_NONE, &tid[i], NULL, (void *) writer, (void *) &thp); printed_lines += thp.print_max; fprintf(stderr, "Restarting thread %02d ...\n", i); continue; } tid[i] = 0; } usleep(100); } for (int i = 0; i < thp.thread_max; i++) { if (tid[i]) /* If not yet terminated, waiting for it */ pthread_join(tid[i], NULL); } logger_deinit(); fprintf(stderr, "%lu total printed lines ...\n", printed_lines); return 0; } ``` #### writer 這個函式所作的事情就是將訊息寫入 queue 內,並且計算該動作需要多少時間 - 在第 27 行到第 31 行,`thp->chances` 內會是一個整數,在預設的命令中為 10 ,表示有 1 / 10 的機率會 sleep - 第 35 行到第 41 行的地方,去計算 `LOG_LEVEL` 會需要多久的時間執行,並且將資訊丟入 queue 中 ```c /* Tester thread */ static void *writer(const thread_params_t *thp) { char th[LOGGER_MAX_THREAD_NAME_SZ]; pthread_getname_np(pthread_self(), th, sizeof(th)); for (int seq = 0; seq < thp->print_max; seq++) { if (!(rand() % thp->chances)) { fprintf(stderr, "<%s> Bad luck, waiting for %d us\n", th, thp->uwait); usleep(thp->uwait); } struct timespec before, after; int level = rand() % LOGGER_LEVEL_COUNT; clock_gettime(CLOCK_MONOTONIC, &before); if (LOG_LEVEL(level, "<%s> %d", th, seq) < 0) { clock_gettime(CLOCK_MONOTONIC, &after); fprintf(stderr, "<%s> %d **LOST** (%m)\n", th, seq); } else { clock_gettime(CLOCK_MONOTONIC, &after); } fprintf(stderr, "<%s> %lu logger_printf took %lu ns\n", th, timespec_to_ns(after), elapsed_ns(before, after)); } return NULL; } ``` ### logger.c 在這邊可以看到每個 write thread 的 queue 宣告 ```c static _Thread_local logger_write_queue_t *_own_wrq = NULL; ``` 在 C11 中有定義出了 [_Thread_local](https://en.cppreference.com/w/c/language/storage_duration) ,表示 **thread storage duration** ,以此宣告的變數,它會存在於該 thread 中,在該 thread 消失時,會跟著一起被消失。 > thread storage duration. The storage duration is the entire execution of the thread in which it was created, and the value stored in the object is initialized when the thread is started. Each thread has its own, distinct, object. If the thread that executes the expression that accesses this object is not the thread that executed its initialization, the behavior is implementation-defined. All objects declared _Thread_local have this storage duration. #### _futex 在這裡把 [futex(2)](https://man7.org/linux/man-pages/man2/futex.2.html) 包成了巨集來使用,分別有 `futex_wait`, `futex_timed_wait`, `futex_wake` ,裡面的參數 `FUTEX_xxx_PRIVATE` 根據 [futex(2)](https://man7.org/linux/man-pages/man2/futex.2.html) 內的說明,是為了讓 kernel 了解只會在這個 process 內使用該 mutex ,故有利於最佳化。 > It tells the kernel that the futex is process-private and not shared with another process (i.e., it is being used for synchronization only between threads of the same process). This allows the kernel to make some additional performance optimizations. 在 `futex_op` 的內容設為 `FUTEX_WAKE` 時,`val` 所代表的就是最高一次喚醒的 thread 數量,而回傳值則是有多少的 waiter 被喚醒。 > This operation wakes at most val of the waiters that are waiting (e.g., inside FUTEX_WAIT) on the futex word at the address uaddr. 而當 `futex_op` 的內容設為 `FUTEX_WAIT` 時,會去比較說內含的數值是否和 val 相同,要是相同會進入 wait 的狀態,要是不同會回傳出 `EAGAIN` ```c= #define futex_wait(addr, val) _futex((addr), FUTEX_WAIT_PRIVATE, (val), NULL) #define futex_timed_wait(addr, val, ts) \ _futex((addr), FUTEX_WAIT_PRIVATE, (val), (ts)) #define futex_wake(addr, val) _futex((addr), FUTEX_WAKE_PRIVATE, (val), NULL) static inline int _futex(atomic_int *uaddr, int futex_op, int val, struct timespec *tv) { /* Note: not using the last 2 parameters uaddr2 & val2 (see futex(2)) */ return syscall(SYS_futex, uaddr, futex_op, val, tv); } ``` #### logger_init 在這邊對 logger 進行初始參數、 reader_thread 等等動作,由於在第 3 行的時候,有將 logger 內的記憶體位置皆初始化為 0 ,故沒有特別設定的都將會是 0。 ```c= int logger_init(int queues_max, int lines_max, logger_opts_t opts) { memset(&logger, 0, sizeof(logger_t)); pthread_mutex_init(&logger.queues_mx, NULL); logger.queues = calloc(queues_max, sizeof(logger_write_queue_t *)); logger.queues_max = queues_max; logger.opts = opts; logger.default_lines_nr = lines_max; logger.running = true; _own_wrq = NULL; /* Reader thread */ pthread_create(&logger.reader_thread, NULL, (void *) logger_thread_func, NULL); pthread_setname_np(logger.reader_thread, "logger-reader"); return 0; } ``` #### logger_thread_func 本函式是 read thread 的主要函式 - 在第 9 行的地方, `fuse_nr` 被設成了 `logger.queues_nr` ,根據在 `logger.h` 內的註解,代表有多少已經 allocated 的 queue ( Number of queues allocated ) - 第 11 到第 25 行,當目前的 queues 數量為 0 時會被執行。首先會先使用 `futex_wait` 來進入等待的狀態,等到有人呼叫 `futex_wake` 後,會將 `logger.reload` 設為 0,接著執行 continue。 - 如果目前的 queue 數量不為 0 ,會進入到第 35 行的迴圈 - 在第 38 行確定目前有沒有 alloc 新的 write queue,如果有的話跳出迴圈 ```c= static void *logger_thread_func(void) { bool running = logger.running; fprintf(stderr, "<logger-thd-read> Starting...\n"); while (running) { int empty_nr = 0; int really_empty = 0; int fuse_nr = logger.queues_nr; if (!fuse_nr) { fprintf(stderr, "<logger-thd-read> Wake me up when there is something... " "Zzz\n"); atomic_store(&logger.waiting, 1); if (futex_wait(&logger.waiting, 1) < 0 && errno != EAGAIN) { fprintf(stderr, "<logger-thd-read> ERROR: %m !\n"); break; } if (!logger.running) break; atomic_store(&logger.reload, 0); continue; } fuse_entry_t fuse_queue[fuse_nr]; fprintf(stderr, "<logger-thd-read> (Re)Loading... fuse_entry_t = %d x " "%lu bytes (%lu bytes total)\n", fuse_nr, sizeof(fuse_entry_t), sizeof(fuse_queue)); empty_nr = init_lines_queue(fuse_queue, fuse_nr); while (1) { empty_nr = enqueue_next_lines(fuse_queue, fuse_nr, empty_nr); if (atomic_compare_exchange_strong(&logger.reload, &(atomic_int){1}, 0)) { break; } if (fuse_queue[0].ts == ~0) { logger.empty = true; if (!logger.running) { /* We want to terminate when all the queues are empty ! */ running = false; break; } if (really_empty < 5) { int wait = 1 << really_empty++; fprintf(stderr, "<logger-thd-read> Print queue empty. Double check " "in %d us ...\n", wait); usleep(wait); /* Double-check multiple times if the queue is really empty. * This is avoid the writers to wakeup too frequently the * reader in case of burst. Waking it up through the futex * also takes time and the goal is to lower the time spent * in logger_printf() as much as possible. */ continue; } really_empty = 0; fprintf(stderr, "<logger-thd-read> Print queue REALLY empty ... Zzz\n"); atomic_store(&logger.waiting, 1); if (futex_wait(&logger.waiting, 1) < 0 && errno != EAGAIN) { fprintf(stderr, "<logger-thd-read> ERROR: %m !\n"); running = false; break; } continue; } logger.empty = false; really_empty = 0; logger_write_queue_t *wrq = fuse_queue[0].wrq; int rv = write_line(wrq, &wrq->lines[wrq->rd_idx]); if (rv < 0) { fprintf(stderr, "<logger-thd-read> logger_write_line(): %m\n"); /* In this case we loose the line but we continue to empty the * queues ... otherwise all the queues gets full and all the * threads are stuck on it (this can happen if disk is full.) */ } } } fprintf(stderr, "<logger-thd-read> Exit\n"); return NULL; } ``` #### init_lines_queue 初始化 `fuse_queue` ,在這裡把每個 `fuse` 的 `wrq` 一一指向 `logger` 的 `wrq` 回傳值是目前初始化了多少的 `fuse_queue`,也就是有多少的 `logger.wrq` 內有東西 :::spoiler 程式碼 ```c typedef struct { unsigned long ts; /* Key to sort on (ts of current line) */ logger_write_queue_t *wrq; /* Related write queue */ } fuse_entry_t; static inline int init_lines_queue(fuse_entry_t *fuse, int fuse_nr) { memset(fuse, 0, fuse_nr * sizeof(fuse_entry_t)); for (int i = 0; i < fuse_nr; i++) { fuse[i].ts = ~0; /* Init all the queues as if they were empty */ fuse[i].wrq = logger.queues[i]; } return fuse_nr; /* Number of empty queues (all) */ } ``` ::: #### enqueue_next_lines 在這邊會去計算還有多少的 empty queue ,並且把 ts 小的往 array 中 index 小的地方放 :::spoiler 程式碼 ```c static int enqueue_next_lines(fuse_entry_t *fuse, int fuse_nr, int empty_nr) { logger_write_queue_t *wrq; /* This one should have been processed. Freeing it */ if (fuse[0].ts != ~0) { wrq = fuse[0].wrq; /* Free this line for the writer thread */ wrq->lines[wrq->rd_idx].ready = false; wrq->rd_idx = ++wrq->rd_seq % wrq->lines_nr; /* Enqueue the next line */ empty_nr += set_queue_entry(wrq, &fuse[0]); bubble_fuse_up(fuse, fuse_nr); /* Find its place */ } /* Let see if there is something new in the empty queues. */ int rv = 0; for (int i = 0, last = fuse_nr - 1; i < empty_nr; i++) { rv += set_queue_entry(fuse[last].wrq, &fuse[last]); bubble_fuse_down(fuse, fuse_nr); } /* return the number of remaining empty queues */ return rv; } ``` ::: #### set_queue_entry 確認該 queue 是否為空 - 若是空的,回傳 1,並把 `ts` 設為 `~0` - 不是空的回傳 0,並把 `ts` 設為 `wrq->lines[index].ts`。 :::spoiler 程式碼 ```c static inline int set_queue_entry(const logger_write_queue_t *wrq, fuse_entry_t *fuse) { unsigned int index = wrq->rd_idx; if (!wrq->lines[index].ready) { fuse->ts = ~0; /* it is empty */ return 1; } struct timespec ts = wrq->lines[index].ts; fuse->ts = timespec_to_ns(ts); return 0; } ``` ::: #### logger_pthread_create 初始化 writer thread 並設定相對應的參數 :::spoiler 程式碼 ```c int logger_pthread_create(const char *thread_name, unsigned int max_lines, logger_opts_t opts, pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg) { thread_params_t *params = malloc(sizeof(thread_params_t)); if (!params) return -1; strncpy(params->thread_name, thread_name, sizeof(params->thread_name) - 1); params->thread_name[sizeof(params->thread_name) - 1] = 0; params->max_lines = max_lines; params->opts = opts; params->start_routine = start_routine; params->arg = arg; return pthread_create(thread, attr, (void *) thread_wrapper, (void *) params); } ``` ::: #### write_line ```c= extern const logger_line_colors_t logger_colors_default; static int write_line(const logger_write_queue_t *wrq, const logger_line_t *l) { char linestr[LOGGER_LINE_SZ + LOGGER_MAX_PREFIX_SZ]; const logger_line_colors_t *c = &logger_colors_default; /* File/Function/Line */ char src_str[128], *start_of_src_str = src_str; int len = snprintf(src_str, sizeof(src_str), "%24s %20s %4d", l->file, l->func, l->line); if (len > LOGGER_MAX_SOURCE_LEN) start_of_src_str += len - LOGGER_MAX_SOURCE_LEN; /* Time stamp calculations */ unsigned long usec = NTOU(l->ts.tv_nsec) % 1000; unsigned long msec = NTOM(l->ts.tv_nsec) % 1000; int sec = l->ts.tv_sec % 60; /* Format all together */ static int biggest_thread_name = 0; if (wrq->thread_name_len > biggest_thread_name) biggest_thread_name = wrq->thread_name_len; len = sprintf(linestr, "%s%s:%02d.%03lu,%03lu [%s%s%s] %*s <%s%*s%s> %s\n", get_date(l->ts.tv_sec, c), get_time(l->ts.tv_sec, c), sec, msec, usec, c->level[l->level], _logger_level_label[l->level], c->reset, LOGGER_MAX_SOURCE_LEN, start_of_src_str, c->thread_name, biggest_thread_name, wrq->thread_name, c->reset, l->str); return write(1, linestr, len); } ``` #### thread_wrapper 在這邊使用了 `pthread_cleanup_push` 與 `pthread_cleanup_pop` 來做 thread 的 clean-up hanlder。 根據 [linux man page](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html) 內的敘述,在執行了 `pthread_cleanup_push` 後,會將指定的 routine 給 push 置 clean-up handler stack 上;`pthread_cleanup_pop` 有一個參數名為 execute,若該參數不為零,會在 pop 該 routine 時去執行它。 並會在以下三種情境將該 routine 從 stack 內 pop 出去並決定是否要執行: 1. 當 thread 被 canceled 時,所有在 clean-up handlers 內的內容都會被 pop 且執行 2. 當使用 `pthread_exit()` 中止掉一條 thread 時,所有在 clean-up handlers 內的 routine 都會被執行 3. 當呼叫了 `pthread_cleanup_pop` 後,會將 stack 最上方的 routine 給 pop 出,並根據參數決定是否被執行 在程式碼的第 26 行,看到 [pthread_exit](https://man7.org/linux/man-pages/man3/pthread_exit.3.html),在這個地方執行主要的 thread function :::warning 困惑點:在 pthread_exit 執行完後,理論上就應該會把 cleanup routine 內的東西給 pop 並執行,那為什麼還會需要下面的兩個 pop ? ::: ```c= static void thread_wrapper(thread_params_t *params) { pthread_cleanup_push((void *) free, (void *) params); pthread_setname_np(pthread_self(), params->thread_name); /* The name of the thread is fixed at allocation time so, the * pthread_setname_np() call must occur before the assignation bellow. * If there is no name specified, thread_id is used instead. */ if (logger_assign_write_queue(params->max_lines, params->opts) < 0) { /* If it happens, it means the limit of queues to alloc is too low. */ pthread_exit(NULL); } /* Must be called when the thread don't need it anymore. Otherwise it * will stay allocated for an unexistant thread forever ! This is also * true for the local threads forked by the domains themself. * * Note also that if this thread never print something, you should NOT * use this wrapper but better use the native pthread_create(3) instead * as all this is useless and reserve a log queue for nothing ... */ pthread_cleanup_push((void *) logger_free_write_queue, NULL); /* Let run the main thread function */ pthread_exit(params->start_routine(params->arg)); /* a push expects a pop */ pthread_cleanup_pop(true); pthread_cleanup_pop(true); } ``` #### logger_assign_write_queue 在這裡的話,是去判斷是否有適當的 write queue,如果有的話,就回傳該 queue,若沒有的話,就生成一個。 如果目前沒有 write queue,可以直接看到下方程式碼的第 40 行,會去 alloc 一個新的 write queue。 ```c= int logger_assign_write_queue(unsigned int lines_max, logger_opts_t opts) { /* If this is already set, nothing else to do */ if (_own_wrq) return 0; /* Caller don't want a specific size */ if (!lines_max) lines_max = logger.default_lines_nr; logger_write_queue_t **queue = logger.queues; logger_write_queue_t *fwrq; int last_lines_nr; retry: /* Searching first for a free queue previously allocated */ last_lines_nr = INT_MAX; fwrq = NULL; for (int i = 0; i < logger.queues_nr; i++) { if (!atomic_load(&queue[i]->free)) continue; /* Find the best free queue ... */ int lines_nr = queue[i]->lines_nr; if (lines_max <= lines_nr && lines_nr < last_lines_nr) { last_lines_nr = lines_nr; fwrq = queue[i]; } } if (fwrq) { if (!atomic_compare_exchange_strong(&fwrq->free, &(atomic_int){1}, 0)) { /* Race condition, another thread took it right before us. * Trying another one */ goto retry; } set_thread_name(fwrq); fwrq->opts = opts ?: logger.opts; } else { /* No free queue that fits our needs... Adding a new one. */ fwrq = alloc_write_queue(lines_max, opts ?: logger.opts); if (!fwrq) return -1; } _own_wrq = fwrq; return 0; } ``` #### alloc_write_queue 在這個函式中,會給予 write queue 一個空間並初始化。 - 在第 13 ~ 24 行,如果有給予 `PREALLOC` 的參數,給與新增出的空間一些數值,目標是繞過 Linux 的最佳化,確保 Linux 有真的給予該參數空間。 - 第 33 行的地方,[atomic_compare_exchange_strong](https://en.cppreference.com/w/c/atomic/atomic_compare_exchange) 會去比較 `&logger.reload` 是否為 0,如果為 0 ,那會將 `&logger.reload` 的值改變為 1。 ```c= static logger_write_queue_t *alloc_write_queue(int lines_max, logger_opts_t opts) { if (logger.queues_nr == logger.queues_max) return errno = ENOBUFS, NULL; logger_write_queue_t *wrq = calloc(1, sizeof(logger_write_queue_t)); wrq->lines = calloc(lines_max, sizeof(logger_line_t)); wrq->lines_nr = lines_max; wrq->opts = opts; set_thread_name(wrq); if (opts & LOGGER_OPT_PREALLOC) { /* Pre-fill the queue with something so that Linux really allocate * the pages. This is due to the 'copy-on-write' logic where the * page is really allocated (or copied) when the process try to * write something. */ for (int i = 0; i < lines_max; i++) { wrq->lines[i].ts.tv_nsec = ~0; for (int j = 0, k = 0; j < sizeof(wrq->lines[i].str) / 64; j++) wrq->lines[i].str[j] = k++; } } /* Ensure this is done atomically between writers. Reader is safe. */ pthread_mutex_lock(&logger.queues_mx); wrq->queue_idx = logger.queues_nr; logger.queues[logger.queues_nr++] = wrq; pthread_mutex_unlock(&logger.queues_mx); /* Let the logger thread take this change into account when possible */ atomic_compare_exchange_strong(&logger.reload, &(atomic_int){0}, 1); return wrq; } ``` #### logger_printf 此函式的目標為將指定的內容格式化後放入 queue 中,並呼叫 `wakeup_reader_if_need` 去判斷是否需要喚醒 reader - 程式碼第 14 行的地方,有使用到 C11 中的 [va_list](https://en.cppreference.com/w/c/variadic/va_list) - 在第 50 行的地方利用了 [vsnprintf](https://www.cplusplus.com/reference/cstdio/vsnprintf/) ,目標是可以印出指定的字串 :::warning 第 45 ~ 53 行程式碼的地方,有沒有可能因為編譯器最佳化,造成 `l->ready` 先被改變,而後才執行 `vsnprintf()` 而造成結果與預期不一致? ::: ```c= int logger_printf(logger_line_level_t level, const char *src, const char *func, unsigned int line, const char *format, ...) { if (!logger.running) return errno = ENOTCONN, -1; if (!_own_wrq && logger_assign_write_queue(0, LOGGER_OPT_NONE) < 0) return -1; va_list ap; int index; logger_line_t *l; reindex: index = _own_wrq->wr_seq % _own_wrq->lines_nr; l = &_own_wrq->lines[index]; while (l->ready) { int ret = wakeup_reader_if_needed(); if (ret < 0) return -1; if (ret > 0) { /* Last chance to empty at least a cell before giving up */ usleep(1); continue; } if (_own_wrq->opts & LOGGER_OPT_NONBLOCK) { _own_wrq->lost++; return errno = EAGAIN, -1; } usleep(50); } if (_own_wrq->lost && _own_wrq->opts & LOGGER_OPT_PRINTLOST) { int lost = _own_wrq->lost; _own_wrq->lost_total += lost; _own_wrq->lost = 0; goto reindex; } va_start(ap, format); clock_gettime(CLOCK_REALTIME, &l->ts); l->level = level; l->file = src; l->func = func; l->line = line; vsnprintf(l->str, sizeof(l->str), format, ap); l->ready = true; _own_wrq->wr_seq++; return wakeup_reader_if_needed() < 0 ? -1 : 0; } ``` #### wakeup_reader_if_needed 在這邊會去判斷 logger 是不是處於 waitting 的狀態,如果是的話,就將它喚醒 ```c static inline int wakeup_reader_if_needed(void) { if (atomic_compare_exchange_strong(&logger.waiting, &(atomic_int){1}, 0)) { /* (the only) 1 waiter to wakeup */ return futex_wake(&logger.waiting, 1) < 0 ? -1 : 1; } return 0; } ``` ## 目前實作流程 目前的實做是 multiple-writer / single-reader 的形式 - 當有一條新的 write 執行緒產生時,它會先去尋找是否有空的 queue ,若是有的話,將自己的 `_own_wrq` 指向該 queue;若沒有的話,會呼叫 `alloc_write_queue` 函式,產生出一條新的 queue。 在 queue 內會有 n 條 line,功能為儲存需要被 log 的資訊 (在這裡假設每條 queue 內有 10 條 line。) ```graphviz digraph list_add_tail { nodesep=0.3 rankdir="LR" node[shape=record] _own_wrq [ label = "_own_wrq" shape="plaintext" ] queue [ label = " {L[0] | L[1] | L[2] | L[3] | ... | L[9]} " style="filled" fillcolor="lightblue" ] _own_wrq -> queue } ``` - 目前使用一條新的 queue 作為範例,在最開始的時候 `wr_seq` 與 `rd_seq` 皆為零,目前沒有任何的 log 被礎存在該 queue 中 - `wr_seq`: 儲存要被寫入的位置 - `rd_seq`: 儲存要被讀取的位置 ```graphviz digraph list_add_tail { nodesep=0.3 rankdir="LR" node[shape=record] wr_seq [ label = "wr_seq" shape="plaintext" ] rd_seq [ label = "rd_seq" shape="plaintext" ] queue [ label = " {<l0>L[0] | <l1>L[1] | L[2] | L[3] | ... | L[9]} " style="filled" fillcolor="lightblue" ] wr_seq -> queue:l0 rd_seq -> queue:l0 } ``` - 當 write 執行緒寫入了一個新的 log 時,會將 `wr_seq` 的位置往後挪一個 ```graphviz digraph list_add_tail { nodesep=0.3 rankdir="LR" node[shape=record] wr_seq [ label = "wr_seq" shape="plaintext" ] rd_seq [ label = "rd_seq" shape="plaintext" ] queue [ label = " {<l0> L[0] | <l1>L[1] | L[2] | L[3] | ... | L[9]} " style="filled" fillcolor="lightblue" ] wr_seq -> queue:l1 rd_seq -> queue:l0 } ``` - 在 write 執行緒將 log 放入 line 的函式中,也會嘗試將 read 執行緒給喚醒 (wake) ,read 執行緒將會去檢查所有的 queue 中是否東西,如果有的話,變更 `rd_seq` 的位置並將其印出 ```graphviz digraph list_add_tail { nodesep=0.3 rankdir="LR" node[shape=record] wr_seq [ label = "wr_seq" shape="plaintext" ] rd_seq [ label = "rd_seq" shape="plaintext" ] queue [ label = " {<l0> L[0] | <l1>L[1] | L[2] | L[3] | ... | L[9]} " style="filled" fillcolor="lightblue" ] wr_seq -> queue:l1 rd_seq -> queue:l1 } ``` ## 修改程式碼 ### 將函式依照功能命名 原始程式碼內的命名會比較通用,但是套到這份專案上面時,會發現比較不易閱讀。 舉 `logger_pthread_create` 為例子,在本專案中,它處理的事情為設定參數並創建出一個 pthread。但由於該專案的設計邏輯,single-reader / multi-writer ,已經有個專門創建 reader 執行緒的函式,故只會在創立 writer 執行緒的時候被使用到。 故想要將該專案內的函式針對專案內的功能,進行重新命名。 #### reader - **logger_init** -> **logger_reader_create** 在程式碼內原本對於該函式的註解說明為 `Initialize the logger manager` 但該函式在設定參數後,同時也創立了 reader thread,為了與 writer 內的相同功能函式命名邏輯一致,變更為 `logger_reader_create` - **logger_thread_func** -> **logger_reader_func** 函式內定義了 `reader_thread` 所該要執行的內容 #### writer - **logger_pthread_create** -> **logger_writer_create** 該函式設定參數並創立 writer thread :::info 在按照以上的邏輯命名發完 [pull request](https://github.com/sysprog21/threaded-logger/pull/1) 後,老師針對 logger_init 的變更回覆為: > This change violates the idea to encapsulate the essential operations. The terms "reader" and "writer" are meant to be distinguished internally. 針對整份 pull request 的回覆為 > For the sake of API naming convention, it is not necessary to address readers and writers in public functions. 看完回覆後發現針對 `logger_init` ,的確是要維持原本的命名方式會比較好,會比較符合他的原始行為 而 `logger_thread_func` 是一個 private 的函式,我想若使用 `logger_reader_func` 也是可以的。而且由於在這邊的執行緒分成兩種類型 (reader / writer),將 reader 放在函式的名稱中,會比較好被理解 `logger_pthread_create` 對我來說最大的問題點是,在 `logger_init` 內也有一條 pthread 被創立,怕會造成混淆。但是經過思考後,這個函式的名稱與他所作的事情的確是相符合,因此也決定不做任何更動。 ::: ### 處理目前程式碼內的 warning - 由於 116 行中的變數 `r` 在程式碼內沒有被使用,因此移除。 ```shell logger.c: In function ‘logger_deinit’: logger.c:116:9: warning: unused variable ‘r’ [-Wunused-variable] 116 | int r = futex_wake(&logger.waiting, 1); ``` - 由於 int 型別的數字範圍為 0 ~ 2147483647,若把一個整數變成字串的話,大小為 0 byte ~ 10 byte ,在此有機會會造成 overflow。觀察程式碼的語意,在這邊由於印出的數字為 thread 的最大數量,而一般的 multi-thread 程式中也不會設定產生超過 10000 的數字,因此在程式碼運行的部份理論上不會造成影響。但是在資訊安全領域的方面,由於 thread 的最大數量是由使用者所決定的,如此可能會造成不正程的程式行為。 在此有兩種改善方案 1. 增大 `tnm` 的大小,以避免 overflow 2. 在程式碼中限制 `thread` 的最大數量,需要介於 1 ~ 10000 之間 ```shell main.c:116:65: warning: ‘%04d’ directive output may be truncated writing between 4 and 10 bytes into a region of size 5 [-Wformat-truncation=] 116 | snprintf(tnm[i], LOGGER_MAX_THREAD_NAME_SZ, "writer-thd-%04d", i); | ^~~~ main.c:116:53: note: directive argument in the range [0, 2147483647] 116 | snprintf(tnm[i], LOGGER_MAX_THREAD_NAME_SZ, "writer-thd-%04d", i); ``` - 在 `write_line` 函式中,可以發現有可能的 buffer overflow 問題。 ```shell logger.c:389:78: warning: ‘sprintf’ may write a terminating nul past the end of the destination [-Wformat-overflow=] 389 | len = sprintf(linestr, "%s%s:%02d.%03lu,%03lu [%s%s%s] %*s <%s%*s%s> %s\n", ``` 下面為 `write_line` 的原始程式碼,會發現 linestr 的大小為 `LOGGER_LINE_SZ` + `LOGGER_MAX_PREFIX_SZ`,也就是應該要被印出的 log 與額外的像是 date/time 的資訊。 ```c len = sprintf(linestr, "%s%s:%02d.%03lu,%03lu [%s%s%s] %*s <%s%*s%s> %s\n", get_date(l->ts.tv_sec, c), get_time(l->ts.tv_sec, c), sec, msec, usec, c->level[l->level], _logger_level_label[l->level], c->reset, LOGGER_MAX_SOURCE_LEN, start_of_src_str, c->thread_name, biggest_thread_name, wrq->thread_name, c->reset, l->str); ``` 回頭看一下 `l->str` 這個參數是如何被賦值的,看到 `logger_printf` 這個函式,可以找到下方的程式碼,會發現 str 的長度是有被固定住的 ```c vsnprintf(l->str, sizeof(l->str), format, ap); ``` 由此判斷這個 warning 在程式碼的邏輯中,是有被妥善處理,不用修正的。 ### 程式碼改進 在 `logger_printf` 的地方有以下的程式碼,當中的 `l->str` 與 `l->ready` 並沒有相依性,因此可能由於最佳化,而造成順序對調。 研究原始程式碼後,發現有可能因為 `l->ready` 先被執行,而造成結果錯誤。 以下擷取自 `logger_printf` 函式 (writer) ```c= l->level = level; l->file = src; l->func = func; l->line = line; vsnprintf(l->str, sizeof(l->str), format, ap); l->ready = true; _own_wrq->wr_seq++; ``` 以下擷取自 `set_queue_entry` 函式 (reader) ```c= unsigned int index = wrq->rd_idx; if (!wrq->lines[index].ready) { fuse->ts = ~0; /* it is empty */ return 1; } struct timespec ts = wrq->lines[index].ts; fuse->ts = timespec_to_ns(ts); return 0; ``` 假設最佳化會造成 ready 先被指派的結果,若 writer 執行緒在執行到以下情形時, `reader` 就先行依照 `ready` 參數的設定結果,判斷是否有需要被印出的字串 (`ready` 已經被設立,但是 `str` 還沒有被改變),就有機會造成錯誤發生 ```graphviz digraph list_add_tail { nodesep=0.3 rankdir="LR" node[shape=record] queue [ label = " { <l0> ready : 1 \n str: \"test\" | <l1> ready : 1 \n str: NULL } " style="filled" fillcolor="lightblue" ] } ``` 為了要避免上面提到最佳化時造成的可能問題,我決定要將 ready 改成 atomic 的變數,並適度的設定 memory barrier,然後實驗看看與未改成 atomic 變數時,會造成多大的效能影響。 ## 參考資料 - [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics) - [簡介 C++11 atomic 和 memory order](https://medium.com/fcamels-notes/%E7%B0%A1%E4%BB%8B-c-11-memory-model-b3f4ed81fea6) - [Acquire and Release Semantics](https://preshing.com/20120913/acquire-and-release-semantics/) - [Linux 效能分析工具: Perf](http://wiki.csie.ncku.edu.tw/embedded/perf-tutorial) <!-- ### Write Thread (只是先把流程記著,之後再畫圖) - 在 main 裡面呼叫 `logger_pthread_create` ,創建出一個 thread - `logger_pthread_creat` 內會做的事情是設定該 thread 的相關參數,之後呼叫 `thread_wrapper` - 在 `thread_wrapper` 內,會去處理 cleanup stack 的事情,並且利用 `logger_assign_write_queue` 指定執行緒該用哪個 write queue - 在 `logger_assign_write_queue` 內會先去看是不是已經找好自己這條 thread 內要用的 queue (`_own_wrq`) ,如果沒有的話,會先去看看現成的,已經沒有被使用的 queue 可以被使用 (logger.queues),如果沒有的話,就呼叫 `alloc_write_queue` 去建立新的 write queue - 尋找最佳 free queue 的條件為,該 queue 的 lines_nr 應為所有 free queue 中最小的。 - 決定好要使用哪個 write queue 後,回到 `thread_wrapper` 函式,接著執行主要的 `writer` 函式 - 在 `writer` 函式中,會透過 `LOG_LEVEL` 將指定的字串塞到 write queue 中 (從 `logger.h` 裡面可以看到一系列由巨集 (macro) 包裝起來的 `logger_printf` 函式) ### Queue 的操作流程 - 在這邊使用的 queue 空間是在 `logger_init` 內被創造的,有 `thread_max * 1.5` 條 queue 可以被使用,然後在這邊設定是有 50 個 lines - logger 在最開始的時候參數的值 - running: true - queues_max: queues_max - default_lines_nr: lines_max - queues_nr: 0 ### atomic 相關參數用途 - `logger.reload` - 會在 read thread 被喚醒時設為 0 - 會在 `alloc_write_queue` 內被設為 1 -->