# 看 valkey 的資料結構 ## serverObj 在Valkey 資料庫中的每個值都由一個 `robj`(原 redisObject,現在改名成 serverObject)表示,其中包含類型、編碼和引用計數……等資訊。 ```c typedef struct serverObject robj; struct serverObject { unsigned type : 4; unsigned encoding : 4; unsigned lru : LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ // ... unsigned refcount : OBJ_REFCOUNT_BITS; void *ptr; }; ``` - type: 標識資料類型 (string, list, set, hash, sorted set, stream, or module,以下將介紹) - encoding: 指定優化的內部表示格式 - refcount: 記憶體管理的引用計數 - ptr: 指向實際資料的位置 ### robj 用途 進行指令的處理。 以處理 SETRANGE 命令時的部分程式碼為例。 ```c // t_string.c o = createObject(OBJ_STRING, sdsnewlen(NULL, offset + sdslen(value))); dbAdd(c->db, c->argv[1], &o); // serverDb *db, robj *key, robj **valref ``` - 使用 createObject 將 SDS 字串包裝成 robj。具體而言是先使用 `sdsbewlen` 創建指定長度的 SDS 字串,之後 assign 給 robj 中的 ptr,另外,robj 中的 type 設為 OBJ_STRING、refcount 設 1。 - dbAdd 會將此 robj 加入到資料庫中。此函式行為會做相關初始化、 根據 key 而決定的雜湊表 (`kvstore`)與其索引,將加到 key-value 適當位置,並做 `notifyKeyspaceEvent()`。 ## 兩版通用雜湊表 舊版的通用雜湊實作是在 dict.c,而在 valkey 8.1 引入新的 hashtable.c。 目前 8.1 中,尚未將所有通用雜湊都改用 hashtable,但如以上提及的 kvstore,就改使用 hashtabl 了。 ## dict.c 在 valkey 原始檔案中,有 dict.c 還有 t_hash.c 與 t_set.c 三者檔案中,第一個檔案是提供通用的 key-value 的操作,後兩者在特定情況下會使用 dict.c 作為底層儲存,但也會根據資料大小和特性選擇其他更適合的編碼方式(如 listpack、intset) 來改進記憶體使用與效能。 ### dict.c 提供通用的 key-value 具有幾大特色 - 由陣列存值,並用 single linked list 做 chaining 以解決 collision 所組成 - 基本雜湊表操作:新增、刪除、查找 - 動態調整大小:自動擴展和收縮 - 增量式重新雜湊(Incremental rehashing) ### struct 需先了解兩個資料結構: `struct dictEntry` 還有 `struct dict` #### 1. dictEntry 這是 hash table 中的每個 entry,也就是 key-value 對: `dictEntry` 是一個共用抽象名詞,實際上背後可能是兩種不同的 struct - `dictEntryNormal` 用在傳統情況下(key 是獨立指標)。 - `dictEntryEmbedded` 將 key (以及有時是 value) 直接嵌入到 dictEntry 結構體內部,而不是透過一個單獨的指標指向另一個記憶體位置,藉此省記憶體與減少指標跳轉 以下用 dictEntryNormal 理解 entry,會存有 key, value 還有指向下一項的指標。 ```c typedef struct { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; /* Next entry in the same hash bucket. */ } dictEntryNormal; ``` #### 2. dict 存 hashTable 內容的就是 `ht_table[2]`,其餘 struct 成員皆是處理雜湊時所需。 ```c struct dict { dictType *type; // Operation type of this dict, providing callbacks such as hash, comparison, free, etc. dictEntry **ht_table[2]; // Two hash tables, supporting Incremental rehash ([0]: old, [1]: new) unsigned long ht_used[2]; // Number of entries actually used in each table long rehashidx; /* rehashing not in progress if rehashidx == -1 */ int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ signed char ht_size_exp[2]; /* Exponent of hash table size (actual size = 2^exp) */ int16_t pauseAutoResize; /* If >0 automatic resizing is disallowed (<0 indicates coding error) */ void *metadata[]; // Used to remember its position in the rehashing list }; ``` 所有資料可能會分散於 `ht_table[0]` 或 `ht_table[1]` 之中,具體在 rehash 部分再說明。 dictType 為 **callback 函式集合**,讓 `dict` 支援多種 key/value 的處理邏輯,例如: - `hashFunction`:計算 hash 值 - `keyCompare`:比對 key 是否相等 - `keyDestructor` / `valDestructor`:釋放記憶體 - `no_value`:設定是否為 set 模式(只有 key,沒有 value) dict 結構的示意圖 > It's a chained hash table, so if multiple keys are hashed to the same slot in the table, their key-value entries form a linked list. That's what the "next" pointer in the dictEntry is for. ![image](https://hackmd.io/_uploads/BktAq4E4ge.png) 圖片來源: [A new hash table](https://valkey.io/blog/new-hash-table/) ### 雜湊表中判斷該 expand 或 shrink 當雜湊表的元素數量達到一定的比率時,會根據需要自動進行擴容或收縮操作。這些操作是由全局設置和具體雜湊表類型的控制函式來決定的。 `DICT_RESIZE_ENABLE` 與 `DICT_RESIZE_FORBID` 是個全域變數,去控制全局 dict 是否可以 resize,而使用 `dictTypeResizeAllowed()`,則是更細緻的控制,根據具體的 dictType 的 `resizeAllowed()` 再次判斷此雜湊表是否可以擴容 **擴容(Expand)** 雜湊表會在兩個條件觸發擴容 - 當元素數量與儲存桶的比率(元素數量 / 桶數量),且雜湊表的元素數量 (`d->ht_used[0]`) 大於等於當前桶數的大小 (`DICTHT_SIZE(d->ht_size_exp[0]`),則觸發擴容。 - 即使 DICT_RESIZE_FORBID(禁止擴容),但元素數量超過 `dict_force_resize_ratio` (預設是 4) 倍的桶數時,仍會觸發擴容。 程式邏輯如下: ```c // int dictExpandIfNeeded(dict* d) if ((dict_can_resize == DICT_RESIZE_ENABLE && d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) || (dict_can_resize != DICT_RESIZE_FORBID && d->ht_used[0] >= dict_force_resize_ratio * DICTHT_SIZE(d->ht_size_exp[0]))) { if (dictTypeResizeAllowed(d, d->ht_used[0] + 1)) dictExpand(d, d->ht_used[0] + 1); return DICT_OK; } ``` 補充相關: - `DICTHT_SIZE(exp)` 巨集計算桶的大小,根據 exp 的值,返回 2 的 exp 次方作為桶的數量。若 exp 為 -1,則返回 0 - `dict_can_resize` 預設為 `DICT_RESIZE_ENABLE`,表示允許擴容。 **收縮(Shrink)** 觸發收縮條件: - 收縮則是當雜湊表的負載因子過低 (1/8)時觸發: - 即使禁止擴容,但元素數量與桶數的比率 達 1/32 下限,仍會觸發收縮 ```c // int dictShrinkIfNeeded(dict* d) if ((dict_can_resize == DICT_RESIZE_ENABLE && d->ht_used[0] * HASHTABLE_MIN_FILL <= DICTHT_SIZE(d->ht_size_exp[0])) || (dict_can_resize != DICT_RESIZE_FORBID && d->ht_used[0] * HASHTABLE_MIN_FILL * dict_force_resize_ratio <= DICTHT_SIZE(d->ht_size_exp[0]))) { if (dictTypeResizeAllowed(d, d->ht_used[0])) dictShrink(d, d->ht_used[0]); return DICT_OK; } ``` 相關補充 - `#define HASHTABLE_MIN_FILL 8` - `static unsigned int dict_force_resize_ratio = 4` ### rehashing 的步驟 (`dictShrink` 還有 `dictExpand`) **rehasing** 當要做擴容還是收縮時,需要創建新的雜湊表,這導致雜湊表的 size 和 sizemask 變化,而 key 的查詢與 sizemask 有關,因此此時必須對對雜湊表中的每一個 key 重新計算索引,插入新的雜湊表。rehash 指的是重新插入值到新雜湊表的整個過程。 **Incremental rehahsing** 避免確保 rehashing 阻塞主執行緒太長時間,每次字典訪問(包括查找、插入、刪除)都會推進 rehashing 進度,在 rehashing 期間,所有數據仍然可以被正確找到。 **在增刪查改中的 rehashing** - 插入(dictAdd、dictReplace):新的鍵值對總是直接插入到 `dict.ht[1]` 中。這確保了 `ht[0]` 只會縮小並最終變為空。 - 尋找、修改和刪除(dictFind、dictDelete):這些操作必須同時檢查 `dict.ht[0]` 和 `dict.ht[1]`。它們首先嘗試在 `dict.ht[0]` 中尋找鍵。如果在那裡找不到,則檢查 `dict.ht[1]`。這保證了資料的一致性,因為在 rehash 期間,鍵可能駐留在任一表中。 **rehash 的處理過程是這樣的: (`dictResizeWithOptionalCheck`)** 1. 計算新雜湊表的大小(newsize),值取決於目前要做的是擴容還是收縮: - 如果是擴容,則 newsize 為第一個大於等於 `ht[0].used+1` 的 $2^n$ - 如果收縮, newsize 是大於或等於 `ht[0].used` 的 2 的最小冪(最小大小為 4,以防止表過小)。 2. 依照 newsize 申請記憶體空間,建立 new_ht_table ,並賦值給`dict.ht[1]` 3. 設定`dict.rehashidx = 0`,標示開始 rehash。(This rehashidx acts as a pointer, indicating the next bucket in ht[0] to be rehashing) <!-- 將`dict.ht[0]`中的每一個 dictEntry 都 rehash 到 `dict.ht[1]` --> 4. 增量遷移 (`dictRehash(dict, n)` 函式) - 當如果`dict.ht[0].table[rehashidx]` 的 entry 做 rehash 到 `dict.ht[1]` 後,就將 rehashidx++。直至`dict.ht[0]` 的 n 個數都 rehash 到 `dict.ct.ht[1]` 5. rehash 的收尾。將 `dict.ht[1]` 值給 `dict.ht[0]`,給 `dict.ht[1]` 初始化為空雜湊表,釋放原來的 `dict.ht[0]` 的記憶體空間 6. 將 rehashidx 賦值為 -1,代表 rehash 結束 ```c // Performs N steps of incremental rehashing. int dictRehash(dict *d, int n) { // Perform incremental rehash while (n-- && d->ht_used[0] != 0) { while (d->ht_table[0][d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; } /* Move all the keys in this bucket from the old to the new hash HT */ rehashEntriesInBucketAtIndex(d, d->rehashidx); d->rehashidx++; } return !dictCheckRehashingCompleted(d); ``` - `rehashEntriesInBucketAtIndex()` 將指定桶(`ht[0][d->rehashidx]`)中的所有 dictEntry 從舊雜湊表搬遷到新雜湊表 當所有元素都完成搬遷後,`dictCheckRehashingCompleted()` 會檢查 `ht[0]` 是否已經空了,並將 `ht[1]` 賦值給 `ht[0]`,ht[1] 清空,並設 `d->rehashidx = -1;`,表示完成 reahsing 過程。 :::success 疑問: mt-redis 怎麼處理 rehashing? 這問題需要先看他的資料結構 ```c typedef struct q_dictEntry { unsigned type:4; // four data structure types: string, list, set, zset and hash void *key; union { void *val; int64_t s64; } v; struct cds_lfht_node node; struct rcu_head rcu_head; } q_dictEntry; typedef struct q_dict { unsigned int size; struct cds_lfht *table; void *privdata; } q_dict; typedef struct q_dictIterator { q_dict *d; struct cds_lfht_iter iter; } q_dictIterator; ``` 在 q_dict 中沒有 `ht_table[2]`,在 mt-redis 中的 q_dict 中的 lock-free hash table 本身就支持動態調整大小,不需要像傳統 dict 那樣進行手動的 incremental rehashing。 調用 `#include <urcu/rculfhash.h>`,使用 cds (Concurrent Data Structures) 的 API rculfhash 的介紹 > Lock-Free Resizable RCU Hash Table. RCU used to provide existence guarantees. Provides scalable updates, and scalable RCU read-side lookups and traversals. > Unique and duplicate keys are supported. Provides "uniquify add" and "replace add" operations, along with associated read-side traversal uniqueness guarantees. Automatic hash table resize based on number of elements is supported. See the API for more details. ::: ### 新增 element 到雜湊表之中 `dictAdd()` 流程圖 ``` dictAdd(dict, key, val) │ ├──▶ dictAddRaw(dict, key, NULL) │ │ │ ├──▶ dictFindPositionForInsert() → 找插入位置(或發現 key 已存在),其中會討論是否擴容 │ ├──▶ dictType->keyDup()(若有)→ 複製 key │ └──▶ dictInsertAtPosition() → 插入 dictEntry │ ├──▶ dictSetVal() → 設定 value(這 hashTable 非 set) └──▶ return DICT_OK ``` **`void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing)`** (dictAddRaw 中賦值給 exisiting 值是 NULL) > dictFindPositionForInsert 用來計算 key 的雜湊值,並返回要插入的位置(如果該位置已經有相同的 key,則返回 NULL 並填充 existing 變數)。 具體操作: 1. 計算 key 的雜湊值 (`hash`),後得 idx,idx 是新元素在雜湊表(dict 中 ht0)的位置 ```c uint64_t hash = dictHashKey(d, key); idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[0]); ``` 2. 判斷是否正在 rehashing,就著就判斷 - 如果當前操作碰巧存取了 `ht[0]` 中一個還沒被 rehash 的 butcket (`idx >= d->rehashidx` 並且 `ht_table[0][idx] 不為空)`),那就 就將透過 `dictBucketRehash()` 在同一個 bucket 中做 rehashing - 否則,透過 ` dictRehashStep(d)` 將 rehash 所有的 buckets。(如果 ht0 中沒有有效位置,則需要在全表範圍內繼續進行桶的重新雜湊,這部分見 `dictRehash(d, 1)`) ```c if (dictIsRehashing(d)) { if ((long)idx >= d->rehashidx && d->ht_table[0][idx]) { dictBucketRehash(d, idx); } else { dictRehashStep(d); } } ``` - dictIsRehashing(d) 用來檢查是否正在進行雜湊表的擴展或收縮過程。 :::success Q: valkey 只有是單執行緒 ,為甚麼還需要判斷是否有正在進行 rehashing? A: 因為是 incremental rehashing,這允許 Valkey 不因為雜湊表大小變化造成了阻塞,藉由逐步完成數據的遷移,確保了高響應性和可用性 ::: 3. 遍歷兩張雜湊表(`ht_table[0]` 和 `ht_table[1]`)來檢查 idx 位置是否已經有該 key。如果已經有,則返回 NULL,並且可以填充 existing,指向原有的 dictEntry。 ```c dictExpandIfAutoResizeAllowed(d); for (table = 0; table <= 1; table++) { if (table == 0 && (long)idx < d->rehashidx) continue; idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]); /* Search if this slot does not already contain the given key */ he = d->ht_table[table][idx]; while (he) { void *he_key = dictGetKey(he); if (key == he_key || dictCompareKeys(d, key, he_key)) { if (existing) *existing = he; return NULL; } he = dictGetNext(he); } ``` - dictCompareKeys() : 檢查計算出的位置是否已有相同的鍵 4. 返回插入的位置 若無相同的 key,則返回該位置(`bucket`) ```c dictEntry **bucket = &d->ht_table[dictIsRehashing(d) ? 1 : 0][idx]; return bucket; ``` **`dictEntry *dictInsertAtPosition(dict *d, void *key, void *position)`** <!-- position 是 bucket。 entry 是? --> 1. htidx 決定 element 是要插入到哪一個表裡,如果正在進行 rehashing,則元素插入到 `ht_table[1]` 中,否則插入到 `ht_table[0]` 中 ```c // dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) { int htidx = dictIsRehashing(d) ? 1 : 0; ``` 2. 創建 entryEntry (`entry`) 之後,就插入到 bucket 位置,並更新該表的元素數量。 ```c *bucket = entry; d->ht_used[htidx]++; ``` **dictSetVal** 會根據 dictEntry 類型(嵌入式或普通型)來設置對應的值 ```c void dictSetVal(dict *d, dictEntry *de, void *val) { DICT_SET_VALUE(de, v.val, val); } ``` ```c #define DICT_SET_VALUE(de, field, val) if (entryIsNormal(de)) { \ dictEntryNormal *_de = decodeEntryNormal(de); \ _de->field = val; ``` ```c static inline void *decodeMaskedPtr(const dictEntry *de) { return (void *)((uintptr_t)(void *)de & ~ENTRY_PTR_MASK); } // #define ENTRY_PTR_MASK 7 /* 111 */ ``` 所以 dictSetVal 等於是 (void*)(de & 111)->v.val = val (v 是指 structEntry 中型別為 union 的成員) #### dictFind 流程圖 ``` dictFind(d, key) │ ├──▶ dictSize(d) == 0? --> 是 -> 返回 NULL │ ├──▶ dictHashKey(d, key) --> 計算雜湊值 h │ ├──▶ idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[0]) --> 計算雜湊索引 │ ├──▶ dictIsRehashing(d)? --> 是 │ ├──▶ 是否處理正在重雜湊的槽位 -> 如果有,則進行 `dictBucketRehash(d, idx)` │ └──▶ 否 -> 進行 incremental rehash -> `dictRehashStep(d)` │ ├──▶ 遍歷 `ht_table[0]` 和 `ht_table[1]`: │ ├──▶ 找到該槽位的 `dictEntry`,若是 linked list,則遍歷 `dictEntry` │ ├──▶ 若找到匹配的 key,返回對應的 `dictEntry` │ └──▶ 若無匹配,繼續下一個槽位或表格 │ └──▶ 最終若未找到,返回 NULL ``` #### dictDelete ``` dictDelete(dict, key) │ ├──▶ dictSize(d) == 0? --> 是 → 返回 NULL(字典為空) │ ├──▶ 計算 key 的雜湊值 (h = dictHashKey(d, key)) │ ├──▶ dictIsRehashing(d)? --> 是 │ ├──▶ idx >= rehashidx 且 ht_table[0][idx] 存在 → 在新表 (ht[1]) 中查找並刪除 │ └──▶ 否 → 調用 dictRehashStep(d) 進行增量 rehashing │ ├──▶ 遍歷 `ht[0]` 和 `ht[1]` 查找目標 key │ ├──▶ 找到 key,調用 dictDeleteEntry(d, he, idx) 刪除該元素 │ └──▶ 未找到,繼續查找或返回 NULL │ └──▶ 返回刪除結果 ``` #### 討論 dict.c 是否合適加入 RCU 以提升並行化層度? RCU 適用於讀多寫少的場景,但仔細閱讀 dict 的API 實做後發現不適合的幾點: - `dictRehash`:這涉及到雜湊表擴展,會改變雜湊表的結構 - `dictAdd`、`dictDelete`:這是對字典進行插入、刪除操作的方法,會修改雜湊表,過程中會 incremental rehash - `dictFind`:這是查找操作,並不會直接修改資料,但過程中也會 做 incremental rehash 因此對於 RCU 加入此核心的雜湊資料結構,並不適合。 然而可考慮如 mt-redis 一般,使用 lock-free hash table API,不需要實作 incremental rehash。 ## hashtable.c valkey 8.1 引入的新的雜湊表,因為發現舊有的 dict.c 實作有幾點缺點,例如如為了查找鍵值 "FOO" 並存取其值"BAR",這仍然需要從記憶體中讀取四次。如果發生雜湊衝突,它必須針對每次 collision 額外追蹤另外兩個指針,因此需要從記憶體中多讀取兩次(鍵值和下一個指針)。 <!-- 補 dict collision 的處理過程--> hashtable.c 的設計 table 不再是指向 dictEntry 而是 bucket。 ![image](https://hackmd.io/_uploads/rJxIU4BVll.png) 當 bucket 較滿,使用 bucket chain 方式去擴展,也就是最後一個元素的 slot 將替換成指向 child bucket 的指針。 ```c struct hashtable { hashtableType *type; ssize_t rehash_idx; /* -1 = rehashing not in progress. */ bucket *tables[2]; /* 0 = main table, 1 = rehashing target. */ size_t used[2]; /* Number of entries in each table. */ int8_t bucket_exp[2]; /* Exponent for num buckets (num = 1 << exp). */ int16_t pause_rehash; /* Non-zero = rehashing is paused */ // ... }; ``` ![image](https://hackmd.io/_uploads/rJXhj4BEeg.png) 在同一個 Bucket 或 Bucket Chain 的元素沒有做內部排序。當新 entry 要插入 buckt 時,可以使用任何空閒 slot。 ```c #define ENTRIES_PER_BUCKET 12 #define BUCKET_BITS_TYPE uint16_t typedef struct hashtableBucket { BUCKET_BITS_TYPE chained : 1; BUCKET_BITS_TYPE presence : ENTRIES_PER_BUCKET; uint8_t hashes[ENTRIES_PER_BUCKET]; void *entries[ENTRIES_PER_BUCKET]; } bucket; ``` bucket 模樣,其中的 meta 區域 (chain, presence 與 hashes) 大約 14 Bytes。 (32系統、64系統) `chained` 使用 bit field 的寫法表示只佔 1 Bit。表示是否有接 child bucket。`present` 表示在 Bucket 中每一個 slot 是否是有值的。`chain` 與 `present` 佔 2 Bytes。`hashes` 為每一個 entry 的 secondary hash,當 hash collision 發生會用上的成員。 ![image](https://hackmd.io/_uploads/HJ3UeUHNlx.png) 64-bit version, 7 entries per bucket: 1 bit 7 bits [1 byte] x 7 [8 bytes] x 7 = 64 bytes chained presence hashes entries 32-bit version, 12 entries per bucket: 1 bit 12 bits 3 bits [1 byte] x 12 2 bytes [4 bytes] x 12 = 64 bytes chained presence unused hashes unused entries ### 如果發生 hash collision Bucket Chaining ## Strings 字串 (String) 實作檔案: `src/t_string.c` <!-- 物件定義: `src/object.c` --> 實作檔案: `src/sds.c`, `src/sds.c` 字串存儲位元組序列,包括文本、serialized objects 和二進位陣列。字串通常用於緩存,但它們支援其他功能,這些功能**允許實現計數器並執行 bitwise 運算**。 ### C 的字串的問題 舉例: ``` char *s = "hello"; ``` 1. 獲得字串長度需要運算 (算到 `\0`) 2. 非二進位安全 (不可以出現 `\0` 等特殊字元在字串中) 3. 不可修改 ## SDS 特色 1. **二進位安全 (Binary Safe)**:SDS 字串可以包含任意位元組,包括 `\0` 字元,因為其長度是明確儲存在字串頭部 2. **自動擴容 (Automatic Resizing)**:當進行追加操作時,如果現有緩衝區空間不足,SDS 會自動擴大緩衝區,並且會預分配額外的空間 (貪婪模式,`SDS_MAX_PREALLOC`) 以減少頻繁的重新分配 3. **相容 C 字串 (C String Compatible)**:SDS 字串**始終以 `\0` 終止**,這使得它可以直接傳遞給需要 C 字串的函式 (例如 `printf()`) ## valkey 中 set command 看見與 sds 的關係 ```c robj *createStringObject(const char *ptr, size_t len) { // if len > OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createRawStringObject(ptr, len); } ``` ```c robj *createRawStringObject(const char *ptr, size_t len) { return createObject(OBJ_STRING, sdsnewlen(ptr, len)); } ``` `sdsnewlen()` 將創建一個全新的 SDS 實例並初始化其內容 ## simple dynamic string (SDS) 支援五種 sdshdr 種類: sdshdr5, sdshdr8, sdshdr16, sdshdr32, sdshdr64 (sdshdr5 不再使用) 以 sdshrd8 的結構體為例: ```c struct __attribute__ ((__packed__)) sdshrd8 { uint8_t len; uint8_t alloc; unsigned char flags; char buf[]; } ``` `__attribute__((__packed__))`使結構中沒有額外的填充位元組。 header 包含三個成員: - len 表示 buf 保存的字串長度。 - alloc 為 buf 總長度,不包含 header 與 \0 結尾。 - flag 表示 SDS 的類型,flags 是用來控制 SDS 的 header 大小 。 flag 有以下類型: <!-- `SDS_TYPE_5`: 用於短字串 (長度小於 32 個字元)。長度和類型資訊都儲存在一個 `unsigned char` 中。這種 header 不包含 `alloc` 資訊--> - `SDS_TYPE_8`: 用於長度小於 256 個字元的字串。 - `SDS_TYPE_16`: 用於長度小於 216 個字元的字串。 - `SDS_TYPE_32`: 用於長度小於 232 個字元的字串。 - `SDS_TYPE_64`: 用於長度小於 264 個字元的字串。 buf 為一個字元陣列。這個字元數組的長度等於最大容量 +1。真正有效的字串長度通常小於最大容量。因此在真正的字串資料之後會有 '\0' 字元與若有空餘其未用的位元組,這允許在不重新分配記憶體的前提下,讓字串資料向後做有限的擴展。 ### sdsnewlen 看到返回 buf 的開頭位置 sdsnewlen()(以及其他所有返回 sds 型別的函式,如 sdscat、sdsdup 等)返回的 sds 都不是返回 sdshdr 的開頭,而是 buf 開頭位置。 ```c typedef char *sds; ``` ```c /* Create a new sds string starting from a null terminated C string. */ sds sdsnew(const char *init) { size_t initlen = (init == NULL) ? 0 : strlen(init); return sdsnewlen(init, initlen); } ``` ```c /* Create a new sds string with the content specified by the 'init' pointer * and 'initlen'. * If NULL is used for 'init' the string is initialized with zero bytes. * If SDS_NOINIT is used, the buffer is left uninitialized; * * The string is always null-terminated (all the sds strings are, always) so * even if you create an sds string with: * * mystring = sdsnewlen("abc",3); * * You can print the string with printf() as there is an implicit \0 at the * end of the string. However the string is binary safe and can contain * \0 characters in the middle, as the length is stored in the sds header. */ sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) { // ... return sdswrite(sh, bufsize, type, init, initlen); } ``` ```c /* Writes an sds with type `type` into a buffer `buf` of size `bufsize`. Returns * an sds handle to the string within the buffer. Use `sdsReqType(length)` to * compute the type and `sdsReqSize(length, type)` to compute the required * buffer size. You can use a larger `bufsize` than required, but usable size * can't be greater than `sdsTypeMaxSize(type)`. */ sds sdswrite(char *buf, size_t bufsize, char type, const char *init, size_t initlen) { assert(bufsize >= sdsReqSize(initlen, type)); int hdrlen = sdsHdrSize(type); size_t usable = bufsize - hdrlen - 1; sds s = buf + hdrlen; unsigned char *fp = ((unsigned char *)s) - 1; /* flags pointer. */ switch (type) { case SDS_TYPE_8: { SDS_HDR_VAR(8, s); sh->len = initlen; assert(usable <= sdsTypeMaxSize(type)); sh->alloc = usable; *fp = type; break; } } if (init == SDS_NOINIT) init = NULL; else if (!init) memset(s, 0, initlen); if (initlen && init) memcpy(s, init, initlen); s[initlen] = '\0'; return s; } ``` `sdsnew()` 設計成讓 sds 指向字串數據 (`buf`)的開頭,這樣才能與 C 語言的 char* 習慣所對齊。 因此 sds 能如 C 字串的能直接讀取。 ```c sds my_string = sdsnew("hello"); char *content = my_string; printf("%s\n", content); ``` 而讀取 sds 的 header 會以 `s[-1]` 取 flag 或 `SDS_HDR` 巨集取得 header 的開頭位置。 ``` #define SDS_HDR(T, s) ((struct sdshdr##T *)((s) - (sizeof(struct sdshdr##T)))) ``` `sizeof(struct sdshdr##T)` 會計算出對應類型 T 的 SDS header (len, alloc, flags) 的總大小 ![image](https://hackmd.io/_uploads/BJcijmTmll.png) <!-- ![image](https://hackmd.io/_uploads/B106j7pmgl.png) --> ### sds 具備動態擴容的功能 sds 分成貪婪與非貪婪模式, 前者根據是新增的字串長度新增並保有預留空間。後者為指擴容成剛剛好的大小。 ```c sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) { // ... size_t avail = sdsavail(s); /* Return ASAP if there is enough space left. */ if (avail >= addlen) return s; // get the size of header hdrlen = sdsHdrSize(type); if (greedy == 1) { if (newlen < SDS_MAX_PREALLOC) newlen *= 2; else newlen += SDS_MAX_PREALLOC; } // ... use_realloc = (oldtype == type); if (use_realloc) { // new header doesn't change newsh = s_realloc_usable(sh, hdrlen + newlen + 1, &bufsize); // ... } else { newsh = s_malloc_usable(hdrlen + newlen + 1, &bufsize); // ... } ``` - 如果新字串大小小於 1MB (`SDS_MAX_PREALLOC = 1024 * 1024` ),新空間可擴成字串長度的兩倍 + 1。 - 如果新字串大小大於 1M,新空間可擴成字串長度 + 1M + 1,採用一個較為保守的預分配策略。 見程式中的 `if (use_realloc)` 是處理的是當 SDS 擴展時,新計算出的 SDS 類型 (type) 與原始 SDS 的類型 (oldtype) 相同 的情況。這意味著 SDS header 的結構不變,因此用 s_relloc_usable() 擴展現有記憶體。然而 type 與 oldtype 不相同時,則表示 header 結構有變化,不能簡單使用 realloc,需要使用 malloc 進行重新分配並複製。 ### SDS 的惰性空間釋放機制 sdsclear() 函式將字串長度設為 0,但不釋放已分配的記憶體,而是將其保留為可用空間,以便後續的追加操作就不需要重新分配記憶體 ```c void sdsclear(sds s) { sdssetlen(s, 0); s[0] = '\0'; } ``` 新增內容的函式 (如 `sdscatlen()`) 在寫入 buf 之前,會呼叫 `sdsMakeRoomFor` ```c sds sdscatlen(sds s, const void *t, size_t len) { size_t curlen = sdslen(s); s = sdsMakeRoomFor(s,len); if (s == NULL) return NULL; memcpy(s+curlen, t, len); sdssetlen(s, curlen+len); s[curlen+len] = '\0'; return s; } ``` 在 sdsMakeRoomFor() 中,我們就可以看到記憶體重用機制。 ``` if (avail >= addlen) return s; ``` 當可用空間 (`sdsavail(s)`) 大於等於新增的長度 (`addlen`),那就直接返回原 SDS (`s`),不進行任何記憶體分配。 然而也有主動釋放記憶體的強況,如 `trimStringObjectIfNeeded()` 中能看到當可用空間超過字串長度 10% 時會調用顯性空間釋放的 `sdsRemoveFreeSpace()` 來釋放多餘空間。 `sdsRemoveFreeSpace()` 會移除多餘的可用空間,保留原本內容,返回一個新的 SDS 指針,原指針將失效。 ```c /* Reallocate the sds string so that it has no free space at the end. The * contained string remains not altered, but next concatenation operations * will require a reallocation. * * After the call, the passed sds string is no longer valid and all the * references must be substituted with the new pointer returned by the call. */ sds sdsRemoveFreeSpace(sds s, int would_regrow) { return sdsResize(s, sdslen(s), would_regrow); } ``` ```c void trimStringObjectIfNeeded(robj *o, int trim_small_values) { if (o->encoding != OBJ_ENCODING_RAW) return; /* A string may have free space in the following cases: * 1. When an arg len is greater than PROTO_MBULK_BIG_ARG the query buffer may be used directly as the SDS string. * 2. When utilizing the argument caching mechanism in Lua. * 3. When calling from RM_TrimStringAllocation (trim_small_values is true). */ size_t len = sdslen(o->ptr); if (len >= PROTO_MBULK_BIG_ARG || trim_small_values || (server.executing_client && server.executing_client->flag.script && len < LUA_CMD_OBJCACHE_MAX_LEN)) { if (sdsavail(o->ptr) > len / 10) { o->ptr = sdsRemoveFreeSpace(o->ptr, 0); } } } ``` 或是當客戶查詢的緩衝空間中,系統檢查有超過 4KB 的可用空間,並且客戶端閒置超過 2 秒沒有處理資料,即會使用 `sdsRemoveFreeSpace()` 釋放多餘空間。 ```c /* The client query buffer is an sds.c string that can end with a lot of * free space not used, this function reclaims space if needed. */ int clientsCronResizeQueryBuffer(client *c) { /* Only resize the query buffer if the buffer is actually wasting at least a * few kbytes */ if (sdsavail(c->querybuf) > 1024 * 4) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ size_t remaining = sdslen(c->querybuf) - c->qb_pos; if (!c->flag.primary && !remaining) { /* If the client is not a primary and no data is pending, * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ sdsfree(c->querybuf); /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. * We don't move the client to the shared query buffer immediately, because if we allocated a private * query buffer for the client, it's likely that the client will use it again soon. */ c->querybuf = NULL; } else { c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); } ``` `sdsFree()` 是調用 zfree_with_size,完全釋放整個 SDS 字串的記憶體,包括 header 和 buffer,之後該 SDS 指針就不能再使用了 ### 討論 sds 是否合適加入 RCU 以提升並行化程度? ## Lists Lists are lists of strings sorted by insertion order. 實作檔案: `src/t_list.c` 底層結構: `src/quicklist.c` , `src/listpack.c` 當 listpack 過大時,會轉換為 quicklist 關於 zliplist: 能在 sever.h 能看到註解寫 No longer used: old list/hash/zset encoding. 目前 valkey 使用 ziplist ,僅用於載入舊 RDB 檔案時實現向後相容 ### listpack ```c typedef struct { unsigned char *sval; uint32_t slen; long long lval; } listpackEntry; ``` - `unsigned char *sval;`: 如果元素是一個字串,`sval` 指向其內容。 - `uint32_t slen;`: 如果元素是一個字串,`slen` 是其長度。 - `long long lval;`: 如果元素是一個整數,`lval` 存儲其值。 - 在實際使用時,`sval` 和 `lval` 只有一個會被設置,另一個為 `NULL` 或不相關。 ### quicklist quicklist 是 linked list + ziplist 結構示意圖: `[head] ⇄ [ziplist1] ⇄ [ziplist2] ⇄ [ziplist3] ⇄ [tail]` 每個 `ziplistX` 都是一塊小型的 ziplist 記憶體區塊 #### 與 quicklist 相關的四個 strcut quicklist 是容器,quicklistNode 是節點,quicklistIter 提供遍歷能力,quicklistEntry 封裝元素訊息 ``` quicklist ├── head ──→ quicklistNode ──→ quicklistNode ──→ ... ──→ tail │ │ │ │ └── entry (listpack) └── entry (listpack) │ └── quicklistIter ──→ current (指向某個 quicklistNode) │ └── quicklistEntry (當前元素訊息) ``` 主要容器 ```c typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all listpacks */ unsigned long len; /* number of quicklistNodes */ signed int fill : QL_FILL_BITS; /* fill factor for individual nodes */ unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ unsigned int bookmark_count : QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist; ``` 指向一個 listpack,有指向前後指標是為了正反遍歷 ```c typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; unsigned char *entry; // stores actual data (listpack or plain data) size_t sz; /* entry size in bytes */ unsigned int count : 16; /* count of items in listpack */ unsigned int encoding : 2; /* RAW==1 or LZF==2 */ } quicklistNode; ``` quicklistIter 提供遍歷能力 ```c typedef struct quicklistIter { quicklist *quicklist; quicklistNode *current; unsigned char *zi; /* points to the current element */ long offset; /* offset in current listpack */ int direction; } quicklistIter; ``` quicklistEntry 是提供當前元素訊息 ```c typedef struct quicklistEntry { const quicklist *quicklist; // quicklist to which this entry belongs quicklistNode *node; // quicklistNode where this entry is located unsigned char *zi; // pointer within listpack unsigned char *value; long long longval; size_t sz; int offset; } quicklistEntry; ``` ## Sets Sets are unordered collections of unique strings that act like the sets from your favorite programming language. With a Set, you can add, remove, and test for existence in O(1) time (in other words, regardless of the number of set elements). 集合 (Set) 實作檔案: src/t_set.c valkey 針對不同情況,提供有三種 set, - Intset:一種記憶體高效率的編碼方式,適用於僅包含整數的集合。使用 binary search 做搜尋。 - Listpack:一種緊湊的編碼方式,適用於小型集合。用於包含非整數值或集合規模足夠小的情況。 - Hashtable:一種更具可擴展性的編碼方式,適用於大型集合。查找時間為 O(1)。 ### IntSet 概論 當 Set 中所有元素都是整數且數量不多時,Valkey 會自動選擇使用 intset 這種緊湊的資料結構來儲存,以減少記憶體開銷。 IntSet 結構定義 ```c typedef struct intset { uint32_t encoding; uint32_t length; // the number of elements in the set int8_t contents[]; // store the elements of set } intset; ``` Valkey 定義了三種編碼類型 intset 透過 encoding 欄位來動態調整其儲存的整數大小 分別為 16 位元, 32 位元, 64 位元 的整數 ```c #define INTSET_ENC_INT16 (sizeof(int16_t)) #define INTSET_ENC_INT32 (sizeof(int32_t)) #define INTSET_ENC_INT64 (sizeof(int64_t)) ``` 所有在 insert 中的元素唯一並且是升序儲存,因此查找元素時適用 binary Search, O(log N) 的時間複雜度 **inset 範例一** 假設已經插入 [16, 2, 5] 到 intset 裡: ``` ------------------------------------------------------------------ | encoding: INTSET_ENC_INT16 | length: 3 | contents: [2, 5, 16] | ------------------------------------------------------------------ ``` **創建 intset (`intsetNew()`)** ```c /* Create an empty intset. */ intset *intsetNew(void) { intset *is = zmalloc(sizeof(intset)); is->encoding = INTSET_ENC_INT16; is->length = 0; return is; } ``` **插入元素 (`intsetAdd`)** 插入新值 (newVal)可能會有兩種情況 1. newVal 適用於目前編碼 2. 新值的大小超過目前編碼可容納的範圍,這需要做 intset 升級 情況一 1. 在 contents 做 binary search 找 newVal 的該插入的索引 (`pos`) - 如果找到 intSet 中找到與 newVal 相同的值,就不插入,返回 NULL 2. 對 contents 做擴容 (`intsetResize()`) 3. 使用 memmove 將 contents 中 `pos` 以後的所以元素都向後移一位,之後也將 newVal 插入到 contents 中 4. 更新 length 情況二舉例 假設 intset 為 [2, 5, 16] (編碼 INTSET_ENC_INT16),現在要插入一個 newVal = 50000。由於 50000 超過 int16_t 的範圍,intset 需要升級 升級操作: (`intsetUpgradeAndAdd`) 1. contents 的型別需改可容納 newVal (`intsetResize()`) 2. 倒序的方式將原有的值填回 content (例如 16 -> 5 -> 2) 3. 把 newVal 插入 content 最後一位 4. 把 encoding 與 length 更新成正確的值 最終,這個 intset 將會是 ``` -------------------------------------------------------------------------- | encoding: INTSET_ENC_INT32 | length: 4 | contents: [ 2, 5, 16, 50000] | -------------------------------------------------------------------------- ``` **刪除元素 (`intsetRemove`)** 從 value 從 intset 移除 1. 檢查是否需要刪除 檢查編碼之後,使用 binary Search 找到 value 的編碼位置 (`pos`) 2. 若非元素最後一值,就 pos 位置後的元素皆向前一位 3. 使用 `intsetResize(is, len - 1)` 來縮小 intset 的大小 4. 更新長度 len - 1 5. 返回更新後的 intset **查尋元素 (intsetFind)** 1. 檢查編碼 2. 使用 binary Search 搜尋 ```c uint8_t intsetFind(intset *is, int64_t value) { uint8_t valenc = _intsetValueEncoding(value); return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is, value, NULL); } ``` #### 討論 intset 是否合適加入 RCU 以提升並行化程度? ## Hashes Hashes are record types modeled as collections of field-value pairs. As such, Hashes resemble Python dictionaries, Java HashMaps, and Ruby hashes. 雜湊表 (Hash) 實作檔案: `src/t_hash.c` - Listpack:一種用於小型雜湊的記憶體高效編碼方式。適用於雜湊值足夠小且不包含非常大的欄位或值的情況。 - Hashtable:一種用於較大雜湊值的更具可擴展性的編碼方式。提供 O(1) 的查找時間。 ## Sorted sets Sorted Sets are collections of unique strings that maintain order by each string's associated score. 有序集合 (Sorted Set/ZSet) 實作檔案: `src/t_zset.c` ## Streams A Stream is a data structure that acts like an append-only log. Streams help record events in the order they occur and then syndicate them for processing. 串流 (Stream) 實作檔案: `src/t_stream.c` Listpack 編碼:適用於元素有限且值較小的小型有序集合 Skiplist 編碼:適用於較大的有序集合或超出 Listpack 閾值的情況 ## Geospatial indexes Geospatial indexes are useful for finding locations within a given geographic radius or bounding box. ## Bitmaps ## Bitfields Bitfields efficiently encode multiple counters in a string value. Bitfields provide atomic get, set, and increment operations and support different overflow policies. ## HyperLogLog The HyperLogLog data structures provide probabilistic estimates of the cardinality (i.e., number of elements) of large sets. ## Bloom Filter Bloom filters are a space efficient probabilistic data structure that allows adding elements and checking if item/s are definitely not present, or if there is a possibility they exist (with the configured false positive rate). The Bloom filter data type / command support is provided by the valkey-bloom module.