Try   HackMD

2021q1 Homework7 (quiz7) 測驗三

contributed by < chiehen >

tags:linux2021

程式碼解釋

alignment

當 cache miss 發生時, cache 會將 cache line 大小的資料從 memory 移到 cache,將變數做 cache line alignment 可減少 cache miss,進而增進效能。

  1. 對 struct 做 alignment
void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE)));

使用 __attribute__ 進行 alignment
根據 gcc 文件

The keyword attribute allows you to specify special properties of variables, function parameters, or structure, union, and, in C++, class members.

The aligned attribute specifies a minimum alignment for the variable or structure field, measured in bytes.

For example, the declaration: int x attribute ((aligned (16))) = 0; causes the compiler to allocate the global variable x on a 16-byte boundary.

因此透過 attribute ((aligned (n))) 可以讓編譯器對變數依我們指定的大小自動做 alignment。

  1. 對 ring 做 alignment

為了做 cache alignment 將配置的記憶體大小調整為 cache line 的倍數,因此將原本的大小進位到 cache line 的倍數

#define ALIGN_CEIL(val, align) \
    (typeof(val))((val) + (-(typeof(val))(val) & ((align) -1)))

加數的部份可計算出val到下一個倍數的差
從二進位來看, 差為原本的數在小於 align 的 bit 為 0 的地方須為 1,並再加 1。
假設 val = 10 = 10102, align = 8 = 10002:
則要關心的 bit 為後三個 bit , 而進行 & align -1 可以達成此目標,
而 val 在後三個 bit 為 010 可以看出要進位到 16 = 100002 需要 1012 + 0012, 而利用二補數-x = ~x+1, 則可以計算出到下一個倍數的差在這個例子為 1102 = 6

另一個進位的方法:

#define ALIGN_CEIL(val, align) \
     (val + align-1) & ~(align-1)         

驗證

使用平台

jane@jane-NB:~/linux2021/linux2021-quiz7C$ uname -a
Linux jane-NB 5.4.0-74-generic #83-Ubuntu SMP Sat May 8 02:35:39 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

移除 cache alignment 相關程式碼,觀看記憶體的配置差別

// 更動的程式碼
@@ -44,15 +44,15 @@ struct __ringbuffer {
         uint32_t size;                /**< Size of ring. */
         uint32_t mask;                /**< Mask (size - 1) of ring. */
         volatile uint32_t head, tail; /**< Producer head and tail. */
-    } prod __attribute__((__aligned__(CACHE_LINE_SIZE)));
+    } prod;
 
     struct {                          /** Ring consumer status. */
         uint32_t size;                /**< Size of the ring. */
         uint32_t mask;                /**< Mask (size - 1) of ring. */
         volatile uint32_t head, tail; /**< Consumer head and tail. */
-    } cons __attribute__((__aligned__(CACHE_LINE_SIZE)));
+    } cons;
 
-    void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE)));
+    void *ring[];
 };
 
 /* true if x is a power of 2 */
@@ -86,7 +86,7 @@ ssize_t ringbuf_get_memsize(const unsigned count)
         return -EINVAL;
 
     ssize_t sz = sizeof(ringbuf_t) + count * sizeof(void *);
-    sz = ALIGN_CEIL(sz, CACHE_LINE_SIZE);
+    //sz = ALIGN_CEIL(sz, CACHE_LINE_SIZE);
     return sz;
 }

  • 沒有 cacheline aligment:

    ringbuffer 位址

    ​​​​1: ring_size = 552
    ​​​​2: r = (ringbuf_t *) 0x55555555a6b0
    ​​​​3: &r.prod = (struct {...} *) 0x55555555a6b0
    ​​​​4: &r.cons = (struct {...} *) 0x55555555a6c4
    ​​​​5: &r.ring = (void *(*)[]) 0x55555555a6d8
    

    可看出 prod與 cons, cons 與 ring 相差了 20 bytes, ring size 也並非 cache line(64) 的倍數

  • 有 cacheline aligment:

    ​​​​1: ring_size = 640
    ​​​​2: r = (ringbuf_t *) 0x55555555a6b0
    ​​​​3: &r.prod = (struct {...} *) 0x55555555a6b0
    ​​​​4: &r.cons = (struct {...} *) 0x55555555a6f0
    ​​​​5: &r.ring = (void *(*)[]) 0x55555555a730
    

    可發現結構間相距 64 bytes, 而 ring size 也是 cache line 的倍數

比較兩者效能差異

使用 perf 察看執行時間及 cache miss 等表現

  • 沒有 cacheline aligment:
    ​​​​Performance counter stats for './pc-test' (5 runs):
    
    ​​​​           11,1823      cache-misses              #    0.025 % of all cache refs      ( +- 15.99% )
    ​​​​       4,5425,0625      cache-references                                              ( +- 11.17% )
    ​​​​      24,5119,8253      instructions              #    0.39  insn per cycle           ( +-  1.68% )
    ​​​​      62,7786,3980      cycles                                                        ( +-  5.51% )
    
    ​​​​            0.9011 +- 0.0499 seconds time elapsed  ( +-  5.54% )
    
  • 有 cacheline aligment:
    ​​​​Performance counter stats for './pc-test' (5 runs):
    
    ​​​​            9,2835      cache-misses              #    0.021 % of all cache refs      ( +-  8.76% )
    ​​​​       4,3471,8751      cache-references                                              ( +-  9.24% )
    ​​​​      23,9569,3419      instructions              #    0.38  insn per cycle           ( +-  2.52% )
    ​​​​      63,4246,6307      cycles                                                        ( +-  4.71% )
    
    ​​​​            0.9106 +- 0.0430 seconds time elapsed  ( +-  4.72% )
    

不如預期,數字上並沒有看出有顯著差異。

Watermark

當 object enqueue 後會超過 water mark 時, 回傳 Quota exceeded 的 error code

/* if we exceed the watermark */
return ((mask + 1) - free_entries + n) > r->prod.watermark ? -EDQUOT : 0;

((mask + 1) - free_entries + n) 計算這次 enqueue 後所被使用的 entries 數量,如果超過則回傳 -EDQUOT

猜測 Quota exceeded 是為了重新配置 ringbuffer

經過與老師的討論後,發現原本的猜想並不正確,並不符合原先使用 circular buffer 這種資料結構的使用情境。

這篇 The design and implementation of a lock-free ring-buffer with contiguous reservations 便講解了 circular buffer 在實際世界中的角色和 watermark 的作用。

在 embedded systems 中因為記憶體特別有限,通常不會使用 heap 來配置記憶體,而是會靜態配置或使用 stack 配置。而Circular Buffer 可以事先確定大小的特性便很適合在這種系統,並且在這些系統中也常用為模擬動態記憶體空間。
因此可知道 watermark 如果是為了在執行時期改變 ringuffer 的大小是不符合使用情境的。

實際上,watermark 的作用是為了能讓一段資料能以連續的方式放入記憶體。而 Direct Memory access(DMA) 這個常見嵌入式系統的機制就需要這個特性。DMA 是外部裝置 (Peripheral) 不透過 CPU 能自動讀寫主記憶體的行為,這個行為可以節省大量的 CPU 時間,對於通常是單核的嵌入式系統特別重要。但因為 DMA 傳輸只認得資料起始位置及資料長度,並不能理解 circular buffer 中 wrap around 的機制,因此需要 watermark 協助。

當要放入的資料長度大(藍色區域)於 circular buffer 尾段的空間

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

則用 watermark 標註 valid data 的位置,將資料從 buffer 開頭開始放入,如下圖所示:

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

計算 ringbuffer 已使用/剩餘空間

在 dequeue 時計算:

uint32_t entries = prod_tail - cons_head;

在 enqueue 時,

uint32_t free_entries = mask + cons_tail - prod_head;

如下圖
(以下示範 index 為 unsigned 16-bit integers)

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

used_entries:
已使剩餘空間為用空間為綠色區域,因此很明顯的便是 producer tail(pt) - consumer head(ch)
使用 producer tail 才可確保元素已被放入 buffer。

free_entries:
從圖片上來看,可知剩餘空間為該圓角矩形剩下的黃色區域,因此將 consumer tail(ct) + mask 可得到在下個圓角矩形的相對位置,所以再將此數減去 producer head(ch) 便可以得到兩段黃色區域的長度,也就是剩餘空間的長度。
使用 consumer tail 才可確保元素已被拿出 buffer,使用 producer head 才能得到這個 producer 要操作的正確位置。

以下解說 16-bit modulo
C99 §6.2.5/9:

A computation involving unsigned operands can never overflow,because a result that cannot be represented by the resulting unsigned integer type is reduced modulo to the number that is one greater than the largest value that can be represented by the resulting type.

因此當進行計算時, 如果結果無法以這個型態儲存, 則會對其進行 Modular arithmetic, 則
UINT_MAX + 1 == 0
0 - 1 == UINT_MAX

因此在下圖範例中

image alt

當進行 pt - ch 時,會將結果自動 modulo 216

used_entries=(ptch)mod216=[(ptmod216+(chmod216)]mod216=6000+6536==12536

ENQUEUE_PTRS

利用 size 及 4 是 2 的冪:
第 4 行中prod_head & mask 相當於 prod_head % size
第 7 行n & ((~(unsigned) 0x3 等同 floor(n/4)
第 14 行n & 0x3n%4

#define ENQUEUE_PTRS() \ do { \ const uint32_t size = r->prod.size; \ uint32_t i, idx = prod_head & mask; \ if (idx + n < size) { \ /* 假設 n=10, 處理 [0-7] */ for (i = 0; i < (n & ((~(unsigned) 0x3))); i += 4, idx += 4) { \ r->ring[idx] = obj_table[i]; \ r->ring[idx + 1] = obj_table[i + 1]; \ r->ring[idx + 2] = obj_table[i + 2]; \ r->ring[idx + 3] = obj_table[i + 3]; \ } \ /* 假設 n=10, 處理 [7-10]*/ switch (n & 0x3) { \ case 3: \ r->ring[idx++] = obj_table[i++]; \ case 2: \ r->ring[idx++] = obj_table[i++]; \ case 1: \ r->ring[idx++] = obj_table[i++]; \ } \ } else { \ /* 須注意線性空間中的邊界 */ for (i = 0; idx < size; i++, idx++) \ r->ring[idx] = obj_table[i]; \ /* 超過線性邊界, 返回線性空間的起始位置: ring[0] */ for (idx = 0; i < n; i++, idx++) \ r->ring[idx] = obj_table[i]; \ } \ } while (0)

Full or empty

常見的 circular buffer 只儲存 (size - 1) 個物件,當 start(consumer) - end(producer) == 1 時,代表 buffer 已滿。當 start 小於 end 時, 會進行 32-bit modulo, 因此要做 mod size, 計算出的值才會在 [0-size] 之間。

static inline bool ringbuf_is_full(const ringbuf_t *r)
{
    uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
    return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
}

當 buffer 的 start 跟 end 指向同個位置時代表 buffer 是空的。

static inline bool ringbuf_is_empty(const ringbuf_t *r)
{
    uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
    return cons_tail == prod_tail;
}

測試程式

不明白為何在測試程式的部份, 先將要放入值 casting void** 再做 dereference

for (int i = 0; !ringbuf_is_full(r); i++)
    ringbuf_sp_enqueue(r, *(void **) &i);

因此將程式改為

int i;
for (i = 0; !ringbuf_is_full(r); i++)
    ringbuf_sp_enqueue(r, (void *) &i);

結果發現程式放入的便不會如預期的為 [0,163]的地址

使用 gdb 查看發現更改動的程式
在每次的迴圈中, 放入的都為值 0x7fffffffdcd0

1: in = 0                                   
2: &in = (int *) 0x7fffffffdcd0 
ringbuf_sp_enqueue (r=0x5555555592a0, obj=0x7fffffffdcd0) at ringbuffer.c:356
Breakpoint 4, ringbuffer_sp_do_enqueue (r=0x5555555592a0, obj_table=0x7fffffffdcb0, n=1) at ringbuffer.c:238
4: obj_table[0] = (void * const) 0x7fffffffdcd0

1: in = 1
2: &in = (int *) 0x7fffffffdcd0
Breakpoint 2, ringbuf_sp_enqueue (r=0x5555555592a0, obj=0x7fffffffdcd0) at ringbuffer.c:356
Breakpoint 4, ringbuffer_sp_do_enqueue (r=0x5555555592a0, obj_table=0x7fffffffdcb0, n=1) at ringbuffer.c:238
4: obj_table[0] = (void * const) 0x7fffffffdcd0

而原來的程式, 經過 casting 與 deference 實際放入的值為遞增

1: in = 0               
2: &in = (int *) 0x7fffffffdcd8             
3: *(void **)&in = (void *) 0x555500000000

Breakpoint 2, ringbuf_sp_enqueue (r=0x5555555592a0, obj=0x555500000000) at ringbuffer.c:356
Breakpoint 1, ringbuffer_sp_do_enqueue (r=0x5555555592a0, obj_table=0x7fffffffdcb0, n=1) at ringbuffer.c:238
5: obj_table[0] = (void * const) 0x555500000000

1: in = 1
2: &in = (int *) 0x7fffffffdcd8
3: *(void **)&in = (void *) 0x555500000001
Breakpoint 2, ringbuf_sp_enqueue (r=0x5555555592a0, obj=0x555500000001) at ringbuffer.c:356
Breakpoint 1, ringbuffer_sp_do_enqueue (r=0x5555555592a0, obj_table=0x7fffffffdcb0, n=1) at ringbuffer.c:238
5: obj_table[0] = (void * const) 0x555500000001

改寫程式碼

SPMC

將原有碼程式擴展為 single producer + multiple consumer (SPMC)

利用 consumer 的 head 和 tail 達到多個 consumer的 concurrency, 複製前先更新 head 標注將要複製, 複製後更新 tail 標示完成複製
參考DPDK Ring Library

  1. 將 consumer head 複製到本地(cons_head),計算複製後 consumer head & tail 應在的位置(cons_next)
  2. 如本地的 consumer 的 head 是正確的, 則更新共同的 consumer head 為 cons_next(比較與更新須使用 atomic instruction), 如本地的 cons_head 已過時, 則回到步驟 1。
  3. 將 ringbuffer 內容複製至 obj_table
  4. 更新共同的 consumer tail 為 cons_next, 更動順序應與consumer head 的順序相同, 因此能更新 tail 的 consumer, cons_head 須與 tail 相同。
/* internal dequeue multiple element (multiple consumer safe)*/ static inline int ringbuffer_mc_do_dequeue(ringbuf_t *r, void **obj_table, const unsigned n) { uint32_t mask = r->prod.mask; uint32_t cons_head; retry: cons_head = atomic_load(&r->cons.head); uint32_t prod_tail = atomic_load(&r->prod.tail); uint32_t entries = prod_tail - cons_head; if (n > entries) return -ENOENT; uint32_t cons_next = cons_head + n; if(!atomic_compare_exchange_strong(&r->cons.head, &cons_head, cons_next)) goto retry; /* copy in table */ DEQUEUE_PTRS(); __compiler_barrier(); while(!atomic_compare_exchange_weak(&r->cons.tail, &cons_head, cons_next)) ; return 0; }

測試

測試程式 pc-test.c:

static void *test_consumer(void *arg) { RingTest *test = (RingTest *)arg; while (atomic_load(&test->consumer_count) < THREAD_COUNT) { if (!ringbuf_is_empty(test->r)) { void *out; atomic_fetch_add(&test->consumer_count, 1); int result = ringbuf_mc_dequeue(test->r, &out); assert(result == 0); } } return NULL; }

執行結果:

** Basic operations **
Verified OK!

** Stress test **
pc-test: pc-test.c:75: test_consumer: Assertion `result == 0' failed.
中止 (核心已傾印)

在 75 行時回傳值不為 0, 代表函式回傳 -ENOENT, 也就是 n > entries

有可能是因為有大於 ringbuffer 中 entries 數量的 consumer 通過第 71 行

更改測試程式 pc_test.c:

static void *test_consumer(void *arg) { RingTest *test = (RingTest *)arg; - while (atomic_load(&test->consumer_count) < THREAD_COUNT) { + int cons_count; + while ((cons_count=atomic_load(&test->consumer_count)) < THREAD_COUNT) { if (!ringbuf_is_empty(test->r)) { void *out; - atomic_fetch_add(&test->consumer_count, 1); + atomic_compare_exchange_strong(&test->consumer_count, &cons_count, cons_count + 1); int result = ringbuf_mc_dequeue(test->r, &out); assert(result == 0); }

確認不會有兩個拿同個號碼牌(consumer_count)的 thread 執行 dequeue

執行結果:
依然在在 75 行時回傳值不為 0

現在的問題為拿不同 consumer_count 的 thread 同時通過 ringbuf_is_empty 的檢查。

如何保證 !ringbuf_is_empty 在 dequeue 時仍成立

Reference