# OS HW2: ETL process
- **學號: F74076124**
- **姓名: 向景亘**
- **系級: 資訊 111**
## 開發環境
- OS: Ubuntu 20.04
- CPU: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
- CPU cores: 4
- Memory: 16 GB
- Programming Language: C11, gcc (Ubuntu 9.3.0-10ubuntu2) 9.3.0
## 編譯與測試
本作業使用 `Makefile` 來進行編譯
可以使用下列指令來執行
```shell
$ make
```
使用 `clang-format` 設定原始碼的格式
```shell
$ make format
```
清除先前 compile 的資料以避免干擾
```shell
$ make clean
```
## 使用方式
### `spawner`: 產生測資
利用以下指令產生測資 `input.csv`
```shell
$ ./spawner
```
### `csv2json`: 將 csv 檔案轉換為 json
將 `input.csv` 中的資料以 json 的格式輸出到 `output.json` 中,可以利用 commandline arguments 指定執行緒的數量(預設為 `1`)
```shell
$ ./csv2json [thread_num]
```
## 開發記錄
### `spawner`
這次的作業一樣要寫出可以產生測資的程式,而按照規定,我們應該要產生的檔案內容大致如下
```
1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20
2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21
3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22
4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23
<...>
4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23
```
而在作業說明中也有特別提到測資檔案的大小大約要在 1 GB 左右,所以我透過 `snprintf` 來記錄總共輸出了多少字元,也就是有多少個 byte 就可以知道目前輸出檔案的大小,並從而在滿足規定容量大小的時候停止測資的產生。
為了避免使用單執行緒會導致產生時間過長,所以我在這個部分嘗試使用 `pthread` 的方式來平行輸出資料,最後再將各執行緒產生的檔案合併為最後的輸出檔 `input.csv`
因為每個用 `|` 隔開的數值範圍與上次作業的範圍一樣,所以我將上一個作業使用到的 `random_bytes` 搬過來使用。
所以組合後就是每個 thread 要執行的 subroutine
```cpp
void *spawn_worker(void *arg)
{
int worker_num = *(int *) arg;
int num[20];
uint64_t content = 0;
char filename[20];
snprintf(filename, 20, WORKING_FILE, worker_num);
FILE *fp = fopen(filename, "w");
while (1) {
randombytes((uint8_t *) &num, sizeof(num));
int i;
for (i = 0; i < 19; i++)
content += fprintf(fp, "%d|", num[i]);
content += fprintf(fp, "%d\n", num[i]);
if (content >= MAX_SPAWN / WORKER_NUM) {
fclose(fp);
return NULL;
}
}
}
```
因為有多個執行緒可以分擔產生測資的作業,所以每個執行緒要產生的就會是 `MAX_SPAWN / WORKER_NUM` 個 byte 的資料。
當各個 pthread 在 main thread 使用 `pthread_join` 回收相關資源後,使用 `open`, `write` 與 `read` 來實作將各個執行緒所產生的
檔案進行合併,最後輸出成 `input.csv`
```cpp
int main(int argc, char *argv[])
{
<...>
for (int i = 0; i < WORKER_NUM - 1; i++)
pthread_join(thr[i], NULL);
int fd = creat(FILENAME, S_IRUSR | S_IWUSR);
char filename[20], buf[4096];
for (int i = 0; i < WORKER_NUM; i++) {
snprintf(filename, 20, WORKING_DIR, i);
int tmp = open(filename, O_RDONLY), ret;
while ((ret = read(tmp, buf, 4096)))
write(fd, buf, ret);
close(tmp);
remove(filename);
}
<...>
}
```
### `csv2json`
在本次作業中,我們需要透過 `csv2json` 將給定的測資 `input.csv` 中的每一行資料
```
1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20
2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21
```
轉換成下列所示的 json 格式
```json
[
{
"col_1":1,
"col_2":2,
"col_3":3,
"col_4":4,
"col_5":5,
"col_6":6,
"col_7":7,
"col_8":8,
"col_9":9,
"col_10":10,
"col_11":11,
"col_12":12,
"col_13":13,
"col_14":14,
"col_15":15,
"col_16":16,
"col_17":17,
"col_18":18,
"col_19":19,
"col_20":20
},
{
<...>
}
]
```
而為了考慮平行化處理的方案,同時又要維持原本輸入檔案的順序,我利用 `main thread` 來讀入每一行的資料並透過 thread 來處理這些資料,最後在各個 thread 將資料轉換成上述的格式後就透過 main thread 來輸出,大致的流程可以參考下圖。
```graphviz
digraph workflow {
rankdir=LR
node[shape=sqare]
input[label="input\n(main)"]
output[label="output\n(main)"]
start[label="" shape=none]
end[label="" shape=none]
start -> input -> {worker_0, worker_1, worker_2, worker_3} -> output -> end
}
```
根據這樣的流程,我設計出了每個 `worker thread` 應該要具備的變數與結構
```cpp
typedef struct {
sem_t task;
sem_t completed;
bool work;
bool exit;
char input[500];
char out[500];
int size;
} convert_args;
```
從上述的結構中,我們可以看到每個 thread 都有自己獨立的 input buffer 與 output buffer 這是為了避免在運作的過程中,因為共用 buffer 的關係而導致 data corruption,所以 `csv2json` 會在輸入前確認有將該輸入排給哪個 thread 來進行,再將資料寫入到該 thread 的 buffer 中,並透過 `task` 這個 semaphore 來通知 thread 開始工作。
因為在這個作業中,我採取的手段是 thread pool,所以為了能夠在輸入前挑出 idle 的 thread 來進行下一個工作,我實作了一個簡單的排程器來執行這樣的任務
```cpp
/* find the index of idle thread */
int thd_sched(convert_args thds[])
{
int index = -1, task, completed;
bool work;
for (int i = 0; i < worker_num; i++) {
sem_getvalue(&thds[i].task, &task);
sem_getvalue(&thds[i].completed, &completed);
work = *(&thds[i].work);
if (!task && !completed && !work) {
index = i;
break;
}
}
return index;
}
```
在這個排程器中,我們會從 `convert_args` 中讀取 `task` 與 `completed` 這兩個 semaphore 的數值,以及 `work` 的布林值,透過 `task` 的判斷,我們可以得知目前這個 thread 沒有任務,是一個 idle 的 thread;而如果 `completed` 是 `0` 的話,就表示他目前並沒有任何已完成的任務;若 `work` 的數值也是 `false` 的話就表示這個 thread 可以排入新的任務,因此 `thd_sched` 就會將這個 thread 的 index 回傳,以進行之後的讀寫。
而當各個 task 被傳到 worker thread 的 buffer 上後,worker thread 接著就會依照 buffer 上的資料來進行處理
```cpp
void *convert_worker(void *arg)
{
convert_args *args = (convert_args *) arg;
<...>
while (1) {
<...>
char *tok, *last = NULL;
// printf("[worker] %s\n", args->out);
tok = strtok_r(args->input, "|", &last);
while (tok) {
// printf("[worker] tok: %s\n", tok);
len += sprintf(args->out + len, "\t\t\"col_%d\":%s", data_cnt, tok);
int flag = (data_cnt++ == 20);
strcat(args->out, &",\n"[flag]);
len += 2 - flag;
tok = strtok_r(NULL, "|", &last);
}
strcat(args->out, "\t}");
len += 2;
args->size = len;
sem_post(&args->completed);
}
}
```
在 `convert_worker` 這個 subroutine 中,為了將每個介在兩個 `|` 字元中的資料取出來,我使用的是 `<string.h>` 中的 `strtok_r` 來處理字串。
因為在 [strtok(3) - Linux manual page](https://man7.org/linux/man-pages/man3/strtok.3.html)
> The strtok() function uses a static buffer while parsing, so it's
not thread safe. Use strtok_r() if this matters to you.
所以為了避免上述情況發生,我使用 `strtok_r` 來切割字串,避免非預期的情況產生。
## 效能分析
### 拆分各階段的執行結果
因為在這個作業中涉及到不同的作業階段,所以我將 `csv2json` 在做的事情進行簡單的劃分
1. **輸入 (Scan)** : 用以讀入各行資料
2. **處理 (Process)** : 透過 thread 處理並轉換資料
3. **輸出 (Output)** : 將轉換結果輸出到 `output.json` 中
### 對照組測量結果
因為要比較使用不同數量的 thread 來平行處理的差異性,所以我們首先先針對只使用 1 個 thread 來處理資料的情況來進行測量
```
$ ./csv2json 1
[Scan] time cost: 84.145579
[Process] time cost: 84.183010
[Output] time cost: 5.814779
[Overall] time cost: 99.543937
Convertion completed
```
### 比較 thread worker 數量的差異
```
$ ./csv2json 2
[Scan] time cost: 66.950295
[Process] time cost: 67.823268
[Output] time cost: 3.692263
[Overall] time cost: 75.937240
Convertion completed
```
在執行緒只有兩個狀況下,我發現這個實作的測量狀況是最好的,時間也是最短的,在輸出時間上看得出來因為每次輸出較多的資料,所以輸出時間較只有 1 個執行緒的時間短。

在 `htop` 觀察執行緒的運作以及各個核心執行的情況可以發現,因為系統中仍有其他的程式需要核心來進行運算,所以在這個情況下雖然 CPU 的使用率很高,但是依然可以執行其他的程式。
```
$ ./csv2json 6
[Scan] time cost: 77.920023
[Process] time cost: 78.906701
[Output] time cost: 3.098866
[Overall] time cost: 83.664055
Convertion completed
```

```
$ ./csv2json 20
[Scan] time cost: 77.357744
[Process] time cost: 77.593611
[Output] time cost: 2.305013
[Overall] time cost: 80.486613
Convertion completed
```

```
$ ./csv2json 100
[Scan] time cost: 75.926779
[Process] time cost: 75.965271
[Output] time cost: 1.862836
[Overall] time cost: 78.160151
Convertion completed
```

### 比較使用 system call 來寫入檔案的差異
考慮到我們在上述的實作中在輸出的部分使用的是 `<stdio.h>` 中定義的 `fprintf`,為了比較其中的差異,我使用 `<unistd.h>` 與 `<fcntl.h>` 中的 `creat` 與 `write` 等系統呼叫來改寫輸出的實作並比較兩者的差異。
#### 對照組
為了比較各個數量的差異,我們一樣先記錄只有一個 thread 的執行情況。
```
$ ./csv2json 1
[Scan] time cost: 75.054586
[Process] time cost: 75.060525
[Output] time cost: 17.833902
[Overall] time cost: 101.473322
Convertion completed
```
#### 實驗組
在實驗組的部分,我們仿造上面的實驗,測量執行緒數量在 2, 6, 20, 100 個的時候的執行情況。
```
$ ./csv2json 2
[Scan] time cost: 66.950295
[Process] time cost: 67.823268
[Output] time cost: 3.692263
[Overall] time cost: 75.937240
Convertion completed
```
```
$ ./csv2json 6
[Scan] time cost: 75.590053
[Process] time cost: 76.442072
[Output] time cost: 15.280341
[Overall] time cost: 93.422552
Convertion completed
```
```
$ ./csv2json 20
[Scan] time cost: 83.200694
[Process] time cost: 83.520436
[Output] time cost: 14.841305
[Overall] time cost: 98.934671
Convertion completed
```
```
$ ./csv2json 100
[Scan] time cost: 75.417766
[Process] time cost: 75.453682
[Output] time cost: 14.213082
[Overall] time cost: 90.002306
Convertion completed
```
## 分析與結論
在上述的實驗結果中,我們可以看到的是,當我們只有一個 thread 在執行資料轉換的時候,因為大致的流程會與 sequential 的版本,也就是單執行緒的情況相似,所以在執行與輸出的時間上,有很大的受限。
因此在我們漸漸將 thread 的數量增加時,我們就可以發現,因為 thread 的數量增加了,也就表示在輸出的時候我們一次可以輸出更多的資料,而不需要等到下一輪的運算之後才能將其他的資料輸出,所以在 `output` 階段的表現會比只有 1 的執行緒的情況還要好。
但是在這個實作中受限於要保持輸入檔案原本的資料順序,所以我們沒辦法透過完全平行化的處理方式來進行,而是需要透過 main thread 來維護整個輸入、處理、輸出的順序性,且在切換到下一個階段的時候還要考慮到是否有 thread 還在進行剛剛指派的 task。
這個問題在執行緒數量小的時候不會造成太大的問題。但是當 thread 的數量來到 60 或甚至是 100 以上的時候,被第一個指派 task 的 thread 有可能已經將剛剛指派的資料已經轉換好了,但是他卻需要等到這一輪的所有 thread 完成他們的任務之後才能一起將這些資料輸出出去,就會造成執行緒數量增加但是整體執行時間沒有降低多少,或甚至增加的狀況產生。
除了比較各個執行緒數量間的差異之外,我們也有比較使用系統呼叫來寫入檔案的實作,主要的考量點在於 `fprintf` 的執行需要判斷字串結尾的 `'\0'` 來決定輸出是否完成,而在 `write` 的實作中是以指定寫入長度來進行,因此可能會有執行上的差異。
但是在我們上列的實驗結果中,我們可以發現其實兩者間的差異並不大,而且在其中的幾個情況裡,使用系統呼叫 `write` 的實作還比使用 `fprintf` 的情況更慢。我認為這樣的情況是因為在操作只有字元的情況下,`write` 與 `fprintf` 兩者的行為沒有太大的差異所致。
而在這些實驗的結果中,我們也可以發現使用 `fprintf`,且執行緒數量指定為 2 的執行情況最佳,我認為會造成這樣情況的原因有下列幾種
- **執行緒過多導致系統需要頻繁中斷任務**
因為在系統中除了要執行 `csv2json` 以外還要執行其他的程式,像是 GUI 或是網路瀏覽器等等,因為我們的系統中的排程器是屬於 time sharing 的實作,所以為了保持其他程式可以保持一定的運作,系統需要頻繁的切換以及中斷我們的 thread。並且因為 thread 數量過多的問題,也會導致所有 thread 會平分有限的系統資源,導致執行的成本增加。
- **硬碟架構與輸出頻率**
在我的硬碟資訊中可以看到他定義一個 section 的大小是 512 byte,考慮到我們行資料經過轉換後的大小最多不超過 500 byte,且因為使用兩個 thread 來處理資料,所以可以做到類似 prefetch 的效果,同時又可以依照硬碟架構來輸出資料,從而提升寫入的效率。
因此在上述兩個情況以及系統為了保持其他程式的執行的狀況下,使用 `fprintf` 以及執行緒指定為兩個的情況就會是這個實作中效率最好的情況了。
###### tags: `os`