# 2019q3 第 11 週測驗題
contributed by <`butterflyred`>, <`colinyoyo26`>
### 測驗 `1`
在 [PLDI'19](https://pldi19.sigplan.org/) 論文 [Computing Summaries of String Loops in C for Better Testing and Refactoring](https://srg.doc.ic.ac.uk/files/papers/loops-pldi-19.pdf) 提到開發者會希望對 loop summarisation 進行替換,例如 [GNU patch](http://www.gnu.org/software/patch/) 這支程式原本程式碼是:
```cpp
while (*s != '\n')
s++;
```
替換為 GNU extension:
```cpp
s = rawmemchr(s, '\n');
```
其中 [rawmemchr](https://linux.die.net/man/3/rawmemchr) 是自從 glibc-2.1 即提供的擴充函式:
> The rawmemchr() function is similar to memchr(): it assumes (i.e., the programmer knows for certain) that an instance of c lies somewhere in the memory area starting at the location pointed to by s, and so performs an optimized search for c (i.e., no use of a count argument to limit the range of the search). If an instance of c is not found, the results are unpredictable. The following call is a fast means of locating a string's terminating null byte:
> ```cpp
> char *p = rawmemchr(s, '\0');
> ```
以下程式碼嘗試運用 [你所不知道的C語言:數值系統篇](https://hackmd.io/@sysprog/c-numerics) 提及的 Detect NULL 技巧,來加速 `rawmemchr` 的實作,適用於 32 位元和 64 位元的 little-endian 架構:
```cpp
#include <limits.h>
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop. */
#define LBLOCKSIZE (sizeof(long))
#if LONG_MAX == 2147483647L
#define DETECTNULL(X) (((X) -0x01010101) & ~(X) &0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
#define DETECTNULL(X) (((X) -0x0101010101010101) & ~(X) &0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
#ifndef DETECTNULL
#error long int is not a 32bit or 64bit byte
#endif
/* DETECTCHAR returns nonzero if (long)X contains the byte used
to fill (long)MASK. */
#define DETECTCHAR(X, MASK) (DETECTNULL(X ^ MASK))
void *rawmemchr(const void *src_void, int c) {
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (*src == d)
return (void *) src;
src++;
}
/* If we get this far, src is word-aligned. */
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << AAA | mask;
for (unsigned long i = BBB; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (1) {
if (DETECTCHAR(*asrc, mask))
break;
asrc++;
}
/* resort to a bytewise loop. */
for (src = (unsigned char *) asrc;; src++) {
if (*src == d)
return (void *) src;
}
}
```
請補完程式碼,應要能在 x86_64 架構獲得效能提升。
==作答區==
AAA = ?
* `(a)` 4
* `(b)` 8
* `(c)` 16
* `(d)` 32
* `(e)` 64
BBB = ?
* `(a)` 4
* `(b)` 8
* `(c)` 16
* `(d)` 32
* `(e)` 64
:::success
延伸問題:
1. 解釋上述程式碼運作原理,並探討是否適用於 Big-endian 硬體;
2. 比照 [CS:APP Assign5.18](https://hackmd.io/@aben20807/rkdzvWJTX) 在 GNU/Linux 上透過 perf 一類的效能工具,分析上述兩種 loop summarisation 實作的效能落差,並運用 [CS:APP](https://hackmd.io/@sysprog/CSAPP) 第 5 章的 CPE 分析手法
3. 研讀論文 [Computing Summaries of String Loops in C for Better Testing and Refactoring](https://srg.doc.ic.ac.uk/files/papers/loops-pldi-19.pdf),嘗試探討裡頭 [Symbolic Execution](https://en.wikipedia.org/wiki/Symbolic_execution) 的手法,還有為何能做到自動分析程式碼並給予 refactor 建議
* 延伸閱讀: [Binary 自動分析的那些事](https://hitcon.org/2016/CMT/slide/day1-r1-a-1.pdf), [MIT 6.858 Computer Systems Security: Symbolic Execution](https://youtu.be/yRVZPvHYHzw)
:::
### 程式解析
```cpp
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
```
- X : memory address
假設有對齊long bytes
- x implies dont care
| 0 | 0 | 0 | x |
| -------- | --------| -------- | -------- | -------- | -------- |
- sizeof(long) - 1
| 1 | 1 | 1 | 0 |
| -------- | --------| -------- | -------- | -------- | -------- |
當沒有對齊 long bytes 時,true。
當有對齊 long bytes 時,false。
```cpp
#if LONG_MAX == 2147483647L
#define DETECTNULL(X) (((X)-0x01010101) & ~(X)&0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
#define DETECTNULL(X) (((X)-0x0101010101010101) & ~(X)&0x8080808080808080)
```
```cpp=
#include <stdio.h>
int main(void) {
for (int count = 255; count >= 0; count--) {
int x = count;
printf("0x%02x\t", x);
int inverseX = x ^ 255;
printf("0x%02x\t", inverseX);
int Xminus = x - 1;
printf("0x%02x\t", Xminus);
int andXinverse = Xminus & inverseX;
printf("0x%02x\t", andXinverse);
int and80 = andXinverse & 128;
printf("0x%02x\t", and80);
printf("\n");
}
return 0;
}
```
- $2^{31}$ - 1 = 2147483647L
- 32 bits 中最大值
- $2^{63}$ - 1 = 9223372036854775807L
- 64 bits 中最大值
- DETECTNULL
- 判斷這次抓的word中,是否含有null byte
- 只有在 byte = 0 (null) 時,返回 0x80
- 只有在 byte != 0 (null) 時,返回 0x00
```cpp
void *rawmemchr(const void *src_void, int c) {
const unsigned char *src = (const unsigned char *)src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (*src == d)
return (void *)src;
src++;
}
```
- rawmemchr
- 在字串中尋找字元
- src_void : 字串
- c : (int) 要找的字元
當字串的記憶體位址沒有對齊時,持續往下一個記憶體位址走,直到對齊為止(沒有對齊同樣讀一個 word 要抓兩次),期間順便往下查找字元,若找到,則return。
```cpp
/* If we get this far, src is word-aligned. */
unsigned long *asrc = (unsigned long *)src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned long i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (1) {
if (DETECTCHAR(*asrc, mask))
break;
asrc++;
}
```
==64 bits computer==
char = 1 byte
int = 4 bytes
long = 8 bytes (1 word)
> 以下底標單位為 byte
- int c
| c |
| -------------------------------- |
0 3
- char d
| d |
| -------- |
0 1
- d << 8
| 0 | d |
| -------- | -------- | -------- | -------- |
0 1 2
- d << 8 | d
| d | d |
| -------- | -------- | -------- | -------- |
0 1 2
- unsigned long mask = d << 8 | d;
| d | d | 0 | 0 | 0 | 0 | 0 | 0 |
| -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
- mask << 16
| 0 | 0 | d | d | 0 | 0 | 0 | 0 |
| -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
- mask << 16 | mask
| d | d | d | d | 0 | 0 | 0 | 0 |
| -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
- mask = mask << 16 | mask;
| 0 | 0 | 0 | 0 | d | d | d | d |
| -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- |
8 7 6 5 4 3 2 1 0
- 剩下迴圈以相同邏輯把 `mask` 每個 byte 都填滿 `d`
```cpp
for (unsigned long i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
```
```cpp
while (1) {
if (DETECTCHAR(*asrc, mask))
break;
asrc++;
}
```
- *asrc (假設)
| 4 | a | 1 | d | 8 | f | 4 | 2 |
|--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
- X 為 asrc 取 64 bits 出來比對
- 依序為
- | 4 |
- | a |
- | 1 |
- ==| d |==
- | 8 |
- | f |
- | 4 |
- | 2 |
```cpp
X ^ MASK
```
*asrc xor MASK
當比對上時 xor 完欲找字元會變為 0
| 4 | a | 0 | 1 | 8 | f | 4 | 2 |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
```cpp
DETECTNULL(X ^ MASK)
```
判斷其中是否有 0
若有,則比對上d了
```cpp
while (1) {
if (DETECTCHAR(*asrc, mask))
break;
asrc++;
}
```
如果比對上,則跳出迴圈
若沒有, *asrc 中 依序 取 64 bits 出來比對
==man page==
The rawmemchr() function is similar to memchr(): it assumes (i.e.,the programmer knows for certain) that an instance of c lies somewhere in the memory area starting at the location pointed to by s, and so performs an optimized search for c (i.e., no use of a count argument to limit the range of the search). If an instance of c is not found, the results are unpredictable.
指出一定找的到,想找的char b
如果找不到,是未定義行為
```cpp
for (src = (unsigned char *)asrc;; src++) {
if (*src == d)
return (void *)src;
}
```
若 *asrc 中,有比對上,則一個一個byte去找 d ,找到則 return address
#### reference
- [記憶體對齊](http://opass.logdown.com/posts/743054-about-memory-alignment)
- [Alignment](https://embeddedartistry.com/blog/2017/02/22/generating-aligned-memory/)
- [alignment allocation](https://books.google.com.tw/books?id=EwlpDwAAQBAJ&pg=PT341&lpg=PT341&dq=unalign+memory+long+boundary+minus&source=bl&ots=Eav77pbR9a&sig=ACfU3U3kWWAEwMl6oGeWpRslibw3J-OFgw&hl=zh-TW&sa=X&ved=2ahUKEwien6jrwp7mAhVrG6YKHc1tDXQ4ChDoATABegQICRAB#v=onepage&q=unalign%20memory%20long%20boundary%20minus&f=true)
### 探討 Big-Endian 與 Little-Endian 硬體是否適用
`*asrc == 0x1a2b3c4d5e6f7g8h`
欲找 `4d`
#### little endian
| 1a | 2b | 3c | 4d | 5e | 6f | 7g | 8h |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
用 mask
| 4d | 4d | 4d | 4d | 4d | 4d | 4d | 4d |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
*asrc xor MASK
| 1a | 2b | 3c | 0 | 5e | 6f | 7g | 8h |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
可找到 4d
#### big endian
| 8h | 7g | 6f | 5e | 4d | 3c | 2b | 1a |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
mask
| 4d | 4d | 4d | 4d | 4d | 4d | 4d | 4d |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
xor MASK
| 8h | 7g | 6f | 5e | 0 | 3c | 2b | 1a |
--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- |
0 1 2 3 4 5 6 7 8
可找到 4d
結論是 big endian 一樣可以用,差在 mask 在被填滿 `4d` 時會從高位址往低位址填,但結果不變
### 分析上述兩種 loop summarisation 實作的效能落差
寫簡單的 [benchmark](https://github.com/colinyoyo26/rawmemchr_perf) 以欲找字元離字串開頭的距離對 cycle 數作圖
- 每個 distance 跑 100 interations
- cycle 數為用 `clock_gettime` 得到的時間和最大頻率相乘得到
- 用 `popen` 執行 shell command `lscpu | grep 'CPU max MHz' | awk '{print $4}'` 得到最大頻率
- 這邊每個 element 定為欲找字元距離字串開頭 byte 數,CPE 就是斜率,單純取每個 Case CPE 的平均 (本來變異度就不大)
- `rawmemchr` 每次檢查 8 bytes `orig` 每次檢查 1 byte , 但是前者還有對齊的預處理,以及為了平行檢查額外的計算量,所以效能沒有達到理想 8 倍提昇

### 論文 Computing Summaries of String Loops in C for Better Testing and Refactoring
#### abstract
- introduce notion of memoryless loop
- using C standard library function
- has application to test optimisation refactoring
- prove correct
- our approach can summarise over two thirds of memoryless loop in less than 5 mins per loop
- show these summarise can be used to
- improve symbolic execution
- optimise native code
- refactor code
#### introduction
- memoryless loop
- do not carry information from one iteration to another
- using C standard library function
- has several advantage
- in this paper, show program analysis via dynamic symbolic execution
- using string solver can have significant scalability benefit
- translate custom loop to use string function
1. define vocabulary that can express memoryless loop
2. algorithm : counterexample-guided synthesis
3. prove equivalent up to small bound
4. comprehensively analysis the impact of time ,program size
#### technique
- goal of our technique is
- translate custom loop into calls to standard string function
- technique use algo CEGIS
- CEGIS is based on dynamic symbolic execution
- dynamic symbolic execution
- program analysis technique that
- automatically explore multiple path through a program
- by using a constraint solver
##### target loop
- take as input a pointer to C string
- return as output a pointer into the same string
##### which loop are translate
- given targeted string loop, our technique aim to translate it into
- equivalent program consist of
- primitive operaton (pointer++)
- and standard string library (strchr)
--- custom loop function ---
```cpp
#define whitespace(c) ((c==_)||(c==\t))
char* loopFunction(char* line){
char *p
for (p=line;p & *p & whitespace(*p);p++)
;
return p
}
loopFunction(line);
```
--- memoryless loop ---
```cpp
line += strspn(line, _\t);
```
--- interpret vocabulary ---
- s : origin string pointer
- program : P_\t\0F = strspn(line, _\t)
```cpp
function interpreter(s,program) {
result = s
skipInstr = false
foreach Instr in program do {
if(skipInstr) {
skipInstr = false
continue
}
else if {
arg = InstrArg(Instr)
switch opcode(Instr) do {
case: P // P enter
result = result + strspn(result,arg)
case: Z // _ \t enter
skipInstr = (reult!=null)
case: F // \0 enter
return result
default // loop run out of Instr or opcode is unknown
return invaildPointer
}
}
}
}
```
```cpp
result = s
skipInstr = false
if(skipInstr) {
skipInstr = false
}else{
result = result + strspn(result,_\t)
}
```
##### CEGIS
- based on symbolic execution
- to synthesis the adopters in vocabulary
1. take a symbolic adapter, constrain it so that correctly translates all currently known example
2. concretize the adapter
- symbolic execution = dynamic symbolic execution = concolic execution
- a program analysis technique that explore path in program using constraint solver
- input: symbolic
- statement depend on input add constraint on the input
- symbolic execution terminate
- constraint solver can used to generate concrete solution to the set of constraint add on the path
ie. the path constraint are x>0 and x<100
- guaranteed to execute the path
ie. constraint solver return x=30
- referred as conuterexample
--- synthesised program ---
```cpp=
// when origin(cex) != interpret(cex,prog)
// put into counterexample
counterexample = 0
while timeout do {
prog = sysmbolicMemoryObj(max_prog_size)
foreach cex in counterexample do {
// add condition to current path constraint
assume(origin(cex) == interpret(cex,prog))
}
killAllOtherPath()
prog = concretize(prog) // make symbolic input concrete,by asking constraint solver
example = sysmbolicMemoryObj(max_ex_size)
startMerge()
isEq(origin(example) == interpret(example,prog))
endMerge()
if(isAlwaysTrue(isEq)) {
return prog
}
assume(!isEq)
cex = concretize(example)
counterexample = counterexample U {cex}
}
```
1-13
- each loop iteration, we create a new symbolic sequence of characters represent our program
- and constraint it such that its output on all currently conuterexample match that of origin function
- 先有 很多 constrain path
- 選一條
14-17
- 在某限制長度下
- 測試 original & interperter 是否相等
19-26
- 若相等 則 找到synthesis program
- 不相等 則 放入 counterexample 產生新的 constraint path
#### evaluation
##### loop database
- implement synthesis algorithm on KLEE symbolic execution engine
- perform evaluation on loop from open source program
- automatic filtering
- compiling each program to LLVM IR
- apply LLVM's mem2reg pass
- apply LLVM's LoopAnalysis to iterate all the loop
- manual filtering
##### loop summaries in symbolic execution
- constraint solver can directly benefit symbolic execution
- to be able to use these solver,constraint have to be express in term of finite vocabulary
- standard string function can esily mapped to this vocabulary
##### loop summaries for optimisation
- complier usually provide built-in function for standard operations
- because such function can often be more efficiently
- complier try to identify loop that can be translated to such build-in function
- our more general technique could be helpful to complier writer recongnize more loops
##### loop summaries for refactoring
- incorporated into refactoring engine
- envision IDE that would hightlight certain loop and suggest to developer a change to standard string function
- submit patch to different application
#### conculsion
- 提出一個轉換c語言中迴圈的方法
- 使用CEGIS方法去轉換
- 應用
- symbolic excution
- 加速
- complier optimisation
- 提昇效能
- refactoring
- 提供patch
### 延伸閱讀
#### Binary 自動分析
#### Symbolic Execution Security
- input不是值,是變數
- 隨著遍歷program會產生 formula
- 機制
- 要如何產生formula
- 要如何解 formula
- satisfiability modulo theories solver
- 演算法
- input : logical formula
- output: 一解 或 無解
---
### 測驗 `2`
考慮到 [cirbuf](https://github.com/sysprog21/cirbuf) 這個 [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) 實作,嘗試透過 [mmap 系統呼叫](http://man7.org/linux/man-pages/man2/mmap.2.html)以化簡緩衝區邊界處理的議題。
為何 circular buffer 初始化時要呼叫三次 mmap 呢?
* 第一次呼叫
* 向作業系統請求配置一塊足以容納兩倍 circular buffer 空間的記憶體
* 第二次呼叫:
* 將稍早用 mkstemp 建立的暫存檔映射到第一次呼叫 mmap 所取得的記憶體中,映射大小為 buffer 的大小
* 第三次呼叫:
* 以第二次要求映射的記憶體位置 (即第一次得到的位置) 加上 buffer 大小的記憶體位置作為要求配置的位置,映射大小同樣為 buffer 的大小,並也是映射到同樣的 fd 上(注意,兩次呼叫傳入的 offset 均為0)
* 如此一來,第二次與第三次映射的記憶體位置即指向同一塊記憶體
有了這個特性後,當我們使用 memcpy 在寫入/讀取 buffer 時即可不用考慮到邊界問題,進而改善存取效率,否則,我們必須考慮到目前 index 是否會超出邊界等等,這將對效能造成衝擊,在 [Using black magic to make a fast circular buffer](https://lo.calho.st/posts/black-magic-buffer/) 一文指出,這兩種方法的效率差距可達三倍之譜。
對照 [cirbuf.h](https://github.com/sysprog21/cirbuf/blob/master/cirbuf.h) 和 [test-cirbuf.c](https://github.com/sysprog21/cirbuf/blob/master/tests/test-cirbuf.c) 以得知具體用法,請補完以下程式碼:
```cpp
static inline int cirbuf_offer(cirbuf_t *cb,
const unsigned char *data,
const int size)
{
/* prevent buffer from getting completely full or over commited */
if (cirbuf_unusedspace(cb) <= size)
return 0;
int written = cirbuf_unusedspace(cb);
written = size < written ? size : written;
memcpy(cb->data + cb->tail, data, written);
cb->tail += written;
MM1
return written;
}
static inline unsigned char *cirbuf_peek(const cirbuf_t *cb)
{
if (cirbuf_is_empty(cb))
return NULL;
MM2
}
```
==作答區==
MM1 = ?
* `(a)` if (cb->size > cb->tail) cb->tail = 0;
* `(b)` if (cb->size < cb->tail) cb->tail = 0;
* `(c)` if (cb->size < cb->tail) cb->tail -= cb->size;
* `(d)` if (cb->size < cb->tail) cb->tail %= cb->size;
* `(e)` if (cb->size < cb->tail) cb->tail++;
* `(f)` if (cb->size < cb->tail) cb->size = 0;
MM2 = ?
* `(a)` return cb->data;
* `(b)` returb cb->head;
* `(c)` returb cb->data + cb->head;
* `(d)` return NULL;
:::success
延伸問題:
1. 解析 test suite 運作原理,嘗試強化現有 cirbuf 的例外處理機制,並且實作於 unit test 內部;
2. 儘管已有 12 個 test case,但涵蓋層面仍不夠廣泛,請指出現有實作的缺陷並著手改善; (提示: 數值範圍及多個 PAGE_SIZE 的空間)
3. 學習 [Using black magic to make a fast circular buffer](https://lo.calho.st/posts/black-magic-buffer/),指出 fast circular buffer 實作的技巧,並分析 cirbuf 的效能,並逐步量化及改善效率;
:::
### test suite 運作原理
一開始會先用 `gentest.sh` 產生 `main.c` 編譯完後執行 `RunAllTests`
```cpp
int main() {
RunAllTests();
return 0;
}
```
#### `RunAllTests` function
先 call `CuSuiteNew` 建造並初始化 `CuSuite` 並回傳指標
- `CuSuite` 資料結構如下,用來紀錄每個 unit test function 的 `Cutest`
```cpp
typedef struct {
int count;
CuTest *list[MAX_TEST_CASES];
int failCount;
} CuSuite;
```
再來會 call `SUITE_ADD_TEST`
- `CuTestNew` 建造並初始化 `CuTest` 並回傳指標
- `CuTest` 資料結構如下,用於管理每個 test case
```cpp
typedef struct CuTestInternal CuTest;
/* define TestFunction as a type of "pointer to function (pointer to Cutest) returning void" */
typedef void (*TestFunction)(CuTest *);
struct CuTestInternal {
/* use to record function name */
const char *name;
/* pointer to test function */
TestFunction function;
/* failed to pass this test or not */
int failed;
/* already ran this function or not */
int ran;
/* record some message when failed */
const char *message;
/* jump to right palce when exception occur */
jmp_buf *jumpBuf;
};
```
- `CuSuiteAdd` 把剛剛初始化的 `CuTest` 加入 `CuSuite` 資料結構中
```cpp
#define SUITE_ADD_TEST(SUITE, TEST) CuSuiteAdd(SUITE, CuTestNew(#TEST, TEST))
```
建立完所有的 `CuTest` 並都加入 `CuSuite` 後,接著執行 `CuSuiteRun`
- 用一個 `for loop` 跑完所有的 `CuTest`
- 並紀錄計個錯誤 `failCount`
```cpp
void CuSuiteRun(CuSuite *testSuite)
{
for (int i = 0; i < testSuite->count; ++i) {
CuTest *testCase = testSuite->list[i];
CuTestRun(testCase);
if (testCase->failed) {
testSuite->failCount += 1;
}
}
}
```
- 第一次 `setjmp` 會回傳 0 ,從 `longjmp` 跳回會回傳非 0 值
```cpp
void CuTestRun(CuTest *tc)
{
printf(" running %s\n", tc->name);
jmp_buf buf;
tc->jumpBuf = &buf;
if (setjmp(buf) == 0) {
tc->ran = 1;
(tc->function)(tc);
}
tc->jumpBuf = 0;
}
```
在每個 unit test function 最後都會 call `CuAssertTrue`
- 其中有兩個參數第一個是 `tc` 就是紀錄該 function 的 `CuTest`
- 第二個是 `cond` 表示測試結果是否正確
- `__FILE__` 會回傳 file path
- `__LINE__` 會回傳所在行數
```cpp
#define CuAssertTrue(tc, cond) \
CuAssert_Line((tc), __FILE__, __LINE__, "assert failed", (cond))
```
- 如果 `cond` 是 `true` 代表測試結果正確,會直接 return
- 否則會繼續 call `CuFail_Line`
- 紀錄訊息到 `CuTest` 中
- `tc->failed` 設為 `true`
- 把錯誤訊息紀錄到 `tc->message`
- 最後用 `longjmp` 跳回 `CuTestRun`
```cpp
void CuAssert_Line(CuTest *tc,
const char *file,
int line,
const char *message,
int condition)
{
if (condition)
return;
CuFail_Line(tc, file, line, NULL, message);
}
```
最後 `CuSuiteDetails` 處理 unit test 的結果,整理到 `CuString` 中
```cpp
void RunAllTests(void) {
CuString *output = CuStringNew();
CuSuite *suite = CuSuiteNew();
SUITE_ADD_TEST(suite, TestCirbuf_set_size_with_init);
SUITE_ADD_TEST(suite, TestCirbuf_is_empty_after_init);
SUITE_ADD_TEST(suite, TestCirbuf_is_not_empty_after_offer);
SUITE_ADD_TEST(suite, TestCirbuf_is_empty_after_poll_release);
SUITE_ADD_TEST(suite, TestCirbuf_spaceused_is_zero_after_poll_release);
SUITE_ADD_TEST(suite, TestCirbuf_cant_offer_if_not_enough_space);
SUITE_ADD_TEST(suite, TestCirbuf_cant_offer_if_buffer_will_be_completely_full);
SUITE_ADD_TEST(suite, TestCirbuf_offer_and_poll);
SUITE_ADD_TEST(suite, TestCirbuf_cant_poll_nonexistant);
SUITE_ADD_TEST(suite, TestCirbuf_cant_poll_twice_when_released);
SUITE_ADD_TEST(suite, TestCirbuf_independant_of_each_other);
SUITE_ADD_TEST(suite, TestCirbuf_independant_of_each_other_with_no_polling);
CuSuiteRun(suite);
CuSuiteDetails(suite, output);
printf("%s\n", CuStringC(output));
}
```
### 強化現有 cirbuf 的例外處理機制
#### 增加 invalid access 例外處理
在增加 test case 的時後有發生 invalid access 的狀況,為了在例外發生時還能繼續跑完所有的 test case 所以加入了 invalid access 的處理
- 先在 `gentest.sh` 設置 `signal`
```cpp
extern void segvhandler();
int main() {
signal(SIGSEGV, segvhandler);
RunAllTests();
return 0;
}
```
- `unit-test.c` 內宣告 `CuTest *test` 全域變數用來追蹤 testing case
```cpp
CuTest *testing;
void CuSuiteRun(CuSuite *testSuite)
{
......
testing = testCase
......
}
```
- invalid access 發生則會讓 `CuFail_Line` 接手處理
```cpp
void segvhandler() {
CuFail_Line(testing, NULL, NULL, NULL, "invalid access");
}
```
- 若 test case 發生 invalid access 其他 case 還是能正常執行完
```
There was 1 failure:
1) TestCirbuf_cant_poll_twice_when_released: (null):0: invalid access
!!!FAILURES!!!
Runs: 14 Passes: 13 Fails: 1
```
:::info
invalid access 例外處理沒辦法追蹤 File Line
:::
### test case 缺陷及改善
#### `cirbuf_poll` function
- 一開始只有檢查是否為空,但如果裡面的 data < `size` 就會出問題了
```cpp
static inline unsigned char *cirbuf_poll(cirbuf_t *cb, const unsigned int size)
{
if (cirbuf_is_empty(cb))
return NULL;
void *end = cb->data + cb->head;
cb->head += size;
if (cb->head >= cb->size)
cb->head -= cb->size;
return end;
}
```
- 以下是原本的 test case 可以看出第二、三次 `cirbuf_poll` 都會直接被擋掉,但如果改一下參數就會出問題了,例如 `cirbuf_poll(&cb, 5)`
```cpp
void TestCirbuf_cant_poll_twice_when_released(CuTest *tc)
{
cirbuf_t cb;
cirbuf_new(&cb, 65536);
cirbuf_offer(&cb, (unsigned char *) "1000", 4);
cirbuf_poll(&cb, 4);
cirbuf_poll(&cb, 4);
CuAssertTrue(tc, NULL == cirbuf_poll(&cb, 4));
}
```
- 加入了新的 test case 檢查這個問題
- 原本會直接把 `head` 移到 `tail` 後面會造成誤判 `usedspace`
```cpp
void TestCirbuf_cant_poll_more_than_datas(CuTest *tc)
{
cirbuf_t cb;
cirbuf_new(&cb, 65536);
cirbuf_offer(&cb, (unsigned char *) "1000", 4);
CuAssertTrue(tc, NULL == cirbuf_poll(&cb, 5));
}
```
```
There was 1 failure:
1) TestCirbuf_cant_poll_more_than_datas: tests/test-cirbuf.c:118: assert failed
!!!FAILURES!!!
Runs: 13 Passes: 12 Fails: 1
```
- 所以一開始的判斷改成 `if (size > (unsigned int) cirbuf_usedspace(cb))` 擋掉超過 `usedspace` 的 `poll` 解決這個問題
#### offer at boundary
因為原本的 test case 沒有測試到邊界,所以加入以下 test case
- 測試結果正常
```cpp
void TestCirbuf_can_offer_if_at_boundary(CuTest *tc){
cirbuf_t cb;
cirbuf_new(&cb, 65536);
char buf[65536];
buf[65535] = '\0';
for(int i = 0; i < 65535; i++)
buf[i] = 'a' + i % 26;
cirbuf_offer(&cb, buf, 65535);
cirbuf_poll(&cb, 65535);
cirbuf_offer(&cb, "9876", 4);
CuAssertTrue(tc, !strncmp("9876", (char *) cirbuf_poll(&cb, 4), 4));
}
```
#### Not multiple of page size
如果 `size` 沒有對齊 page size `mmap` 會失敗
- 把 `create_buffer_mirro` 改成 init 失敗會把 cb->data 設成 `NULL`
- 讓 `cirbuf_new` 回傳 boolean 判斷 init 成功或失敗
```cpp
static inline bool cirbuf_new(cirbuf_t *dst, const unsigned long int size)
{
dst->size = size;
dst->head = dst->tail = 0;
create_buffer_mirror(dst);
return dst->data != NULL;
}
```
```cpp
static void create_buffer_mirror(cirbuf_t *cb)
{
......
if (cb->data == MAP_FAILED)
goto fail;
......
fail:
munmap(cb->data, cb->size << 1);
cb->data = NULL;
close(fd);
}
```
- 增加以下 test case 測試沒有對齊會回傳 false
```cpp
void TestCirbuf_cant_init_if_not_multiple_of_pagesize(CuTest *tc)
{
cirbuf_t cb;
CuAssertTrue(tc, !cirbuf_new(&cb, 65535));
}
```
### 效能分析
以下實作普通版本的 `offer` 和 `mmap` 版本比較效能
```cpp
static inline int cirbuf_offer_orig(cirbuf_t *cb,
const unsigned char *data,
const int size)
{
/* prevent buffer from getting completely full or over commited */
if (cirbuf_unusedspace(cb) <= size)
return 0;
long mask = (long) (size - cb->size + cb->tail) >> 63;
const size_t part1 = (size & mask) + ((cb->size - cb->tail) & ~mask);
const size_t part2 = size - part1;
memcpy(cb->data + cb->tail, data, part1);
memcpy(cb->data, data + part1, part2);
cb->tail += size;
if (cb->size < cb->tail)
cb->tail -= cb->size;
return size;
}
```
- `mmap` 的效率將近原本的 1.5 倍

但其實以上只考慮到了每次寫入 32 bytes 的情況
- 以下是測試每次固定寫入不同 bytes 各跑 100000 iterations
- BUFFER_SIZE == 65536
- 雖然看起來紫色的點還是在比較上方,但優化的成果並沒有想像中的好

### 改善效率
`cirbuf_offer` 超過範圍的 `offer` 已經被第一個條件檔掉了
- `written = size < written ? size : written` 條件式一定成立,所以刪掉這行減少不必要的 branch
```cpp
static inline int cirbuf_offer(cirbuf_t *cb,
const unsigned char *data,
const int size)
if (cirbuf_unusedspace(cb) <= size)
return 0;
int written = cirbuf_unusedspace(cb);
written = size < written ? size : written;
memcpy(cb->data + cb->tail, data, written);
cb->tail += written;
if (cb->size < cb->tail)
cb->tail %= cb->size;
return written;
}
```
#### 對 `cirbuf_offer` 的 data cpy 進行優化
有先試著用 multithread (`openmp`) 進行加速
- 推測產生 thread 的 overhead 相對 `memcpy` 太慢
- L1 data cache 之先需要 synchronization 就可能造成效率更慢
- 結論是在 thread level 做平行不適合
現在嘗試 DLP (Data Level Parallelism)
- 利用 AVX SIMD 加速 data 搬移, code 如下
- 概念是每次搬 32 bytes 剩下的在用 `memcpy` 處理
- [Iintel Intrinsics Guide](https://software.intel.com/sites/landingpage/IntrinsicsGuide/#techs=AVX&cats=Store&expand=3859,2374,2375,3344,3424,3410,3401,3418,5654,3418,5672,5654)
- Load 256-bits of integer data from unaligned memory into dst. This intrinsic may perform better than _mm256_loadu_si256 when the data crosses a cache line boundary.
>因為寫入字串只對齊 1 byte 所以推測常常會橫跨 cache line
>實際測試過 `_mm256_lddqu_si256` 的確效率更好
```cpp
#include <immintrin.h>
#include <stdint.h>
static inline int cirbuf_offer_opt(cirbuf_t *cb,
const unsigned char *data,
const int size)
{
/* prevent buffer from getting completely full or over commited */
if (cirbuf_unusedspace(cb) <= size)
return 0;
void *base = cb->data + cb->tail;
int i = 0, align = sizeof(__m256i);
for (; i < (size & -align); i += align) {
__m256i buf = _mm256_lddqu_si256((__m256i *) (data + i));
_mm256_storeu_si256((__m256i *) (base + i), buf);
}
memcpy(base + i, data + i, size & (align - 1));
cb->tail += size;
if (cb->size < cb->tail)
cb->tail -= cb->size;
return size;
}
```
雖然在特定 case 有較好的效率 (觀察只有在寫入大小夠小時有較好的效能)
- 512 bytes
- BUFFER_SIZE = 4096

但考慮不同寫入大小時就發現沒有優化成功了
- BUFFER_SIZE = 65536

#### 其他嘗試
也嘗試過從原來的小地方著手,最小化 dereference 以及修掉 branch 但並沒有得到明顯的效果
```cpp
static inline int cirbuf_offer_opt(cirbuf_t *cb,
const unsigned char *data,
const int size)
{
/* prevent buffer from getting completely full or over commited */
if (cirbuf_unusedspace(cb) <= size)
return 0;
typeof(cb->data) base = cb->data;
typeof(cb->tail) offset = cb->tail;
typeof(cb->size) capacity = cb->size, mask;
memcpy(base + offset, data, size);
typeof(cb->tail) newtail = offset + size;
mask = ((long) (capacity - newtail)) >> 63;
cb->tail = newtail - (capacity & mask);
return size;
}
```

最後也嘗試了直接略過 cache 寫回 memory 試圖減少 data 在每層 layer copy 的 overhead 結果效能也是很差
- `void _mm256_stream_si256 (__m256i * mem_addr, __m256i a)`
- Store 256-bits of integer data from a into memory using a non-temporal memory hint. mem_addr must be aligned on a 32-byte boundary or a general-protection exception may be generated.
> 這個要對齊才能用
```cpp
for(; i < (size & -align); i += align){
__m256i buf = _mm256_stream_load_si256 ((__m256i *) (data + i));
_mm256_stream_si256 ((__m256i *) (base + i), buf);
}
```

- 同樣的方法[這篇文](https://stackoverflow.com/questions/2963898/faster-alternative-to-memcpy)卻達到很好的效能,他是在 `Ryzen 1800X` 處理器上運行, L1 D-cache 和我規格一樣
- cache line size 為 64 byte 而我每次寫入 32 byte 第二次寫入同個 cache line 還是能利用到 spatial locality(第一次 cold miss) 才對
- 雖然 BUFFER_SIZE = 65536 > 32K 沒辦法 cache 整個 buffer 繞完一圈回到 `cb->tail == 0` 還是會 miss (capacity miss)
- 但為何 stack overflow 那篇可以達到很好的效能?
- 以下是我的 memory 還有 cache 規格,相比他是用 DDR4 (都是 single channel) 差別在 memory 的 clock rate
- `transfer rate = 64 (single channel) * clock rate / 8 (byte / sec)`
```
$ sudo lshw -class memory
*-cache:0
description: L1 cache
physical id: d
slot: L1 Cache
size: 64KiB
capacity: 64KiB
capabilities: synchronous internal write-back data
configuration: level=1
*-memory
description: System Memory
physical id: 12
slot: System board or motherboard
size: 4GiB
*-bank:0
description: SODIMM DDR3 Synchronous 1600 MHz (0.6 ns)
product: 8KTF51264HDZ-1G9E1
vendor: Micron
physical id: 0
serial: 00000000
slot: ChannelA-DIMM0
size: 4GiB
width: 64 bits
clock: 1600MHz (0.6ns)
*-bank:1
description: [empty]
physical id: 1
slot: ChannelB-DIMM0
```
### reference
[Iintel Intrinsics Guide](https://software.intel.com/sites/landingpage/IntrinsicsGuide/#techs=AVX&cats=Store&expand=3859,2374,2375,3344,3424,3410,3401,3418,5654,3418,5672,5654)
[faster alternative to memcpy](https://stackoverflow.com/questions/2963898/faster-alternative-to-memcpy)