Try   HackMD

mt-redis 運作原理

contributed by < yeh-sudo >

開發環境

$ uname -a
Linux andyyeh-ubuntu 6.5.0-21-generic #21~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Fri Feb  9 13:32:52 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
Copyright (C) 2021 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

$ lscpu
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         44 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  16
  On-line CPU(s) list:   0-15
Vendor ID:               AuthenticAMD
  Model name:            AMD Ryzen 7 4800HS with Radeon Graphics
    CPU family:          23
    Model:               96
    Thread(s) per core:  2
    Core(s) per socket:  8
    Socket(s):           1
    Stepping:            1
    Frequency boost:     enabled
    CPU max MHz:         2900.0000
    CPU min MHz:         1400.0000
    BogoMIPS:            5788.79
Virtualization features: 
  Virtualization:        AMD-V
Caches (sum of all):     
  L1d:                   256 KiB (8 instances)
  L1i:                   256 KiB (8 instances)
  L2:                    4 MiB (8 instances)
  L3:                    8 MiB (2 instances)

mt-Redis 運作原理

開發 mt-Redis 前,首先要了解整個資料庫如何儲存與處理資料,因為所有的命令與操作都建立在這些資料結構之上,會探討的資料結構有 SDS (simple dynamic string, sds) 、 鏈結串列 (list) 、 Userspace RCU 、 字典 (q_dict) 與儲存命令的 redisCommand ,接著了解伺服器的事件與多執行緒如何處理任務,包含 q_eventloopq_threadq_workerq_master ,最後再從 server.h 中的 redisServerclient 還有 redisDb 了解 mt-Redis 整體的架構與如何運作。

SDS (Simple Dynamic String)

與 SDS 相關的程式碼都在 src/sds.hsrc/sds.c

__attribute__((__packed__))

在每一個 SDS 的 header 前面都加上了 __attribute__((__packed__)) ,根據 GCC 的描述,作用是讓結構中的資料可以對齊,以最大程度減少記憶體開銷。

The packed attribute specifies that a variable or structure field should have the smallest possible alignment—one byte for a variable, and one bit for a field, unless you specify a larger value with the aligned attribute.

以 SDS 的 sdshdr32 為例。

struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

根據 C 語言規格書的 6.7.2.1 的第 16 點,在結構中的最後一個元素是未完成的陣列時,叫做 flexible array member ,在大多數情況下,使用 sizeof 時會忽略這個元素,所以使用 sizeof 計算 sdshdr32 的大小時,會忽略其中的 buf ,計算出來的結果,使用 packed 時,整個結構的大小為 9 ,不使用時大小為 12 ,根據你所不知道的 C 語言:記憶體管理、對齊及硬體特性,為了讓資料位置為 4 的倍數,編譯器在分配記憶體時,會按照宣告的型態去做對齊,所以在 flags 後面填充了 3 個 bytes ,讓結構體的大小變成 12 。

As a special case, the last element of a structure with more than one named member may have an incomplete array type; this is called a flexible array member. In most situations, the flexible array member is ignored. In particular, the size of the structure is as if the flexible array member were omitted except that it may have more trailing padding than the omission would imply.

➜ andyyeh@andyyeh-ubuntu  ~/linux2024/sds_test  ./main
size of packed sdshdr32: 9
size of unpacked sdshdr32: 12

所以,在記憶體中, sdshdr32 每個元素都緊密排列在一起,可以畫出下圖。

|-------------- sdshdr32 ---------------|----------- buf[] ------------|
|    <len>    |    <alloc>    | <flags> |          <String>            |
|   4 Bytes   |    4 Bytes    | 1 Byte  |       n Bytes + 1 Byte       |

sdshdr

SDS 分為五種,分別為 sdshdr5sdshdr8sdshdr16sdshdr32sdshdr64 ,其中 sdshdr5 不會被使用到,而每個 sdshdr 後面的數字代表能儲存最常字串的長度, sdshdr8 儲存長度介於 [0,28) 之間的字串, sdshdr16 儲存長度 [28,216) 之間的字串,其他以此類推,所以,如果字串長度小於 28 ,就會使用 sdshdr8 ,若長度比 28 還長但不超過 216 時,會使用 sdshdr16

len

len 為 SDS 中字串的長度,定義了 len ,就可以在 O(1) 的時間複雜度下取得字串長度,不需要透過 strlen 或是使用迴圈計算,在 sds.h 中也定義了 sdslen 這個函式,當作是取得 SDS 長度的 API 。

alloc

alloc 為建立新的 SDS 時,所配置 buf 的大小,但不包含 \0 ,在 sds.h 中也定義了 sdsalloc 這個函式,當作是取得 SDS 配置時的大小的 API ,另外, sdsavail 這個函式的作用,是作為取得當前剩餘可用空間的 API ,本質上就是 alloc 扣掉 len ,當增加字串長度,會先使用 sdsavail 檢查剩餘可用空間,若空間不足,就會使用 sdsMakeRoomFor 增加字串的空間。

flags

使用 1 個 bytes 的大小儲存 sdshdr 的型態,只使用到這個 bytes 中最小的 3 個 bits ,檢查 SDS 是哪一種型態時,只需要使用定義好的 SDS_TYPE_MASKflags 進行 & 操作,就可以得到 sdshdr 的型態。

/* Note: sdshdr5 is never used, we just access the flags byte directly.
 * However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

定義 SDS

了解完 sdshdr 的定義之後,可以知道 buf 就是儲存字串資料的地方,但是再看到 sds 的型態,只是一個指標,如下所示,而不是直接使用 sdshdr ,會這樣做,是因為使用者可以將 sds 看作是一般的字串來使用,可以直接使用 <string.h> 中的函式,也可以使用 mt-Redis 提供的 API 進行快速獲取字串長度等操作。

typedef char *sds;

建立 SDS

建立 SDS 使用的函式為 sdsnewlen(const void *init, size_t initlen)init 是要放入 buf 中的字串轉形成 void * 的型態, initlen 則是字串長度。首先,透過 sdsReqType 檢查必須要使用哪一種長度的 SDS ,若長度小於 25 必須使用 sdshdr5 時,轉換成 sdshdr8 ,接著依照使用的 type 取得 sdshdr 的長度,進行到這邊,完成前置作業,並且宣告之後要用到的變數,像是 shfp 等。

void *sh;
sds s;
char type = sdsReqType(initlen);
/* Empty strings are usually created in order to append. Use type 8
 * since type 5 is not good at this. */
if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
int hdrlen = sdsHdrSize(type);
unsigned char *fp; /* flags pointer. */

配置記憶體, sh 為整個 sdshdr ,包含 lenallocflagsbuf ,因為 initlen 沒有計算到 \0 ,所以要額外加 1 ,最關鍵的地方, s = (char*)sh+hdrlen ,因為 shvoid * 型態,所以透過顯示轉型轉為 char * ,根據 C 語言規格書 6.2.5 第 28 點提到, void *char * 在記憶體中的表示和對齊是相同的,所以可以相互轉換,轉換成 char * 之後,加上 hdrlen ,也就是使用 sizeof 取得結構的大小,但並不包含 buf ,在解釋 __attribute__((__packed__)) 也有提過,以 sdshdr32 為例,得出 9 ,因為記憶體對齊的特性, flags 後面接續的就是 buf ,所以 (char *)sh 加上 hdrlen 後,會指向 buf 的開頭,而 sdsnewlen 最後回傳的值就是這個 s ,這就是為什麼可以將 mt-Redis 的 sds 當作一般的字串操作,而不用另外實作其他 API ,並且,可以透過記憶體對齊的特性找出 sdshdr 中的其他元素, fp 就是透過這種方式找到 sdshdr 中的 flags

A pointer to void shall have the same representation and alignment requirements as a pointer to a character type.

sh = s_malloc(hdrlen+initlen+1);
if (!init)
    memset(sh, 0, hdrlen+initlen+1);
if (sh == NULL) return NULL;
s = (char*)sh+hdrlen;
fp = ((unsigned char*)s)-1;

最後就是將 init 字串的內容複製到 s 裡面並且補上 \0

if (initlen && init)
    memcpy(s, init, initlen);
s[initlen] = '\0';
return s;

雙向鏈結串列 (list)

與鏈結串列相關的程式碼都在 src/adlish.hsrc/adlist.c

listNode

listNode 是鏈結串列中的節點, prev 指向前一個節點, next 指向下一個節點, value 則是節點保存的值,使用 void * 型態,可以使用顯示轉型轉成我們要的型態。

typedef struct listNode {
    struct listNode *prev;
    struct listNode *next;
    void *value;
} listNode;

list

head 指向鏈結串列的開頭, tail 指向鏈結串列的結尾,另外, list 結構提供了三個函式指標 dupfreematch ,可以讓使用者傳入自己定義的函式, dup 為複製鏈結串列, free 為釋放鏈結串列,而在進行搜尋的時候,若有定義 match 函式,則搜尋時會使用 match 函式進行配對,確認要找的 keylistNode 中的 value 相同。

typedef struct list {
    listNode *head;
    listNode *tail;
    void *(*dup)(void *ptr);
    void (*free)(void *ptr);
    int (*match)(void *ptr, void *key);
    unsigned long len;
} list;

listIter

mt-Redis 的鏈結串列還提供了迭代器, next 指向現在的節點, direction 代表走訪鏈結串列的方向,在 adlist.h 中定義了兩個巨集, AL_START_HEAD 代表從頭開始走訪, AL_START_TAIL 代表從鏈結串列的尾端以倒序的方式走訪。

typedef struct listIter {
    listNode *next;
    int direction;
} listIter;

/* Directions for iterators */
#define AL_START_HEAD 0
#define AL_START_TAIL 1

使用 listIter 要搭配 listNext 函式,在 listNext 函式有說明如何搭配迭代器,範例如下。

iter = listGetIterator(list, <direction>);
while ((node = listNext(iter)) != NULL) {
    doSomethingWith(listNodeValue(node));
}

使用範例

使用 listCreate 建立雙向鏈結串列,並透過 listAddNodeHead 將陣列中的元素加入鏈結串列的開頭,最後使用迭代器搭配 listNext 走訪鏈結串列並輸出 value 中的值。

#include <stdio.h>
#include "adlist.h"

int main()
{
    char a[5] = {'a', 'b', 'c', 'd', 'e'};
    list *list = listCreate();

    for (int i = 0; i < 5; i++){
        char *ptr = &a[i];
        listAddNodeHead(list, (void *)ptr);
    }

    listIter *iter = listGetIterator(list, AL_START_HEAD);
    listNode *node;
    while ((node = listNext(iter)) != NULL) {
        printf("%c\n", *(char *)node->value);
    }

    return 0;
}

將陣列的內容都加入鏈結串列後的狀態如下圖。







G



node0

prev

next

'e'



node1

prev

next

'd'



node0:next->node1





NULL1
NULL



node0:prev->NULL1





node1:prev->node0





node2

prev

next

'c'



node1:next->node2





node2:prev->node1





node3

prev

next

'b'



node2:next->node3





node3:prev->node2





node4

prev

next

'a'



node3:next->node4





node4:prev->node3





NULL2
NULL



node4:next->NULL2





list

head

tail

len: 5



list:head->node0





list:tail->node4





User-space RCU 在 mt-redis 中的應用

針對 mt-redis 中使用 urcu 的 lock-free RCU hash table 進行討論。

Lock-Free RCU Hash Table

參考資料

Lock-Free RCU Hash Table 的 API

介紹幾個比較重要且會在 mt-Redis 的 q_dict 中使用的 API ,其他的函式或是巨集可以參考 The URCU hash table API

caa_container_of

這個巨集的作用跟 Linux 核心的 container_of 一模一樣,利用 caa_container_of ,藉由 User-space RCU 提供的公開介面,反向去存取到自行定義的結構體開頭地址。

/*
 * caa_container_of - Get the address of an object containing a field.
 *
 * @ptr: pointer to the field.
 * @type: type of the object.
 * @member: name of the field within the object.
 */
#define caa_container_of(ptr, type, member)				\
	__extension__							\
	({								\
		const __typeof__(((type *) NULL)->member) * __ptr = (ptr); \
		(type *)((char *)__ptr - offsetof(type, member));	\
	})

對比 Linux 核心原始程式碼巨集: container_of 就可以發現兩者在實作上的手法是相同的。

cds_lfht_node

與 Linux 核心的鏈結串列類似, lock-free RCU hash table 提供了 cds_lfht_node 作為將 hash table 連結起來的媒介,可以使用 caa_container_of 這個巨集取得整個結構,類似 Linux 核心中 container_of ,但沒有提供 key 和 value 相關的操作,使用者必須自行定義 hash function 與 key-value 的形式。

struct cds_lfht_node {
	struct cds_lfht_node *next;	/* ptr | REMOVAL_OWNER_FLAG | BUCKET_FLAG | REMOVED_FLAG */
	unsigned long reverse_hash;
} __attribute__((aligned(8)));
cds_lfht_iter

與 mt-Redis 的鏈結串列一樣, lock-free RCU hash table 提供了迭代器, 搭配 cds_lfht_next 函式使用,可以走訪整個鏈結串列。

/* cds_lfht_iter: Used to track state while traversing a hash chain. */
struct cds_lfht_iter {
	struct cds_lfht_node *node, *next;
};
cds_lfht_new

cds_lfht_new 用於建立新的 hash table , init_size 代表 hash table 中初始的 bucket 數量, min_nr_alloc_buckets 表示最少的 bucket 數量, max_nr_buckets 代表最多的 bucket 數量,當這個值為 0 時,代表最大 bucket 的數量沒有限制,以上這三個參數都必須要是 2 的冪次, flags 代表這個 hash table 的特性,代入 CDS_LFHT_AUTO_RESIZE 代表 hash table 會自動調整大小,代入 CDS_LFHT_ACCOUNTING 代表會紀錄加入與移除的資料數量,這個特性也允許 hash table 減少自身的大小,當 flags 設置為 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING 時,允許 hash table 大小變大與縮小。

static inline
struct cds_lfht *cds_lfht_new(unsigned long init_size,
			unsigned long min_nr_alloc_buckets,
			unsigned long max_nr_buckets,
			int flags,
			pthread_attr_t *attr)
{
	return _cds_lfht_new(init_size, min_nr_alloc_buckets, max_nr_buckets,
			flags, NULL, &rcu_flavor, attr);
}
cds_lfht_lookup

這個函式是透過 match 這個使用者定義的函式,以特定的 key 尋找相對應的值,必須要在持有 rcu_read_lock 時才能呼叫,且呼叫這個函式的執行緒必須要是 RCU read-side 的執行緒。若沒有找到相對應的值, iter 會設為 NULL

extern
void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
		cds_lfht_match_fct match, const void *key,
		struct cds_lfht_iter *iter);
cds_lfht_add_replace

用指定的 key 替換節點,如果不存在這樣的節點,則添加一個節點。如果存在這樣的節點,則返回被替換的節點,否則返回 NULL。呼叫這個函式時必須處於 RCU read-side 臨界區,並且必須已經註冊為 RCU 讀者 (使用 rcu_register_thread) 。

extern
struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
		unsigned long hash,
		cds_lfht_match_fct match,
		const void *key,
		struct cds_lfht_node *node);
cds_lfht_iter_get_node

回傳迭代器指向的節點。

static inline
struct cds_lfht_node *cds_lfht_iter_get_node(struct cds_lfht_iter *iter)
{
	return iter->node;
}

字典 (q_dict)

與字典相關的程式碼在 src/q_dict.hsrc/q_sict.c

q_dict

建立 q_dict 每有對應的函式,在 server.c 中是直接進行記憶體配置,並用上述提到的 cds_lfht_new 建立 lock-free RCU hast table ,並將字典的大小設為 0 。

typedef struct q_dict {
    unsigned int size;
    struct cds_lfht *table;
    void *privdata;
} q_dict;

q_dictEntry

建立 q_dictEntry 使用的是 q_createDictEntry 函式,參數為一個 SDS 以及一個 robjrobjserver.h 中有定義,完整的名字為 redisObject , SDS 會做為 q_dictEntry 的 key ,而 redisObject 則是 q_dictEntry 的值,所以在結構中的 v->val 會指向 redisObject ,而 s64 則是這個物件過期的時間, node 就是用來放入 lock-free RCU hash table 中的節點, rcu_head 比較特別,在 User-space RCU 中的 rcu-api.md 文件有說明,要搭配 call_rcu 一起使用,為這個 rcu_head 註冊一個函式,且這個函式會在寬限期結束時執行。

typedef struct q_dictEntry {
    unsigned type:4;   // four data structure types: string, list, set, zset and hash
    void *key;
    union {
        void *val;
        int64_t s64;
    } v;
    struct cds_lfht_node node;
    struct rcu_head rcu_head;
} q_dictEntry;

q_dictEntry 中的 val 儲存的值就是 robj ,也就是 redisObject 這個結構,這個結構是儲存 q_dictEntry 中字典的值的真正地方,並且用 type 表示這個值是字串、鏈結串列或是集合。 redisObject 可以透過 createObject 這個函式建立,使用時必須將值的型態以及指向值得指標代入參數中。

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
    int refcount;
    void *ptr;
} robj;

建立一個 q_dictEntry 使用的 q_createDictEntry 函式。

q_dictEntry *q_createDictEntry(sds key, robj* val) {
    q_dictEntry *de = NULL;
    de = zmalloc(sizeof(*de));
    cds_lfht_node_init(&de->node);
    de->key = key;
    de->v.val = val;
    return de;
}

q_dictIterator

q_dictIterator 是用來走訪字典的迭代器,包含了 cds_lfht_iter ,也就是 lock-free RCU hast table 的迭代器,以及指向當前字典的指標。

typedef struct q_dictIterator {
    q_dict *d;
    struct cds_lfht_iter iter;
} q_dictIterator;

使用 q_dictGetIterator 可以建立一個新的迭代器。

q_dictIterator *q_dictGetIterator(q_dict *ht) {
    q_dictIterator *iter = zmalloc(sizeof(*iter));
    iter->d = ht;
    cds_lfht_first(ht->table, &iter->iter);
    return iter;
}

q_dictSdsKeyCaseMatch

這個函式是要 mt-Redis 傳入 cds_lfht_lookupcds_lfht_add_replace 這兩個函式的參數,目的是要比較節點中的 key 與我們要找的 key 是否一致。

int q_dictSdsKeyCaseMatch(struct cds_lfht_node *ht_node, const void *key) {
    struct q_dictEntry *de = caa_container_of(ht_node, struct q_dictEntry, node);
    return strcasecmp(de->key, key) == 0;
}

q_dictAdd

加入新節點時使用的函式,使用 cds_lfht_add_replace 將節點加入 hash table ,根據上面對這個函式的描述,必須處在 read-side 的臨界區,所以要加上 rcu_read_lockrcu_read_unlock ,且如果 hash table 中有相同的 key ,會以新的值取代並回傳舊的物件,這個時候會使用 call_rcu 註冊釋放資源的函式,在寬限期結束時釋放記憶體空間以及資源,若 key 不存在 hash table 中,則加入新節點,並增加 hash table 的大小。

int q_dictAdd(q_dict *d, sds key, robj* val) {
    struct cds_lfht_node *ht_node;
    unsigned long hash;
    struct q_dictEntry *de;

    rcu_read_lock();
    de = q_createDictEntry(key, val);
    hash = dictSdsHash(key);
    ht_node = cds_lfht_add_replace(d->table, hash, q_dictSdsKeyCaseMatch, key, 
            &de->node);

    if (ht_node) {
        struct q_dictEntry *ode = caa_container_of(ht_node, struct q_dictEntry, node);
        call_rcu(&ode->rcu_head, q_freeRcuDictEntry);
        rcu_read_unlock();
        return DICT_REPLACED;
    } else {
        ++d->size;
        rcu_read_unlock();
        return DICT_OK;
    }

    return DICT_OK;
}

q_freeRcuDictEntry

這個函式與 q_freeRcuDictExpirationEntry 一樣,都是 call_rcu 註冊時會使用的函式,目的都是在寬限期結束時將節點的占用的資源釋放,透過 caa_container_of 取得節點所在的物件,就會使用 q_freeDictEntry 釋放資源。

void q_freeRcuDictEntry(struct rcu_head *head) {
    struct q_dictEntry *de = caa_container_of(head, struct q_dictEntry, rcu_head);
    q_freeDictEntry(de);
}

執行緒 (q_thread)

q_thread 相關程式碼在 src/q_thread.hsrc/q_thread.c 中。

q_thread 結構

q_thread 中使用的是 POSIX thread ,閱讀 pthreads(7) ,每一個執行緒都有唯一的 thread ID ,會儲存在 pthread_t 這個型態的變數中,也就是 q_thread 結構中的 thread_idq_thread_func_t 是執行緒所要執行的函式, data 則是執行 fun_run 函式所需要的資料。

typedef void *(*q_thread_func_t)(void *data);

typedef struct q_thread {
    int id;
    pthread_t thread_id;
    q_thread_func_t fun_run;
    void *data;
} q_thread;

q_thread_start

閱讀 pthread_create(3)q_thread_start 是用 pthread_create 建立新的執行緒並執行 q_thread_run 函式,而 thread 則是被當作 q_thread_run 的參數。

int q_thread_start(q_thread *thread) {
    pthread_attr_t attr;
    pthread_attr_init(&attr);

    if (thread == NULL || thread->fun_run == NULL) {
        return C_ERR;
    }

    pthread_create(&thread->thread_id,
                   &attr, q_thread_run, thread);

    return C_OK;
}

使用 pthread_create 建立新的執行緒之後,會執行 q_thread_run 函式, data 就是在 pthread_create 被當作參數傳遞進來的 q_thread ,首先會設定亂數種子,接著執行 q_thread 中儲存的 fun_run 函式。

static void *q_thread_run(void *data) {
    q_thread *thread = data;
    srand(ustime() ^ (int) pthread_self());

    return thread->fun_run(thread->data);
}

事件循環

與事件循環相關的程式碼在 src/q_eventloop.hsrc/q_eventloop.csrc/ae.hsrc/ae.csrc/ae_epoll.c 中。

基本組件

事件循環由四個基本組件所組成,分別是 file event 、 時間事件 (time event) 、 fired event 與 epoll 模組。

File Event

File event 指的是非同步處理 I/O 操作的系統,在事件循環上運作,這個系統會監聽 file descriptors 上發生的各種事件,一個 file descriptors 就代表一個連接到用戶的 socket ,對每個操作都有一個對應的事件處理器,而 file event 的類型一共有三種。

  • AE_READABLE

代表監聽的 file descriptors 上有數據可以讀取,通常是用戶對伺服器發出請求與命令,也就是將資料寫入 socket ,或是想要進行連線時,就會產生 AE_READABLE 事件。

#define AE_READABLE 1   /* Fire when descriptor is readable. */
  • AE_WRITABLE

代表 file descriptors 可以接受伺服器的資訊,也就是伺服器將資料寫入 socket ,通常在用戶想要讀取或查詢資料庫的資料時,產生 AE_WRITABLE 事件。

#define AE_WRITABLE 2   /* Fire when descriptor is writable. */
  • AE_BARRIER

代表先執行 AE_WRITABLE 事件後再執行 AE_READABLE 的事件。

#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

file event 的結構是由 maskrfileProcwfileProcclientData 所組成。 mask 為決定這個事件是 AE_READABLE 或是 AE_WRITABLE 的變數,也會儲存這個事件是否為 AE_BARRIERrfileProcwfileProc 分別為 AE_READABLE 事件與 AE_WRITABLE 事件的處理函式,而 clientData 則是要處理的資料。

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

建立一個新的 file event 使用的函式為 aeCreateEventLoop ,參數為指向事件循環的指標 eventLoop ,還有 file descriptor ,也就是 socket 本身,代表事件類型的 mask ,最後是對應的處理函式 proc 與要處理的資料,回傳的值代表有沒有成功建立事件,函式中使用的 aeApiAddEvent 會在講解 epoll 時介紹。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
時間事件 (Time Event)

時間事件是用來處理定時任務的機制,可以是一次性的,也就是定時事件,也可以是週期性的,也就是週期事件,在事件循環中,會定期檢查和觸發時間事件以執行特定的操作。在事件循環中,會優先處理 file event ,

  • 定時事件

在指定的時間點執行一次,就叫做定時事件。

  • 週期事件

每隔一段固定的時間就執行一次,叫做週期事件。分辨一個時間事件是定時事件還是週期事件,取決於建立事件時,處理函式的回傳值,若處理函式回傳 AE_NOMORE ,則這個事件是一個定時事件,若回傳的值不是 AE_NOMORE ,則根據處理函式的回傳值設定下次事件觸發的時間。

#define AE_NOMORE -1

時間事件的結構是由 7 個變數所組成, when_sec 代表幾秒之後要觸發事件, when_ms 代表多少毫秒之後要觸發事件, timeProc 則是時間事件對應的處理函式, finalizerProc 則是在事件結束之後會呼叫的函式, clientData 為要處理的資料, next 則指向下一個時間事件。

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

建立時間事件使用的函式為 aeCreateTimeEvent ,原則上與建立 file event 的函式大同小異,要注意的是決定多久要觸發的時間參數 milliseconds 單位是毫秒,在函式中會透過 aeAddMillisecondsToNow 算出 when_secwhen_ms ,而加入到時間事件時到事件循環結構中的 timeEventHead 時,永遠都是加到鏈結串列的開頭。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

以建立三個時間事件為例,建立之後鏈結串列如下所示。







G



eventLoop

...

timeEventHead

...



te3

id: 3

when_sec

when_ms

timeProc

finalizerProc

clientData

next



eventLoop:head->te3





te2

id: 2

when_sec

when_ms

timeProc

finalizerProc

clientData

next



te3:next3->te2





te1

id: 1

when_sec

when_ms

timeProc

finalizerProc

clientData

next



te2:next2->te1





NULL

NULL



te1:next1->NULL





Fired Event

儲存待處理的 file 事件。

typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
  1. File event 要翻譯成什麼比較好? 檔案事件?

檔案事件

  1. Fired event 要翻譯成什麼比較好? 觸發事件?

已提交的事件、即將發生的事件

epoll 模組

閱讀 epoll(7) , epoll API 可以監控多個 file descriptor ,使用方式可以是 edge-triggered 或是 edge-triggered 而且對於大量的 file descripotr 有很好的擴展性。 epoll 是 Linux 核心中的一種資料結構,以 user-space 的觀點來看, epoll 是兩個列表的容器,分別為 interest list 與 ready list 。

  • Interest List
    已經註冊且正在被監控的 file descriptor 。
  • Ready List
    準備好進行 I/O 操作的 file descriptor ,為 interest list 的子集合。

簡單介紹 epoll 之後,來介紹幾個 epoll 的系統呼叫。首先介紹 epoll_create ,這個函式建立一個 epoll 的實例,在 Linux 核心 2.6.8 之後的版本,會忽略參數 size ,但這個值必須大於 0 ,回傳一個代表 epoll 實例的 file descriptor ,接下來所有呼叫 epoll 界面的操作都要透過這個 file descriptor 。

int epoll_create(int size);

epoll_ctl ,這個系統呼叫是用來新增、修改或移除 epfd 所代表的 epoll 實例 interest list 中的元素, op 則是要對目標 file descriptor ,也就是 fd 所要做的操作,而 event 則是 fd 所代表的 epoll_event 物件, op 有以下三種。

  • EPOLL_CTL_ADD

epfd 所代表的 epoll 實例的 interest list 中加入一個元素,這個元素包括 fdevent

  • EPOLL_CTL_MOD

將 interest list 中 fd 的設定改成新的 event 的設定。

  • EPOLL_CTL_DEL

fd 從 interest list 中移除, event 會被忽略且可以為 NULL

epoll_event 這個結構中有一個變數 events ,代表這個事件的類型,其中,會被 mt-Redis 使用到的有以下幾種。

  • EPOLLIN

相對應的 file 已經準備好進行 read 操作。

  • EPOLLOUT

相對應的 file 已經準備好進行 write 操作。

  • EPOLLERR

相關的 file decriptor 上發生錯誤,在呼叫 epoll_ctl 時不一定要加入這個參數,因為 epoll_wait 遇到錯誤都會回傳 EPOLLERR

  • EPOLLHUP

對應的 file decriptor 上發生掛斷 (hang up) 的狀況 , epoll_wait 會等待這個事件,所以在 epoll_ctl 中不一定要設置。

int epoll_ctl(int epfd, int op, int fd,
                     struct epoll_event *_Nullable event);

epoll_wait 會等待 epfd 上的事件, events 是用來回傳 ready list 的資訊,也就是 interest list 中準備好進行 I/O 操作的事件, maxevents 代表最多可以回傳的 ready list 大小, timeout 代表 epoll_wait 會阻塞多少毫秒,若 timeout 設為 -1 ,則 epoll_wait 會無期限阻塞,若設為 0 會讓 epoll_wait 立即回傳,而 epoll_wait 會阻塞直到以下三種條件滿足其中一種,一個 file descriptor 準備好進行 I/O 操作,被信號打斷,或是超過 timeout 的毫秒數, epoll_wait 的回傳值代表有多少 file descriptor 準備好了,也就是 ready list 的大小。

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

mt-Redis 使用 epoll API 監控多個連線的 file descriptor ,使用 aeApiState 這個結構使用 epfd 變數儲存 epoll 實例與使用 events 變數儲存 ready list 的事件。

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

透過 aeApiCreate 對 epoll 模組進行初始化,設定 aeApiState 結構中 events 的大小,以及使用 epoll_create 建立 epoll 實例,最後將初始化好的 epoll 模組儲存在事件循環結構 eventLoop 中的 apidata

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

新增或修改監聽的事件,需要傳入的參數有 file descriptor 與判斷讀寫操作的 mask ,若儲存事件的陣列中已經有同樣的事件了,就使用 EPOLL_CTL_MOD 修改事件,若沒有就使用 EPOLL_CTL_ADD 新增事件,再透過 epoll_ctl 對事件進行新增或修改的動作。

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

aeApiPoll 透過 epoll_wait 取得已經準備好進行 I/O 操作的事件,並判斷這個事件的類型,最後儲存到事件循環 eventLoop 中的 aeFiredEvent ,代表執行緒可以處理這些事件了。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

事件循環如何初始化?

Server 事件循環初始化

伺服器的事件循環是在 initServer 函式中初始化,先使用 q_eventloop_init 初始化事件循環,接著初始化 socket ,使用 listenToPort 開啟 TCP socket ,並且取得所有的 socket file descriptor ,也就是 server.ipfd ,最後建立 server_event_process file 事件與 serverCron 時間事件,用來統計伺服器的數據與資料。

void initServer(void) 
{
    ...
    q_eventloop_init(&server.qel, server.maxclients + CONFIG_FDSET_INCR);
    server.el = server.qel.el;

    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, server.socketpairs) < 0 ) {
        serverLog(LL_WARNING, "unable to create server socketpairs");
        exit(1);
    }
    if (anetNonBlock(NULL, server.socketpairs[0]) < 0 ) {
        serverLog(LL_WARNING, "Failed to set server.socketpairs[0] to non-blocking.");
        exit(1);
    }

    if (anetNonBlock(NULL, server.socketpairs[1]) < 0 ) {
        serverLog(LL_WARNING, "Failed to set server.socketpair[1] to non-blocking.");
        exit(1);
    }
    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...
    /* Create the serverCron() time event, that's our main way to process
     * background operations. */
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }
    ...
    if (aeCreateFileEvent(server.el, server.socketpairs[0], AE_READABLE, 
                server_event_process, NULL)== AE_ERR) {
        serverPanic("Can't create server event process") ;
        exit(1);
    }
    ...
}
Master 事件循環初始化

Master 事件循環的初始化是透過 q_master_init 函式進行,一樣是透過 q_eventloop_init 初始化事件循環,然後將執行緒要執行的函式設為 master_thread_run ,最後再進行 setup_master

int q_master_init(void) {
    int filelimit;
    
    filelimit = server.threads_num * 2  + 128;
    q_eventloop_init(&master.qel, filelimit);
    master.qel.thread.fun_run = master_thread_run;
    setup_master();
    return C_OK;
}

setup_master 中,會對每一個伺服器的 file descriptor 綁定一個 file 事件,也就是在初始化伺服器事件循環時,使用 listenToPort 取得的 server.ipfd ,事件對應的處理函式為 acceptTcpHandler

static int 
setup_master(void) {
    int j;
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(master.qel.el, server.ipfd[j], AE_READABLE,
                    acceptTcpHandler,NULL) == AE_ERR)
        {
            serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
        }
    }
    if (server.sofd > 0 && aeCreateFileEvent(master.qel.el,server.sofd,AE_READABLE,
                acceptUnixHandler,NULL) == AE_ERR) 
        serverPanic("Unrecoverable error creating server.sofd file event.");
    return C_OK;
}
Worker 事件循環初始化

server.c 中的 initServer 函式,會使用 q_workers_init 初始化 worker 執行緒。

void initServer(void)
{
    ...
    if(q_workers_init(server.threads_num) != C_OK) {
        serverLog(LL_WARNING, "Init worker threads failed.");
        exit(1);
    }
    ...
}

接著看 q_worker.c 中的 q_workers_init ,初始化 workers 陣列,配置 worker_count 的大小,接著對每一個執行緒使用 q_worker_init 函式再做初始化,初始化之後,再用 setup_worker 建立處理事件的 file 事件與 worker_cron 時間事件。

int
q_workers_init(uint32_t worker_count) {
    int status;
    uint32_t idx;
    q_worker *worker;

    darray_init(&workers, worker_count, sizeof(q_worker));

    for (idx = 0; idx < worker_count; idx++) {
        worker = darray_push(&workers);
        q_worker_init(worker);
        worker->id = idx;
        worker->qel.id = idx;
        status = setup_worker(worker);
        if (status != C_OK) {
            exit(1);
        }
    }

    num_worker_threads = (int) darray_n(&workers);

    return C_OK;
}

在每一個執行緒的初始化函式 q_worker_init 中,會對 socket 與執行緒的事件循環進行初始化,初始化事件循環時使用的是 q_eventloop_init 函式,這個函式會使用 q_thread_init 對執行緒進行初始化,最重要的是,使用 aeCreateEventLoop 建立事件循環,其中就有使用到上述的 epoll API ,也就是 aeApiCreate 函式,對 file descriptor 進行初始化,設置完事件循環後,就回到 q_worker_init 函式,設定執行緒所要執行的 worker_thread_run 函式與資料。 這邊的 socket 是 worker 與伺服器溝通的 socket ,並不是與使用者的溝通管道。

int
q_worker_init(q_worker *worker) {
    ...
    adjustOpenFilesLimit();
    q_eventloop_init(&worker->qel, server.maxclients);
    worker->qel.thread.fun_run = worker_thread_run;
    worker->qel.thread.data = worker;
    ...
    return C_OK;
}

初始化執行緒之後,接著使用 setup_worker 建立執行緒處理事件的 file 事件,以及並建立一個時間事件用來處理背景任務。 file 事件對應的函式為 worker_thread_event_process ,用來處理與 worker 與伺服器之間的溝通,監聽的 socket 為 socketpairs[1] ,而 socketpairs[0] 是給伺服器與 master 執行緒使用的。時間事件對應的函式為 worker_cron ,用來統計與 worker 相關的數據。

static int
setup_worker(q_worker *worker) {
    int status;

    status = aeCreateFileEvent(worker->qel.el, worker->socketpairs[1], AE_READABLE,
            worker_thread_event_process, worker);
    if (status == AE_ERR) {
        serverLog(LL_WARNING, "Unrecoverable error creating worker ipfd file event.");
        return C_ERR;
    }

    aeSetBeforeSleepProc(worker->qel.el, worker_before_sleep, worker);

    /* Create the worker_cron() time event, that's our main way to process
     * background operations. */
    if (aeCreateTimeEvent(worker->qel.el, 1, worker_cron, worker, NULL) == AE_ERR) {
        serverPanic("Can't create the worker_cron time event.");
        return C_ERR;
    }

    return C_OK;
}

所有執行緒都執行完 q_worker_initsetup_worker 之後,就完成 worker 事件循環系統的初始化,程式的執行流程如下。

flowchart

事件循環如何運作?

Master 事件循環

Master 事件循環在初始化的時候,監控的 file descriptor 與 file 事件都是與連線有關的,而 master 事件循環最主要就是負責處理使用者的連線,使用 GDB 追蹤 master 事件循環的運作,將斷點設置在 src/q_master.c 中的 acceptTcpHandler 函式,在另外一個終端機進行連線, GDB 的結果如下。首先, mt-Redis 會切換到 master 事件循環所在的執行緒,因為監控的 file descriptor 準備好進行 I/O 操作,也就是使用者的連線,所以會進入到這個 file 事件已經註冊好的函式 acceptTcpHandler ,結束之後,又會回到 aeProcessEvents 函式,也就是事件循環處理 ready list 的函式,最後處理時間事件,完成後,回到 aeMain 的迴圈繼續下一個事件循環。

[New Thread 0x7ffff0e74600 (LWP 57896)]
[Switching to Thread 0x7ffff0e74600 (LWP 57896)]

Thread 13 "redis-server" hit Breakpoint 1, acceptTcpHandler (el=0x7ffff784b3c8, fd=7, privdata=0x0, mask=1) at q_master.c:110
110	    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
(gdb) n
116	    while(max--) {
(gdb) n
117	        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
(gdb) n
118	        if (cfd == ANET_ERR) {
(gdb) n
124	        serverLog(LL_DEBUG,"Accepted %s:%d", cip, cport);
(gdb) n
125	        acceptCommonHandler(cfd,0,cip);
(gdb) n
116	    while(max--) {
(gdb) n
117	        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
(gdb) n
118	        if (cfd == ANET_ERR) {
(gdb) n
119	            if (errno != EWOULDBLOCK)
(gdb) n
122	            return;
(gdb) n
127	}
(gdb) n
aeProcessEvents (eventLoop=0x7ffff784b3c8, flags=3) at ae.c:432
432	                fired++;
(gdb) n
436	            if (fe->mask & mask & AE_WRITABLE) {
(gdb) n
445	            if (invert && fe->mask & mask & AE_READABLE) {
(gdb) n
452	            processed++;
(gdb) n
405	        for (j = 0; j < numevents; j++) {
(gdb) n
456	    if (flags & AE_TIME_EVENTS)
(gdb) n
457	        processed += processTimeEvents(eventLoop);
(gdb) n
459	    return processed; /* return the number of processed file/time events */
(gdb) n
460	}
(gdb) n
aeMain (eventLoop=0x7ffff784b3c8) at ae.c:486
486	    while (!eventLoop->stop) {
(gdb) 
Server 事件循環

initServer 函式中,為伺服器事件循環建立了 file 事件,對應的處理函式為 server_event_process ,使用 GDB 追蹤伺服器事件循環的執行過程,將斷點設置在 src/server.cserver_event_process 函式中,接著透過 redis-cli 執行命令,並觀察 GDB 的執行 mt-Redis 的過程。

  • redis-cli

在使用者端執行命令 SET mykey "Hello World" ,建立一個鍵值對,鍵為 mykey 而值為 "Hello World" 字串。

➜ andyyeh@andyyeh-ubuntu  ~/mt-redis/src git:(main) ✗ ./redis-cli
127.0.0.1:6379> SET mykey "Hello World"
OK
(115.66s)
127.0.0.1:6379>
  • mt-Redis Server

當使用者輸入命令時,伺服器的事件循環就會監控到伺服器的 server.socket[0] 準備進行 I/O 操作,接著就透過 aeProcessEvents 執行已經註冊好的 file 事件函式 server_event_process ,從 command_requests_head 取出使用者的命令,透過 server_processCommand 處理之後,再將結果回傳給使用者。

Thread 1 "redis-server" hit Breakpoint 1, server_event_process (eventLoop=0x7ffff784b068, fd=4, clientData=0x0, mask=1) at server.c:1289
1289	    serverAssert(fd == server.socketpairs[0]);
(gdb) n
1293	    qnode = __cds_wfcq_dequeue_blocking(&server.command_requests_head, &server.command_requests_tail);
(gdb) n
1300	    while(read(fd, buf, 1) == 1 || qnode != NULL ){
(gdb) n
1302	        if (qnode == NULL) break;
(gdb) p buf
$1 = "r"
(gdb) n
1303	        r = caa_container_of(qnode, struct q_command_request, q_node);
(gdb) n
1305	        c = r->c;
(gdb) p r
$2 = (q_command_request *) 0x7ffff5f85cc8
(gdb) p r->c
$3 = (struct client *) 0x7fffefe00008
(gdb) n
1306	        from = c->curidx;
(gdb) n
1307	        c->qel = &server.qel;
(gdb) n
1308	        c->curidx = -1;
(gdb) p c->qel
$4 = (q_eventloop *) 0x555555b11c40 <server+2752>
(gdb) n
1309	        q_freeCommandRequest(r);
(gdb) n
1311	        server_processCommand(c);
(gdb) n
[New Thread 0x7fffefdff600 (LWP 59176)]
1312	        if (c->flags & CLIENT_SLAVE || c->cmd->flags & CMD_ADMIN) {
(gdb) n
1315	            dispatch_to_worker(c, from);
(gdb) n
1317	        qnode = __cds_wfcq_dequeue_blocking(&server.command_requests_head, &server.command_requests_tail);
(gdb) n
1300	    while(read(fd, buf, 1) == 1 || qnode != NULL ){
(gdb) n
1319	}
(gdb) n
aeProcessEvents (eventLoop=0x7ffff784b068, flags=3) at ae.c:432
432	                fired++;
(gdb) n
436	            if (fe->mask & mask & AE_WRITABLE) {
(gdb) n
445	            if (invert && fe->mask & mask & AE_READABLE) {
(gdb) n
452	            processed++;
(gdb) n
405	        for (j = 0; j < numevents; j++) {
(gdb) n
456	    if (flags & AE_TIME_EVENTS)
(gdb) n
457	        processed += processTimeEvents(eventLoop);
(gdb) n
459	    return processed; /* return the number of processed file/time events */
(gdb) n
460	}
(gdb) n
aeMain (eventLoop=0x7ffff784b068) at ae.c:486
486	    while (!eventLoop->stop) {
(gdb) n
487	        if (eventLoop->beforesleep != NULL)
(gdb) n
488	            eventLoop->beforesleep(eventLoop, eventLoop->bsdata);
(gdb) n
489	        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
(gdb) n
486	    while (!eventLoop->stop) {
Worker 事件循環

在 master 事件循環處理連線時,執行 acceptTcpHandler 時,會呼叫 acceptCommonHandler 函式,其中最後會透過 dispatch_conn_new 將任務分配給 worker 執行緒, buf 儲存要執行的操作,接著 write 將資料寫入選定的 worker 的 socket 中,在 worker 事件循環中,因為 file descriptor 寫入完成,可以進行讀取,所以會觸發初始化時設定好的 worker_thread_event_process 函式,因為 buf 中寫入的 'c' ,所以會執行 createClient 函式以進行建立 client 的操作。

void
dispatch_conn_new(int sd) {
    struct connswapunit *su = csui_new();
    char buf[1];
    q_worker *worker;

    if (su == NULL) {
        close(sd);
        /* given that malloc failed this may also fail, but let's try */
        serverLog(LL_WARNING, "Failed to allocate memory for connection swap object\n");
        return;
    }

    int tid = (last_worker_thread + 1) % server.threads_num;
    worker = darray_get(&workers, (uint32_t) tid);
    last_worker_thread = tid;

    su->num = sd;
    su->data = NULL;
    csul_push(worker, su);

    buf[0] = 'c';
    // to be handled by worker's worker_thread_event_process loop
    if (write(worker->socketpairs[0], buf, 1) != 1) {
        serverLog(LL_WARNING, "Notice the worker failed.");
    }
}

執行 createClient 時,會讓 worker 事件循環監聽與使用者連線的 file descriptor fd ,並建立 file 事件,處理函式為 worker_readQueryFromClient ,從使用者端讀取命令或是請求,當使用者寫入命令時, worker 事件循環就會觸發 worker_readQueryFromClient 讀取訊息。

client *createClient(q_eventloop *qel, int fd) {
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        aeFileProc *proc = readQueryFromClient;
        if (qel != &server.qel)  {
            // createClient inside worker thread
            proc = worker_readQueryFromClient;
        }
        c->curidx = qel->id;
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(qel->el,fd,AE_READABLE,
                    proc, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    ...
    if (fd != -1) listAddNodeTail(qel->clients,c);
    initClientMultiState(c);
    return c;
}

讀取使用者的傳送的資料之後,會執行到 worker_processInputBuffer ,再呼叫 worker_processCommand ,經過層層處理,最後會將命令請求放入伺服器的 command_requests_head ,接著使用 writebuf 寫入,此時就會讓伺服器的事件循環觸發 server_event_process 處理使用者的請求,最後透過 dispatch_to_worker 將結果回傳給使用者。

int worker_processCommand(client *c) {

    char buf[1];
    struct q_command_request *r = NULL;
    ...
    // unconditional scheduled to server thread 
    // later after adding RCU for RedisDB, we can handle read requests in 
    // worker thread and scheduled write requests to server thread
    unlinkClientFromEventloop(c);
    r = q_createCommandRequest(c);

    /*
     * pthread_mutex_lock(&server.command_request_lock);
     * listAddNodeTail(server.command_requests, r);
     * pthread_mutex_unlock(&server.command_request_lock);
     */
    cds_wfcq_enqueue(&server.command_requests_head, &server.command_requests_tail, &r->q_node);
    c->flags |= CLIENT_JUMP;

    buf[0] = 'r'; /* r stands for request from worker thread */
    if (write(server.socketpairs[1], buf, 1)!=1) {
        //ToDo: need to proper set the socketpairs[1]'s buf size, for now, wirte to socketpairs[1] could fail when redis-benchmark instances increases.  
        //or number of connections increased.
        //serverPanic("write to server thread's socketpair[1] from worker thread failed.");
        //As server_event_process() has dealed with unmatched socketpair buff with num of request. we choose not to panic here.
        serverLog(LL_VERBOSE, "write to server thread's socketpair[1] from worker thread failed.");
    }

    return C_SCHED;
}

執行 dispatch_to_worker 時,會透過 writebuf 寫入 worker 執行緒的 socket ,在 worker 事件循環中就會觸發 worker_thread_event_process ,但這次是伺服器端寫入的,寫入的 buf'b' ,所以會建立一個 file 事件,處理函式為 sendReplyToClient 將結果回傳給使用者。

int dispatch_to_worker(client *c, int workerId)
{
    q_worker *worker;
    char buf[1];

    worker = darray_get(&workers, (uint32_t)workerId);
    ...
    unlinkClientFromServerEventloop(c);
    // clear the JUMP flag so that worker event process thread can resetClient()
    c->flags &= ~CLIENT_JUMP;

    su->num = workerId;
    rcu_assign_pointer(su->data, c);
    csul_server_push(worker, su);

    buf[0] = 'b';
    if (write(worker->socketpairs[0], buf, 1) != 1) {
        serverPanic("dispatch_to_worker: write back to worker socketpairs[1] failed.\r\n");
        return C_ERR;
    }
    return C_OK;
}
  • worker_thread_event_process

處理與 master 執行緒和伺服器執行緒溝通的函式,如果 buf 值為 'c' ,則執行 createClient 函式,若值為 'b' ,則建立 file 事件,處理函式設置為 sendReplyToClient ,當使用者的 file descriptor 準備好寫入時就會觸發,將使用者請求的結果和回應回傳給使用者。

// worker threads to master/server thread communication handler function
static void
worker_thread_event_process(aeEventLoop *el, int fd, void *privdata, int mask) {
    int status;
    int sd;
    int res = C_OK;
    q_worker *worker = privdata;
    char buf[1];
    struct connswapunit *csu;
    client *c;
    q_eventloop *qel=NULL;

    UNUSED(mask);

    serverAssert(el == worker->qel.el);
    serverAssert(fd == worker->socketpairs[1]);

    if (read(fd, buf, 1) != 1) {
        serverLog(LL_WARNING, "Can't read for worker(id:%d) socketpairs[1](%d)",
                 worker->qel.thread.id, fd);
        buf[0] = 'c';
    }

    switch (buf[0]) {
         case 'c':
             ...
             c = createClient(&worker->qel, sd);
             if (c == NULL) {
                 serverLog(LL_WARNING, "Create client failed");
                 close(sd);
                 return;
             }
             c->curidx = worker->id;
             ...
             break;
        case 'b':
            /* receive back the client from server thread. */
           ...
           if (c->flags & CLIENT_PENDING_WRITE) {
               listAddNodeTail(qel->clients_pending_write, c);
               if (aeCreateFileEvent(qel->el, c->fd, AE_WRITABLE,
                           sendReplyToClient, c) == AE_ERR) {
                   freeClient(c);
                   return;
               }
           } else if(clientHasPendingReplies(c)){
               if(aeCreateFileEvent(qel->el, c->fd, AE_WRITABLE, 
                           sendReplyToClient, c) == AE_ERR) {
                   freeClient(c);
                   return;
               }
           }
            ...
           break;
        default:
           serverLog(LL_WARNING, "read error char '%c' for worker(id:%d) socketpairs[1](%d)",
                   buf[0], worker->qel.thread.id, worker->socketpairs[1]);
           break;
    }
}

mt-Redis Server

首先,查看 src/server.hstruct redisServer 就在這裡面,只討論重要的部份。伺服器在進行初始化時,會先執行 initServerConfig 將伺服器狀態設置為預設狀態,再使用 loadServerConfig 載入使用者設定檔,最後執行 initServer 完成整個伺服器初始化。

struct redisServer {
    /* General */
    ...
    redisDb *db;
    dict *commands;             /* Command table */
    dict *orig_commands;        /* Command table before command renaming. */
    aeEventLoop *el;
    
    ...
        
    /* Networking */
    int port;                   /* TCP listening port */
    char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
    int bindaddr_count;         /* Number of addresses in server.bindaddr[] */
    
    ...
        
    list *clients;              /* List of active clients */
    list *clients_to_close;     /* Clients to close asynchronously */
    list *slaves, *monitors;    /* List of slaves and MONITORs */
    client *current_client; /* Current client, only used on crash report */
    int clients_paused;         /* True if clients are currently paused */
    mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
    
    ...
        
    uint64_t next_client_id;    /* Next client unique ID. Incremental. */
    
    ...

    /* Fast pointers to often looked up command */
    struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
                        *rpopCommand, *sremCommand, *execCommand, *expireCommand,
                        *pexpireCommand;
    
    ...

    //multi-thread redis support
    pthread_mutex_t command_request_lock;
    int threads_num;    /* num of worker threads.*/
    list *command_requests;
    struct cds_wfcq_head command_requests_head;
    struct cds_wfcq_tail command_requests_tail;
    int socketpairs[2];
    q_eventloop qel;
};

General

redisServer.db

server.db 為資料庫本體,型態為 redisDb ,同樣定義在 server.h 中,在 server.c 裡面的 initServer 函式進行初始化,根據設定檔設定的數量建立資料庫,而 dict 就是主要儲存資料的地方。

typedef struct redisDb {
    struct q_dict *dict;        /* The keyspace for this DB */
    struct q_dict *expires;     /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    struct evictionPoolEntry *eviction_pool;    /* Eviction pool of keys */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;
redisServer.commands, redisServer.orig_commands, redisCommand

接著來看 mt-Redis 是如何儲存用戶要執行的命令。 mt-Redis 使用 redisCommand 這個結構儲存命令, name 是用來儲存命令的名稱, proc 指標指向用來處理這個命令的函式, arity 代表這個命令的參數數量,如果是負值,例如 -N ,代表參數數量大於 Nsflagsflags 都是儲存命令的 flag ,但 sflags 是以字串儲存, flags 是以 bit mask 的方式儲存, firstkey 代表第一個參數是 keylastkey 代表最後一個參數是 keykeystep 則是參數與參數之間的距離,以 MSET 指令來說, keystep 為 2 ,帶表參數的形式為 key, value, key valuegetkeys_proc 則是在以上三個變數沒辦法決定哪個參數是 key 時使用以找出 key 在參數中的位置, microseconds 紀錄這個命令的執行時間, calls 紀錄這個命令被呼叫多少次。

typedef void redisCommandProc(client *c);
typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags; /* Flags as string representation, one char per flag. */
    int flags;    /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
};

以下是命令的參數列表。

w: write command (may modify the key space).
r: read command  (will never modify the key space).
m: may increase memory usage once called. Don't allow if out of memory.
a: admin command, like SAVE or SHUTDOWN.
p: Pub/Sub related command.
f: force replication of this command, regardless of server.dirty.
s: command not allowed in scripts.
R: random command. Command is not deterministic, that is, the same command with the same arguments, with the same key space, may have different results. For instance SPOP and RANDOMKEY are two random commands.
S: Sort command output array if called from script, so that the output is deterministic.
l: Allow command while loading the database.
t: Allow command while a slave has stale data but is not allowed to server this data. Normally no command is accepted in this condition but just a few.
M: Do not automatically propagate the command on MONITOR.
k: Perform an implicit ASKING for this command, so the command will be accepted in cluster mode if the slot is marked as 'importing'.
F: Fast command: O(1) or O(log(N)) command that should never delay its execution as long as the kernel scheduler is giving us time. Note that commands that may trigger a DEL as a side effect (like SET) are not fast commands.

完整的命令表格 redisCommandTable 可以參考 src/server.c

struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    ...
    {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
    {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
};

redisServer 中使用了兩個字典建立表格,一個是要用來修改的,因為使用者可以在設定檔中修改命令的名字,在建立資料庫時,會執行 loadServerConfig 函式載入設定檔,所以要額外建立一個不會被設定檔影響的指令表格,也就是原本資料庫內建的指令。建立表格是在 initServerConfig 中使用 populateCommandTable 這個函式,將每個命令從 redisCommandTable 取出,並將 sflags ,也就是這個命令的 flags 用字串表示,轉換成 flags ,以整數的形式表示,最後加入 commandsorig_commands 兩個字典中,而字典的 key 使用的是 sds (simple dynamic string) ,而不是普通的字串。

/* Populates the Redis Command Table starting from the hard coded list
 * we have on top of redis.c file. */
void populateCommandTable(void) {
    int j;
    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

    for (j = 0; j < numcommands; j++) {
        struct redisCommand *c = redisCommandTable+j;
        char *f = c->sflags;
        int retval1, retval2;

        while(*f != '\0') {
            switch(*f) {
                case 'w': c->flags |= CMD_WRITE; break;
                case 'r': c->flags |= CMD_READONLY; break;
                case 'm': c->flags |= CMD_DENYOOM; break;
                case 'a': c->flags |= CMD_ADMIN; break;
                case 'p': c->flags |= CMD_PUBSUB; break;
                case 's': c->flags |= CMD_NOSCRIPT; break;
                case 'R': c->flags |= CMD_RANDOM; break;
                case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
                case 'l': c->flags |= CMD_LOADING; break;
                case 't': c->flags |= CMD_STALE; break;
                case 'M': c->flags |= CMD_SKIP_MONITOR; break;
                case 'k': c->flags |= CMD_ASKING; break;
                case 'F': c->flags |= CMD_FAST; break;
                case 'd': c->flags |= CMD_SERVER_THREAD; break;
                default: serverPanic("Unsupported command flag"); break;
            }
            f++;
        }

        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        /* Populate an additional dictionary that will be unaffected
         * by rename-command statements in redis.conf. */
        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
        //serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
    }
}

建立完表格之後,對於那些常用的指令,像是 dellpush 等,使用 lookupCommandByCString 建立能快速連結到指令的指標,以減少查表的時間,以 dictFetchValue 函式在 commands 字典中找到相對應的 key 並回傳 value ,而 value 就是我們要的指令。

struct redisCommand *lookupCommandByCString(char *s) {
    struct redisCommand *cmd;
    sds name = sdsnew(s);

    cmd = dictFetchValue(server.commands, name);
    sdsfree(name);
    return cmd;
}
redisServer.el

在原本的 Redis 中,使用的 aeEventLoop ,使用 aeCreateEventLoop 進行初始化,但在 mt-Redis 中支援多執行緒,所以使用 q_eventloopaeEventLoop 包裝起來,用 q_eventloop_init 初始化。

q_eventloop_init(&server.qel, server.maxclients + CONFIG_FDSET_INCR);
server.el = server.qel.el;

Networking

  • redisServer.port

port 定義了資料庫連線的連接埠,用戶使用這個連接埠進行連線,如果沒有使用 redis.conf 或是 --port 特別指定要使用哪一個連接埠,預設會是 CONFIG_DEFAULT_SERVER_PORT ,也就是 6379 。

~/mt-redis/src main* 5s ❯ ./redis-server
26516:C 16 May 15:40:08.672 # Warning: no config file specified, using the default config. In order to specify a config file use ./redis-server /path/to/redis.conf
26516:M 16 May 15:40:08.673 * Increased maximum number of open files to 10032 (it was originally set to 1024).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 3.2.13 (74300e3c/1) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 26516
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

redisServer.bindaddr, redisServer.bindaddr_count

如果沒有在 redis.conf 指定 bind 配置指令, mt-Redis 預設會監聽伺服器上所有可用的網絡接口。可以使用 bind 配置指令,後面指定一個或多個 IP 地址,僅監聽一個或多個選定的接口。 bindaddr_count 就是指 bind 的數量。

先使用 ip addr 指令查看我電腦的網路裝置,有兩個網路裝置,其 IP 分別為 127.0.0.1192.168.50.17

➜ andyyeh@andyyeh-ubuntu  ~/mt-redis/src git:(main) ✗ ip addr                                                 
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host 
       valid_lft forever preferred_lft forever
2: wlp2s0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether 9c:fc:e8:f4:f8:30 brd ff:ff:ff:ff:ff:ff
    inet 192.168.50.17/24 brd 192.168.50.255 scope global dynamic noprefixroute wlp2s0
       valid_lft 77095sec preferred_lft 77095sec
    inet6 fe80::4bfe:d1ff:dd01:73c6/64 scope link noprefixroute 
       valid_lft forever preferred_lft forever

當我在 redis.conf 將指定 bind0.0.0.0 ,使用 127.0.0.1192.168.50.17 都可以連接到伺服器。

➜ andyyeh@andyyeh-ubuntu  ~/mt-redis/src git:(main) ✗ curl http://192.168.50.17:6379
curl: (52) Empty reply from server
➜ andyyeh@andyyeh-ubuntu  ~/mt-redis/src git:(main) ✗ curl http://127.0.0.1:6379
curl: (52) Empty reply from server

當我在 redis.conf 將指定 bind192.168.50.17 ,就只有 192.168.50.17 連接的到伺服器。

➜ andyyeh@andyyeh-ubuntu  ~/mt-redis/src git:(main) ✗ curl http://192.168.50.17:6379                          
curl: (52) Empty reply from server
➜ andyyeh@andyyeh-ubuntu  ~/mt-redis/src git:(main) ✗ curl http://127.0.0.1:6379
curl: (7) Failed to connect to 127.0.0.1 port 6379 after 0 ms: Connection refused

也就是說,當 bind 多個 IP 位置時,執行伺服器的電腦上必須有相對應的網路裝置,並且可以指定這些網路裝置的 IP 接收用戶的請求,若將 bind 設為 0.0.0.0 ,則可以接受從所有網路裝置的 IP 進行連接。

mt-Redis Clients

redisServer 中,有紀錄用戶的鏈結串列,與其他與用戶相關的資料,我們放到探討 client 這個結構時一起討論。

typedef struct client {
    q_eventloop *qel;       /* Eventloop of the worker's thread which handles this client.*/
    int curidx;             /* The worker idx that this client current belogn to.*/
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    int dictid;             /* ID of the currently SELECTed DB. */
    ...
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    ...
    list *reply;            /* List of reply objects to send to the client. */
    ...
} client;

在介紹 worker 事件循環時,就有看過 client 結構是怎麼建立的, qel 代表處理這個 client 的 worker 事件循環, curidx 代表 worker 的 ididclient 自己的 id , fd 則是在 acceptTcpHandler 取得的使用者 file descriptor , reply 鏈結串列是要 worker 要回覆給使用者的訊息。