# Linux 核心專題: 改進 kHTTPd
> 執行人: [Jerejere0808](https://github.com/Jerejere0808/khttpd)
> [專題解說錄影](https://youtu.be/I9Wc8XYSvls)
## 任務簡述
依據 [ktcp](https://hackmd.io/@sysprog/linux2023-ktcp) 的指示,持續改進 [sysprog21/khttpd](https://github.com/sysprog21/khttpd) 的程式碼,打造出高效且穩定的網頁伺服器。預計達成的目標:
* 導入 lock-free 資源管理
* 支援 HTTP 壓縮,運用 Linux 核心 API
* 實作 content cache
## TODO: 改進 [sysprog21/khttpd](https://github.com/sysprog21/khttpd) 的效率
依據 [ktcp](https://hackmd.io/@sysprog/linux2023-ktcp) 的指示,在 [sysprog21/khttpd](https://github.com/sysprog21/khttpd) 的基礎之上,利用 CMWQ 一類的機制,打造出高效且穩定的網頁伺服器,需要處理 lock-free 的資源管理議題 (如 RCU)。
搭配閱讀: 〈[RCU Usage In the Linux Kernel: One Decade Later](https://pdos.csail.mit.edu/6.828/2020/readings/rcu-decade-later.pdf)〉
> An application can change the IP options on a per-socket basis by calling sys_setsockopt,
which eventually causes the kernel to call setsockopt.
setsockopt sets the new IP options, then uses call_
rcu to asynchronously free the memory storing the old IP
options. Using `call_rcu` ensures all threads that might
be manipulating the old options will have released their
reference by exiting the RCU critical section.
## TODO: 支援 HTTP 壓縮
參見 [Compression in HTTP](https://developer.mozilla.org/en-US/docs/Web/HTTP/Compression),應善用 [Kernel Crypto API](https://docs.kernel.org/crypto/intro.html),至少涵蓋 gzip 壓縮。
## TODO: 實作 content cache
預計涵蓋:
* Directory listing
* File information (size, last modified date, MIME type, etc)
* Compressed file contents
## TODO: 檢視 I/O 模型並尋求效能改進空間
以 ftrace 檢視 kHTTPd 運作的細節,檢視 I/O 模型並尋求效能改進空間
## 實作 Directory listing
原本的伺服器只能傳送一串固定的字串給客戶端,故做一些修改來實作一個有 Directory listing 功能的伺服器。其可以根據使用者傳送的 URL 回傳相對應路徑目錄的檔案資訊,包括檔名,上次修改時間和檔案大小,若是檔案則可以傳送檔案內容。
所以在原本函式 `http_server_response` 裡面做修改,新增一個函式 `directory_handler` 來統一處理不同的 request url。
```diff
static int http_server_response(struct http_request *request, int keep_alive)
{
- pr_info("requested_url = %s\n", request->request_url);
- if (request->method != HTTP_GET)
- response = keep_alive ? HTTP_RESPONSE_501_KEEPALIVE : HTTP_RESPONSE_501;
- else
- response = keep_alive ? HTTP_RESPONSE_200_KEEPALIVE_DUMMY
- : HTTP_RESPONSE_200_DUMMY;
- http_server_send(request->socket, response, strlen(response));
+ directory_handler(request, keep_alive);
return 0;
```
```c=
static bool directory_handler(struct http_request *request, int keep_alive){
struct file *fp;
char *path_buf;
path_buf = kzalloc(SEND_BUFFER_SIZE, GFP_KERNEL);
if (!path_buf) {
pr_err("can't allocate memory!\n");
return false;
}
...
catstr(path_buf, daemon_list.path, request->request_url);
fp = filp_open(path_buf, O_RDONLY, 0);
...
if (S_ISDIR(fp->f_inode->i_mode)) {
char send_buf[SEND_BUFFER_SIZE] = {0};
snprintf(send_buf, SEND_BUFFER_SIZE, "%s%s%s", "HTTP/1.1 200 OK\r\n",
"Content-Type: text/html\r\n",
"Transfer-Encoding: chunked\r\n\r\n");
http_server_send(request->socket, send_buf, strlen(send_buf));
snprintf(send_buf, SEND_BUFFER_SIZE, "7B\r\n%s%s%s%s",
"<html><head><style>\r\n",
"body{font-family: monospace; font-size: 15px;}\r\n",
"td {padding: 1.5px 6px;}\r\n",
"</style></head><body><table>\r\n");
http_server_send(request->socket, send_buf, strlen(send_buf));
iterate_dir(fp, &request->dir_context);
snprintf(send_buf, SEND_BUFFER_SIZE,
"16\r\n</table></body></html>\r\n");
http_server_send(request->socket, send_buf, strlen(send_buf));
snprintf(send_buf, SEND_BUFFER_SIZE, "0\r\n\r\n");
http_server_send(request->socket, send_buf, strlen(send_buf));
} else if (S_ISREG(fp->f_inode->i_mode)) {
char *read_data = kmalloc(fp->f_inode->i_size, GFP_KERNEL);
int ret = read_file(fp, read_data);
send_http_header(request->socket, 200, "OK", "text/plain", ret,
connection);
http_server_send(request->socket, read_data, ret);
kfree(read_data);
}
...
}
```
`directory_handler` 對於 request 的處理流程為:
1. 合併伺服器指定的路徑和 `request->request_url` 所指定的路徑成絕對路徑。
2. 以 `filp_open` 開啟相對應的檔案。
3. 根據檔案的 inode 判斷是目錄還是檔案。
4. 若為檔案就以 `read_file` 讀入檔案,若為目錄就使用作業提示的 iterate_dir 和 tracedir 迭代目錄下的檔案並依序取得檔名。
5. 傳送的部分檔案的情況較為簡單,因為已經知道其大小,就能在 http header 使用 Conent-Length 指定內容大小並傳送,而目錄的情況相對複雜,因為不會事先知道目錄有多少檔案且還要傳送每個檔案大小和上次修改時間的資訊,所以若要用 Conent-Length 指定大小就必須把所有內容存進 buffer,再取其大小,這會需要額外空間,所以這裡的做法是用 Transfer-Encoding 分多次傳送目錄檔案資訊,這部分以下會更詳細說明。
### 使用 Transfer-Encoding 傳送目錄檔案資訊
當使用普通模式,即非 KeepAlive 模式時,每個請求/回應客戶和伺服器都要新建一個連接,完成 之後立即斷開連接(HTTP 協定爲無狀態 (stateless) 的協定);當使用 Keep-Alive 模式(又稱持久連接、連接重用)時,Keep-Alive 功能使客戶端到伺服器端的連接持續有效,當出現對伺服器的後繼請求時, Keep-Alive 功能避免了建立或者重新建立連接。
Keep-Alive 模式發送資料完畢時,HTTP 伺服器不會自動斷開連接,客戶端用兩種方法判斷請求所得到的響應資料已經接收完成:
Conent-Length
顧名思義, Conent-Length 表示實體內容長度,客戶端(伺服器)可以根據這個值來判斷資料是否接收完成。
Chunked-transfer-encoding
chunk 編碼將資料分成一塊一塊的發生。 Chunked 編碼將使用若干個 Chunk 串連而成,由一個標明長度爲 0 的 chunk 標示結束。每個 Chunk 分爲頭部和本文兩部分,頭部內容指定本文的字元總數(十六進位的數值)和數量單位(一般不寫),本文部分就是指定長度的實際內容,兩部分之間用 CRLF 隔開。在最後一個長度爲 0 的 Chunk 中的內容是稱爲 footer的內容,是一些附加的 Header 資訊(通常可以直接忽略)
傳送目錄內的檔案資訊因為事先不會知道實體內容長度為多少(不知道有幾個檔案),所以這邊就以 Chunked-transfer-encoding 來傳送。
:::warning
注意用語:
* folder: 檔案夾
* directory: 目錄
在 POSIX 相容作業系統 (如 Linux),應該使用「目錄」
:::
所以在每次的 `tracedir` 函式都必須要以 Chunked-transfer-encoding 的方式傳送其所迭代到的檔案或目錄名稱。
以下為函式 `tracedir` 的實作部分:
```c=
static int tracedir(struct dir_context *dir_context,
const char *name,
int namelen,
loff_t offset,
u64 ino,
unsigned int d_type)
{
if (strcmp(name, ".") && strcmp(name, "..")) {
struct http_request *request =
container_of(dir_context, struct http_request, dir_context);
char buf[SEND_BUFFER_SIZE] = {0};
char absolute_path[256];
int path_pos = 0;
memset(absolute_path, 0, 256);
memcpy(absolute_path, daemon_list.path, strlen(daemon_list.path));
path_pos += strlen(daemon_list.path);
memcpy(absolute_path + path_pos, request->request_url,
strlen(request->request_url));
path_pos += strlen(request->request_url);
if (strcmp(absolute_path, "/")) {
memcpy(absolute_path + path_pos, "/", 1);
path_pos += 1;
}
memcpy(absolute_path + path_pos, name, strlen(name));
struct file *fp = filp_open(absolute_path, O_RDONLY, 0);
if (IS_ERR(fp)) {
pr_info("Open file failed");
return 0;
}
struct tm result;
time64_to_tm(fp->f_inode->i_mtime.tv_sec, 0, &result);
char size_str[100];
memset(size_str, 0, 100);
snprintf(size_str, 100, "%lld", fp->f_inode->i_size);
char *url =
!strcmp(request->request_url, "/") ? "" : request->request_url;
snprintf(buf, SEND_BUFFER_SIZE,
"%lx\r\n<tr><td><a href=\"%s/%s\">%s</a> %04ld/%02d/%02d "
"%s Bytes</td></tr>\r\n",
34 + strlen(url) + (namelen << 1) + 14 + strlen(size_str) + 7,
url, name, name, result.tm_year + 1900, result.tm_mon + 1,
result.tm_mday, size_str);
http_server_send(request->socket, buf, strlen(buf));
filp_close(fp, NULL);
}
return 0;
}
```
可以看到在第 45 行的部分傳送的資料被放在 buf 中,一開始就用十六進位的格式標註本文的字元總數,其包括 html 格式、檔名、日期和檔案大小等等資訊。並在第 51 行送給客戶端,使其可以依序接收。
注意到上面有包含檔案訊息包括上次修改日期和檔案大小資訊的傳送,進一步解說如下。
因為函式 `tracedir` 的參數是固定的,但又需要結構 `http_request` 裡面的某些成員變數,其中包括 `socket` 用來傳送資料,另外也會需要 `request_url`,因為會需要傳送檔案的相關資訊,包括大小和修改日期,所以會需要其絕對路徑來存取到檔案,這時將會用到 `request_url`。因為無法自訂要傳入的參數,但會傳入結構 `dir_context`,所以就可以把結構 `dir_context` 放進結構 `http_request` 裡,如此一來,透過巨集`container_of` 就可以存取到結構 `http_request` 和其他的成員變數。
新增 `struct dir_context` 在 `struct http_request`
```diff=
struct http_request {
struct socket *socket;
enum http_method method;
char request_url[128];
int complete;
+ struct dir_context dir_context;
};
```
整個 `tracedir` 的流程如下所示
1. 透過 `container_of` 存取到結構 `http_request` (第 9 行)
2. 組合絕對路徑並開啟檔案 (第 13 ~ 34 行)
3. 在檔案的 inode 有紀錄上次存取的時間 `i_mtime`,其型態為 `struct timespec`,不是年月日。 其中有一個成員變數 sec 型態為 time_t,實際是個長整數型態。其值表示為從 UTC (coordinated universal time) 時間 1970 年 1 月 1 日 00 時 00 分 00 秒(也稱為 UNIX 系統的 [Epoch 時間](https://en.wikipedia.org/wiki/Unix_time))到目前時刻的秒數。
```c
struct timespec {
time_t tv_sec; /*second*/
long tv_nsec;/*nanosecond*/
}
```
這裡用 `time64_to_tm` 把 tv_sec 轉置成 `struct tm` 的型態如下:
```c
struct tm {
int tm_sec; /* 秒 – 取值區間為[0,59] */
int tm_min; /* 分 - 取值區間為[0,59] */
int tm_hour; /* 時 - 取值區間為[0,23] */
int tm_mday; /* 一個月中的日期 - 取值區間為[1,31] */
int tm_mon; /* 月份(從一月開始,0代表一月) - 取值區間為[0,11] */
int tm_year; /* 年份,其值等於實際年份減去1900 */
int tm_wday; /* 星期 – 取值區間為[0,6],其中0代表星期天,1代表星期一,以此類推 */
int tm_yday; /* 從每年的1月1日開始的天數 – 取值區間為[0,365],其中0代表1月1日,1代表1月2日,以此類推 */
int tm_isdst; /* 日光節約時間標示,實行日光節約時間的時候,tm_isdst 為正。不實行日光節約時間時,tm_isdst 為 0;在未知狀況,tm_isdst() 為負。*/
};
```
如此就能得到年月日的形式(第 36 ~ 41 行)
4. 再來就是檔案大小的部分,也就是 inode 裡面的 `i_size`,型態為 `long long` , 先把其轉成字串後可放進 buf。
5. 最後就是傳送的部分,用先前提到的 Chunked-transfer-encoding 傳送。
6. 關閉檔案
以上為初步的 directory list 實作,成果如下:
![](https://hackmd.io/_uploads/HJte3JPU2.png)
用 `htstress` 測量其吞吐量,伺服器需要處理 20000 個連線並傳送其所在電腦的根目錄內容,可以看到吞吐量為 `requests/sec: 3346.166`。
```
./htstress localhost:8081 -n 20000
0 requests
2000 requests
4000 requests
6000 requests
8000 requests
10000 requests
12000 requests
14000 requests
16000 requests
18000 requests
requests: 20000
good requests: 20000 [100%]
bad requests: 0 [0%]
socket errors: 0 [0%]
seconds: 5.977
requests/sec: 3346.166
```
進一步用 ftrace 去觀察其效能瓶頸。可以發現第 29 行 `iterate_dir()` 花了 619.143 us,若可以改善就能省下不少時間。
```=
http_server_worker_CMWQ [khttpd]() {
7) | kernel_sigaction() {
7) 0.660 us | _raw_spin_lock_irq();
7) 0.182 us | _raw_spin_unlock_irq();
7) 1.453 us | }
7) | kernel_sigaction() {
7) 0.172 us | _raw_spin_lock_irq();
7) 0.176 us | _raw_spin_unlock_irq();
7) 0.848 us | }
7) | kmem_cache_alloc_trace() {
7) 0.167 us | __cond_resched();
7) 0.168 us | should_failslab();
7) 1.133 us | }
7) 0.173 us | http_parser_init [khttpd]();
7) | kernel_recvmsg() {
7) | sock_recvmsg() {
...
7) 0.169 us | http_should_keep_alive [khttpd]();
7) | _printk() {
7) 7.532 us | vprintk();
7) 7.964 us | }
7) | directory_handler.isra.0 [khttpd]() {
7) 0.466 us | kmem_cache_alloc_trace();
7) + 11.456 us | filp_open();
7) + 41.726 us | http_server_send.isra.0 [khttpd]();
7) + 28.557 us | http_server_send.isra.0 [khttpd]();
7) ! 619.143 us | iterate_dir();
7) + 22.697 us | http_server_send.isra.0 [khttpd]();
7) + 19.107 us | http_server_send.isra.0 [khttpd]();
7) 2.910 us | filp_close();
7) 0.292 us | kfree();
7) ! 749.799 us | }
7) ! 758.680 us | }
7) ! 768.669 us | }
```
參考: [Risheng1128](https://github.com/Risheng1128/khttpd)
## 以 content cache 改進伺服器處理效率
為了避免在每一次客戶端要求某個目錄或檔案內容時都去執行 `iterate_dir` 或 `read_file` 導致吞吐量下降,這裡實作一個 cache 來把之前讀過的目錄或檔案儲存在記憶體,一段時間內,若有相同的 request_url 就可以直接傳送已經準備好的內容,若超過這段時間,記憶體就會被釋放掉來避免 cache 占用過多記憶體的問題。
### 以 hash table 實作 content cache
這裡選擇使用 hash table 來實作 content cache,此外因為 cache 通常可以符合讀的次數大於修改其內容的次數,所以會使用 RCU (Read-Copy Update) 同步機制,來使多個執行緒可以在有資料在更新時,還能做讀取的動作,以此避免頻繁的 lock 導致效能下降。
以下是 hash table 的資料結構,`struct hash_table` 為 hash table 主體,其中 buckets 為一個陣列,存放型態為 `struct list_head` 的鏈結串列節點,這些節點經由雜湊函數計算後,應放在同一個 bucket 的資料串聯起來,也就是同一個 bucket 但是不同 key 的資料會以鏈結串列的形式串聯並存放。另外, bucket_size 為陣列長度,lock 為多個執行緒要修改 hash table 時必須搶奪的 spinlock 。
```c
struct hash_table {
unsigned int bucket_size;
struct list_head *buckets;
spinlock_t lock;
};
```
`struct hash_element` 為存放資料的節點,`void *data` 為目錄或檔案的內容,`unsigned int size` 為檔案大小, `char *key` 為其對應的絕對路徑,這些節點會被 `struct list_head node` 串聯起來,另外,還有一個 `struct timer_node timer_item` 這是有關 timer 釋放資料時會用到的結構,稍後會做說明,最後因為資料的讀取和釋放會用到 RCU , 所以還有一個 `struct rcu_head rcu` 成員。
```c
struct hash_element {
struct list_head node;
char *key;
void *data;
unsigned int size;
struct timer_node timer_item;
struct rcu_head rcu;
};
```
以下是插入資料會用到的函式 `hash_table_add`,流程如下:
1. 把要插入的檔案或目錄內容的絕對路徑當成 key , 並將其字串長度除以 bucket 數量來計算出應該把資料放在哪個 bucket (第 6 行)。
2. 建立 element (第 9 ~ 20 行)。
3. 尋找是否已經有相同資料存在,若為是,就放棄插入。這裡用 `list_for_each_entry_rcu` 配合 `rcu_read_lock` 和 `rcu_read_unlock` 來走訪鏈結串列,這樣就不需要使用到 lock (第 22 ~ 33 行)。
4. 最後要用到 `list_add_rcu` 做插入,這裡會需要 lock , 因為可能會有多個執行緒要同時插入,若是只有單一執行緒在進行寫入、修改和刪除,其他的則只有讀的話,這個 lock 是不需要的 (第 35 ~ 38 行)。
```c=
struct hash_element *hash_table_add(struct hash_table *ht,
char *key,
void *data,
unsigned int size)
{
struct hash_element *loop = NULL, *elem = NULL;
struct list_head *bucket = &ht->buckets[strlen(key) % ht->bucket_size];
elem = (struct hash_element *) kmalloc(sizeof(struct hash_element),
GFP_KERNEL);
if (!elem)
return NULL;
elem->key = (char *) kmalloc(strlen(key) + 1, GFP_KERNEL);
memcpy(elem->key, key, strlen(key));
elem->data = (void *) kmalloc(size + 1, GFP_KERNEL);
memcpy(elem->data, data, size);
elem->size = size;
rcu_read_lock();
list_for_each_entry_rcu(loop, bucket, node)
{
if (strcmp(loop->key, key) == 0) {
kfree(elem->key);
kfree(elem->data);
kfree(elem);
rcu_read_unlock();
return NULL;
}
}
rcu_read_unlock();
spin_lock(&ht->lock);
list_add_rcu(&elem->node, bucket);
spin_unlock(&ht->lock);
return elem;
}
```
以下是查詢節點會用到的函式 `hash_table_find`。透過傳入`unsigned int *cache_size` 來獲取資料大小,也是用 `rcu_read_lock` 來避免 lock,最後傳回複製的資料來避免原本的資料在 `rcu_read_unlock` 之後可能被刪除的情況。
```c
char *hash_table_find(struct hash_table *ht, char *key, unsigned int *cache_size)
{
struct hash_element *loop = NULL;
struct list_head *bucket = &ht->buckets[strlen(key) % ht->bucket_size];
char *cache_data = NULL;
rcu_read_lock();
list_for_each_entry_rcu(loop, bucket, node)
{
if (strcmp(loop->key, key) == 0) {
*cache_size = loop->size;
cache_data = kmalloc(loop->size + 1, GFP_KERNEL);
memcpy(cache_data, loop->data, loop->size);
break;
}
}
rcu_read_unlock();
return cache_data;
}
```
有以上插入和搜尋資料的功能後,就可以快取讀過的資料。不過,也需要找時間將某些資料刪除避免佔用過多記憶體。
### 根據 timer 釋放 cache 資料
所以這裡根據作業提示的方法,用 heap 實作 timer priority queue,讓伺服器定期把超過時間限制的 cache element 釋放。
這時就需要用到剛剛提到的 `struct timer_node timer_item`,
其結構如下,`time_limit` 是 timer 過期的時間,以 millisecond 表示:
```c
struct timer_node {
size_t time_limit;
};
```
透過把結構 `hash_element` 裡的 `timer_node` 指標放入一個過期時間由近到遠排列的 timer priority queue,就可以以 `timer_node` 存取到結構 `hash_element` 來做 cache 的管理。
timer priority queue 結構如下,第 5 行為 priority queue 本體,可以看到裡面存放的是 `struct timer_node` 的指標
```c=
struct timer_heap {
atomic_t size;
atomic_t capacity;
timer_heap_compare compare;
struct timer_node **heap;
};
```
以下是把 `timer_node` 插入 priority queue 的部分,流程為:
更新並取得現在的時間,加上時間限制得到 cache 過期的時間,並設定 `ele` 裡面的 `struct timer_node timer_item` 。 (第 5、7 行)
將 `timer_node` 的指標插入 prority queue。(第 9 行)
```c=
bool cache_add_timer(struct hash_element *elem,
size_t timeout,
struct timer_heap *cache_timer_heap)
{
timer_update_current_msec();
(elem->timer_item).time_limit = atomic_read(¤t_msec) + timeout;
timer_heap_insert(cache_timer_heap, &elem->timer_item);
return 1;
}
```
所以每次在 cache 新增 `struct hash_element` 資料時要順便把其 `timer_node` 的指標插入 prority queue。
再來就是要將伺服器改成 non-blocking,並定期去執行檢查是否有`struct hash_element` 的 timer 過期,並且刪除。以下為實作,其實就是檢查放置最久的 `timer_node`,若其過期時間已經小於目前時間就透過 `cache_heap_delete_min` 刪除其對應的 element , 也就是快取的資料。若其到期時間大於目前時間,因為是 priority queue , 所以後面的 `timer_node` 過期一定是更久之後,就代表目前的都還沒到期,可以跳出迴圈。
```c=
void cache_handle_expired_timers(struct timer_heap *cache_timer_heap,
struct hash_table *ht)
{
while (!timer_heap_is_empty(cache_timer_heap)) {
struct timer_node *node = timer_heap_get_min(cache_timer_heap);
timer_update_current_msec();
if (node->time_limit > atomic_read(¤t_msec))
return;
cache_heap_delete_min(cache_timer_heap, ht);
}
return;
}
```
`cache_heap_delete_min` 實作如下,可以看到在第 25 行,透過 container_of 存取到 `struct hash_element` , 之後就可以在第 27 行用`hash_table_remove_by_elem_pointer` 去釋放這個 `struct hash_element`。
```c=
static bool cache_heap_delete_min(struct timer_heap *cache_timer_heap,
struct hash_table *ht)
{
size_t size;
struct timer_node *victim;
if (timer_heap_is_empty(cache_timer_heap))
return 0;
while (1) {
if (timer_heap_is_empty(cache_timer_heap))
return true;
size = atomic_read(&cache_timer_heap->size);
timer_heap_swap(cache_timer_heap, 1, size);
if (size == atomic_read(&cache_timer_heap->size)) {
victim = cache_timer_heap->heap[size];
break;
}
timer_heap_swap(cache_timer_heap, 1, size);
}
struct hash_element *elem =
container_of(victim, struct hash_element, timer_item);
hash_table_remove_by_elem_pointer(ht, elem);
atomic_set(&cache_timer_heap->size, --size);
timer_heap_sink(cache_timer_heap, 1);
return 1;
}
```
所以除了剛剛對 cache 的 hash table 做插入和搜尋的動作之外還有新增一個刪除的函式 `hash_table_remove_by_elem_pointer` ,如下:
```c=
void *hash_table_remove_by_elem_pointer(struct hash_table *ht,
struct hash_element *elem)
{
void *data = NULL;
spin_lock(&ht->lock);
data = elem->data;
list_del_rcu(&elem->node);
call_rcu(&elem->rcu, free_hash_element_rcu);
spin_unlock(&ht->lock);
return data;
}
static void free_hash_element_rcu(struct rcu_head *rcu)
{
struct hash_element *elem = container_of(rcu, struct hash_element, rcu);
kfree(elem->key);
kfree(elem->data);
kfree(elem);
return;
}
```
用 `list_del_rcu` 把 `struct hash_element` 從鏈結串列中移除,並用 `call_rcu` 延後記憶體的釋放,讓正在讀資料的執行緒可先讀完資料。跟插入時一樣,這裡會用到 `spin_lock` 是因為還是得避免多執行緒的 race condition 問題。
:::warning
〈[Linux 核心專題: 改進高效網頁伺服器](https://hackmd.io/@sysprog/HJgX4_MH3)〉提到 [tickless hierarchical timing wheel](https://www.25thandclement.com/~william/projects/timeout.c.html),當連線數量夠大時,和以 binary heap 實作的 priority queue 之間效率落差就會顯著。
:notes: jserv
:::
有了以上的 cache 之後就可以把每次的資料存進去,只要 timer 還沒到期,就可以一直使用。
這裡用 `htstress` 觀察可省下多少時間。
```shell
$ ./htstress localhost:8081 -n 20000
0 requests
2000 requests
4000 requests
6000 requests
8000 requests
10000 requests
12000 requests
14000 requests
16000 requests
18000 requests
requests: 20000
good requests: 20000 [100%]
bad requests: 0 [0%]
socket errors: 0 [0%]
seconds: 1.223
requests/sec: 16351.585
```
因為這 20000 個 request 其實要求的內容都是網站的根目錄,url 和內容都相同,所以只有在第一次 request 要執行 `iterate_dir` ,其他的都是直接從 cache 讀資料。可以看到吞吐量可以來到 16351.585 requests/sec。
用 `ftrace` 來觀察差異。第一部分為有執行 `iterate_dir` , 第二部分為從 cache 抓取資料。
```=
6) | http_parser_callback_message_complete [khttpd]() {
6) 0.180 us | http_should_keep_alive [khttpd]();
6) | directory_handler.isra.0 [khttpd]() {
6) 0.678 us | kmem_cache_alloc_trace();
6) 7.196 us | filp_open();
6) 0.412 us | hash_table_find [khttpd]();
6) + 32.114 us | http_server_send.isra.0 [khttpd]();
6) + 27.226 us | http_server_send.isra.0 [khttpd]();
6) 5.344 us | kmalloc_order_trace();
6) ! 625.341 us | iterate_dir();
6) + 14.122 us | http_server_send.isra.0 [khttpd]();
6) + 19.295 us | http_server_send.isra.0 [khttpd]();
6) 3.294 us | hash_table_add [khttpd]();
6) 0.618 us | cache_add_timer [khttpd]();
6) 1.063 us | kfree();
6) 3.155 us | filp_close();
6) 0.297 us | kfree();
6) ! 744.181 us | }
```
```=
0) | http_parser_callback_message_complete [khttpd]() {
0) 0.165 us | http_should_keep_alive [khttpd]();
0) | directory_handler.isra.0 [khttpd]() {
0) 0.540 us | kmem_cache_alloc_trace();
0) 6.603 us | filp_open();
0) 0.428 us | hash_table_find [khttpd]();
0) + 37.594 us | send_http_header [khttpd]();
0) + 29.727 us | http_server_send.isra.0 [khttpd]();
0) 1.281 us | filp_close();
0) 0.310 us | kfree();
0) + 79.456 us | }
0) + 80.105 us | }
```
可以看到第一部分除了 `iterate_dir` 之外也要執行 `hash_table_add` 和 `cache_add_timer`。這樣第二部分就只需要 `hash_table_find` 就能抓取到資料。
## 以 http 資料壓縮減少快取記憶體用量及資料傳送量
接下來可以更進一步用 Linux Kernel 提供的 API 來壓縮資料,這些 API 會經由 crypto subsystem 把 request 轉發給硬體算法引擎 (hardware crypto engine),所以實際上壓縮是有專門的硬體執行的。
這裡用`crypto_alloc_comp(const char *alg_name, u32 type, u32 mask)` 和 `crypto_alloc_acomp(const char *alg_name, u32 type, u32 mask)` 兩個函式來做資料壓縮,在這裡,`const char *alg_name` 可以是特定的驅動程式名稱或是演算法名稱。
可以使用 `cat /proc/crypto` 命令找到已註冊的加密演算法:
```shell!
jeremy@jeremy-VivoBook-15-ASUS-Laptop-X560UD:~/linux2023/khttpd$ cat /proc/crypto
name : ccm(aes)
driver : ccm_base(ctr(aes-aesni),cbcmac(aes-aesni))
module : ccm
priority : 300
refcnt : 3
selftest : passed
internal : no
type : aead
async : no
blocksize : 1
ivsize : 16
maxauthsize : 16
geniv : <none>
name : ctr(aes)
driver : ctr(aes-aesni)
module : kernel
priority : 300
refcnt : 3
selftest : passed
internal : no
type : skcipher
async : no
blocksize : 1
min keysize : 16
max keysize : 32
ivsize : 16
chunksize : 16
walksize : 16
name : cbcmac(aes)
driver : cbcmac(aes-aesni)
module : ccm
priority : 300
refcnt : 5
selftest : passed
internal : no
type : shash
blocksize : 1
digestsize : 16
...
```
利用 API 實作 deflate 壓縮函式如下:
```c=
void deflate_compress(const char *input,
unsigned int input_len,
char *output,
unsigned int *output_len)
{
struct crypto_comp *comp;
struct comp_request *req;
comp = crypto_alloc_comp("deflate", 0, 0);
if (IS_ERR(comp)) {
pr_err("Failed to allocate compression object\n");
return;
}
int ret = crypto_comp_compress(comp, input, input_len, output, output_len);
if (ret) {
pr_err("Compression failed with error code: %d\n", ret);
crypto_free_comp(comp);
return;
}
crypto_free_comp(comp);
return;
}
```
其中 `crypto_alloc_comp` 建立一物件,其包含此次壓縮的資訊,包含用的演算法本身和其他的 context 資訊。而主要執行壓縮的函式為 `crypto_comp_compress`,這裡選擇的演算法為`Deflate`。壓縮過的資料大小(有幾個 Bytes),可以透過傳入一個整數指標獲得。值得注意的是不可以透過 `strlen` 直接取得壓縮後的資料長度,因為 `strlen` 用於計算以 null 字元 ('\0') 結尾的字串長度,經過 `Deflate` 壓縮的字串可能包含任意的二進位數據,其中可能包含 null 字元。
壓縮後的資料就可以儲存進 cache 或 透過 http 傳送,如果要透過 http 傳送,記得要在 header 加上 `Content-Encoding: 演算法名字`。
用 `Postman Agent` 測試一下,傳送`http://192.168.0.108:8081/home/jeremy/linux2023/khttpd/htstress.c` 的 request,上面為沒有壓縮的 `Content-Length` 為 16436,下面有壓縮的則是 5597。
![](https://hackmd.io/_uploads/rJqIGKDLh.png)
![](https://hackmd.io/_uploads/HkwrWKD83.png)
以下為 demo 影片:
{%youtube wGaFudLghUw%}
參考 [How to use Linux kernel crypto compression](https://wangzhou.github.io/How-to-use-Linux-kernel-crypto-compression/)
參考 [Crypto Subsystem of Linux Kernel - Overview](https://yushuanhsieh.github.io/post/2022-02-15-linux-kernel-crypto/)