# 2019q3 第 11 週測驗題 contributed by <`butterflyred`>, <`colinyoyo26`> ### 測驗 `1` 在 [PLDI'19](https://pldi19.sigplan.org/) 論文 [Computing Summaries of String Loops in C for Better Testing and Refactoring](https://srg.doc.ic.ac.uk/files/papers/loops-pldi-19.pdf) 提到開發者會希望對 loop summarisation 進行替換,例如 [GNU patch](http://www.gnu.org/software/patch/) 這支程式原本程式碼是: ```cpp while (*s != '\n') s++; ``` 替換為 GNU extension: ```cpp s = rawmemchr(s, '\n'); ``` 其中 [rawmemchr](https://linux.die.net/man/3/rawmemchr) 是自從 glibc-2.1 即提供的擴充函式: > The rawmemchr() function is similar to memchr(): it assumes (i.e., the programmer knows for certain) that an instance of c lies somewhere in the memory area starting at the location pointed to by s, and so performs an optimized search for c (i.e., no use of a count argument to limit the range of the search). If an instance of c is not found, the results are unpredictable. The following call is a fast means of locating a string's terminating null byte: > ```cpp > char *p = rawmemchr(s, '\0'); > ``` 以下程式碼嘗試運用 [你所不知道的C語言:數值系統篇](https://hackmd.io/@sysprog/c-numerics) 提及的 Detect NULL 技巧,來加速 `rawmemchr` 的實作,適用於 32 位元和 64 位元的 little-endian 架構: ```cpp #include <limits.h> #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) /* How many bytes are loaded each iteration of the word copy loop. */ #define LBLOCKSIZE (sizeof(long)) #if LONG_MAX == 2147483647L #define DETECTNULL(X) (((X) -0x01010101) & ~(X) &0x80808080) #else #if LONG_MAX == 9223372036854775807L #define DETECTNULL(X) (((X) -0x0101010101010101) & ~(X) &0x8080808080808080) #else #error long int is not a 32bit or 64bit type. #endif #endif #ifndef DETECTNULL #error long int is not a 32bit or 64bit byte #endif /* DETECTCHAR returns nonzero if (long)X contains the byte used to fill (long)MASK. */ #define DETECTCHAR(X, MASK) (DETECTNULL(X ^ MASK)) void *rawmemchr(const void *src_void, int c) { const unsigned char *src = (const unsigned char *) src_void; unsigned char d = c; while (UNALIGNED(src)) { if (*src == d) return (void *) src; src++; } /* If we get this far, src is word-aligned. */ unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask = mask << AAA | mask; for (unsigned long i = BBB; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; while (1) { if (DETECTCHAR(*asrc, mask)) break; asrc++; } /* resort to a bytewise loop. */ for (src = (unsigned char *) asrc;; src++) { if (*src == d) return (void *) src; } } ``` 請補完程式碼,應要能在 x86_64 架構獲得效能提升。 ==作答區== AAA = ? * `(a)` 4 * `(b)` 8 * `(c)` 16 * `(d)` 32 * `(e)` 64 BBB = ? * `(a)` 4 * `(b)` 8 * `(c)` 16 * `(d)` 32 * `(e)` 64 :::success 延伸問題: 1. 解釋上述程式碼運作原理,並探討是否適用於 Big-endian 硬體; 2. 比照 [CS:APP Assign5.18](https://hackmd.io/@aben20807/rkdzvWJTX) 在 GNU/Linux 上透過 perf 一類的效能工具,分析上述兩種 loop summarisation 實作的效能落差,並運用 [CS:APP](https://hackmd.io/@sysprog/CSAPP) 第 5 章的 CPE 分析手法 3. 研讀論文 [Computing Summaries of String Loops in C for Better Testing and Refactoring](https://srg.doc.ic.ac.uk/files/papers/loops-pldi-19.pdf),嘗試探討裡頭 [Symbolic Execution](https://en.wikipedia.org/wiki/Symbolic_execution) 的手法,還有為何能做到自動分析程式碼並給予 refactor 建議 * 延伸閱讀: [Binary 自動分析的那些事](https://hitcon.org/2016/CMT/slide/day1-r1-a-1.pdf), [MIT 6.858 Computer Systems Security: Symbolic Execution](https://youtu.be/yRVZPvHYHzw) ::: ### 程式解析 ```cpp #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) ``` - X : memory address 假設有對齊long bytes - x implies dont care | 0 | 0 | 0 | x | | -------- | --------| -------- | -------- | -------- | -------- | - sizeof(long) - 1 | 1 | 1 | 1 | 0 | | -------- | --------| -------- | -------- | -------- | -------- | 當沒有對齊 long bytes 時,true。 當有對齊 long bytes 時,false。 ```cpp #if LONG_MAX == 2147483647L #define DETECTNULL(X) (((X)-0x01010101) & ~(X)&0x80808080) #else #if LONG_MAX == 9223372036854775807L #define DETECTNULL(X) (((X)-0x0101010101010101) & ~(X)&0x8080808080808080) ``` ```cpp= #include <stdio.h> int main(void) { for (int count = 255; count >= 0; count--) { int x = count; printf("0x%02x\t", x); int inverseX = x ^ 255; printf("0x%02x\t", inverseX); int Xminus = x - 1; printf("0x%02x\t", Xminus); int andXinverse = Xminus & inverseX; printf("0x%02x\t", andXinverse); int and80 = andXinverse & 128; printf("0x%02x\t", and80); printf("\n"); } return 0; } ``` - $2^{31}$ - 1 = 2147483647L - 32 bits 中最大值 - $2^{63}$ - 1 = 9223372036854775807L - 64 bits 中最大值 - DETECTNULL - 判斷這次抓的word中,是否含有null byte - 只有在 byte = 0 (null) 時,返回 0x80 - 只有在 byte != 0 (null) 時,返回 0x00 ```cpp void *rawmemchr(const void *src_void, int c) { const unsigned char *src = (const unsigned char *)src_void; unsigned char d = c; while (UNALIGNED(src)) { if (*src == d) return (void *)src; src++; } ``` - rawmemchr - 在字串中尋找字元 - src_void : 字串 - c : (int) 要找的字元 當字串的記憶體位址沒有對齊時,持續往下一個記憶體位址走,直到對齊為止(沒有對齊同樣讀一個 word 要抓兩次),期間順便往下查找字元,若找到,則return。 ```cpp /* If we get this far, src is word-aligned. */ unsigned long *asrc = (unsigned long *)src; unsigned long mask = d << 8 | d; mask = mask << 16 | mask; for (unsigned long i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; while (1) { if (DETECTCHAR(*asrc, mask)) break; asrc++; } ``` ==64 bits computer== char = 1 byte int = 4 bytes long = 8 bytes (1 word) > 以下底標單位為 byte - int c | &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c&nbsp; &nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; | | -------------------------------- | 0 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 3 - char d | d | | -------- | 0 &nbsp; &nbsp; 1 - d << 8 | 0 | d | | -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp;1 &nbsp; &nbsp; &nbsp;2 &nbsp; - d << 8 | d | d | d | | -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp;1 &nbsp; &nbsp; &nbsp;2 &nbsp; - unsigned long mask = d << 8 | d; | d | d | 0 | 0 | 0 | 0 | 0 | 0 | | -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; 8&nbsp; - mask << 16 | 0 | 0 | d | d | 0 | 0 | 0 | 0 | | -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; 8&nbsp; - mask << 16 | mask | d | d | d | d | 0 | 0 | 0 | 0 | | -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; 8&nbsp; - mask = mask << 16 | mask; | 0 | 0 | 0 | 0 | d | d | d | d | | -------- | -------- | -------- | -------- | -------- | -------- | -------- | -------- | 8&nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; 0&nbsp; - 剩下迴圈以相同邏輯把 `mask` 每個 byte 都填滿 `d` ```cpp for (unsigned long i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; ``` ```cpp while (1) { if (DETECTCHAR(*asrc, mask)) break; asrc++; } ``` - *asrc (假設) | 4 | a | 1 | d | 8 | f | 4 | 2 | |--------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; 8&nbsp; - X 為 asrc 取 64 bits 出來比對 - 依序為 - | 4 | - | a | - | 1 | - ==| d |== - | 8 | - | f | - | 4 | - | 2 | ```cpp X ^ MASK ``` *asrc xor MASK 當比對上時 xor 完欲找字元會變為 0 | 4 | a | 0 | 1 | 8 | f | 4 | 2 | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; 8&nbsp; ```cpp DETECTNULL(X ^ MASK) ``` 判斷其中是否有 0 若有,則比對上d了 ```cpp while (1) { if (DETECTCHAR(*asrc, mask)) break; asrc++; } ``` 如果比對上,則跳出迴圈 若沒有, *asrc 中 依序 取 64 bits 出來比對 ==man page== The rawmemchr() function is similar to memchr(): it assumes (i.e.,the programmer knows for certain) that an instance of c lies somewhere in the memory area starting at the location pointed to by s, and so performs an optimized search for c (i.e., no use of a count argument to limit the range of the search). If an instance of c is not found, the results are unpredictable. 指出一定找的到,想找的char b 如果找不到,是未定義行為 ```cpp for (src = (unsigned char *)asrc;; src++) { if (*src == d) return (void *)src; } ``` 若 *asrc 中,有比對上,則一個一個byte去找 d ,找到則 return address #### reference - [記憶體對齊](http://opass.logdown.com/posts/743054-about-memory-alignment) - [Alignment](https://embeddedartistry.com/blog/2017/02/22/generating-aligned-memory/) - [alignment allocation](https://books.google.com.tw/books?id=EwlpDwAAQBAJ&pg=PT341&lpg=PT341&dq=unalign+memory+long+boundary+minus&source=bl&ots=Eav77pbR9a&sig=ACfU3U3kWWAEwMl6oGeWpRslibw3J-OFgw&hl=zh-TW&sa=X&ved=2ahUKEwien6jrwp7mAhVrG6YKHc1tDXQ4ChDoATABegQICRAB#v=onepage&q=unalign%20memory%20long%20boundary%20minus&f=true) ### 探討 Big-Endian 與 Little-Endian 硬體是否適用 `*asrc == 0x1a2b3c4d5e6f7g8h` 欲找 `4d` #### little endian | 1a | 2b | 3c | 4d | 5e | 6f | 7g | 8h | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; &nbsp; 8&nbsp; 用 mask | 4d | 4d | 4d | 4d | 4d | 4d | 4d | 4d | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; &nbsp; 8&nbsp; *asrc xor MASK | 1a | 2b | 3c | 0 | 5e | 6f | 7g | 8h | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; &nbsp; 8&nbsp; 可找到 4d #### big endian | 8h | 7g | 6f | 5e | 4d | 3c | 2b | 1a | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; &nbsp; 8&nbsp; mask | 4d | 4d | 4d | 4d | 4d | 4d | 4d | 4d | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; &nbsp; 8&nbsp; xor MASK | 8h | 7g | 6f | 5e | 0 | 3c | 2b | 1a | --------| -------- | -------- | -------- | -------- | --------| -------- | -------- | -------- | -------- | 0&nbsp; &nbsp; &nbsp; &nbsp; 1&nbsp; &nbsp; &nbsp; &nbsp; 2&nbsp; &nbsp; &nbsp; &nbsp; 3&nbsp; &nbsp; &nbsp; &nbsp; 4&nbsp; &nbsp; &nbsp; &nbsp; 5&nbsp; &nbsp; &nbsp; &nbsp; 6&nbsp; &nbsp; &nbsp; &nbsp; 7&nbsp; &nbsp; &nbsp; &nbsp; 8&nbsp; 可找到 4d 結論是 big endian 一樣可以用,差在 mask 在被填滿 `4d` 時會從高位址往低位址填,但結果不變 ### 分析上述兩種 loop summarisation 實作的效能落差 寫簡單的 [benchmark](https://github.com/colinyoyo26/rawmemchr_perf) 以欲找字元離字串開頭的距離對 cycle 數作圖 - 每個 distance 跑 100 interations - cycle 數為用 `clock_gettime` 得到的時間和最大頻率相乘得到 - 用 `popen` 執行 shell command `lscpu | grep 'CPU max MHz' | awk '{print $4}'` 得到最大頻率 - 這邊每個 element 定為欲找字元距離字串開頭 byte 數,CPE 就是斜率,單純取每個 Case CPE 的平均 (本來變異度就不大) - `rawmemchr` 每次檢查 8 bytes `orig` 每次檢查 1 byte , 但是前者還有對齊的預處理,以及為了平行檢查額外的計算量,所以效能沒有達到理想 8 倍提昇 ![](https://i.imgur.com/VnirR7I.png) ### 論文 Computing Summaries of String Loops in C for Better Testing and Refactoring #### abstract - introduce notion of memoryless loop - using C standard library function - has application to test optimisation refactoring - prove correct - our approach can summarise over two thirds of memoryless loop in less than 5 mins per loop - show these summarise can be used to - improve symbolic execution - optimise native code - refactor code #### introduction - memoryless loop - do not carry information from one iteration to another - using C standard library function - has several advantage - in this paper, show program analysis via dynamic symbolic execution - using string solver can have significant scalability benefit - translate custom loop to use string function 1. define vocabulary that can express memoryless loop 2. algorithm : counterexample-guided synthesis 3. prove equivalent up to small bound 4. comprehensively analysis the impact of time ,program size #### technique - goal of our technique is - translate custom loop into calls to standard string function - technique use algo CEGIS - CEGIS is based on dynamic symbolic execution - dynamic symbolic execution - program analysis technique that - automatically explore multiple path through a program - by using a constraint solver ##### target loop - take as input a pointer to C string - return as output a pointer into the same string ##### which loop are translate - given targeted string loop, our technique aim to translate it into - equivalent program consist of - primitive operaton (pointer++) - and standard string library (strchr) --- custom loop function --- ```cpp #define whitespace(c) ((c==_)||(c==\t)) char* loopFunction(char* line){ char *p for (p=line;p & *p & whitespace(*p);p++) ; return p } loopFunction(line); ``` --- memoryless loop --- ```cpp line += strspn(line, _\t); ``` --- interpret vocabulary --- - s : origin string pointer - program : P_\t\0F = strspn(line, _\t) ```cpp function interpreter(s,program) { result = s skipInstr = false foreach Instr in program do { if(skipInstr) { skipInstr = false continue } else if { arg = InstrArg(Instr) switch opcode(Instr) do { case: P // P enter result = result + strspn(result,arg) case: Z // _ \t enter skipInstr = (reult!=null) case: F // \0 enter return result default // loop run out of Instr or opcode is unknown return invaildPointer } } } } ``` ```cpp result = s skipInstr = false if(skipInstr) { skipInstr = false }else{ result = result + strspn(result,_\t) } ``` ##### CEGIS - based on symbolic execution - to synthesis the adopters in vocabulary 1. take a symbolic adapter, constrain it so that correctly translates all currently known example 2. concretize the adapter - symbolic execution = dynamic symbolic execution = concolic execution - a program analysis technique that explore path in program using constraint solver - input: symbolic - statement depend on input add constraint on the input - symbolic execution terminate - constraint solver can used to generate concrete solution to the set of constraint add on the path ie. the path constraint are x>0 and x<100 - guaranteed to execute the path ie. constraint solver return x=30 - referred as conuterexample --- synthesised program --- ```cpp= // when origin(cex) != interpret(cex,prog) // put into counterexample counterexample = 0 while timeout do { prog = sysmbolicMemoryObj(max_prog_size) foreach cex in counterexample do { // add condition to current path constraint assume(origin(cex) == interpret(cex,prog)) } killAllOtherPath() prog = concretize(prog) // make symbolic input concrete,by asking constraint solver example = sysmbolicMemoryObj(max_ex_size) startMerge() isEq(origin(example) == interpret(example,prog)) endMerge() if(isAlwaysTrue(isEq)) { return prog } assume(!isEq) cex = concretize(example) counterexample = counterexample U {cex} } ``` 1-13 - each loop iteration, we create a new symbolic sequence of characters represent our program - and constraint it such that its output on all currently conuterexample match that of origin function - 先有 很多 constrain path - 選一條 14-17 - 在某限制長度下 - 測試 original & interperter 是否相等 19-26 - 若相等 則 找到synthesis program - 不相等 則 放入 counterexample 產生新的 constraint path #### evaluation ##### loop database - implement synthesis algorithm on KLEE symbolic execution engine - perform evaluation on loop from open source program - automatic filtering - compiling each program to LLVM IR - apply LLVM's mem2reg pass - apply LLVM's LoopAnalysis to iterate all the loop - manual filtering ##### loop summaries in symbolic execution - constraint solver can directly benefit symbolic execution - to be able to use these solver,constraint have to be express in term of finite vocabulary - standard string function can esily mapped to this vocabulary ##### loop summaries for optimisation - complier usually provide built-in function for standard operations - because such function can often be more efficiently - complier try to identify loop that can be translated to such build-in function - our more general technique could be helpful to complier writer recongnize more loops ##### loop summaries for refactoring - incorporated into refactoring engine - envision IDE that would hightlight certain loop and suggest to developer a change to standard string function - submit patch to different application #### conculsion - 提出一個轉換c語言中迴圈的方法 - 使用CEGIS方法去轉換 - 應用 - symbolic excution - 加速 - complier optimisation - 提昇效能 - refactoring - 提供patch ### 延伸閱讀 #### Binary 自動分析 #### Symbolic Execution Security - input不是值,是變數 - 隨著遍歷program會產生 formula - 機制 - 要如何產生formula - 要如何解 formula - satisfiability modulo theories solver - 演算法 - input : logical formula - output: 一解 或 無解 --- ### 測驗 `2` 考慮到 [cirbuf](https://github.com/sysprog21/cirbuf) 這個 [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) 實作,嘗試透過 [mmap 系統呼叫](http://man7.org/linux/man-pages/man2/mmap.2.html)以化簡緩衝區邊界處理的議題。 為何 circular buffer 初始化時要呼叫三次 mmap 呢? * 第一次呼叫 * 向作業系統請求配置一塊足以容納兩倍 circular buffer 空間的記憶體 * 第二次呼叫: * 將稍早用 mkstemp 建立的暫存檔映射到第一次呼叫 mmap 所取得的記憶體中,映射大小為 buffer 的大小 * 第三次呼叫: * 以第二次要求映射的記憶體位置 (即第一次得到的位置) 加上 buffer 大小的記憶體位置作為要求配置的位置,映射大小同樣為 buffer 的大小,並也是映射到同樣的 fd 上(注意,兩次呼叫傳入的 offset 均為0) * 如此一來,第二次與第三次映射的記憶體位置即指向同一塊記憶體 有了這個特性後,當我們使用 memcpy 在寫入/讀取 buffer 時即可不用考慮到邊界問題,進而改善存取效率,否則,我們必須考慮到目前 index 是否會超出邊界等等,這將對效能造成衝擊,在 [Using black magic to make a fast circular buffer](https://lo.calho.st/posts/black-magic-buffer/) 一文指出,這兩種方法的效率差距可達三倍之譜。 對照 [cirbuf.h](https://github.com/sysprog21/cirbuf/blob/master/cirbuf.h) 和 [test-cirbuf.c](https://github.com/sysprog21/cirbuf/blob/master/tests/test-cirbuf.c) 以得知具體用法,請補完以下程式碼: ```cpp static inline int cirbuf_offer(cirbuf_t *cb, const unsigned char *data, const int size) { /* prevent buffer from getting completely full or over commited */ if (cirbuf_unusedspace(cb) <= size) return 0; int written = cirbuf_unusedspace(cb); written = size < written ? size : written; memcpy(cb->data + cb->tail, data, written); cb->tail += written; MM1 return written; } static inline unsigned char *cirbuf_peek(const cirbuf_t *cb) { if (cirbuf_is_empty(cb)) return NULL; MM2 } ``` ==作答區== MM1 = ? * `(a)` if (cb->size > cb->tail) cb->tail = 0; * `(b)` if (cb->size < cb->tail) cb->tail = 0; * `(c)` if (cb->size < cb->tail) cb->tail -= cb->size; * `(d)` if (cb->size < cb->tail) cb->tail %= cb->size; * `(e)` if (cb->size < cb->tail) cb->tail++; * `(f)` if (cb->size < cb->tail) cb->size = 0; MM2 = ? * `(a)` return cb->data; * `(b)` returb cb->head; * `(c)` returb cb->data + cb->head; * `(d)` return NULL; :::success 延伸問題: 1. 解析 test suite 運作原理,嘗試強化現有 cirbuf 的例外處理機制,並且實作於 unit test 內部; 2. 儘管已有 12 個 test case,但涵蓋層面仍不夠廣泛,請指出現有實作的缺陷並著手改善; (提示: 數值範圍及多個 PAGE_SIZE 的空間) 3. 學習 [Using black magic to make a fast circular buffer](https://lo.calho.st/posts/black-magic-buffer/),指出 fast circular buffer 實作的技巧,並分析 cirbuf 的效能,並逐步量化及改善效率; ::: ### test suite 運作原理 一開始會先用 `gentest.sh` 產生 `main.c` 編譯完後執行 `RunAllTests` ```cpp int main() { RunAllTests(); return 0; } ``` #### `RunAllTests` function 先 call `CuSuiteNew` 建造並初始化 `CuSuite` 並回傳指標 - `CuSuite` 資料結構如下,用來紀錄每個 unit test function 的 `Cutest` ```cpp typedef struct { int count; CuTest *list[MAX_TEST_CASES]; int failCount; } CuSuite; ``` 再來會 call `SUITE_ADD_TEST` - `CuTestNew` 建造並初始化 `CuTest` 並回傳指標 - `CuTest` 資料結構如下,用於管理每個 test case ```cpp typedef struct CuTestInternal CuTest; /* define TestFunction as a type of "pointer to function (pointer to Cutest) returning void" */ typedef void (*TestFunction)(CuTest *); struct CuTestInternal { /* use to record function name */ const char *name; /* pointer to test function */ TestFunction function; /* failed to pass this test or not */ int failed; /* already ran this function or not */ int ran; /* record some message when failed */ const char *message; /* jump to right palce when exception occur */ jmp_buf *jumpBuf; }; ``` - `CuSuiteAdd` 把剛剛初始化的 `CuTest` 加入 `CuSuite` 資料結構中 ```cpp #define SUITE_ADD_TEST(SUITE, TEST) CuSuiteAdd(SUITE, CuTestNew(#TEST, TEST)) ``` 建立完所有的 `CuTest` 並都加入 `CuSuite` 後,接著執行 `CuSuiteRun` - 用一個 `for loop` 跑完所有的 `CuTest` - 並紀錄計個錯誤 `failCount` ```cpp void CuSuiteRun(CuSuite *testSuite) { for (int i = 0; i < testSuite->count; ++i) { CuTest *testCase = testSuite->list[i]; CuTestRun(testCase); if (testCase->failed) { testSuite->failCount += 1; } } } ``` - 第一次 `setjmp` 會回傳 0 ,從 `longjmp` 跳回會回傳非 0 值 ```cpp void CuTestRun(CuTest *tc) { printf(" running %s\n", tc->name); jmp_buf buf; tc->jumpBuf = &buf; if (setjmp(buf) == 0) { tc->ran = 1; (tc->function)(tc); } tc->jumpBuf = 0; } ``` 在每個 unit test function 最後都會 call `CuAssertTrue` - 其中有兩個參數第一個是 `tc` 就是紀錄該 function 的 `CuTest` - 第二個是 `cond` 表示測試結果是否正確 - `__FILE__` 會回傳 file path - `__LINE__` 會回傳所在行數 ```cpp #define CuAssertTrue(tc, cond) \ CuAssert_Line((tc), __FILE__, __LINE__, "assert failed", (cond)) ``` - 如果 `cond` 是 `true` 代表測試結果正確,會直接 return - 否則會繼續 call `CuFail_Line` - 紀錄訊息到 `CuTest` 中 - `tc->failed` 設為 `true` - 把錯誤訊息紀錄到 `tc->message` - 最後用 `longjmp` 跳回 `CuTestRun` ```cpp void CuAssert_Line(CuTest *tc, const char *file, int line, const char *message, int condition) { if (condition) return; CuFail_Line(tc, file, line, NULL, message); } ``` 最後 `CuSuiteDetails` 處理 unit test 的結果,整理到 `CuString` 中 ```cpp void RunAllTests(void) { CuString *output = CuStringNew(); CuSuite *suite = CuSuiteNew(); SUITE_ADD_TEST(suite, TestCirbuf_set_size_with_init); SUITE_ADD_TEST(suite, TestCirbuf_is_empty_after_init); SUITE_ADD_TEST(suite, TestCirbuf_is_not_empty_after_offer); SUITE_ADD_TEST(suite, TestCirbuf_is_empty_after_poll_release); SUITE_ADD_TEST(suite, TestCirbuf_spaceused_is_zero_after_poll_release); SUITE_ADD_TEST(suite, TestCirbuf_cant_offer_if_not_enough_space); SUITE_ADD_TEST(suite, TestCirbuf_cant_offer_if_buffer_will_be_completely_full); SUITE_ADD_TEST(suite, TestCirbuf_offer_and_poll); SUITE_ADD_TEST(suite, TestCirbuf_cant_poll_nonexistant); SUITE_ADD_TEST(suite, TestCirbuf_cant_poll_twice_when_released); SUITE_ADD_TEST(suite, TestCirbuf_independant_of_each_other); SUITE_ADD_TEST(suite, TestCirbuf_independant_of_each_other_with_no_polling); CuSuiteRun(suite); CuSuiteDetails(suite, output); printf("%s\n", CuStringC(output)); } ``` ### 強化現有 cirbuf 的例外處理機制 #### 增加 invalid access 例外處理 在增加 test case 的時後有發生 invalid access 的狀況,為了在例外發生時還能繼續跑完所有的 test case 所以加入了 invalid access 的處理 - 先在 `gentest.sh` 設置 `signal` ```cpp extern void segvhandler(); int main() { signal(SIGSEGV, segvhandler); RunAllTests(); return 0; } ``` - `unit-test.c` 內宣告 `CuTest *test` 全域變數用來追蹤 testing case ```cpp CuTest *testing; void CuSuiteRun(CuSuite *testSuite) { ...... testing = testCase ...... } ``` - invalid access 發生則會讓 `CuFail_Line` 接手處理 ```cpp void segvhandler() { CuFail_Line(testing, NULL, NULL, NULL, "invalid access"); } ``` - 若 test case 發生 invalid access 其他 case 還是能正常執行完 ``` There was 1 failure: 1) TestCirbuf_cant_poll_twice_when_released: (null):0: invalid access !!!FAILURES!!! Runs: 14 Passes: 13 Fails: 1 ``` :::info invalid access 例外處理沒辦法追蹤 File Line ::: ### test case 缺陷及改善 #### `cirbuf_poll` function - 一開始只有檢查是否為空,但如果裡面的 data < `size` 就會出問題了 ```cpp static inline unsigned char *cirbuf_poll(cirbuf_t *cb, const unsigned int size) { if (cirbuf_is_empty(cb)) return NULL; void *end = cb->data + cb->head; cb->head += size; if (cb->head >= cb->size) cb->head -= cb->size; return end; } ``` - 以下是原本的 test case 可以看出第二、三次 `cirbuf_poll` 都會直接被擋掉,但如果改一下參數就會出問題了,例如 `cirbuf_poll(&cb, 5)` ```cpp void TestCirbuf_cant_poll_twice_when_released(CuTest *tc) { cirbuf_t cb; cirbuf_new(&cb, 65536); cirbuf_offer(&cb, (unsigned char *) "1000", 4); cirbuf_poll(&cb, 4); cirbuf_poll(&cb, 4); CuAssertTrue(tc, NULL == cirbuf_poll(&cb, 4)); } ``` - 加入了新的 test case 檢查這個問題 - 原本會直接把 `head` 移到 `tail` 後面會造成誤判 `usedspace` ```cpp void TestCirbuf_cant_poll_more_than_datas(CuTest *tc) { cirbuf_t cb; cirbuf_new(&cb, 65536); cirbuf_offer(&cb, (unsigned char *) "1000", 4); CuAssertTrue(tc, NULL == cirbuf_poll(&cb, 5)); } ``` ``` There was 1 failure: 1) TestCirbuf_cant_poll_more_than_datas: tests/test-cirbuf.c:118: assert failed !!!FAILURES!!! Runs: 13 Passes: 12 Fails: 1 ``` - 所以一開始的判斷改成 `if (size > (unsigned int) cirbuf_usedspace(cb))` 擋掉超過 `usedspace` 的 `poll` 解決這個問題 #### offer at boundary 因為原本的 test case 沒有測試到邊界,所以加入以下 test case - 測試結果正常 ```cpp void TestCirbuf_can_offer_if_at_boundary(CuTest *tc){ cirbuf_t cb; cirbuf_new(&cb, 65536); char buf[65536]; buf[65535] = '\0'; for(int i = 0; i < 65535; i++) buf[i] = 'a' + i % 26; cirbuf_offer(&cb, buf, 65535); cirbuf_poll(&cb, 65535); cirbuf_offer(&cb, "9876", 4); CuAssertTrue(tc, !strncmp("9876", (char *) cirbuf_poll(&cb, 4), 4)); } ``` #### Not multiple of page size 如果 `size` 沒有對齊 page size `mmap` 會失敗 - 把 `create_buffer_mirro` 改成 init 失敗會把 cb->data 設成 `NULL` - 讓 `cirbuf_new` 回傳 boolean 判斷 init 成功或失敗 ```cpp static inline bool cirbuf_new(cirbuf_t *dst, const unsigned long int size) { dst->size = size; dst->head = dst->tail = 0; create_buffer_mirror(dst); return dst->data != NULL; } ``` ```cpp static void create_buffer_mirror(cirbuf_t *cb) { ...... if (cb->data == MAP_FAILED) goto fail; ...... fail: munmap(cb->data, cb->size << 1); cb->data = NULL; close(fd); } ``` - 增加以下 test case 測試沒有對齊會回傳 false ```cpp void TestCirbuf_cant_init_if_not_multiple_of_pagesize(CuTest *tc) { cirbuf_t cb; CuAssertTrue(tc, !cirbuf_new(&cb, 65535)); } ``` ### 效能分析 以下實作普通版本的 `offer` 和 `mmap` 版本比較效能 ```cpp static inline int cirbuf_offer_orig(cirbuf_t *cb, const unsigned char *data, const int size) { /* prevent buffer from getting completely full or over commited */ if (cirbuf_unusedspace(cb) <= size) return 0; long mask = (long) (size - cb->size + cb->tail) >> 63; const size_t part1 = (size & mask) + ((cb->size - cb->tail) & ~mask); const size_t part2 = size - part1; memcpy(cb->data + cb->tail, data, part1); memcpy(cb->data, data + part1, part2); cb->tail += size; if (cb->size < cb->tail) cb->tail -= cb->size; return size; } ``` - `mmap` 的效率將近原本的 1.5 倍 ![](https://i.imgur.com/WbCdmoK.png) 但其實以上只考慮到了每次寫入 32 bytes 的情況 - 以下是測試每次固定寫入不同 bytes 各跑 100000 iterations - BUFFER_SIZE == 65536 - 雖然看起來紫色的點還是在比較上方,但優化的成果並沒有想像中的好 ![](https://i.imgur.com/D34cWdE.png) ### 改善效率 `cirbuf_offer` 超過範圍的 `offer` 已經被第一個條件檔掉了 - `written = size < written ? size : written` 條件式一定成立,所以刪掉這行減少不必要的 branch ```cpp static inline int cirbuf_offer(cirbuf_t *cb, const unsigned char *data, const int size) if (cirbuf_unusedspace(cb) <= size) return 0; int written = cirbuf_unusedspace(cb); written = size < written ? size : written; memcpy(cb->data + cb->tail, data, written); cb->tail += written; if (cb->size < cb->tail) cb->tail %= cb->size; return written; } ``` #### 對 `cirbuf_offer` 的 data cpy 進行優化 有先試著用 multithread (`openmp`) 進行加速 - 推測產生 thread 的 overhead 相對 `memcpy` 太慢 - L1 data cache 之先需要 synchronization 就可能造成效率更慢 - 結論是在 thread level 做平行不適合 現在嘗試 DLP (Data Level Parallelism) - 利用 AVX SIMD 加速 data 搬移, code 如下 - 概念是每次搬 32 bytes 剩下的在用 `memcpy` 處理 - [Iintel Intrinsics Guide](https://software.intel.com/sites/landingpage/IntrinsicsGuide/#techs=AVX&cats=Store&expand=3859,2374,2375,3344,3424,3410,3401,3418,5654,3418,5672,5654) - Load 256-bits of integer data from unaligned memory into dst. This intrinsic may perform better than _mm256_loadu_si256 when the data crosses a cache line boundary. >因為寫入字串只對齊 1 byte 所以推測常常會橫跨 cache line >實際測試過 `_mm256_lddqu_si256` 的確效率更好 ```cpp #include <immintrin.h> #include <stdint.h> static inline int cirbuf_offer_opt(cirbuf_t *cb, const unsigned char *data, const int size) { /* prevent buffer from getting completely full or over commited */ if (cirbuf_unusedspace(cb) <= size) return 0; void *base = cb->data + cb->tail; int i = 0, align = sizeof(__m256i); for (; i < (size & -align); i += align) { __m256i buf = _mm256_lddqu_si256((__m256i *) (data + i)); _mm256_storeu_si256((__m256i *) (base + i), buf); } memcpy(base + i, data + i, size & (align - 1)); cb->tail += size; if (cb->size < cb->tail) cb->tail -= cb->size; return size; } ``` 雖然在特定 case 有較好的效率 (觀察只有在寫入大小夠小時有較好的效能) - 512 bytes - BUFFER_SIZE = 4096 ![](https://i.imgur.com/9qzzE3R.png) 但考慮不同寫入大小時就發現沒有優化成功了 - BUFFER_SIZE = 65536 ![](https://i.imgur.com/7peFvN5.png) #### 其他嘗試 也嘗試過從原來的小地方著手,最小化 dereference 以及修掉 branch 但並沒有得到明顯的效果 ```cpp static inline int cirbuf_offer_opt(cirbuf_t *cb, const unsigned char *data, const int size) { /* prevent buffer from getting completely full or over commited */ if (cirbuf_unusedspace(cb) <= size) return 0; typeof(cb->data) base = cb->data; typeof(cb->tail) offset = cb->tail; typeof(cb->size) capacity = cb->size, mask; memcpy(base + offset, data, size); typeof(cb->tail) newtail = offset + size; mask = ((long) (capacity - newtail)) >> 63; cb->tail = newtail - (capacity & mask); return size; } ``` ![](https://i.imgur.com/7uecdf1.png) 最後也嘗試了直接略過 cache 寫回 memory 試圖減少 data 在每層 layer copy 的 overhead 結果效能也是很差 - `void _mm256_stream_si256 (__m256i * mem_addr, __m256i a)` - Store 256-bits of integer data from a into memory using a non-temporal memory hint. mem_addr must be aligned on a 32-byte boundary or a general-protection exception may be generated. > 這個要對齊才能用 ```cpp for(; i < (size & -align); i += align){ __m256i buf = _mm256_stream_load_si256 ((__m256i *) (data + i)); _mm256_stream_si256 ((__m256i *) (base + i), buf); } ``` ![](https://i.imgur.com/WNpxKdJ.png) - 同樣的方法[這篇文](https://stackoverflow.com/questions/2963898/faster-alternative-to-memcpy)卻達到很好的效能,他是在 `Ryzen 1800X` 處理器上運行, L1 D-cache 和我規格一樣 - cache line size 為 64 byte 而我每次寫入 32 byte 第二次寫入同個 cache line 還是能利用到 spatial locality(第一次 cold miss) 才對 - 雖然 BUFFER_SIZE = 65536 > 32K 沒辦法 cache 整個 buffer 繞完一圈回到 `cb->tail == 0` 還是會 miss (capacity miss) - 但為何 stack overflow 那篇可以達到很好的效能? - 以下是我的 memory 還有 cache 規格,相比他是用 DDR4 (都是 single channel) 差別在 memory 的 clock rate - `transfer rate = 64 (single channel) * clock rate / 8 (byte / sec)` ``` $ sudo lshw -class memory *-cache:0 description: L1 cache physical id: d slot: L1 Cache size: 64KiB capacity: 64KiB capabilities: synchronous internal write-back data configuration: level=1 *-memory description: System Memory physical id: 12 slot: System board or motherboard size: 4GiB *-bank:0 description: SODIMM DDR3 Synchronous 1600 MHz (0.6 ns) product: 8KTF51264HDZ-1G9E1 vendor: Micron physical id: 0 serial: 00000000 slot: ChannelA-DIMM0 size: 4GiB width: 64 bits clock: 1600MHz (0.6ns) *-bank:1 description: [empty] physical id: 1 slot: ChannelB-DIMM0 ``` ### reference [Iintel Intrinsics Guide](https://software.intel.com/sites/landingpage/IntrinsicsGuide/#techs=AVX&cats=Store&expand=3859,2374,2375,3344,3424,3410,3401,3418,5654,3418,5672,5654) [faster alternative to memcpy](https://stackoverflow.com/questions/2963898/faster-alternative-to-memcpy)