來用 Doubly Linked-List 和 Hash table 實現一個 LRU cache 吧!
===
最近工作中遇到個有趣的問題,因為使用者的 session 儲存在 redis 中,有一個 api 非常頻繁的被調用,而這個 api 需要去 redis 找到 token id 對應的使用者資訊,但因為太多次調用且 redis 本身是單線程的關係,導致其他服務要存取 redis 時速度異常緩慢,因此決定在 redis 之前實現一個後端自己的記憶體儲存使用者資訊,類似 L1、L2 快取的概念,不過也不能讓這個記憶體佔用太多空間因此需要有定時刪除的機制,這個時候 LRU 算法就可以被很好的運用了。
Doubly Linked-List
---
**Doubly Linked-List** 有別於一般的 Linked-List 他在連結下一筆資料的同時,下一筆資料也有一個 link 連結到自己,大概可以看成這樣的資料結構:
``` mermaid
graph LR;
head-->node1-->node2-->node3-->tail;
tail-->node3-->node2-->node1-->head;
```
head 和 tail 一般不記錄資料,只用來標記頭尾所在,有了他就可以很方便的記錄誰最近被用,誰好久沒用了,至於記錄方法就是將最近使用的 node 插入頭部,當需要移除資料時只需要將最後一個節點移除即可,這樣就達到 LRU 的要求啦!
Hash Table
---
Linked-List 最大的缺點大概就是要找到資料只能一個一個往下找,所以為了加快找到的速度,可以多建立一個 hash table,這樣就可以在 O(1) 的時間內知道當前資料是否有在 List 中,並且快速的拿到資料。
建立一個節點結構和快取結構
---
```go=
type Node[K comparable, V any] struct {
key K
value V
prev, next *Node[K, V]
}
type LRUCache[K comparable, V any] struct {
capacity int
cache map[K]*Node[K, V]
head, tail *Node[K, V]
mu sync.RWMutex
}
```
這裡用到 go 1.18 才有的泛型定義 node,為了滿足任何資料類型都可以使用這個 cache,不過要注意 key 必須要是可比較的。
這裡的 Node 對應上面 Linked-List 的節點結構,包含 key 及對應的 value,如果是對應上面說的使用情境那麼 key 就是 token id,value 就是使用者資訊,同時記錄前一節點與後一節點的記憶體位址。
#### LRUCache
1. **capacity:** 定義這個記憶體要使用的容量
2. **cache:** 快取對應的 hash table,只要輸入 key 就可以查到對應的 node 節點記憶體位址
3. **head, tail:** 頭尾節點,沒有 value 值
4. **mu:** 用來處理並發問題的讀寫鎖
開始寫讀取資料與寫入資料吧
---
#### 讀取資料
首先是讀取資料,相當單純,只要把 key 帶入 hash table 中即可,記得處理並發讀寫問題:
```go=
func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) {
c.mu.Rlock()
node, exists := c.cache[key] // 查看是否有在 hash table 中
if !exists {
c.mu.RUnlock()
return
}
c.mu.RUnlock()
c.mu.Lock()
c.moveToHead(node) // 這裡先知道這個動作會把節點移動到頭部即可
c.mu.Unlock()
return node.value, true
}
```
基本邏輯就是看有沒有在 hashTable 中,沒有就跳出,有就把這個節點移動到頭然後傳出值。
#### 寫入資料
然後是寫入資料,這裡要考慮的事情比較多一些,例如如果資料本來就存在那麼就要更新,長度超過 capacity 就要刪除,具體的寫法如下:
```go=
func (c *LRUCache[K, V]) Put(key K, value V) error {
c.mu.Lock()
defer c.Mu.Unlock()
if node, ok := c.cache[key]; ok {
node.value = value
c.moveToHead()
return nil
}
// 代表不在原本的快取中,需要寫入資料
newNode := &Node[K, V]{
key: key,
value: value,
}
c.cache[key] = newNode
c.moveToHead(newNode)
// 檢查長度是否超過 capacity
if len(c.cache) > c.capacity {
removed := c.removeTail() // 假設這個函式會回傳要刪除的節點位址
delete(c.cache, removed.key)
}
return nil
}
```
現在完成 Doubly Linked-List 的基礎建設吧!
---
實際上要實現 `moveToHead` 和 `removeTail` 兩個函式,我們還需要兩個輔助函式,注意這四個函式必須是私有函式,要避免被外部調用導致整個 list 出問題。先來看 `moveToHead`:
```go=
func (c *LRUCache[K, V]) moveToHead(node *Node[K, V]) {
c.removeNode(node)
c.addToHead(node)
}
```
這裡剛好讓兩個輔助函式都出來了,其實 `moveToHead` 的邏輯很簡單,就是先移除 node 節點,然後加到頭的位置就好,注意移除 node 節點並不是真的把資料清除,只是從 linked-list 中取出來,可以看一下 `removeNode` 的具體做法:
```go=
func (c *LRUCache[K, V]) removeNode(node *Node[K, V]) {
node.prev.next = node.next
node.next.prev = node.prev
}
```
用圖示好像更好理解一點,就像是原本的串列為:
``` mermaid
graph LR;
head-->node1-->node2-->node3-->tail;
tail-->node3-->node2-->node1-->head;
```
調用 `removeNode` 刪除 node2 後變成:
``` mermaid
graph LR;
head-->node1-->node3-->tail;
tail-->node3-->node1-->head;
node2;
```
這裡的 `node2` 找得到,但已經不在串列中了。再來看 `addToHead`:
```go=
func (c *LRUCache[K, V]) addToHead(node *Node[K, V]) {
// 先插入節點
node.prev = c.head
node.next = c.head.next
// 再將原節點連過來
c.head.next.prev = node
c.head.next = node
}
```
注意這裡的順序,如果順序錯了有可能導致資料丟失找不到,圖示第一階段就是:
``` mermaid
graph LR;
head-->node1-->node3-->tail;
tail-->node3-->node1-->head;
node2-->head;
node2-->node1;
```
然後再把 `node1` 和 `head` 彼此互連的 link 連到 `node2` 上就加入了。最後再來看一下 `removeTail`:
```go=
func (c *LRUCache[K, V]) removeTail(node *Node[K, V]) *Node[K, V] {
node := c.tail.prev
removeNode(node)
return node
}
```
也是非常簡單,找到 `tail` 節點的前一節點記憶體位址取出,然後用 `removeNode` 孤立這個節點,透過 go 語言本身的 GC 機制就會釋放這個節點使用的記憶體了,當然傳出去的目的是告訴 `Put` 函式這個要被刪除的節點 key 是什麼,讓 `Put` 函式可以根據這個 key 刪除 hash table 中當前 key 的資料。
如果想看全部組合起來的程式碼大概會長這樣:
```go
package cache
import (
"sync"
)
// This LRU cache is implemented using doubly linked-list and hash table
type Node[K comparable, V any] struct {
key K
value V
prev, next *Node[K, V]
}
type LRUCache[K comparable, V any] struct {
capacity int // LRU cache capacity
cache map[K]*Node[K, V] // hash table to get value
head, tail *Node[K, V] // head and tail are empty node
mu sync.RWMutex // read/write mutex, to support concurrency read
}
// NewLRUCache To use this cache, input key type and value type(use generic), key must be comparable
func NewLRUCache[K comparable, V any](capacity int) *LRUCache[K, V] {
if capacity <= 0 {
panic("capacity must be greater then zero")
}
lru := &LRUCache[K, V]{
capacity: capacity,
cache: make(map[K]*Node[K, V]),
head: &Node[K, V]{},
tail: &Node[K, V]{},
}
lru.head.next = lru.tail
lru.tail.prev = lru.head
return lru
}
func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) {
c.mu.RLock()
node, exists := c.cache[key]
if !exists {
c.mu.RUnlock()
return
}
c.mu.RUnlock()
c.mu.Lock()
c.moveToHead(node)
c.mu.Unlock()
return node.value, true
}
func (c *LRUCache[K, V]) Put(key K, value V) error {
c.mu.Lock()
defer c.mu.Unlock()
if node, ok := c.cache[key]; ok {
node.value = value
c.moveToHead(node)
} else {
// if not in cache, create new node
newNode := &Node[K, V]{
key: key,
value: value,
}
c.cache[key] = newNode
c.addToHead(newNode)
// check if the cache exceeds its capacity
if len(c.cache) > c.capacity {
removed := c.removeTail()
delete(c.cache, removed.key)
}
}
return nil
}
func (c *LRUCache[K, V]) moveToHead(node *Node[K, V]) {
c.removeNode(node)
c.addToHead(node)
}
func (c *LRUCache[K, V]) addToHead(node *Node[K, V]) {
node.prev = c.head
node.next = c.head.next
c.head.next.prev = node
c.head.next = node
}
func (c *LRUCache[K, V]) removeTail() *Node[K, V] {
node := c.tail.prev
c.removeNode(node)
return node
}
func (c *LRUCache[K, V]) removeNode(node *Node[K, V]) {
node.prev.next = node.next
node.next.prev = node.prev
}
```
這就是一個可以直接拿去用的 LRU cache 啦!