# ringbuffer 期末報告 contributed by < `rebvivi、Laurora` > ###### tags: `linux2019` * [F11: ringbuffer](https://hackmd.io/s/SkYLI9CiN) ## 作業要求 - 完成 [第 11 週測驗題 (上)](https://hackmd.io/@sysprog/HypUB7HjV?type=view#2019q1-%E7%AC%AC-11-%E9%80%B1%E6%B8%AC%E9%A9%97%E9%A1%8C-%E4%B8%8A) 和所有延伸題目 - 在 Linux 核心原始程式碼指出類似的 ring buffer 實作,設計 Linux 核心模組的實驗,需要探討對應的原理 - 需要涵蓋 kernel API 同步機制的運用 - 執行時期的分析 (提示: 可善用 eBPF) ## 測驗 `1` ### create_buffer_mirror () - 在 `cirbuf.h` 中的 `create_buffer_mirror()` 函式中有用到 mmap >在 [mmap 的 Linux Man Page](http://man7.org/linux/man-pages/man2/mmap.2.html) 中有提到: > >mmap() creates a new mapping in the virtual address space of the calling process. > >將檔案內容 mapping 到一段 virtual memory 上,透過對這段 memory 的讀取和修改,實現對檔案的讀取和修改 > >>#include <sys/mman.h> void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset); > >- `void *addr`:指向 new mapping 的 starting address ,通常設為 NULL,代表讓 kernel 自動選定 address > >- `size_t length`: mapping 的長度 > >- `int prot`: mapping 區域所想要的 memory protection 方式 >`PROT_EXEC`:pages 可被 execute `PROT_READ`:pages 可被 read `PROT_WRITE`:pages 可被 write `PROT_NONE`:pages 不能被 access > >- `int flags`:影響 mapping 區域的各種特性 >`MAP_ANONYMOUS`:建立匿名 mapping ,會忽略參數 fd,不涉及檔案,而且 mapping 區域無法和其他 process 共享 >`MAP_PRIVATE`:不允許其他 mapping 該檔案的 process 共享,對 mapping 區域的寫入操作會產生一個 copy-on-write 的 mapping ,對此區域所做的修改不會寫回原檔案 > >- `int fd`:由 open 返回的 file descriptor ,代表要 mapping 到 kernel 的檔案,如果 `flags` 中設置了`MAP_ANONYMOUS`, `fd` 設為 -1 > >- `off_t offset`:從檔案 mapping 開始處的 offset ,通常為 0 ,代表從檔案最前方開始 mapping > >如果 mapping 成功則 return mapping 區域的 kernel starting address,否則 return MAP_FAILED(-1) ```cpp= static int create_buffer_mirror(cirbuf_t *cb) { char path[] = "/tmp/cirbuf-XXXXXX"; int fd = mkstemp(path); int vu = unlink(path); int vf = ftruncate(fd, cb->size); /* FIXME: validate if mkstemp, unlink, ftruncate failed */ if (fd < 0) return -1; if (vu) return -1; if (vf) return -1; /* create the array of data */ cb->data = mmap(NULL, cb->size << 1, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); /* FIXME: validate if cb->data != MAP_FAILED */ if (cb->data == MAP_FAILED) return -1; void *address = mmap(cb->data, cb->size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0); /* FIXME: validate if address == cb->data */ if (address != cb->data) return -1; address = mmap(cb->data + cb->size, cb->size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0); /* FIXME: validate if address == cb->data + cb->size */ if (address != cb->data + cb->size) return -1; close(fd); return 0; } ``` - 第 4 行中,根據 path 建立檔名不重複的暫存檔 ```cpp int fd = mkstemp(path); ``` >在 [mkstemp 的 Linux Man Page](http://man7.org/linux/man-pages/man3/mkstemp.3.html) 中有提到: > >The mkstemp() function generates a unique temporary filename from template, creates and opens the file, and returns an open file descriptor for the file. > >>#include <stdlib.h> int mkstemp(char *template); > - 第 5 行中,刪除參數 path 所指定的檔案 如果該檔名為最後連接點,但有其他 process 打開了此檔案,則在所有關於此檔案的 file descriptor 皆關閉後才會刪除 ```cpp int vu = unlink(path); ``` >在 [unlink 的 Linux Man Page](https://linux.die.net/man/2/unlink) 中有提到: > >unlink() deletes a name from the file system. If that name was the last link to a file and no processes have the file open the file is deleted and the space it was using is made available for reuse. > >If the name was the last link to a file but any processes still have the file open the file will remain in existence until the last file descriptor referring to it is closed. > >>#include <unistd.h> int unlink(const char *pathname); - 第 6 行中,將檔案的大小指定為 cb->size ```cpp int vf = ftruncate(fd, cb->size); ``` >在 [ftruncate 的 Linux Man Page](https://linux.die.net/man/2/ftruncate) 中有提到: > >The truncate() and ftruncate() functions cause the regular file named by path or referenced by fd to be truncated to a size of precisely length bytes. > >If the file previously was larger than this size, the extra data is lost. If the file previously was shorter, it is extended, and the extended part reads as null bytes ('\0'). > >>#include <unistd.h> #include <sys/types.h> int ftruncate(int fd, off_t length); > >ftruncate() 會將參數 fd 指定的檔案大小改為參數 length 指定的大小。參數 fd 為已打開的 file descriptor ,而且必須是以寫入模式打開的檔案。如果原來的檔案容量比參數 length 大,則超過的部分會被刪去 - 第 18 行中,使用 `MAP_ANONYMOUS` 和 ` MAP_PRIVATE` 的方式分配了一塊為指定緩衝區大小 2 倍的 memory space ```cpp cb->data = mmap(NULL, cb->size << 1, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE,-1, 0); ``` - 第 25 行中,將 mkstemp 所建立的暫存檔 mapping 到 address `data[0] ~ data[size]` ```cpp void *address = mmap(cb->data, cb->size, PROT_READ | PROT_WRITE,MAP_FIXED | MAP_SHARED, fd, 0); ``` - 第 32 行中,將 mkstemp 所建立的暫存檔 mapping 到 address `data[size] ~ data[2 * size]` ```cpp address = mmap(cb->data + cb->size, cb->size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0); ``` 這樣對 `data[i]` 的讀寫操作就等同於對 `data [i+size]` 的讀寫操作,達到了 ring buffer 這樣形同環狀資料的處理 --- ### cirbuf_offer () ```cpp= /** Write data to the tail of the circular buffer. * Increases the position of the tail. * This copies data to the circular buffer using memcpy. * After calling you should free the data if you don't need it anymore. * * @param cb The circular buffer. * @param data Buffer to be written to the circular buffer. * @param size Size of the buffer to be added to the circular buffer in bytes. * @return number of bytes offered */ static inline int cirbuf_offer(cirbuf_t *cb, const unsigned char *data, const int size) { /* prevent buffer from getting completely full or over commited */ if (cirbuf_unusedspace(cb) <= size) return 0; int written = cirbuf_unusedspace(cb); written = size < written ? size : written; memcpy(cb->data + cb->tail, data, written); cb->tail += written; if (cb->size < cb->tail) cb->tail %= cb->size; return written; } ``` - 第 21 行中,將 `data` copy 長度 `written` 到 `data[tail]` ```cpp memcpy(cb->data + cb->tail, data, written); ``` - 第 22 ~ 23 行中,改變 cb->tail 所在的 index,也就是 ``` data[tail]=data[tail + written] if(size < tail) data[tail]=data[tail % size] ``` ```cpp cb->tail += written; if (cb->size < cb->tail) cb->tail %= cb->size; ``` --- ### cirbuf_peek () ```cpp= /** Look at data at the circular buffer's head. * Use cirbuf_usedspace to determine how much data in bytes can be read. * @param cb The circular buffer. * @return pointer to the head of the circular buffer */ static inline unsigned char *cirbuf_peek(const cirbuf_t *cb) { if (cirbuf_is_empty(cb)) return NULL; return cb->data + cb->head; } ``` - 第 11 行中, return `data[head]` ```cpp return cb->data + cb->head; ``` --- ### cirbuf_usedspace () 此處彙整 <`flawless0714`> 同學的共筆: ```cpp= /** Tell how much data has been written in bytes to the circular buffer. * @param cb The circular buffer. * @return number of bytes of how data has been written */ static inline int cirbuf_usedspace(const cirbuf_t *cb) { if (cb->head <= cb->tail) return cb->tail - cb->head; return cb->size - (cb->head - cb->tail); } ``` - 第 7 ~ 8 行中,如果 `head <= tail` ,表示說 `tail` 還沒有經過 circular buffer `cb` 的起點,所以 usedspace 就是 `tail - head` - 第 9 行中,表示說 `tail` 經過 circular buffer `cb` 的起點,所以 usedspace 就是 `(head ~ cb起點) + (cb起點 ~ tail)` 的空間,也就是 `[size - (head - cb起點)] + (tail - cb起點) = size - (head - tail) ` ## 解析 test suite 運作原理 此處彙整 <`grant7163`> 同學的共筆: - 如果我們在 cirbuf 的目錄底下輸入 make ,會產生一個 `main.c` 的檔案 ``` $ make bash tests/gentest.sh tests/test-*.c > main.c cc -I. -Itests -g -O2 -Wall -W -o driver main.c tests/test-cirbuf.c tests/unit-test.c ./driver running TestCirbuf_set_size_with_init running TestCirbuf_is_empty_after_init running TestCirbuf_is_not_empty_after_offer running TestCirbuf_is_empty_after_poll_release running TestCirbuf_spaceused_is_zero_after_poll_release running TestCirbuf_cant_offer_if_not_enough_space running TestCirbuf_cant_offer_if_buffer_will_be_completely_full running TestCirbuf_offer_and_poll running TestCirbuf_cant_poll_nonexistant running TestCirbuf_cant_poll_twice_when_released running TestCirbuf_independant_of_each_other running TestCirbuf_independant_of_each_other_with_no_polling OK (12 tests) ``` - 在 `Makefile` 當中,可以看到我們是透過 `gentest.sh` 這個 shell script 產生 `main.c` 的 ``` main.c: bash tests/gentest.sh tests/test-*.c > $@ driver: main.c tests/test-cirbuf.c tests/unit-test.c main.c $(CC) $(CFLAGS) -o $@ $^ ./$@ ``` --- 以下是 `gentest.sh` 的程式碼: ```cpp= #!/bin/bash # Automatically generate single AllTests file for C-based unit test. FILES=$1 echo \ ' /* This is auto-generated code. */ #include <stdio.h> #include "unit-test.h" ' cat $FILES | grep '^void Test' | sed -e 's/(.*$//' \ -e 's/$/(CuTest*);/' \ -e 's/^/extern /' echo \ ' void RunAllTests(void) { CuString *output = CuStringNew(); CuSuite *suite = CuSuiteNew(); ' cat $FILES | grep '^void Test' | sed -e 's/^void //' \ -e 's/(.*$//' \ -e 's/^/ SUITE_ADD_TEST(suite, /' \ -e 's/$/);/' echo \ ' CuSuiteRun(suite); CuSuiteDetails(suite, output); printf("%s\n", CuStringC(output)); } int main() { RunAllTests(); return 0; } ' ``` - 第 1 行中,宣告這個 script 使用的 shell 名稱,因為我們是使用 bash, 所以用 `#!/bin/bash` 來宣告這個檔案內的語法使用 bash 的語法 - 第 4 行中,將 `FILE` 指定為 command line 的第一個參數 (這裡指的是 `test-cirbuf.c`) - 第 6 行中,有使用到 `echo` ,是用來顯示輸出的命令 - 第 14 行中,利用 `grep` ,將 `test-cirbuf.c` 中,有出現 `void Test` 的那行取出來 > `^`:代表 expression 的開頭 > `$`:代表 expression 的結尾 - 第 15 ~ 17 行中,利用 `sed` 字串編輯器,將 `void Test` 的那行,`(` 之後的字串取代成 `(CuTest*);`,然後將開頭取代成 `extern` --- ### test 相關程式碼解析 - 在 `unit-test.c` 中的 `CuStringAppendFormat()` 函式: ```cpp= void CuStringAppendFormat(CuString *str, const char *format, ...) { va_list argp; char buf[HUGE_STRING_LEN]; va_start(argp, format); vsprintf(buf, format, argp); va_end(argp); CuStringAppend(str, buf); } ``` >定義一個 function prototype 時,若是將 argument list 以"..."代入,就代表了這個 function 使用 variant argument: >`void func(...);` > >這樣子可以讓 compiler 不檢驗傳入 function 裡的 argument 的型態和數量, compile 出來的程式碼在呼叫端就能夠把各式各樣 argument 傳入: >`void func(0, 1, 2, 3, 0.4, 0.5 "6789");` > >但是要存取 variant argument 不可能像一般程式一樣直接存取它們,因為缺少 argument variable ,所以我們改用 <stdarg.h> 裡面所提供的 macro 與 type > >在 [stdarg 的 Linux Man Page](http://man7.org/linux/man-pages/man3/stdarg.3.html) 中有提到: > >A function may be called with a varying number of arguments of varying types. The include file <stdarg.h> declares a type va_list and defines three macros for stepping through a list of arguments whose number and types are not known to the called function. > >The called function must declare an object of type va_list which is used by the macros va_start(), va_arg(), and va_end(). > > >>#include <stdarg.h> void va_start(va_list ap, last); type va_arg(va_list ap, type); void va_end(va_list ap); > >- `va_list`:宣告一個指向 variable argument list 的 pointer `ap` > >- `va_start`:初始化 `ap` ,讓它指向正確的 variable argument list 開頭 > `last`:variable argument list 中,",..." 之前的那個 argument > >- `va_end`:清除 pointer `ap`,把它設為NULL > >在 [vsprintf 的 Linux Man Page](https://linux.die.net/man/3/vsprintf) 中有提到: > >The functions vprintf(), vfprintf(), vsprintf(), vsnprintf() are equivalent to the functions printf(), fprintf(), sprintf(), snprintf(), respectively, except that they are called with a va_list instead of a variable number of arguments. These functions do not call the va_end macro. Because they invoke the va_arg macro, the value of ap is undefined after the call. > > >>#include <stdarg.h> int vsprintf(char *str, const char *format, va_list ap); > >vsprintf() 會根據 `format` 來轉換並格式化數據,然後將結果 copy 到 `str`,直到出現 `'\0'`為止 > - 第 6 行中,根據 `format` 來轉換並格式化數據,然後將結果 copy 到 `buf` ```cpp vsprintf(buf, format, argp); ``` - 第 8 行中,將根據 `format` 轉換完成並格式的化數據,利用 `CuStringAppend()` 函式,加到 `str` 的後面 ```cpp CuStringAppend(str, buf); ``` --- - 在 `unit-test.c` 中的 `CuTestRun()` 函式: ```cpp= void CuTestRun(CuTest *tc) { printf(" running %s\n", tc->name); jmp_buf buf; tc->jumpBuf = &buf; if (setjmp(buf) == 0) { tc->ran = 1; (tc->function)(tc); } tc->jumpBuf = 0; } ``` >[C99 的規格書](http://www.dii.uchile.cl/~daespino/files/Iso_C_1999_definition.pdf) 的 254 頁中有提到: > >The setjmp macro saves its calling environment in its jmp_buf argument for later use by the longjmp function. > >>#include <setjmp.h> int setjmp(jmp_buf env); > >- `env`:保存現有執行環境資訊,型態為 jmp_buf > > >If the return is from a direct invocation, the setjmp macro returns the value zero. If the return is from a call to the longjmp function, the setjmp macro returns a nonzero value. > >如果 return 0 表示第一次呼叫 setjmp() , return nonzero 表示它是從 longjmp() 返回的 > --- - 在 `test-cirbuf.c` 中,多次用到 `CuAssertTrue()`,例如: ```cpp= void TestCirbuf_set_size_with_init(CuTest *tc) { cirbuf_t cb; cirbuf_new(&cb, 65536u); CuAssertTrue(tc, 65536u == cirbuf_size(&cb)); } ``` - `CuAssertTrue()` 的 macro 定義在 `unit-test.h` 中 ```cpp #define CuAssertTrue(tc, cond) \ CuAssert_Line((tc), __FILE__, __LINE__, "assert failed", (cond)) ``` >[C99 的規格書](http://www.dii.uchile.cl/~daespino/files/Iso_C_1999_definition.pdf) 的 172 頁中有提到: > >- `__FILE__` :The presumed name of the current source file (a character string literal). >- `__LINE__` : The presumed line number (within the current source file) of the current source line (an integer constant). > - 如果 `TestCirbuf_set_size_with_init()` 的第 5 行 `65536u == cirbuf_size(&cb)` 有成立,代表第 4 行 `cirbuf_new(&cb, 65536u)`有成功: ```cpp= void CuAssert_Line(CuTest *tc, const char *file, int line, const char *message, int condition) { if (condition) return; CuFail_Line(tc, file, line, NULL, message); } ``` - `CuAssert_Line()`的第 7 行就會成立,return 回這個 function - 之後 `main.c` 就會繼續進行下一個測試 - 如果 `TestCirbuf_set_size_with_init()` 的第 5 行 `65536u == cirbuf_size(&cb)` 不成立,代表第 4 行 `cirbuf_new(&cb, 65536u)` 沒有成功: - `CuAssert_Line()` 的第 7 行就不會成立 - 執行 `CuAssert_Line()` 的第 9 行,也就是 `CuFail_Line()` 函式: ```cpp CuFail_Line(tc, __FILE__, __LINE__, NULL, "assert failed"); ``` ```cpp= void CuFail_Line(CuTest *tc, const char *file, int line, const char *message2, const char *message) { CuString string; CuStringInit(&string); if (message2) { CuStringAppend(&string, message2); CuStringAppend(&string, ": "); } CuStringAppend(&string, message); CuFailInternal(tc, file, line, &string); } ``` - 之後 `CuFail_Line()` 的第 14 行,將錯誤訊息 `"assert failed"` 附加到 `string` - `CuFail_Line()` 的第 15 行,將`string` 傳給 `CuFailInternal()` 函式: ```cpp CuFailInternal(tc, __FILE__, __LINE__, &string); ``` ```cpp= static void CuFailInternal(CuTest *tc, const char *file, int line, CuString *string) { char buf[HUGE_STRING_LEN]; sprintf(buf, "%s:%d: ", file, line); CuStringInsert(string, buf, 0); tc->failed = 1; tc->message = string->buffer; if (tc->jumpBuf != 0) longjmp(*(tc->jumpBuf), 0); } ``` - `CuFailInternal()` 函式的第 9 行,利用 `CuStringInsert()` 將 function name 和 line number,追加到錯誤消息 `"assert failed"` 的後面 - 第 12 行中,將完整的錯誤消息給 `tc->message` - 第 14 行中,由 `longjmp` 返回 --- - 在 `main.c` 中,多次用到 `SUITE_ADD_TEST`: ```cpp SUITE_ADD_TEST(suite, TestCirbuf_set_size_with_init); SUITE_ADD_TEST(suite, TestCirbuf_is_empty_after_init); SUITE_ADD_TEST(suite, TestCirbuf_is_not_empty_after_offer); SUITE_ADD_TEST(suite, TestCirbuf_is_empty_after_poll_release); SUITE_ADD_TEST(suite, TestCirbuf_spaceused_is_zero_after_poll_release); SUITE_ADD_TEST(suite, TestCirbuf_cant_offer_if_not_enough_space); SUITE_ADD_TEST(suite, TestCirbuf_cant_offer_if_buffer_will_be_completely_full); SUITE_ADD_TEST(suite, TestCirbuf_offer_and_poll); SUITE_ADD_TEST(suite, TestCirbuf_cant_poll_nonexistant); SUITE_ADD_TEST(suite, TestCirbuf_cant_poll_twice_when_released); SUITE_ADD_TEST(suite, TestCirbuf_independant_of_each_other); SUITE_ADD_TEST(suite, TestCirbuf_independant_of_each_other_with_no_polling); ``` - `SUITE_ADD_TEST` 的 macro 定義在 `unit-test.h` 中: ```cpp CuTest *CuTestNew(const char *name, TestFunction function); #define SUITE_ADD_TEST(SUITE, TEST) CuSuiteAdd(SUITE, CuTestNew(#TEST, TEST)) ``` - `#TEST` 等於將 `TEST` 内容轉為 `const char *` ```cpp= void CuSuiteAdd(CuSuite *testSuite, CuTest *testCase) { assert(testSuite->count < MAX_TEST_CASES); testSuite->list[testSuite->count] = testCase; testSuite->count++; } ``` - 利用 `CuSuiteAdd()` ,將多個 `testCase` 合併成 `testSuite` --- - 在 `main.c` 中,測試的執行是靠 `CuSuiteRun()` 函式: ```cpp CuSuiteRun(suite); ``` ```cpp= void CuSuiteRun(CuSuite *testSuite) { for (int i = 0; i < testSuite->count; ++i) { CuTest *testCase = testSuite->list[i]; CuTestRun(testCase); if (testCase->failed) { testSuite->failCount += 1; } } } ``` - `CuSuiteRun()` 的第 3 行中,for 迴圈是依序測試 `testSuite` 裡頭的各個 `testCase` - `CuSuiteRun()` 的第 5 行中,對各個 `testCase` 進行 `CuTestRun()` ```cpp= void CuTestRun(CuTest *tc) { printf(" running %s\n", tc->name); jmp_buf buf; tc->jumpBuf = &buf; if (setjmp(buf) == 0) { tc->ran = 1; (tc->function)(tc); } tc->jumpBuf = 0; } ``` - `CuTestRun()` 的第 7 行的 `setjmp(buf) == 0` ,設下跳轉點 - 之後第 8 ~ 9 行開始測試 `testCase`,如果測試失敗就紀錄錯誤訊息,使用 `longjmp` 回到第 7 行的 `setjmp(buf) == 0` - `CuSuiteRun()` 的第 6 ~ 7 行中,會對 `testCase` fail 的次數進行 count,以便全部 `testCase` 運行完後,進而運行 `CuSuiteDetails()` ```cpp void CuSuiteDetails(CuSuite *testSuite, CuString *details) { if (testSuite->failCount == 0) { int passCount = testSuite->count - testSuite->failCount; const char *testWord = passCount == 1 ? "test" : "tests"; CuStringAppendFormat(details, "OK (%d %s)\n", passCount, testWord); return; } if (testSuite->failCount == 1) CuStringAppend(details, "There was 1 failure:\n"); else CuStringAppendFormat(details, "There were %d failures:\n", testSuite->failCount); for (int i = 0, failCount = 0; i < testSuite->count; ++i) { CuTest *testCase = testSuite->list[i]; if (testCase->failed) { failCount++; CuStringAppendFormat(details, "%d) %s: %s\n", failCount, testCase->name, testCase->message); } } CuStringAppend(details, "\n!!!FAILURES!!!\n"); CuStringAppendFormat(details, "Runs: %d ", testSuite->count); CuStringAppendFormat(details, "Passes: %d ", testSuite->count - testSuite->failCount); CuStringAppendFormat(details, "Fails: %d\n", testSuite->failCount); } ``` - `CuSuiteDetails()` 利用 `CuStringAppendFormat()` 將結果放到 `details` - 之後利用 `printf` 將 `details` 中 `buffer` 的內容印出來 ## [fast circular buffer](https://lo.calho.st/quick-hacks/employing-black-magic-in-the-linux-page-table/) ### 實作技巧 ```cpp= #include <stdint.h> #include <stdio.h> #include <string.h> #include <sys/time.h> #define BUFFER_SIZE (1024) #define MESSAGE_SIZE (32) #define NUMBER_RUNS (1000000) static inline double microtime() { struct timeval tv; gettimeofday(&tv, NULL); return 1e6 * tv.tv_sec + tv.tv_usec; } static inline off_t min(off_t a, off_t b) { return a < b ? a : b; } int main() { uint8_t message[MESSAGE_SIZE]; uint8_t buffer[2 * BUFFER_SIZE]; size_t offset; double slow_start = microtime(); offset = 0; for (int i = 0; i < NUMBER_RUNS; i++) { const size_t part1 = min(MESSAGE_SIZE, BUFFER_SIZE - offset); const size_t part2 = MESSAGE_SIZE - part1; memcpy(buffer + offset, message, part1); memcpy(buffer, message + part1, part2); offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE; } double slow_stop = microtime(); double fast_start = microtime(); offset = 0; for (int i = 0; i < NUMBER_RUNS; i++) { memcpy(&buffer[offset], message, MESSAGE_SIZE); offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE; } double fast_stop = microtime(); printf("slow: %f microseconds per write\n", (slow_stop - slow_start) / NUMBER_RUNS); printf("fast: %f microseconds per write\n", (fast_stop - fast_start) / NUMBER_RUNS); return 0; } ``` - 第 29 ~ 36 行中,是第一種將 message 寫入 circular buffer 的方法: - 因為 circular buffer 的空間只有 BUFFER_SIZE,所以寫入時要小心不能寫到沒有宣告的空間 - 如果 circular buffer 尾端的空間不夠一次寫入所有的 message ,就會將 message 分成兩半 - 第 31 行中,比較 message 的長度 `MESSAGE_SIZE` 和 circular buffer 尾端的空間 `BUFFER_SIZE - offset` 誰大誰小 - 如果 `MESSAGE_SIZE` 比較大,就會將 message 分成兩半,分兩次 copy - 第 33 行中,先 copy 前半到 circular buffer 的尾端 - 第 34 行中,再 copy 後半到 circular buffer 的開頭 - 第 40 ~ 44 行中,是第二種將 message 寫入 circular buffer 的方法: - 因為這時候 circular buffer 的空間有 (2 * BUFFER_SIZE),所以寫入時不用分成兩次 copy - 第 42 行中,直接 copy message 到 circular buffer 的尾端 >以下是執行結果: ```cpp slow: 0.055107 microseconds per write fast: 0.004035 microseconds per write ``` 第一種方法比第二種方法慢了 13.6 倍左右 - 第二種方法不用考慮到 message 超出所宣告的 `BUFFER_SIZE` 的問題,大大的提升存取的效率 --- ```cpp #include <stdint.h> #include <stdio.h> #include <string.h> #include <sys/time.h> #define BUFFER_SIZE (1024) //#define MESSAGE_SIZE (32) #define NUMBER_RUNS (1000000) static inline double microtime() { struct timeval tv; gettimeofday(&tv, NULL); return 1e6 * tv.tv_sec + tv.tv_usec; } static inline off_t min(off_t a, off_t b) { return a < b ? a : b; } int main() { uint8_t buffer[2 * BUFFER_SIZE]; size_t offset; FILE *fp = fopen("time.txt", "wb+"); for (int MESSAGE_SIZE = 0; MESSAGE_SIZE < 1025; MESSAGE_SIZE += 4) { uint8_t message[MESSAGE_SIZE]; double slow_start = microtime(); offset = 0; for (int i = 0; i < NUMBER_RUNS; i++) { const size_t part1 = min(MESSAGE_SIZE, BUFFER_SIZE - offset); const size_t part2 = MESSAGE_SIZE - part1; memcpy(buffer + offset, message, part1); memcpy(buffer, message + part1, part2); offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE; } double slow_stop = microtime(); double fast_start = microtime(); offset = 0; for (int i = 0; i < NUMBER_RUNS; i++) { memcpy(&buffer[offset], message, MESSAGE_SIZE); offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE; } double fast_stop = microtime(); fprintf(fp, "%d\t\t%f\t\t\t%f\n", MESSAGE_SIZE, (slow_stop - slow_start) / NUMBER_RUNS, (fast_stop - fast_start) / NUMBER_RUNS); } fclose(fp); return 0; } ``` >隨著 `MESSAGE_SIZE` 大小的增加,兩種方法的 runtime 變動的趨勢都差不多,大概都隨著 `MESSAGE_SIZE` 變大而跟著線性遞增 > ![](https://i.imgur.com/pXtwRdL.png) --- ### 改善效率 如果將原本 wrap around 的地方 ```cpp offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE; ``` 改成 ```cpp offset = offset + MESSAGE_SIZE; if (offset > BUFFER_SIZE) offset -= BUFFER_SIZE; ``` 速度會變快 ```cpp slow: 0.055107 -> 0.033718 microseconds per write fast: 0.004035 -> 0.003003 microseconds per write ``` --- ## OP-TEE [Benchmark framework](https://optee.readthedocs.io/debug/benchmark.html) Benchmark framework 由以下部份組成 1. Benchmark Client Application (CA) - client 端 專用的應用程式,負責分配 timestamp circular buffers - 這些 buffer 在 Benchmark PTA 中註冊,使用所有OP-TEE層生成的所有時間標記 (timestamp) 資料。 - 將時間標記資料放入 `.ts` 的 extension 中。 2. Benchmark Pseudo Trusted Application (PTA) - 擁有來自 shared memory 的所有 per-cpu circular non-secure buffers > per-CPU: 建立 per-CPU 變數時,系統中的每個處理器都會獲得這個變數的副本。存取 per-CPU 變數時幾乎不需要加鎖,因為每個處理器使用的都是自己的副本。 - 必須由 CA 呼叫 Benchmark PTA 註冊時間標記的 circular buffers - Benchmark PTA 呼叫 OP-TEE Linux 驅動程式在 Linux 核心層中註冊這個 circular buffer 3. libTEEC and Linux kernel OP-TEE driver 用來處理從 Benchmark PTA 發出的時間標記緩衝區註冊請求 **Timestamp source** * Arm Performance Monitor Units 作為Timestamp source 的主要來源 * 提供精確的 per-CPU 週期數值 * 對所有事件啟用 EL0 的存取,因此建立於用戶模式的應用程式可以直接從共處理器的暫存器讀取 cpu counter 值,為了使延遲最小,避免對 EL1 核心執行額外的系統呼叫。 **流程圖** ![](https://optee.readthedocs.io/_images/benchmark_sequence.png)