# OS HW2: ETL process - **學號: F74076124** - **姓名: 向景亘** - **系級: 資訊 111** ## 開發環境 - OS: Ubuntu 20.04 - CPU: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz - CPU cores: 4 - Memory: 16 GB - Programming Language: C11, gcc (Ubuntu 9.3.0-10ubuntu2) 9.3.0 ## 編譯與測試 本作業使用 `Makefile` 來進行編譯 可以使用下列指令來執行 ```shell $ make ``` 使用 `clang-format` 設定原始碼的格式 ```shell $ make format ``` 清除先前 compile 的資料以避免干擾 ```shell $ make clean ``` ## 使用方式 ### `spawner`: 產生測資 利用以下指令產生測資 `input.csv` ```shell $ ./spawner ``` ### `csv2json`: 將 csv 檔案轉換為 json 將 `input.csv` 中的資料以 json 的格式輸出到 `output.json` 中,可以利用 commandline arguments 指定執行緒的數量(預設為 `1`) ```shell $ ./csv2json [thread_num] ``` ## 開發記錄 ### `spawner` 這次的作業一樣要寫出可以產生測資的程式,而按照規定,我們應該要產生的檔案內容大致如下 ``` 1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20 2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21 3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22 4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23 <...> 4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23 ``` 而在作業說明中也有特別提到測資檔案的大小大約要在 1 GB 左右,所以我透過 `snprintf` 來記錄總共輸出了多少字元,也就是有多少個 byte 就可以知道目前輸出檔案的大小,並從而在滿足規定容量大小的時候停止測資的產生。 為了避免使用單執行緒會導致產生時間過長,所以我在這個部分嘗試使用 `pthread` 的方式來平行輸出資料,最後再將各執行緒產生的檔案合併為最後的輸出檔 `input.csv` 因為每個用 `|` 隔開的數值範圍與上次作業的範圍一樣,所以我將上一個作業使用到的 `random_bytes` 搬過來使用。 所以組合後就是每個 thread 要執行的 subroutine ```cpp void *spawn_worker(void *arg) { int worker_num = *(int *) arg; int num[20]; uint64_t content = 0; char filename[20]; snprintf(filename, 20, WORKING_FILE, worker_num); FILE *fp = fopen(filename, "w"); while (1) { randombytes((uint8_t *) &num, sizeof(num)); int i; for (i = 0; i < 19; i++) content += fprintf(fp, "%d|", num[i]); content += fprintf(fp, "%d\n", num[i]); if (content >= MAX_SPAWN / WORKER_NUM) { fclose(fp); return NULL; } } } ``` 因為有多個執行緒可以分擔產生測資的作業,所以每個執行緒要產生的就會是 `MAX_SPAWN / WORKER_NUM` 個 byte 的資料。 當各個 pthread 在 main thread 使用 `pthread_join` 回收相關資源後,使用 `open`, `write` 與 `read` 來實作將各個執行緒所產生的 檔案進行合併,最後輸出成 `input.csv` ```cpp int main(int argc, char *argv[]) { <...> for (int i = 0; i < WORKER_NUM - 1; i++) pthread_join(thr[i], NULL); int fd = creat(FILENAME, S_IRUSR | S_IWUSR); char filename[20], buf[4096]; for (int i = 0; i < WORKER_NUM; i++) { snprintf(filename, 20, WORKING_DIR, i); int tmp = open(filename, O_RDONLY), ret; while ((ret = read(tmp, buf, 4096))) write(fd, buf, ret); close(tmp); remove(filename); } <...> } ``` ### `csv2json` 在本次作業中,我們需要透過 `csv2json` 將給定的測資 `input.csv` 中的每一行資料 ``` 1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20 2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21 ``` 轉換成下列所示的 json 格式 ```json [ { "col_1":1, "col_2":2, "col_3":3, "col_4":4, "col_5":5, "col_6":6, "col_7":7, "col_8":8, "col_9":9, "col_10":10, "col_11":11, "col_12":12, "col_13":13, "col_14":14, "col_15":15, "col_16":16, "col_17":17, "col_18":18, "col_19":19, "col_20":20 }, { <...> } ] ``` 而為了考慮平行化處理的方案,同時又要維持原本輸入檔案的順序,我利用 `main thread` 來讀入每一行的資料並透過 thread 來處理這些資料,最後在各個 thread 將資料轉換成上述的格式後就透過 main thread 來輸出,大致的流程可以參考下圖。 ```graphviz digraph workflow { rankdir=LR node[shape=sqare] input[label="input\n(main)"] output[label="output\n(main)"] start[label="" shape=none] end[label="" shape=none] start -> input -> {worker_0, worker_1, worker_2, worker_3} -> output -> end } ``` 根據這樣的流程,我設計出了每個 `worker thread` 應該要具備的變數與結構 ```cpp typedef struct { sem_t task; sem_t completed; bool work; bool exit; char input[500]; char out[500]; int size; } convert_args; ``` 從上述的結構中,我們可以看到每個 thread 都有自己獨立的 input buffer 與 output buffer 這是為了避免在運作的過程中,因為共用 buffer 的關係而導致 data corruption,所以 `csv2json` 會在輸入前確認有將該輸入排給哪個 thread 來進行,再將資料寫入到該 thread 的 buffer 中,並透過 `task` 這個 semaphore 來通知 thread 開始工作。 因為在這個作業中,我採取的手段是 thread pool,所以為了能夠在輸入前挑出 idle 的 thread 來進行下一個工作,我實作了一個簡單的排程器來執行這樣的任務 ```cpp /* find the index of idle thread */ int thd_sched(convert_args thds[]) { int index = -1, task, completed; bool work; for (int i = 0; i < worker_num; i++) { sem_getvalue(&thds[i].task, &task); sem_getvalue(&thds[i].completed, &completed); work = *(&thds[i].work); if (!task && !completed && !work) { index = i; break; } } return index; } ``` 在這個排程器中,我們會從 `convert_args` 中讀取 `task` 與 `completed` 這兩個 semaphore 的數值,以及 `work` 的布林值,透過 `task` 的判斷,我們可以得知目前這個 thread 沒有任務,是一個 idle 的 thread;而如果 `completed` 是 `0` 的話,就表示他目前並沒有任何已完成的任務;若 `work` 的數值也是 `false` 的話就表示這個 thread 可以排入新的任務,因此 `thd_sched` 就會將這個 thread 的 index 回傳,以進行之後的讀寫。 而當各個 task 被傳到 worker thread 的 buffer 上後,worker thread 接著就會依照 buffer 上的資料來進行處理 ```cpp void *convert_worker(void *arg) { convert_args *args = (convert_args *) arg; <...> while (1) { <...> char *tok, *last = NULL; // printf("[worker] %s\n", args->out); tok = strtok_r(args->input, "|", &last); while (tok) { // printf("[worker] tok: %s\n", tok); len += sprintf(args->out + len, "\t\t\"col_%d\":%s", data_cnt, tok); int flag = (data_cnt++ == 20); strcat(args->out, &",\n"[flag]); len += 2 - flag; tok = strtok_r(NULL, "|", &last); } strcat(args->out, "\t}"); len += 2; args->size = len; sem_post(&args->completed); } } ``` 在 `convert_worker` 這個 subroutine 中,為了將每個介在兩個 `|` 字元中的資料取出來,我使用的是 `<string.h>` 中的 `strtok_r` 來處理字串。 因為在 [strtok(3) - Linux manual page](https://man7.org/linux/man-pages/man3/strtok.3.html) > The strtok() function uses a static buffer while parsing, so it's not thread safe. Use strtok_r() if this matters to you. 所以為了避免上述情況發生,我使用 `strtok_r` 來切割字串,避免非預期的情況產生。 ## 效能分析 ### 拆分各階段的執行結果 因為在這個作業中涉及到不同的作業階段,所以我將 `csv2json` 在做的事情進行簡單的劃分 1. **輸入 (Scan)** : 用以讀入各行資料 2. **處理 (Process)** : 透過 thread 處理並轉換資料 3. **輸出 (Output)** : 將轉換結果輸出到 `output.json` 中 ### 對照組測量結果 因為要比較使用不同數量的 thread 來平行處理的差異性,所以我們首先先針對只使用 1 個 thread 來處理資料的情況來進行測量 ``` $ ./csv2json 1 [Scan] time cost: 84.145579 [Process] time cost: 84.183010 [Output] time cost: 5.814779 [Overall] time cost: 99.543937 Convertion completed ``` ### 比較 thread worker 數量的差異 ``` $ ./csv2json 2 [Scan] time cost: 66.950295 [Process] time cost: 67.823268 [Output] time cost: 3.692263 [Overall] time cost: 75.937240 Convertion completed ``` 在執行緒只有兩個狀況下,我發現這個實作的測量狀況是最好的,時間也是最短的,在輸出時間上看得出來因為每次輸出較多的資料,所以輸出時間較只有 1 個執行緒的時間短。 ![](https://i.imgur.com/W1ir43H.png) 在 `htop` 觀察執行緒的運作以及各個核心執行的情況可以發現,因為系統中仍有其他的程式需要核心來進行運算,所以在這個情況下雖然 CPU 的使用率很高,但是依然可以執行其他的程式。 ``` $ ./csv2json 6 [Scan] time cost: 77.920023 [Process] time cost: 78.906701 [Output] time cost: 3.098866 [Overall] time cost: 83.664055 Convertion completed ``` ![](https://i.imgur.com/kABQQlw.png) ``` $ ./csv2json 20 [Scan] time cost: 77.357744 [Process] time cost: 77.593611 [Output] time cost: 2.305013 [Overall] time cost: 80.486613 Convertion completed ``` ![](https://i.imgur.com/RoFv2jb.png) ``` $ ./csv2json 100 [Scan] time cost: 75.926779 [Process] time cost: 75.965271 [Output] time cost: 1.862836 [Overall] time cost: 78.160151 Convertion completed ``` ![](https://i.imgur.com/leMfGa3.png) ### 比較使用 system call 來寫入檔案的差異 考慮到我們在上述的實作中在輸出的部分使用的是 `<stdio.h>` 中定義的 `fprintf`,為了比較其中的差異,我使用 `<unistd.h>` 與 `<fcntl.h>` 中的 `creat` 與 `write` 等系統呼叫來改寫輸出的實作並比較兩者的差異。 #### 對照組 為了比較各個數量的差異,我們一樣先記錄只有一個 thread 的執行情況。 ``` $ ./csv2json 1 [Scan] time cost: 75.054586 [Process] time cost: 75.060525 [Output] time cost: 17.833902 [Overall] time cost: 101.473322 Convertion completed ``` #### 實驗組 在實驗組的部分,我們仿造上面的實驗,測量執行緒數量在 2, 6, 20, 100 個的時候的執行情況。 ``` $ ./csv2json 2 [Scan] time cost: 66.950295 [Process] time cost: 67.823268 [Output] time cost: 3.692263 [Overall] time cost: 75.937240 Convertion completed ``` ``` $ ./csv2json 6 [Scan] time cost: 75.590053 [Process] time cost: 76.442072 [Output] time cost: 15.280341 [Overall] time cost: 93.422552 Convertion completed ``` ``` $ ./csv2json 20 [Scan] time cost: 83.200694 [Process] time cost: 83.520436 [Output] time cost: 14.841305 [Overall] time cost: 98.934671 Convertion completed ``` ``` $ ./csv2json 100 [Scan] time cost: 75.417766 [Process] time cost: 75.453682 [Output] time cost: 14.213082 [Overall] time cost: 90.002306 Convertion completed ``` ## 分析與結論 在上述的實驗結果中,我們可以看到的是,當我們只有一個 thread 在執行資料轉換的時候,因為大致的流程會與 sequential 的版本,也就是單執行緒的情況相似,所以在執行與輸出的時間上,有很大的受限。 因此在我們漸漸將 thread 的數量增加時,我們就可以發現,因為 thread 的數量增加了,也就表示在輸出的時候我們一次可以輸出更多的資料,而不需要等到下一輪的運算之後才能將其他的資料輸出,所以在 `output` 階段的表現會比只有 1 的執行緒的情況還要好。 但是在這個實作中受限於要保持輸入檔案原本的資料順序,所以我們沒辦法透過完全平行化的處理方式來進行,而是需要透過 main thread 來維護整個輸入、處理、輸出的順序性,且在切換到下一個階段的時候還要考慮到是否有 thread 還在進行剛剛指派的 task。 這個問題在執行緒數量小的時候不會造成太大的問題。但是當 thread 的數量來到 60 或甚至是 100 以上的時候,被第一個指派 task 的 thread 有可能已經將剛剛指派的資料已經轉換好了,但是他卻需要等到這一輪的所有 thread 完成他們的任務之後才能一起將這些資料輸出出去,就會造成執行緒數量增加但是整體執行時間沒有降低多少,或甚至增加的狀況產生。 除了比較各個執行緒數量間的差異之外,我們也有比較使用系統呼叫來寫入檔案的實作,主要的考量點在於 `fprintf` 的執行需要判斷字串結尾的 `'\0'` 來決定輸出是否完成,而在 `write` 的實作中是以指定寫入長度來進行,因此可能會有執行上的差異。 但是在我們上列的實驗結果中,我們可以發現其實兩者間的差異並不大,而且在其中的幾個情況裡,使用系統呼叫 `write` 的實作還比使用 `fprintf` 的情況更慢。我認為這樣的情況是因為在操作只有字元的情況下,`write` 與 `fprintf` 兩者的行為沒有太大的差異所致。 而在這些實驗的結果中,我們也可以發現使用 `fprintf`,且執行緒數量指定為 2 的執行情況最佳,我認為會造成這樣情況的原因有下列幾種 - **執行緒過多導致系統需要頻繁中斷任務** 因為在系統中除了要執行 `csv2json` 以外還要執行其他的程式,像是 GUI 或是網路瀏覽器等等,因為我們的系統中的排程器是屬於 time sharing 的實作,所以為了保持其他程式可以保持一定的運作,系統需要頻繁的切換以及中斷我們的 thread。並且因為 thread 數量過多的問題,也會導致所有 thread 會平分有限的系統資源,導致執行的成本增加。 - **硬碟架構與輸出頻率** 在我的硬碟資訊中可以看到他定義一個 section 的大小是 512 byte,考慮到我們行資料經過轉換後的大小最多不超過 500 byte,且因為使用兩個 thread 來處理資料,所以可以做到類似 prefetch 的效果,同時又可以依照硬碟架構來輸出資料,從而提升寫入的效率。 因此在上述兩個情況以及系統為了保持其他程式的執行的狀況下,使用 `fprintf` 以及執行緒指定為兩個的情況就會是這個實作中效率最好的情況了。 ###### tags: `os`