contributed by < yeh-sudo
>
$ uname -a
Linux andyyeh-ubuntu 6.5.0-21-generic #21~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Fri Feb 9 13:32:52 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
Copyright (C) 2021 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 44 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Vendor ID: AuthenticAMD
Model name: AMD Ryzen 7 4800HS with Radeon Graphics
CPU family: 23
Model: 96
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
Stepping: 1
Frequency boost: enabled
CPU max MHz: 2900.0000
CPU min MHz: 1400.0000
BogoMIPS: 5788.79
Virtualization features:
Virtualization: AMD-V
Caches (sum of all):
L1d: 256 KiB (8 instances)
L1i: 256 KiB (8 instances)
L2: 4 MiB (8 instances)
L3: 8 MiB (2 instances)
開發 mt-Redis 前,首先要了解整個資料庫如何儲存與處理資料,因為所有的命令與操作都建立在這些資料結構之上,會探討的資料結構有 SDS (simple dynamic string, sds
) 、 鏈結串列 (list
) 、 Userspace RCU 、 字典 (q_dict
) 與儲存命令的 redisCommand
,接著了解伺服器的事件與多執行緒如何處理任務,包含 q_eventloop
、 q_thread
、 q_worker
與 q_master
,最後再從 server.h
中的 redisServer
和 client
還有 redisDb
了解 mt-Redis 整體的架構與如何運作。
與 SDS 相關的程式碼都在 src/sds.h
與 src/sds.c
。
__attribute__((__packed__))
在每一個 SDS 的 header 前面都加上了 __attribute__((__packed__))
,根據 GCC 的描述,作用是讓結構中的資料可以對齊,以最大程度減少記憶體開銷。
The packed attribute specifies that a variable or structure field should have the smallest possible alignment—one byte for a variable, and one bit for a field, unless you specify a larger value with the aligned attribute.
以 SDS 的 sdshdr32
為例。
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
根據 C 語言規格書的 6.7.2.1 的第 16 點,在結構中的最後一個元素是未完成的陣列時,叫做 flexible array member ,在大多數情況下,使用 sizeof
時會忽略這個元素,所以使用 sizeof
計算 sdshdr32
的大小時,會忽略其中的 buf
,計算出來的結果,使用 packed
時,整個結構的大小為 9 ,不使用時大小為 12 ,根據你所不知道的 C 語言:記憶體管理、對齊及硬體特性,為了讓資料位置為 4 的倍數,編譯器在分配記憶體時,會按照宣告的型態去做對齊,所以在 flags
後面填充了 3 個 bytes ,讓結構體的大小變成 12 。
As a special case, the last element of a structure with more than one named member may have an incomplete array type; this is called a flexible array member. In most situations, the flexible array member is ignored. In particular, the size of the structure is as if the flexible array member were omitted except that it may have more trailing padding than the omission would imply.
➜ andyyeh@andyyeh-ubuntu ~/linux2024/sds_test ./main
size of packed sdshdr32: 9
size of unpacked sdshdr32: 12
所以,在記憶體中, sdshdr32
每個元素都緊密排列在一起,可以畫出下圖。
|-------------- sdshdr32 ---------------|----------- buf[] ------------|
| <len> | <alloc> | <flags> | <String> |
| 4 Bytes | 4 Bytes | 1 Byte | n Bytes + 1 Byte |
sdshdr
SDS 分為五種,分別為 sdshdr5
、 sdshdr8
、 sdshdr16
、 sdshdr32
、 sdshdr64
,其中 sdshdr5
不會被使用到,而每個 sdshdr
後面的數字代表能儲存最常字串的長度, sdshdr8
儲存長度介於 sdshdr16
儲存長度 sdshdr8
,若長度比 sdshdr16
。
len
len
為 SDS 中字串的長度,定義了 len
,就可以在 strlen
或是使用迴圈計算,在 sds.h
中也定義了 sdslen
這個函式,當作是取得 SDS 長度的 API 。
alloc
alloc
為建立新的 SDS 時,所配置 buf
的大小,但不包含 \0
,在 sds.h
中也定義了 sdsalloc
這個函式,當作是取得 SDS 配置時的大小的 API ,另外, sdsavail
這個函式的作用,是作為取得當前剩餘可用空間的 API ,本質上就是 alloc
扣掉 len
,當增加字串長度,會先使用 sdsavail
檢查剩餘可用空間,若空間不足,就會使用 sdsMakeRoomFor
增加字串的空間。
flags
使用 1 個 bytes 的大小儲存 sdshdr
的型態,只使用到這個 bytes 中最小的 3 個 bits ,檢查 SDS 是哪一種型態時,只需要使用定義好的 SDS_TYPE_MASK
與 flags
進行 &
操作,就可以得到 sdshdr
的型態。
/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
了解完 sdshdr
的定義之後,可以知道 buf
就是儲存字串資料的地方,但是再看到 sds
的型態,只是一個指標,如下所示,而不是直接使用 sdshdr
,會這樣做,是因為使用者可以將 sds
看作是一般的字串來使用,可以直接使用 <string.h>
中的函式,也可以使用 mt-Redis 提供的 API 進行快速獲取字串長度等操作。
typedef char *sds;
建立 SDS 使用的函式為 sdsnewlen(const void *init, size_t initlen)
, init
是要放入 buf
中的字串轉形成 void *
的型態, initlen
則是字串長度。首先,透過 sdsReqType
檢查必須要使用哪一種長度的 SDS ,若長度小於 sdshdr5
時,轉換成 sdshdr8
,接著依照使用的 type
取得 sdshdr
的長度,進行到這邊,完成前置作業,並且宣告之後要用到的變數,像是 sh
與 fp
等。
void *sh;
sds s;
char type = sdsReqType(initlen);
/* Empty strings are usually created in order to append. Use type 8
* since type 5 is not good at this. */
if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
int hdrlen = sdsHdrSize(type);
unsigned char *fp; /* flags pointer. */
配置記憶體, sh
為整個 sdshdr
,包含 len
、 alloc
、 flags
與 buf
,因為 initlen
沒有計算到 \0
,所以要額外加 1 ,最關鍵的地方, s = (char*)sh+hdrlen
,因為 sh
是 void *
型態,所以透過顯示轉型轉為 char *
,根據 C 語言規格書 6.2.5 第 28 點提到, void *
與 char *
在記憶體中的表示和對齊是相同的,所以可以相互轉換,轉換成 char *
之後,加上 hdrlen
,也就是使用 sizeof
取得結構的大小,但並不包含 buf
,在解釋 __attribute__((__packed__))
也有提過,以 sdshdr32
為例,得出 9 ,因為記憶體對齊的特性, flags
後面接續的就是 buf
,所以 (char *)sh
加上 hdrlen
後,會指向 buf
的開頭,而 sdsnewlen
最後回傳的值就是這個 s
,這就是為什麼可以將 mt-Redis 的 sds
當作一般的字串操作,而不用另外實作其他 API ,並且,可以透過記憶體對齊的特性找出 sdshdr
中的其他元素, fp
就是透過這種方式找到 sdshdr
中的 flags
。
A pointer to void shall have the same representation and alignment requirements as a pointer to a character type.
sh = s_malloc(hdrlen+initlen+1);
if (!init)
memset(sh, 0, hdrlen+initlen+1);
if (sh == NULL) return NULL;
s = (char*)sh+hdrlen;
fp = ((unsigned char*)s)-1;
最後就是將 init
字串的內容複製到 s
裡面並且補上 \0
。
if (initlen && init)
memcpy(s, init, initlen);
s[initlen] = '\0';
return s;
list
)與鏈結串列相關的程式碼都在 src/adlish.h
與 src/adlist.c
。
listNode
listNode
是鏈結串列中的節點, prev
指向前一個節點, next
指向下一個節點, value
則是節點保存的值,使用 void *
型態,可以使用顯示轉型轉成我們要的型態。
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;
list
head
指向鏈結串列的開頭, tail
指向鏈結串列的結尾,另外, list
結構提供了三個函式指標 dup
、 free
與 match
,可以讓使用者傳入自己定義的函式, dup
為複製鏈結串列, free
為釋放鏈結串列,而在進行搜尋的時候,若有定義 match
函式,則搜尋時會使用 match
函式進行配對,確認要找的 key
與 listNode
中的 value
相同。
typedef struct list {
listNode *head;
listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned long len;
} list;
listIter
mt-Redis 的鏈結串列還提供了迭代器, next
指向現在的節點, direction
代表走訪鏈結串列的方向,在 adlist.h
中定義了兩個巨集, AL_START_HEAD
代表從頭開始走訪, AL_START_TAIL
代表從鏈結串列的尾端以倒序的方式走訪。
typedef struct listIter {
listNode *next;
int direction;
} listIter;
/* Directions for iterators */
#define AL_START_HEAD 0
#define AL_START_TAIL 1
使用 listIter
要搭配 listNext
函式,在 listNext
函式有說明如何搭配迭代器,範例如下。
iter = listGetIterator(list, <direction>);
while ((node = listNext(iter)) != NULL) {
doSomethingWith(listNodeValue(node));
}
使用 listCreate
建立雙向鏈結串列,並透過 listAddNodeHead
將陣列中的元素加入鏈結串列的開頭,最後使用迭代器搭配 listNext
走訪鏈結串列並輸出 value
中的值。
#include <stdio.h>
#include "adlist.h"
int main()
{
char a[5] = {'a', 'b', 'c', 'd', 'e'};
list *list = listCreate();
for (int i = 0; i < 5; i++){
char *ptr = &a[i];
listAddNodeHead(list, (void *)ptr);
}
listIter *iter = listGetIterator(list, AL_START_HEAD);
listNode *node;
while ((node = listNext(iter)) != NULL) {
printf("%c\n", *(char *)node->value);
}
return 0;
}
將陣列的內容都加入鏈結串列後的狀態如下圖。
針對 mt-redis 中使用 urcu 的 lock-free RCU hash table 進行討論。
介紹幾個比較重要且會在 mt-Redis 的 q_dict
中使用的 API ,其他的函式或是巨集可以參考 The URCU hash table API 。
caa_container_of
這個巨集的作用跟 Linux 核心的 container_of
一模一樣,利用 caa_container_of
,藉由 User-space RCU 提供的公開介面,反向去存取到自行定義的結構體開頭地址。
/*
* caa_container_of - Get the address of an object containing a field.
*
* @ptr: pointer to the field.
* @type: type of the object.
* @member: name of the field within the object.
*/
#define caa_container_of(ptr, type, member) \
__extension__ \
({ \
const __typeof__(((type *) NULL)->member) * __ptr = (ptr); \
(type *)((char *)__ptr - offsetof(type, member)); \
})
對比 Linux 核心原始程式碼巨集: container_of
就可以發現兩者在實作上的手法是相同的。
cds_lfht_node
與 Linux 核心的鏈結串列類似, lock-free RCU hash table 提供了 cds_lfht_node
作為將 hash table 連結起來的媒介,可以使用 caa_container_of
這個巨集取得整個結構,類似 Linux 核心中 container_of
,但沒有提供 key 和 value 相關的操作,使用者必須自行定義 hash function 與 key-value 的形式。
struct cds_lfht_node {
struct cds_lfht_node *next; /* ptr | REMOVAL_OWNER_FLAG | BUCKET_FLAG | REMOVED_FLAG */
unsigned long reverse_hash;
} __attribute__((aligned(8)));
cds_lfht_iter
與 mt-Redis 的鏈結串列一樣, lock-free RCU hash table 提供了迭代器, 搭配 cds_lfht_next
函式使用,可以走訪整個鏈結串列。
/* cds_lfht_iter: Used to track state while traversing a hash chain. */
struct cds_lfht_iter {
struct cds_lfht_node *node, *next;
};
cds_lfht_new
cds_lfht_new
用於建立新的 hash table , init_size
代表 hash table 中初始的 bucket 數量, min_nr_alloc_buckets
表示最少的 bucket 數量, max_nr_buckets
代表最多的 bucket 數量,當這個值為 0 時,代表最大 bucket 的數量沒有限制,以上這三個參數都必須要是 2 的冪次, flags
代表這個 hash table 的特性,代入 CDS_LFHT_AUTO_RESIZE
代表 hash table 會自動調整大小,代入 CDS_LFHT_ACCOUNTING
代表會紀錄加入與移除的資料數量,這個特性也允許 hash table 減少自身的大小,當 flags
設置為 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING
時,允許 hash table 大小變大與縮小。
static inline
struct cds_lfht *cds_lfht_new(unsigned long init_size,
unsigned long min_nr_alloc_buckets,
unsigned long max_nr_buckets,
int flags,
pthread_attr_t *attr)
{
return _cds_lfht_new(init_size, min_nr_alloc_buckets, max_nr_buckets,
flags, NULL, &rcu_flavor, attr);
}
cds_lfht_lookup
這個函式是透過 match
這個使用者定義的函式,以特定的 key
尋找相對應的值,必須要在持有 rcu_read_lock
時才能呼叫,且呼叫這個函式的執行緒必須要是 RCU read-side 的執行緒。若沒有找到相對應的值, iter
會設為 NULL
。
extern
void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
cds_lfht_match_fct match, const void *key,
struct cds_lfht_iter *iter);
cds_lfht_add_replace
用指定的 key
替換節點,如果不存在這樣的節點,則添加一個節點。如果存在這樣的節點,則返回被替換的節點,否則返回 NULL
。呼叫這個函式時必須處於 RCU read-side 臨界區,並且必須已經註冊為 RCU 讀者 (使用 rcu_register_thread
) 。
extern
struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
unsigned long hash,
cds_lfht_match_fct match,
const void *key,
struct cds_lfht_node *node);
cds_lfht_iter_get_node
回傳迭代器指向的節點。
static inline
struct cds_lfht_node *cds_lfht_iter_get_node(struct cds_lfht_iter *iter)
{
return iter->node;
}
q_dict
)與字典相關的程式碼在 src/q_dict.h
與 src/q_sict.c
。
q_dict
建立 q_dict
每有對應的函式,在 server.c
中是直接進行記憶體配置,並用上述提到的 cds_lfht_new
建立 lock-free RCU hast table ,並將字典的大小設為 0 。
typedef struct q_dict {
unsigned int size;
struct cds_lfht *table;
void *privdata;
} q_dict;
q_dictEntry
建立 q_dictEntry
使用的是 q_createDictEntry
函式,參數為一個 SDS 以及一個 robj
, robj
在 server.h
中有定義,完整的名字為 redisObject
, SDS 會做為 q_dictEntry
的 key ,而 redisObject
則是 q_dictEntry
的值,所以在結構中的 v->val
會指向 redisObject
,而 s64
則是這個物件過期的時間, node
就是用來放入 lock-free RCU hash table 中的節點, rcu_head
比較特別,在 User-space RCU 中的 rcu-api.md 文件有說明,要搭配 call_rcu
一起使用,為這個 rcu_head
註冊一個函式,且這個函式會在寬限期結束時執行。
typedef struct q_dictEntry {
unsigned type:4; // four data structure types: string, list, set, zset and hash
void *key;
union {
void *val;
int64_t s64;
} v;
struct cds_lfht_node node;
struct rcu_head rcu_head;
} q_dictEntry;
q_dictEntry
中的 val
儲存的值就是 robj
,也就是 redisObject
這個結構,這個結構是儲存 q_dictEntry
中字典的值的真正地方,並且用 type
表示這個值是字串、鏈結串列或是集合。 redisObject
可以透過 createObject
這個函式建立,使用時必須將值的型態以及指向值得指標代入參數中。
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
int refcount;
void *ptr;
} robj;
建立一個 q_dictEntry
使用的 q_createDictEntry
函式。
q_dictEntry *q_createDictEntry(sds key, robj* val) {
q_dictEntry *de = NULL;
de = zmalloc(sizeof(*de));
cds_lfht_node_init(&de->node);
de->key = key;
de->v.val = val;
return de;
}
q_dictIterator
q_dictIterator
是用來走訪字典的迭代器,包含了 cds_lfht_iter
,也就是 lock-free RCU hast table 的迭代器,以及指向當前字典的指標。
typedef struct q_dictIterator {
q_dict *d;
struct cds_lfht_iter iter;
} q_dictIterator;
使用 q_dictGetIterator
可以建立一個新的迭代器。
q_dictIterator *q_dictGetIterator(q_dict *ht) {
q_dictIterator *iter = zmalloc(sizeof(*iter));
iter->d = ht;
cds_lfht_first(ht->table, &iter->iter);
return iter;
}
q_dictSdsKeyCaseMatch
這個函式是要 mt-Redis 傳入 cds_lfht_lookup
與 cds_lfht_add_replace
這兩個函式的參數,目的是要比較節點中的 key
與我們要找的 key
是否一致。
int q_dictSdsKeyCaseMatch(struct cds_lfht_node *ht_node, const void *key) {
struct q_dictEntry *de = caa_container_of(ht_node, struct q_dictEntry, node);
return strcasecmp(de->key, key) == 0;
}
q_dictAdd
加入新節點時使用的函式,使用 cds_lfht_add_replace
將節點加入 hash table ,根據上面對這個函式的描述,必須處在 read-side 的臨界區,所以要加上 rcu_read_lock
與 rcu_read_unlock
,且如果 hash table 中有相同的 key
,會以新的值取代並回傳舊的物件,這個時候會使用 call_rcu
註冊釋放資源的函式,在寬限期結束時釋放記憶體空間以及資源,若 key
不存在 hash table 中,則加入新節點,並增加 hash table 的大小。
int q_dictAdd(q_dict *d, sds key, robj* val) {
struct cds_lfht_node *ht_node;
unsigned long hash;
struct q_dictEntry *de;
rcu_read_lock();
de = q_createDictEntry(key, val);
hash = dictSdsHash(key);
ht_node = cds_lfht_add_replace(d->table, hash, q_dictSdsKeyCaseMatch, key,
&de->node);
if (ht_node) {
struct q_dictEntry *ode = caa_container_of(ht_node, struct q_dictEntry, node);
call_rcu(&ode->rcu_head, q_freeRcuDictEntry);
rcu_read_unlock();
return DICT_REPLACED;
} else {
++d->size;
rcu_read_unlock();
return DICT_OK;
}
return DICT_OK;
}
q_freeRcuDictEntry
這個函式與 q_freeRcuDictExpirationEntry
一樣,都是 call_rcu
註冊時會使用的函式,目的都是在寬限期結束時將節點的占用的資源釋放,透過 caa_container_of
取得節點所在的物件,就會使用 q_freeDictEntry
釋放資源。
void q_freeRcuDictEntry(struct rcu_head *head) {
struct q_dictEntry *de = caa_container_of(head, struct q_dictEntry, rcu_head);
q_freeDictEntry(de);
}
q_thread
)q_thread
相關程式碼在 src/q_thread.h
與 src/q_thread.c
中。
q_thread
結構在 q_thread
中使用的是 POSIX thread ,閱讀 pthreads(7) ,每一個執行緒都有唯一的 thread ID ,會儲存在 pthread_t
這個型態的變數中,也就是 q_thread
結構中的 thread_id
, q_thread_func_t
是執行緒所要執行的函式, data
則是執行 fun_run
函式所需要的資料。
typedef void *(*q_thread_func_t)(void *data);
typedef struct q_thread {
int id;
pthread_t thread_id;
q_thread_func_t fun_run;
void *data;
} q_thread;
q_thread_start
閱讀 pthread_create(3) , q_thread_start
是用 pthread_create
建立新的執行緒並執行 q_thread_run
函式,而 thread
則是被當作 q_thread_run
的參數。
int q_thread_start(q_thread *thread) {
pthread_attr_t attr;
pthread_attr_init(&attr);
if (thread == NULL || thread->fun_run == NULL) {
return C_ERR;
}
pthread_create(&thread->thread_id,
&attr, q_thread_run, thread);
return C_OK;
}
使用 pthread_create
建立新的執行緒之後,會執行 q_thread_run
函式, data
就是在 pthread_create
被當作參數傳遞進來的 q_thread
,首先會設定亂數種子,接著執行 q_thread
中儲存的 fun_run
函式。
static void *q_thread_run(void *data) {
q_thread *thread = data;
srand(ustime() ^ (int) pthread_self());
return thread->fun_run(thread->data);
}
與事件循環相關的程式碼在 src/q_eventloop.h
、 src/q_eventloop.c
、 src/ae.h
、 src/ae.c
與 src/ae_epoll.c
中。
事件循環由四個基本組件所組成,分別是 file event 、 時間事件 (time event) 、 fired event 與 epoll 模組。
File event 指的是非同步處理 I/O 操作的系統,在事件循環上運作,這個系統會監聽 file descriptors 上發生的各種事件,一個 file descriptors 就代表一個連接到用戶的 socket ,對每個操作都有一個對應的事件處理器,而 file event 的類型一共有三種。
AE_READABLE
代表監聽的 file descriptors 上有數據可以讀取,通常是用戶對伺服器發出請求與命令,也就是將資料寫入 socket ,或是想要進行連線時,就會產生 AE_READABLE
事件。
#define AE_READABLE 1 /* Fire when descriptor is readable. */
AE_WRITABLE
代表 file descriptors 可以接受伺服器的資訊,也就是伺服器將資料寫入 socket ,通常在用戶想要讀取或查詢資料庫的資料時,產生 AE_WRITABLE
事件。
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
AE_BARRIER
代表先執行 AE_WRITABLE
事件後再執行 AE_READABLE
的事件。
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
READABLE event already fired in the same event
loop iteration. Useful when you want to persist
things to disk before sending replies, and want
to do that in a group fashion. */
file event 的結構是由 mask
、 rfileProc
、 wfileProc
與 clientData
所組成。 mask
為決定這個事件是 AE_READABLE
或是 AE_WRITABLE
的變數,也會儲存這個事件是否為 AE_BARRIER
, rfileProc
與 wfileProc
分別為 AE_READABLE
事件與 AE_WRITABLE
事件的處理函式,而 clientData
則是要處理的資料。
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
建立一個新的 file event 使用的函式為 aeCreateEventLoop
,參數為指向事件循環的指標 eventLoop
,還有 file descriptor ,也就是 socket 本身,代表事件類型的 mask
,最後是對應的處理函式 proc
與要處理的資料,回傳的值代表有沒有成功建立事件,函式中使用的 aeApiAddEvent
會在講解 epoll 時介紹。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
時間事件是用來處理定時任務的機制,可以是一次性的,也就是定時事件,也可以是週期性的,也就是週期事件,在事件循環中,會定期檢查和觸發時間事件以執行特定的操作。在事件循環中,會優先處理 file event ,
在指定的時間點執行一次,就叫做定時事件。
每隔一段固定的時間就執行一次,叫做週期事件。分辨一個時間事件是定時事件還是週期事件,取決於建立事件時,處理函式的回傳值,若處理函式回傳 AE_NOMORE
,則這個事件是一個定時事件,若回傳的值不是 AE_NOMORE
,則根據處理函式的回傳值設定下次事件觸發的時間。
#define AE_NOMORE -1
時間事件的結構是由 7 個變數所組成, when_sec
代表幾秒之後要觸發事件, when_ms
代表多少毫秒之後要觸發事件, timeProc
則是時間事件對應的處理函式, finalizerProc
則是在事件結束之後會呼叫的函式, clientData
為要處理的資料, next
則指向下一個時間事件。
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
建立時間事件使用的函式為 aeCreateTimeEvent
,原則上與建立 file event 的函式大同小異,要注意的是決定多久要觸發的時間參數 milliseconds
單位是毫秒,在函式中會透過 aeAddMillisecondsToNow
算出 when_sec
與 when_ms
,而加入到時間事件時到事件循環結構中的 timeEventHead
時,永遠都是加到鏈結串列的開頭。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
以建立三個時間事件為例,建立之後鏈結串列如下所示。
儲存待處理的 file 事件。
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
檔案事件
已提交的事件、即將發生的事件
閱讀 epoll(7) , epoll API 可以監控多個 file descriptor ,使用方式可以是 edge-triggered 或是 edge-triggered 而且對於大量的 file descripotr 有很好的擴展性。 epoll 是 Linux 核心中的一種資料結構,以 user-space 的觀點來看, epoll 是兩個列表的容器,分別為 interest list 與 ready list 。
簡單介紹 epoll 之後,來介紹幾個 epoll 的系統呼叫。首先介紹 epoll_create
,這個函式建立一個 epoll 的實例,在 Linux 核心 2.6.8 之後的版本,會忽略參數 size
,但這個值必須大於 0 ,回傳一個代表 epoll 實例的 file descriptor ,接下來所有呼叫 epoll 界面的操作都要透過這個 file descriptor 。
int epoll_create(int size);
epoll_ctl
,這個系統呼叫是用來新增、修改或移除 epfd
所代表的 epoll 實例 interest list 中的元素, op
則是要對目標 file descriptor ,也就是 fd
所要做的操作,而 event
則是 fd
所代表的 epoll_event
物件, op
有以下三種。
EPOLL_CTL_ADD
在 epfd
所代表的 epoll 實例的 interest list 中加入一個元素,這個元素包括 fd
與 event
。
EPOLL_CTL_MOD
將 interest list 中 fd
的設定改成新的 event
的設定。
EPOLL_CTL_DEL
將 fd
從 interest list 中移除, event
會被忽略且可以為 NULL
。
epoll_event
這個結構中有一個變數 events
,代表這個事件的類型,其中,會被 mt-Redis 使用到的有以下幾種。
EPOLLIN
相對應的 file 已經準備好進行 read
操作。
EPOLLOUT
相對應的 file 已經準備好進行 write
操作。
EPOLLERR
相關的 file decriptor 上發生錯誤,在呼叫 epoll_ctl
時不一定要加入這個參數,因為 epoll_wait
遇到錯誤都會回傳 EPOLLERR
。
EPOLLHUP
對應的 file decriptor 上發生掛斷 (hang up) 的狀況 , epoll_wait
會等待這個事件,所以在 epoll_ctl
中不一定要設置。
int epoll_ctl(int epfd, int op, int fd,
struct epoll_event *_Nullable event);
epoll_wait
會等待 epfd
上的事件, events
是用來回傳 ready list 的資訊,也就是 interest list 中準備好進行 I/O 操作的事件, maxevents
代表最多可以回傳的 ready list 大小, timeout
代表 epoll_wait
會阻塞多少毫秒,若 timeout
設為 -1 ,則 epoll_wait
會無期限阻塞,若設為 0 會讓 epoll_wait
立即回傳,而 epoll_wait
會阻塞直到以下三種條件滿足其中一種,一個 file descriptor 準備好進行 I/O 操作,被信號打斷,或是超過 timeout
的毫秒數, epoll_wait
的回傳值代表有多少 file descriptor 準備好了,也就是 ready list 的大小。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
mt-Redis 使用 epoll API 監控多個連線的 file descriptor ,使用 aeApiState
這個結構使用 epfd
變數儲存 epoll 實例與使用 events
變數儲存 ready list 的事件。
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
透過 aeApiCreate
對 epoll 模組進行初始化,設定 aeApiState
結構中 events
的大小,以及使用 epoll_create
建立 epoll 實例,最後將初始化好的 epoll 模組儲存在事件循環結構 eventLoop
中的 apidata
。
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
新增或修改監聽的事件,需要傳入的參數有 file descriptor 與判斷讀寫操作的 mask
,若儲存事件的陣列中已經有同樣的事件了,就使用 EPOLL_CTL_MOD
修改事件,若沒有就使用 EPOLL_CTL_ADD
新增事件,再透過 epoll_ctl
對事件進行新增或修改的動作。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
aeApiPoll
透過 epoll_wait
取得已經準備好進行 I/O 操作的事件,並判斷這個事件的類型,最後儲存到事件循環 eventLoop
中的 aeFiredEvent
,代表執行緒可以處理這些事件了。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
伺服器的事件循環是在 initServer
函式中初始化,先使用 q_eventloop_init
初始化事件循環,接著初始化 socket ,使用 listenToPort
開啟 TCP socket ,並且取得所有的 socket file descriptor ,也就是 server.ipfd
,最後建立 server_event_process
file 事件與 serverCron
時間事件,用來統計伺服器的數據與資料。
void initServer(void)
{
...
q_eventloop_init(&server.qel, server.maxclients + CONFIG_FDSET_INCR);
server.el = server.qel.el;
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, server.socketpairs) < 0 ) {
serverLog(LL_WARNING, "unable to create server socketpairs");
exit(1);
}
if (anetNonBlock(NULL, server.socketpairs[0]) < 0 ) {
serverLog(LL_WARNING, "Failed to set server.socketpairs[0] to non-blocking.");
exit(1);
}
if (anetNonBlock(NULL, server.socketpairs[1]) < 0 ) {
serverLog(LL_WARNING, "Failed to set server.socketpair[1] to non-blocking.");
exit(1);
}
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
...
/* Create the serverCron() time event, that's our main way to process
* background operations. */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create the serverCron time event.");
exit(1);
}
...
if (aeCreateFileEvent(server.el, server.socketpairs[0], AE_READABLE,
server_event_process, NULL)== AE_ERR) {
serverPanic("Can't create server event process") ;
exit(1);
}
...
}
Master 事件循環的初始化是透過 q_master_init
函式進行,一樣是透過 q_eventloop_init
初始化事件循環,然後將執行緒要執行的函式設為 master_thread_run
,最後再進行 setup_master
。
int q_master_init(void) {
int filelimit;
filelimit = server.threads_num * 2 + 128;
q_eventloop_init(&master.qel, filelimit);
master.qel.thread.fun_run = master_thread_run;
setup_master();
return C_OK;
}
在 setup_master
中,會對每一個伺服器的 file descriptor 綁定一個 file 事件,也就是在初始化伺服器事件循環時,使用 listenToPort
取得的 server.ipfd
,事件對應的處理函式為 acceptTcpHandler
。
static int
setup_master(void) {
int j;
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(master.qel.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(master.qel.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR)
serverPanic("Unrecoverable error creating server.sofd file event.");
return C_OK;
}
在 server.c
中的 initServer
函式,會使用 q_workers_init
初始化 worker 執行緒。
void initServer(void)
{
...
if(q_workers_init(server.threads_num) != C_OK) {
serverLog(LL_WARNING, "Init worker threads failed.");
exit(1);
}
...
}
接著看 q_worker.c
中的 q_workers_init
,初始化 workers
陣列,配置 worker_count
的大小,接著對每一個執行緒使用 q_worker_init
函式再做初始化,初始化之後,再用 setup_worker
建立處理事件的 file 事件與 worker_cron
時間事件。
int
q_workers_init(uint32_t worker_count) {
int status;
uint32_t idx;
q_worker *worker;
darray_init(&workers, worker_count, sizeof(q_worker));
for (idx = 0; idx < worker_count; idx++) {
worker = darray_push(&workers);
q_worker_init(worker);
worker->id = idx;
worker->qel.id = idx;
status = setup_worker(worker);
if (status != C_OK) {
exit(1);
}
}
num_worker_threads = (int) darray_n(&workers);
return C_OK;
}
在每一個執行緒的初始化函式 q_worker_init
中,會對 socket 與執行緒的事件循環進行初始化,初始化事件循環時使用的是 q_eventloop_init
函式,這個函式會使用 q_thread_init
對執行緒進行初始化,最重要的是,使用 aeCreateEventLoop
建立事件循環,其中就有使用到上述的 epoll API ,也就是 aeApiCreate
函式,對 file descriptor 進行初始化,設置完事件循環後,就回到 q_worker_init
函式,設定執行緒所要執行的 worker_thread_run
函式與資料。 這邊的 socket 是 worker 與伺服器溝通的 socket ,並不是與使用者的溝通管道。
int
q_worker_init(q_worker *worker) {
...
adjustOpenFilesLimit();
q_eventloop_init(&worker->qel, server.maxclients);
worker->qel.thread.fun_run = worker_thread_run;
worker->qel.thread.data = worker;
...
return C_OK;
}
初始化執行緒之後,接著使用 setup_worker
建立執行緒處理事件的 file 事件,以及並建立一個時間事件用來處理背景任務。 file 事件對應的函式為 worker_thread_event_process
,用來處理與 worker 與伺服器之間的溝通,監聽的 socket 為 socketpairs[1]
,而 socketpairs[0]
是給伺服器與 master 執行緒使用的。時間事件對應的函式為 worker_cron
,用來統計與 worker 相關的數據。
static int
setup_worker(q_worker *worker) {
int status;
status = aeCreateFileEvent(worker->qel.el, worker->socketpairs[1], AE_READABLE,
worker_thread_event_process, worker);
if (status == AE_ERR) {
serverLog(LL_WARNING, "Unrecoverable error creating worker ipfd file event.");
return C_ERR;
}
aeSetBeforeSleepProc(worker->qel.el, worker_before_sleep, worker);
/* Create the worker_cron() time event, that's our main way to process
* background operations. */
if (aeCreateTimeEvent(worker->qel.el, 1, worker_cron, worker, NULL) == AE_ERR) {
serverPanic("Can't create the worker_cron time event.");
return C_ERR;
}
return C_OK;
}
所有執行緒都執行完 q_worker_init
與 setup_worker
之後,就完成 worker 事件循環系統的初始化,程式的執行流程如下。
Master 事件循環在初始化的時候,監控的 file descriptor 與 file 事件都是與連線有關的,而 master 事件循環最主要就是負責處理使用者的連線,使用 GDB 追蹤 master 事件循環的運作,將斷點設置在 src/q_master.c
中的 acceptTcpHandler
函式,在另外一個終端機進行連線, GDB 的結果如下。首先, mt-Redis 會切換到 master 事件循環所在的執行緒,因為監控的 file descriptor 準備好進行 I/O 操作,也就是使用者的連線,所以會進入到這個 file 事件已經註冊好的函式 acceptTcpHandler
,結束之後,又會回到 aeProcessEvents
函式,也就是事件循環處理 ready list 的函式,最後處理時間事件,完成後,回到 aeMain
的迴圈繼續下一個事件循環。
[New Thread 0x7ffff0e74600 (LWP 57896)]
[Switching to Thread 0x7ffff0e74600 (LWP 57896)]
Thread 13 "redis-server" hit Breakpoint 1, acceptTcpHandler (el=0x7ffff784b3c8, fd=7, privdata=0x0, mask=1) at q_master.c:110
110 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
(gdb) n
116 while(max--) {
(gdb) n
117 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
(gdb) n
118 if (cfd == ANET_ERR) {
(gdb) n
124 serverLog(LL_DEBUG,"Accepted %s:%d", cip, cport);
(gdb) n
125 acceptCommonHandler(cfd,0,cip);
(gdb) n
116 while(max--) {
(gdb) n
117 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
(gdb) n
118 if (cfd == ANET_ERR) {
(gdb) n
119 if (errno != EWOULDBLOCK)
(gdb) n
122 return;
(gdb) n
127 }
(gdb) n
aeProcessEvents (eventLoop=0x7ffff784b3c8, flags=3) at ae.c:432
432 fired++;
(gdb) n
436 if (fe->mask & mask & AE_WRITABLE) {
(gdb) n
445 if (invert && fe->mask & mask & AE_READABLE) {
(gdb) n
452 processed++;
(gdb) n
405 for (j = 0; j < numevents; j++) {
(gdb) n
456 if (flags & AE_TIME_EVENTS)
(gdb) n
457 processed += processTimeEvents(eventLoop);
(gdb) n
459 return processed; /* return the number of processed file/time events */
(gdb) n
460 }
(gdb) n
aeMain (eventLoop=0x7ffff784b3c8) at ae.c:486
486 while (!eventLoop->stop) {
(gdb)
在 initServer
函式中,為伺服器事件循環建立了 file 事件,對應的處理函式為 server_event_process
,使用 GDB 追蹤伺服器事件循環的執行過程,將斷點設置在 src/server.c
的 server_event_process
函式中,接著透過 redis-cli
執行命令,並觀察 GDB 的執行 mt-Redis 的過程。
redis-cli
在使用者端執行命令 SET mykey "Hello World"
,建立一個鍵值對,鍵為 mykey
而值為 "Hello World"
字串。
➜ andyyeh@andyyeh-ubuntu ~/mt-redis/src git:(main) ✗ ./redis-cli
127.0.0.1:6379> SET mykey "Hello World"
OK
(115.66s)
127.0.0.1:6379>
mt-Redis Server
當使用者輸入命令時,伺服器的事件循環就會監控到伺服器的 server.socket[0]
準備進行 I/O 操作,接著就透過 aeProcessEvents
執行已經註冊好的 file 事件函式 server_event_process
,從 command_requests_head
取出使用者的命令,透過 server_processCommand
處理之後,再將結果回傳給使用者。
Thread 1 "redis-server" hit Breakpoint 1, server_event_process (eventLoop=0x7ffff784b068, fd=4, clientData=0x0, mask=1) at server.c:1289
1289 serverAssert(fd == server.socketpairs[0]);
(gdb) n
1293 qnode = __cds_wfcq_dequeue_blocking(&server.command_requests_head, &server.command_requests_tail);
(gdb) n
1300 while(read(fd, buf, 1) == 1 || qnode != NULL ){
(gdb) n
1302 if (qnode == NULL) break;
(gdb) p buf
$1 = "r"
(gdb) n
1303 r = caa_container_of(qnode, struct q_command_request, q_node);
(gdb) n
1305 c = r->c;
(gdb) p r
$2 = (q_command_request *) 0x7ffff5f85cc8
(gdb) p r->c
$3 = (struct client *) 0x7fffefe00008
(gdb) n
1306 from = c->curidx;
(gdb) n
1307 c->qel = &server.qel;
(gdb) n
1308 c->curidx = -1;
(gdb) p c->qel
$4 = (q_eventloop *) 0x555555b11c40 <server+2752>
(gdb) n
1309 q_freeCommandRequest(r);
(gdb) n
1311 server_processCommand(c);
(gdb) n
[New Thread 0x7fffefdff600 (LWP 59176)]
1312 if (c->flags & CLIENT_SLAVE || c->cmd->flags & CMD_ADMIN) {
(gdb) n
1315 dispatch_to_worker(c, from);
(gdb) n
1317 qnode = __cds_wfcq_dequeue_blocking(&server.command_requests_head, &server.command_requests_tail);
(gdb) n
1300 while(read(fd, buf, 1) == 1 || qnode != NULL ){
(gdb) n
1319 }
(gdb) n
aeProcessEvents (eventLoop=0x7ffff784b068, flags=3) at ae.c:432
432 fired++;
(gdb) n
436 if (fe->mask & mask & AE_WRITABLE) {
(gdb) n
445 if (invert && fe->mask & mask & AE_READABLE) {
(gdb) n
452 processed++;
(gdb) n
405 for (j = 0; j < numevents; j++) {
(gdb) n
456 if (flags & AE_TIME_EVENTS)
(gdb) n
457 processed += processTimeEvents(eventLoop);
(gdb) n
459 return processed; /* return the number of processed file/time events */
(gdb) n
460 }
(gdb) n
aeMain (eventLoop=0x7ffff784b068) at ae.c:486
486 while (!eventLoop->stop) {
(gdb) n
487 if (eventLoop->beforesleep != NULL)
(gdb) n
488 eventLoop->beforesleep(eventLoop, eventLoop->bsdata);
(gdb) n
489 aeProcessEvents(eventLoop, AE_ALL_EVENTS);
(gdb) n
486 while (!eventLoop->stop) {
在 master 事件循環處理連線時,執行 acceptTcpHandler
時,會呼叫 acceptCommonHandler
函式,其中最後會透過 dispatch_conn_new
將任務分配給 worker 執行緒, buf
儲存要執行的操作,接著 write
將資料寫入選定的 worker
的 socket 中,在 worker 事件循環中,因為 file descriptor 寫入完成,可以進行讀取,所以會觸發初始化時設定好的 worker_thread_event_process
函式,因為 buf
中寫入的 'c'
,所以會執行 createClient
函式以進行建立 client
的操作。
void
dispatch_conn_new(int sd) {
struct connswapunit *su = csui_new();
char buf[1];
q_worker *worker;
if (su == NULL) {
close(sd);
/* given that malloc failed this may also fail, but let's try */
serverLog(LL_WARNING, "Failed to allocate memory for connection swap object\n");
return;
}
int tid = (last_worker_thread + 1) % server.threads_num;
worker = darray_get(&workers, (uint32_t) tid);
last_worker_thread = tid;
su->num = sd;
su->data = NULL;
csul_push(worker, su);
buf[0] = 'c';
// to be handled by worker's worker_thread_event_process loop
if (write(worker->socketpairs[0], buf, 1) != 1) {
serverLog(LL_WARNING, "Notice the worker failed.");
}
}
執行 createClient
時,會讓 worker 事件循環監聽與使用者連線的 file descriptor fd
,並建立 file 事件,處理函式為 worker_readQueryFromClient
,從使用者端讀取命令或是請求,當使用者寫入命令時, worker 事件循環就會觸發 worker_readQueryFromClient
讀取訊息。
client *createClient(q_eventloop *qel, int fd) {
client *c = zmalloc(sizeof(client));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
aeFileProc *proc = readQueryFromClient;
if (qel != &server.qel) {
// createClient inside worker thread
proc = worker_readQueryFromClient;
}
c->curidx = qel->id;
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(qel->el,fd,AE_READABLE,
proc, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
...
if (fd != -1) listAddNodeTail(qel->clients,c);
initClientMultiState(c);
return c;
}
讀取使用者的傳送的資料之後,會執行到 worker_processInputBuffer
,再呼叫 worker_processCommand
,經過層層處理,最後會將命令請求放入伺服器的 command_requests_head
,接著使用 write
將 buf
寫入,此時就會讓伺服器的事件循環觸發 server_event_process
處理使用者的請求,最後透過 dispatch_to_worker
將結果回傳給使用者。
int worker_processCommand(client *c) {
char buf[1];
struct q_command_request *r = NULL;
...
// unconditional scheduled to server thread
// later after adding RCU for RedisDB, we can handle read requests in
// worker thread and scheduled write requests to server thread
unlinkClientFromEventloop(c);
r = q_createCommandRequest(c);
/*
* pthread_mutex_lock(&server.command_request_lock);
* listAddNodeTail(server.command_requests, r);
* pthread_mutex_unlock(&server.command_request_lock);
*/
cds_wfcq_enqueue(&server.command_requests_head, &server.command_requests_tail, &r->q_node);
c->flags |= CLIENT_JUMP;
buf[0] = 'r'; /* r stands for request from worker thread */
if (write(server.socketpairs[1], buf, 1)!=1) {
//ToDo: need to proper set the socketpairs[1]'s buf size, for now, wirte to socketpairs[1] could fail when redis-benchmark instances increases.
//or number of connections increased.
//serverPanic("write to server thread's socketpair[1] from worker thread failed.");
//As server_event_process() has dealed with unmatched socketpair buff with num of request. we choose not to panic here.
serverLog(LL_VERBOSE, "write to server thread's socketpair[1] from worker thread failed.");
}
return C_SCHED;
}
執行 dispatch_to_worker
時,會透過 write
將 buf
寫入 worker 執行緒的 socket ,在 worker 事件循環中就會觸發 worker_thread_event_process
,但這次是伺服器端寫入的,寫入的 buf
是 'b'
,所以會建立一個 file 事件,處理函式為 sendReplyToClient
將結果回傳給使用者。
int dispatch_to_worker(client *c, int workerId)
{
q_worker *worker;
char buf[1];
worker = darray_get(&workers, (uint32_t)workerId);
...
unlinkClientFromServerEventloop(c);
// clear the JUMP flag so that worker event process thread can resetClient()
c->flags &= ~CLIENT_JUMP;
su->num = workerId;
rcu_assign_pointer(su->data, c);
csul_server_push(worker, su);
buf[0] = 'b';
if (write(worker->socketpairs[0], buf, 1) != 1) {
serverPanic("dispatch_to_worker: write back to worker socketpairs[1] failed.\r\n");
return C_ERR;
}
return C_OK;
}
worker_thread_event_process
處理與 master 執行緒和伺服器執行緒溝通的函式,如果 buf
值為 'c'
,則執行 createClient
函式,若值為 'b'
,則建立 file 事件,處理函式設置為 sendReplyToClient
,當使用者的 file descriptor 準備好寫入時就會觸發,將使用者請求的結果和回應回傳給使用者。
// worker threads to master/server thread communication handler function
static void
worker_thread_event_process(aeEventLoop *el, int fd, void *privdata, int mask) {
int status;
int sd;
int res = C_OK;
q_worker *worker = privdata;
char buf[1];
struct connswapunit *csu;
client *c;
q_eventloop *qel=NULL;
UNUSED(mask);
serverAssert(el == worker->qel.el);
serverAssert(fd == worker->socketpairs[1]);
if (read(fd, buf, 1) != 1) {
serverLog(LL_WARNING, "Can't read for worker(id:%d) socketpairs[1](%d)",
worker->qel.thread.id, fd);
buf[0] = 'c';
}
switch (buf[0]) {
case 'c':
...
c = createClient(&worker->qel, sd);
if (c == NULL) {
serverLog(LL_WARNING, "Create client failed");
close(sd);
return;
}
c->curidx = worker->id;
...
break;
case 'b':
/* receive back the client from server thread. */
...
if (c->flags & CLIENT_PENDING_WRITE) {
listAddNodeTail(qel->clients_pending_write, c);
if (aeCreateFileEvent(qel->el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) {
freeClient(c);
return;
}
} else if(clientHasPendingReplies(c)){
if(aeCreateFileEvent(qel->el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) {
freeClient(c);
return;
}
}
...
break;
default:
serverLog(LL_WARNING, "read error char '%c' for worker(id:%d) socketpairs[1](%d)",
buf[0], worker->qel.thread.id, worker->socketpairs[1]);
break;
}
}
首先,查看 src/server.h
, struct redisServer
就在這裡面,只討論重要的部份。伺服器在進行初始化時,會先執行 initServerConfig
將伺服器狀態設置為預設狀態,再使用 loadServerConfig
載入使用者設定檔,最後執行 initServer
完成整個伺服器初始化。
struct redisServer {
/* General */
...
redisDb *db;
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
...
/* Networking */
int port; /* TCP listening port */
char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
...
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client, only used on crash report */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
...
uint64_t next_client_id; /* Next client unique ID. Incremental. */
...
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand, *sremCommand, *execCommand, *expireCommand,
*pexpireCommand;
...
//multi-thread redis support
pthread_mutex_t command_request_lock;
int threads_num; /* num of worker threads.*/
list *command_requests;
struct cds_wfcq_head command_requests_head;
struct cds_wfcq_tail command_requests_tail;
int socketpairs[2];
q_eventloop qel;
};
redisServer.db
server.db
為資料庫本體,型態為 redisDb
,同樣定義在 server.h
中,在 server.c
裡面的 initServer
函式進行初始化,根據設定檔設定的數量建立資料庫,而 dict
就是主要儲存資料的地方。
typedef struct redisDb {
struct q_dict *dict; /* The keyspace for this DB */
struct q_dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
redisServer.commands
, redisServer.orig_commands
, redisCommand
接著來看 mt-Redis 是如何儲存用戶要執行的命令。 mt-Redis 使用 redisCommand
這個結構儲存命令, name
是用來儲存命令的名稱, proc
指標指向用來處理這個命令的函式, arity
代表這個命令的參數數量,如果是負值,例如 -N
,代表參數數量大於 N
, sflags
與 flags
都是儲存命令的 flag ,但 sflags
是以字串儲存, flags
是以 bit mask 的方式儲存, firstkey
代表第一個參數是 key
, lastkey
代表最後一個參數是 key
, keystep
則是參數與參數之間的距離,以 MSET
指令來說, keystep
為 2 ,帶表參數的形式為 key, value, key value… , getkeys_proc
則是在以上三個變數沒辦法決定哪個參數是 key
時使用以找出 key
在參數中的位置, microseconds
紀錄這個命令的執行時間, calls
紀錄這個命令被呼叫多少次。
typedef void redisCommandProc(client *c);
typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};
以下是命令的參數列表。
w: write command (may modify the key space).
r: read command (will never modify the key space).
m: may increase memory usage once called. Don't allow if out of memory.
a: admin command, like SAVE or SHUTDOWN.
p: Pub/Sub related command.
f: force replication of this command, regardless of server.dirty.
s: command not allowed in scripts.
R: random command. Command is not deterministic, that is, the same command with the same arguments, with the same key space, may have different results. For instance SPOP and RANDOMKEY are two random commands.
S: Sort command output array if called from script, so that the output is deterministic.
l: Allow command while loading the database.
t: Allow command while a slave has stale data but is not allowed to server this data. Normally no command is accepted in this condition but just a few.
M: Do not automatically propagate the command on MONITOR.
k: Perform an implicit ASKING for this command, so the command will be accepted in cluster mode if the slot is marked as 'importing'.
F: Fast command: O(1) or O(log(N)) command that should never delay its execution as long as the kernel scheduler is giving us time. Note that commands that may trigger a DEL as a side effect (like SET) are not fast commands.
完整的命令表格 redisCommandTable
可以參考 src/server.c
。
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
...
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
};
redisServer
中使用了兩個字典建立表格,一個是要用來修改的,因為使用者可以在設定檔中修改命令的名字,在建立資料庫時,會執行 loadServerConfig
函式載入設定檔,所以要額外建立一個不會被設定檔影響的指令表格,也就是原本資料庫內建的指令。建立表格是在 initServerConfig
中使用 populateCommandTable
這個函式,將每個命令從 redisCommandTable
取出,並將 sflags
,也就是這個命令的 flags 用字串表示,轉換成 flags
,以整數的形式表示,最後加入 commands
與 orig_commands
兩個字典中,而字典的 key
使用的是 sds (simple dynamic string) ,而不是普通的字串。
/* Populates the Redis Command Table starting from the hard coded list
* we have on top of redis.c file. */
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
int retval1, retval2;
while(*f != '\0') {
switch(*f) {
case 'w': c->flags |= CMD_WRITE; break;
case 'r': c->flags |= CMD_READONLY; break;
case 'm': c->flags |= CMD_DENYOOM; break;
case 'a': c->flags |= CMD_ADMIN; break;
case 'p': c->flags |= CMD_PUBSUB; break;
case 's': c->flags |= CMD_NOSCRIPT; break;
case 'R': c->flags |= CMD_RANDOM; break;
case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= CMD_LOADING; break;
case 't': c->flags |= CMD_STALE; break;
case 'M': c->flags |= CMD_SKIP_MONITOR; break;
case 'k': c->flags |= CMD_ASKING; break;
case 'F': c->flags |= CMD_FAST; break;
case 'd': c->flags |= CMD_SERVER_THREAD; break;
default: serverPanic("Unsupported command flag"); break;
}
f++;
}
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
//serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
建立完表格之後,對於那些常用的指令,像是 del
、 lpush
等,使用 lookupCommandByCString
建立能快速連結到指令的指標,以減少查表的時間,以 dictFetchValue
函式在 commands
字典中找到相對應的 key
並回傳 value
,而 value
就是我們要的指令。
struct redisCommand *lookupCommandByCString(char *s) {
struct redisCommand *cmd;
sds name = sdsnew(s);
cmd = dictFetchValue(server.commands, name);
sdsfree(name);
return cmd;
}
redisServer.el
在原本的 Redis 中,使用的 aeEventLoop
,使用 aeCreateEventLoop
進行初始化,但在 mt-Redis 中支援多執行緒,所以使用 q_eventloop
把 aeEventLoop
包裝起來,用 q_eventloop_init
初始化。
q_eventloop_init(&server.qel, server.maxclients + CONFIG_FDSET_INCR);
server.el = server.qel.el;
redisServer.port
port
定義了資料庫連線的連接埠,用戶使用這個連接埠進行連線,如果沒有使用 redis.conf
或是 --port
特別指定要使用哪一個連接埠,預設會是 CONFIG_DEFAULT_SERVER_PORT
,也就是 6379 。
~/mt-redis/src main* 5s ❯ ./redis-server
26516:C 16 May 15:40:08.672 # Warning: no config file specified, using the default config. In order to specify a config file use ./redis-server /path/to/redis.conf
26516:M 16 May 15:40:08.673 * Increased maximum number of open files to 10032 (it was originally set to 1024).
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 3.2.13 (74300e3c/1) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in standalone mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379
| `-._ `._ / _.-' | PID: 26516
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
redisServer.bindaddr
, redisServer.bindaddr_count
如果沒有在 redis.conf
指定 bind
配置指令, mt-Redis 預設會監聽伺服器上所有可用的網絡接口。可以使用 bind
配置指令,後面指定一個或多個 IP 地址,僅監聽一個或多個選定的接口。 bindaddr_count
就是指 bind
的數量。
先使用 ip addr
指令查看我電腦的網路裝置,有兩個網路裝置,其 IP 分別為 127.0.0.1
與 192.168.50.17
。
➜ andyyeh@andyyeh-ubuntu ~/mt-redis/src git:(main) ✗ ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: wlp2s0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
link/ether 9c:fc:e8:f4:f8:30 brd ff:ff:ff:ff:ff:ff
inet 192.168.50.17/24 brd 192.168.50.255 scope global dynamic noprefixroute wlp2s0
valid_lft 77095sec preferred_lft 77095sec
inet6 fe80::4bfe:d1ff:dd01:73c6/64 scope link noprefixroute
valid_lft forever preferred_lft forever
當我在 redis.conf
將指定 bind
到 0.0.0.0
,使用 127.0.0.1
與 192.168.50.17
都可以連接到伺服器。
➜ andyyeh@andyyeh-ubuntu ~/mt-redis/src git:(main) ✗ curl http://192.168.50.17:6379
curl: (52) Empty reply from server
➜ andyyeh@andyyeh-ubuntu ~/mt-redis/src git:(main) ✗ curl http://127.0.0.1:6379
curl: (52) Empty reply from server
當我在 redis.conf
將指定 bind
到 192.168.50.17
,就只有 192.168.50.17
連接的到伺服器。
➜ andyyeh@andyyeh-ubuntu ~/mt-redis/src git:(main) ✗ curl http://192.168.50.17:6379
curl: (52) Empty reply from server
➜ andyyeh@andyyeh-ubuntu ~/mt-redis/src git:(main) ✗ curl http://127.0.0.1:6379
curl: (7) Failed to connect to 127.0.0.1 port 6379 after 0 ms: Connection refused
也就是說,當 bind
多個 IP 位置時,執行伺服器的電腦上必須有相對應的網路裝置,並且可以指定這些網路裝置的 IP 接收用戶的請求,若將 bind
設為 0.0.0.0
,則可以接受從所有網路裝置的 IP 進行連接。
在 redisServer
中,有紀錄用戶的鏈結串列,與其他與用戶相關的資料,我們放到探討 client
這個結構時一起討論。
typedef struct client {
q_eventloop *qel; /* Eventloop of the worker's thread which handles this client.*/
int curidx; /* The worker idx that this client current belogn to.*/
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
int dictid; /* ID of the currently SELECTed DB. */
...
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
...
list *reply; /* List of reply objects to send to the client. */
...
} client;
在介紹 worker 事件循環時,就有看過 client
結構是怎麼建立的, qel
代表處理這個 client
的 worker 事件循環, curidx
代表 worker 的 id
, id
為 client
自己的 id , fd
則是在 acceptTcpHandler
取得的使用者 file descriptor , reply
鏈結串列是要 worker 要回覆給使用者的訊息。