來用 Doubly Linked-List 和 Hash table 實現一個 LRU cache 吧! === 最近工作中遇到個有趣的問題,因為使用者的 session 儲存在 redis 中,有一個 api 非常頻繁的被調用,而這個 api 需要去 redis 找到 token id 對應的使用者資訊,但因為太多次調用且 redis 本身是單線程的關係,導致其他服務要存取 redis 時速度異常緩慢,因此決定在 redis 之前實現一個後端自己的記憶體儲存使用者資訊,類似 L1、L2 快取的概念,不過也不能讓這個記憶體佔用太多空間因此需要有定時刪除的機制,這個時候 LRU 算法就可以被很好的運用了。 Doubly Linked-List --- **Doubly Linked-List** 有別於一般的 Linked-List 他在連結下一筆資料的同時,下一筆資料也有一個 link 連結到自己,大概可以看成這樣的資料結構: ``` mermaid graph LR; head-->node1-->node2-->node3-->tail; tail-->node3-->node2-->node1-->head; ``` head 和 tail 一般不記錄資料,只用來標記頭尾所在,有了他就可以很方便的記錄誰最近被用,誰好久沒用了,至於記錄方法就是將最近使用的 node 插入頭部,當需要移除資料時只需要將最後一個節點移除即可,這樣就達到 LRU 的要求啦! Hash Table --- Linked-List 最大的缺點大概就是要找到資料只能一個一個往下找,所以為了加快找到的速度,可以多建立一個 hash table,這樣就可以在 O(1) 的時間內知道當前資料是否有在 List 中,並且快速的拿到資料。 建立一個節點結構和快取結構 --- ```go= type Node[K comparable, V any] struct { key K value V prev, next *Node[K, V] } type LRUCache[K comparable, V any] struct { capacity int cache map[K]*Node[K, V] head, tail *Node[K, V] mu sync.RWMutex } ``` 這裡用到 go 1.18 才有的泛型定義 node,為了滿足任何資料類型都可以使用這個 cache,不過要注意 key 必須要是可比較的。 這裡的 Node 對應上面 Linked-List 的節點結構,包含 key 及對應的 value,如果是對應上面說的使用情境那麼 key 就是 token id,value 就是使用者資訊,同時記錄前一節點與後一節點的記憶體位址。 #### LRUCache 1. **capacity:** 定義這個記憶體要使用的容量 2. **cache:** 快取對應的 hash table,只要輸入 key 就可以查到對應的 node 節點記憶體位址 3. **head, tail:** 頭尾節點,沒有 value 值 4. **mu:** 用來處理並發問題的讀寫鎖 開始寫讀取資料與寫入資料吧 --- #### 讀取資料 首先是讀取資料,相當單純,只要把 key 帶入 hash table 中即可,記得處理並發讀寫問題: ```go= func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) { c.mu.Rlock() node, exists := c.cache[key] // 查看是否有在 hash table 中 if !exists { c.mu.RUnlock() return } c.mu.RUnlock() c.mu.Lock() c.moveToHead(node) // 這裡先知道這個動作會把節點移動到頭部即可 c.mu.Unlock() return node.value, true } ``` 基本邏輯就是看有沒有在 hashTable 中,沒有就跳出,有就把這個節點移動到頭然後傳出值。 #### 寫入資料 然後是寫入資料,這裡要考慮的事情比較多一些,例如如果資料本來就存在那麼就要更新,長度超過 capacity 就要刪除,具體的寫法如下: ```go= func (c *LRUCache[K, V]) Put(key K, value V) error { c.mu.Lock() defer c.Mu.Unlock() if node, ok := c.cache[key]; ok { node.value = value c.moveToHead() return nil } // 代表不在原本的快取中,需要寫入資料 newNode := &Node[K, V]{ key: key, value: value, } c.cache[key] = newNode c.moveToHead(newNode) // 檢查長度是否超過 capacity if len(c.cache) > c.capacity { removed := c.removeTail() // 假設這個函式會回傳要刪除的節點位址 delete(c.cache, removed.key) } return nil } ``` 現在完成 Doubly Linked-List 的基礎建設吧! --- 實際上要實現 `moveToHead` 和 `removeTail` 兩個函式,我們還需要兩個輔助函式,注意這四個函式必須是私有函式,要避免被外部調用導致整個 list 出問題。先來看 `moveToHead`: ```go= func (c *LRUCache[K, V]) moveToHead(node *Node[K, V]) { c.removeNode(node) c.addToHead(node) } ``` 這裡剛好讓兩個輔助函式都出來了,其實 `moveToHead` 的邏輯很簡單,就是先移除 node 節點,然後加到頭的位置就好,注意移除 node 節點並不是真的把資料清除,只是從 linked-list 中取出來,可以看一下 `removeNode` 的具體做法: ```go= func (c *LRUCache[K, V]) removeNode(node *Node[K, V]) { node.prev.next = node.next node.next.prev = node.prev } ``` 用圖示好像更好理解一點,就像是原本的串列為: ``` mermaid graph LR; head-->node1-->node2-->node3-->tail; tail-->node3-->node2-->node1-->head; ``` 調用 `removeNode` 刪除 node2 後變成: ``` mermaid graph LR; head-->node1-->node3-->tail; tail-->node3-->node1-->head; node2; ``` 這裡的 `node2` 找得到,但已經不在串列中了。再來看 `addToHead`: ```go= func (c *LRUCache[K, V]) addToHead(node *Node[K, V]) { // 先插入節點 node.prev = c.head node.next = c.head.next // 再將原節點連過來 c.head.next.prev = node c.head.next = node } ``` 注意這裡的順序,如果順序錯了有可能導致資料丟失找不到,圖示第一階段就是: ``` mermaid graph LR; head-->node1-->node3-->tail; tail-->node3-->node1-->head; node2-->head; node2-->node1; ``` 然後再把 `node1` 和 `head` 彼此互連的 link 連到 `node2` 上就加入了。最後再來看一下 `removeTail`: ```go= func (c *LRUCache[K, V]) removeTail(node *Node[K, V]) *Node[K, V] { node := c.tail.prev removeNode(node) return node } ``` 也是非常簡單,找到 `tail` 節點的前一節點記憶體位址取出,然後用 `removeNode` 孤立這個節點,透過 go 語言本身的 GC 機制就會釋放這個節點使用的記憶體了,當然傳出去的目的是告訴 `Put` 函式這個要被刪除的節點 key 是什麼,讓 `Put` 函式可以根據這個 key 刪除 hash table 中當前 key 的資料。 如果想看全部組合起來的程式碼大概會長這樣: ```go package cache import ( "sync" ) // This LRU cache is implemented using doubly linked-list and hash table type Node[K comparable, V any] struct { key K value V prev, next *Node[K, V] } type LRUCache[K comparable, V any] struct { capacity int // LRU cache capacity cache map[K]*Node[K, V] // hash table to get value head, tail *Node[K, V] // head and tail are empty node mu sync.RWMutex // read/write mutex, to support concurrency read } // NewLRUCache To use this cache, input key type and value type(use generic), key must be comparable func NewLRUCache[K comparable, V any](capacity int) *LRUCache[K, V] { if capacity <= 0 { panic("capacity must be greater then zero") } lru := &LRUCache[K, V]{ capacity: capacity, cache: make(map[K]*Node[K, V]), head: &Node[K, V]{}, tail: &Node[K, V]{}, } lru.head.next = lru.tail lru.tail.prev = lru.head return lru } func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) { c.mu.RLock() node, exists := c.cache[key] if !exists { c.mu.RUnlock() return } c.mu.RUnlock() c.mu.Lock() c.moveToHead(node) c.mu.Unlock() return node.value, true } func (c *LRUCache[K, V]) Put(key K, value V) error { c.mu.Lock() defer c.mu.Unlock() if node, ok := c.cache[key]; ok { node.value = value c.moveToHead(node) } else { // if not in cache, create new node newNode := &Node[K, V]{ key: key, value: value, } c.cache[key] = newNode c.addToHead(newNode) // check if the cache exceeds its capacity if len(c.cache) > c.capacity { removed := c.removeTail() delete(c.cache, removed.key) } } return nil } func (c *LRUCache[K, V]) moveToHead(node *Node[K, V]) { c.removeNode(node) c.addToHead(node) } func (c *LRUCache[K, V]) addToHead(node *Node[K, V]) { node.prev = c.head node.next = c.head.next c.head.next.prev = node c.head.next = node } func (c *LRUCache[K, V]) removeTail() *Node[K, V] { node := c.tail.prev c.removeNode(node) return node } func (c *LRUCache[K, V]) removeNode(node *Node[K, V]) { node.prev.next = node.next node.next.prev = node.prev } ``` 這就是一個可以直接拿去用的 LRU cache 啦!