## Linux 核心專題:位元運算的應用 > 執行人: yy214123 > [專題解說影片](https://youtu.be/SYXcMAjyoL4) > 執行人: yenshipotato > [專題解說影片](https://youtu.be/-egLcMB8pkU) ### Reviewed by `56han` 我的作業和你們相同,也是完成第 7 週測驗題測驗。 首先,謝謝你們「第 7 週測驗題測驗 - 第二題」對 `m` 有完整的數學證明,讓我更了解第二個判別式,我有引用數學證明到我的作業上,並貼上這篇作業的網址,若有不妥,我會再修改。 另外,我不理解第二個判別式 `___res` 和 `___x` 存在的意義,還有下方 if-else 的三個情境。 ```c if (~0ULL % (___b / (___b & -___b)) == 0) { \ /* special case, can be simplified to ... */ \ ___n /= (___b & -___b); \ ___m = ~0ULL / (___b / (___b & -___b)); \ ___p = 1; \ ___bias = true; \ } else if (___res != ___x / ___b) { \ ___bias = true; \ /* Compute m = (p << 64) / b */ \ ___m = (~0ULL / ___b) * ___p; \ ___m += ((~0ULL % ___b + 1) * ___p) / ___b; \ } else { \ uint32_t ___bits = -(___m & -___m); \ ___bits |= ___m >> 32; \ ___bits = (~___bits) << 1; \ if (!___bits) { \ ___p /= (___m & -___m); \ ___m /= (___m & -___m); \ } else { \ ___p >>= ilog2(___bits); \ ___m >>= ilog2(___bits); \ } \ /* No bias needed. */ \ ___bias = false; \ } ``` > 當初解決數學推導部分後,我並沒有繼續進一步研究該判別式的其餘部分。謝謝你提出這個問題。一旦我完全理解了這部分,能夠更系統性地進行描述運作原理,會立即更新於此處。 > [name=yy214123] ## Reviewed by `yuyuan0625` 請問第七周測驗題2 的判別式使用 `__builtin_constant_p(__base)` 有什麼用意? > 這與 Constant folding 有關,可參考 [你所不知道的 C 語言:編譯器和最佳化原理 (上)-[2:29:58]](https://youtu.be/XY4YkuX_a3k?si=fpaJhRQvQvbl8LOk&t=8998),透過這個 builtin fuction ,去判斷除數是否為常數,若以組合語言序列的觀點來看,會有部分被 dead code elimination,故可以優化程式的執行時間。 > [name=yy214123] ## Reviewed by `david965154` 在 `memchr` 函式實作中建議可以加上 `UNALIGNED` 的判斷考量集及解釋,為何要先做這裡的判斷才透過 SWAR 特定字元。 > 感謝建議,已更新 `UNALIGNED` 的解釋。 > [name=yenshipotato] ## 任務簡介 探討位元運算的應用 ## TODO: 第 7 週測驗題測驗 1 > 重新作答並涵蓋延伸問題 :::success #### 延伸問題1. 解釋上述程式碼運作原理 ::: ```c #include <stdint.h> bool both_odd(uint32_t x, uint32_t y) { return (x & 1) && (y & 1); } ``` 如果 lsb 被設置為 1,則其值必為奇數,反之。 而為減少運算量,將兩個 32 位元組合為一個 64 位元 ```c static inline uint64_t bit_compound(uint32_t x, uint32_t y) { return ((0L + x) << 32) | ((y + 0L) & (-1L >> 32)); } ``` `y`= 0xAAAAAAAA `y`+ 0L = `0x00000000 00000000` = `0x00000000 AAAAAAAA` 假設 x 的十六進位表示為 `0x12345678`,則相加並左移後為:`0x12345678 00000000`。 `x` 的型別是無號 32 位元整數,而 `0L` 是 64 位元,在 c99 有以下描述: > The rank of long long int shall be greater than the rank of long int, which shall be greater than the rank of int, which shall be greater than the rank of short int, which shall be greater than the rank of signed char. > > 〈 c99 6.3.1.1 Boolean, characters, and integers 〉 由此可知 long int 的 rank 比 int 高。 > if the type of the operand with signed integer type can represent all of the values of the type of the operand with unsigned integer type,then the operand with unsigned integer type is converted to the type of the operand with signed integer type. > > 〈 c99 - 6.3.1.8 Usual arithmetic conversionse 〉 所以 `x` 的型別會被轉為 64 位元的 long,將其與 `0L` 相加後向左移 32 位元,其結果如下: > 假設 x 的十六進位表示為 `0x12345678`,則相加並左移後為:`0x12345678 00000000`。 > 假設 y 的十六進位表示為 `0x12345678`,則相加後 `0x00000000 12345678 `。 而 `(y + 0L` 同理。 我對這部分有疑惑,為甚麼不直接: ```diff static inline uint64_t bit_compound(uint32_t x, uint32_t y) { - return ((0L + x) << 32) | ((y + 0L) & (-1L >> 32)); + return ((0L + x) << 32) | (y + 0L); } ``` 如上方程式碼所示,去掉 `(-1L >> 32)` 效果不是相同嘛?那做這個 and 運算的原因為何? 設計對應測試程式如下,`bit_compound` 是原始包含 and 運算的,而 `bit_compound_2` 是對應我上方做的更動(即去掉 and 運算): ```c int main() { int x = 12345678; int y = 9012345; /*return ((0L + x) << 32) | ((y + 0L) & (-1L >> 32))*/ uint64_t xy = bit_compound(x, y); /*return ((0L + x) << 32) | (y + 0L)*/ uint64_t xy_2 = bit_compound_2(x, y); printf("%lu, %lu\n", xy, xy_2); } ``` 輸出結果: ```shell 53024283265959033, 53024283265959033 ``` **根據該結果,我是不是能說 `(-1L >> 32)` 這個運算是多餘的?還是這其中有我沒考量到的地方?** Linux 核心中的 [lib/string.c](https://github.com/torvalds/linux/blob/master/lib/string.c) 有 [memchr](https://man7.org/linux/man-pages/man3/memchr.3.html) 實作: ```c void *memchr(const void *s, int c, size_t n) { const unsigned char *p = s; while (n-- != 0) { if ((unsigned char) c == *p++) { return (void *) (p - 1); } } return NULL; } ``` 這段程式會逐字元去檢查輸入的字串,直至與 `c` 相符。在規格書提到: > The result of the postfix ++ operator is the value of the operand. After the result is obtained, the value of the operand is incremented. (That is, the value 1 of the appropriate type is added to it.) See the discussions of additive operators and compound assignment for information on constraints, types, and conversions and the effects of operations on pointers. > > 〈 c99 - 6.5.2.4 Postfix increment and decrement operators 〉 所以在 while 迴圈中,會先比對 `c` 是否等於 `*p`,接著將其增加 1(即指向下一個字元),故若有匹配,應將其指向上一個字元(`p - 1`)。 `memchr_opt` 函式有三個輸入參數: * `const void` `*str`:輸入字串 * `int` `c`:欲從輸入字串中搜尋的字元 * `size_t` `len`:輸入字串的長度 可以看到 `*str` 是 `void` 型別,用意是可以接受不同型別的輸入字串。接著看到: ```c const unsigned char *src = (const unsigned char *) str; ``` 而為了逐字元比對,所以先將其強制轉型成 `unsigned char`,並指派給同為 `unsigned char` 型別的 `src` 指標變數。 首先看到第一個 `while` 迴圈: ```c while (UNALIGNED(src)) { if (!len--) return NULL; if (*src == d) return (void *) src; src++; } ``` `while` 的執行條件是 `UNALIGNED` 這個巨集,該巨集的輸入參數為指向輸入字串的指標變數 `str` 其位址。 ```c /* Nonzero if X is not aligned on a "long" boundary */ #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) ``` 起初不懂為何這樣 and 運算後可以判斷是否對齊,故我將變數中所存的值打印出來以便觀察: ```c int main() { const char str[] = "https://wiki.csie.ncku.edu.tw"; const unsigned char *src = (const unsigned char *) str; printf("%p\n", src); printf("%ld\n", (long) src); } ``` 輸出結果 ```shell 0x7fffffffdbf0 140737488346096 ``` 經過進位轉換工具驗證後,確定了 `140737488346096` 是 `0x7fffffffdbf0` 的十進位表示,經此轉換後才可以進行對應的 bitwise 運算。 接著去思考與 `(sizeof(long) - 1)` 進行 and 運算的用意。規格書有關 `sizeof` 描述如下: > The sizeof operator yields the size (in bytes) of its operand, which may be an expression or the parenthesized name of a type. The size is determined from the type of the operand. The result is an integer. If the type of the operand is a variable length array type, the operand is evaluated; otherwise, the operand is not evaluated and the result is an integer constant. >〈 c99 - 6.5.3.4 The sizeof operator 〉 根據描述可以得知是以 bytes 為單位,而 `long` 是 8bytes,所以扣掉 1 後是 7,以二進位來看就是原先二的羃 bit 變為 0,其右邊 bits 皆為 1。若作為參數傳入其轉換為 long 型別後值為 8 之倍,與 7 進行 and 運算後必為 0,反之若非 8 之倍則進行 and 運算後必為 1。 回到 `while` 的部份,當 `UNALIGNED(src)` 為真時會進入迴圈中: * 第一個判別式用以判斷 `len` 是否為 0,若為 0 則表所有字元已經掃描過一輪,而隨著每次檢查會將其值遞減,直至迴圈條件不成立。 * 第二個判別式用以判斷當前 `src` 所指向字元與欲搜尋之字元 `d` 是否匹配。 * 最後將 `src` 改指向下一個字元所在位址,直到位址值符合對齊條件。 接著是 `memchr_opt` 的第二部份程式碼,即 `if` 判別式的部份: ```c if (!TOO_SMALL(len)) { unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask |= mask << 16; for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask |= BBBB; while (len >= LBLOCKSIZE) { if (DETECT_CHAR(CCCC, mask)) break; asrc++; len -= LBLOCKSIZE; } src = (unsigned char *) asrc; } ``` 此判別式的成立條件為 `!TOO_SMALL(len)`,而傳入巨集的參數即 `strlen(輸入字串)`,規格書有關 `strlen` 描述如下: > The strlen function returns the number of characters that precede the terminating null character. >〈 c99 - 6.5.3.4 The sizeof operator 〉 故可知當輸入字串長度 $\geq 8$ 時,判別式條件成立,而判別式的第一行程式碼將 `src` 轉型為 `unsigned long`,這邊我不懂確切的原因,透過程式碼上下文我推測是跟後續的 `mask` 有關。 接著透過 or 及左移運算,使變數 `mask` 為欲搜尋字元的重覆擴展(擴展次數會因不同機器上的 `long` 型別位元數而有所不同)。 接著是 `DETECT_CHAR` 及 `DETECT_NULL` 這兩個巨集,首先先將輸入字串與 `mask` 進行 xor 運算,這邊在理解的過程中也出現了疑惑,我透過 vscode 偵錯模式去監看指標變數,轉為 `long` 型別的輸入字串會指向第一個字元,**如此當初擴充 `mask` 的意義為何呢?不也是只比較最低的 byte 嘛?** 後面的流程我可以理解,如果輸入字串的某個字元與 `mask` 有匹配,那兩者進行 xor 後會出現 `00000000`,接著看到這個巨集: ```c #if LONG_MAX == 2147483647L #define DETECT_NULL(X) (((X) - (0x01010101)) & ~(X) & (0x80808080)) #else #if LONG_MAX == 9223372036854775807L /* Nonzero if X (a long int) contains a NULL byte. */ #define DETECT_NULL(X) \ (((X) - (0x0101010101010101)) & ~(X) & (0x8080808080808080)) ``` 這個巨集會根據 `long` 型別位元數之差異(32位元、64位元)進行對應的 bitwise 操作。 `(X) - (0x01010101)`: > 以 byte 的觀點出發,當 `X` 有任一個 byte 為 `0`,與 `0x01010101` 相減後對應的 byte 會是 `0xFF`。 接著上一步結果與 `~X` 進行 and 運算,當 `X` 有任一個 byte 為 `0`,則 `~X` 中對應的 byte 亦為 `0xFF`。 舉兩個例子說明: > 假設 `A`、`B` 皆為 32 位元 long,其中 `A` 為`0x00FF00FF` 、`B` 為 `0x12345678`。 首先看 `A`,特意將第一個 byte、第三個 byte 設為為 0: `A` - `0x01010101` = `0xFFFEFFFE` ; `0xFFFEFFFE` & `~A` = `0xFF00FF00` 接著看 `B`,不具為 0 的 byte: `B` - `0x01010101` = `0x11335577` ; `0x11335577` & `~B` = `0x01030107` 最後與 `0x80808080` 進行 and 運算,將 1byte 分為左右 4bits 來看,若原先具 0 byte,左邊的 4bits 其最高位必為 `1`。示意圖如下: ```graphviz digraph G { rankdir=TB; node [shape=record]; context1 [label="1|*|*|*|*|*|*|*"]; } ``` 結論來說,先透過 xor 運算來判斷欲搜尋字元是否存在,若存在必具某 byte 為 0,而此 byte 經運算後會使原先 if 判別式的條件成立,隨即 break(表示已找到),而另一方面: ```c asrc++; len -= LBLOCKSIZE; ``` 這邊起初在看時不懂,想說為什麼遞增與遞減 `LBLOCKSIZE` 不太一致,後來我也去監看兩指標其遞增及遞減所造成的記憶體位址變化後,才解決了這方面的問題。 >由於 `arsc` 型別是 `unsigned long`,其每次遞增會將記憶體位址增加 `sizeof(unsigned long)` 這麼多。 :::success #### 延伸問題 2. 比較 Linux 核心原本 (與平台無關) 的實作和 memchr_opt,設計實驗來觀察隨著字串長度和特定 pattern 變化的效能影響 ::: 先前提到 Linux 核心中的 [lib/string.c](https://github.com/torvalds/linux/blob/master/lib/string.c) 有 [memchr](https://man7.org/linux/man-pages/man3/memchr.3.html) 實作: ```c void *memchr(const void *s, int c, size_t n) { const unsigned char *p = s; while (n-- != 0) { if ((unsigned char) c == *p++) { return (void *) (p - 1); } } return NULL; } ``` 在先前 [2024q1 Homework1 (lab0)](/yI-Rh8l7Q9uzWhQgBQbsnQ) 我在排序法效能比較的部分遇到了困難,當時遇到的問題是: :::info 測試資料準備以及對應的統計分析,為什麼我設計的測試資料是 1000 筆?而每筆是由 1K 到 1000K 個長度介於 5 到 15 的隨機字串所組成,目前還是無法針對該部份提出對應的統計分析,會不會根本不需要這麼多? ::: 趁著這次比較 `memchr` 與 `memchr_opt` 我想在這個問題點上有所突破,之前在 [M01: lab0 亂數 + 論文閱讀](https://hackmd.io/@sysprog/linux2024-lab0/%2F%40sysprog%2Flinux2024-lab0-d) 提到 Linux 中有工具 `ent` 可用於計算輸入字串的 entropy,char 的最大 entropy 為 8,所以要盡可能滿足這個理論值: ```shell $ head -c 100 /dev/random | ent Entropy = 6.303856 bits per byte. Optimum compression would reduce the size of this 100 byte file by 21 percent. Chi square distribution for 100 samples is 243.04, and randomly would exceed this value 69.44 percent of the times. Arithmetic mean value of data bytes is 133.7900 (127.5 = random). Monte Carlo value for Pi is 2.000000000 (error 36.34 percent). Serial correlation coefficient is -0.121648 (totally uncorrelated = 0.0). ``` > 長度為 100 ,此時 entropy 離理論值有一段差距,但由卡方檢定所計算出的結果可知此時是符合 Uniform distrubution。 ```shell $ head -c 1K /dev/random | ent Entropy = 7.824000 bits per byte. Optimum compression would reduce the size of this 1024 byte file by 2 percent. Chi square distribution for 1024 samples is 241.50, and randomly would exceed this value 71.87 percent of the times. Arithmetic mean value of data bytes is 128.2246 (127.5 = random). Monte Carlo value for Pi is 3.152941176 (error 0.36 percent). Serial correlation coefficient is -0.030634 (totally uncorrelated = 0.0). ``` > 相較於長度 100,entropy 提高許多。 ```shell $ head -c 10K /dev/random | ent Entropy = 7.984912 bits per byte. Optimum compression would reduce the size of this 10240 byte file by 0 percent. Chi square distribution for 10240 samples is 212.55, and randomly would exceed this value 97.53 percent of the times. Arithmetic mean value of data bytes is 127.1747 (127.5 = random). Monte Carlo value for Pi is 3.172332943 (error 0.98 percent). Serial correlation coefficient is -0.006131 (totally uncorrelated = 0.0). ``` 以上述結果來看,我認為字串長最小要從 10K 開始。 根據中央極限定理(CLT),當樣本數 $\geq 30$ 時,樣本均值的分佈會接近常態分佈。然而,我在實驗設計中遇到了一些疑問。中央極限定理通常假設存在一個母體(例如,包含大量長度為 10K 的字串),我們每次從中隨機挑選 30 個字串來計算處理時間並取其平均值。經過多次挑選後,這些平均時間的分佈會趨近於常態分佈。 然而,這與我的實驗設計有所不同。我的實驗設計如下: 1. 我需要研究不同長度字串所需的處理時間。 2. 對於每個特定長度的字串(例如,10K、100K、1000K 等),我要測量多次處理時間。 3. 我會重複這個過程,計算每個長度字串的平均處理時間。 對照中央極限定理看下來,在我的實驗中並不是針對各長度字串有設計一個母體,那這樣我每個特定長度字串的樣本數量該如何訂定?是否有其他方法可用於解決這部份問題? 實驗設計: * 三種不同長度的字串:10K、100K、1000K * Pattern:遇搜尋字元位於字串首、字串末、不存在 :::success #### 延伸問題 3. 研讀 [2022 年學生報告](https://hackmd.io/@arthur-chang/linux2022-quiz8),在 Linux 核心原始程式碼找出 x86_64 或 arm64 對應的最佳化實作,跟上述程式碼比較,並嘗試舉出其中的策略和分析 > 下一步:貢獻程式碼到 Linux 核心 > [Phoronix 報導](https://www.phoronix.com/news/Linux-Kernel-Faster-memchr) ::: Linux 核心原始程式碼對應 x86_64 的實作在 [linux/arch/x86/lib/string_32.c](https://github.com/torvalds/linux/blob/master/arch/x86/lib/string_32.c#L181) 中,程式碼如下: ```c void *memchr(const void *cs, int c, size_t count) { int d0; void *res; if (!count) return NULL; asm volatile("repne\n\t" "scasb\n\t" "je 1f\n\t" "movl $1,%0\n" "1:\tdecl %0" : "=D" (res), "=&c" (d0) : "a" (c), "0" (cs), "1" (count) : "memory"); return res; } ``` 試著去理解上方 inline aseembly 各指令的用途,所以我去參考了 [Intel® 64 and IA-32 Architectures Software Developer Manuals](https://www.intel.com/content/www/us/en/developer/articles/technical/intel-sdm.html)。 ```c : "=D" (res), "=&c" (d0) : "a" (c), "0" (cs), "1" (count) ``` 這邊依程式上下文,我的解讀是輸出/入參數與 inline aseembly 間的傳遞,但我找不到 `"a"`、`"0"`、`"1"` 這些代表的意義為何? `repne` 在手冊中的 Vol. 2B 4-555 有相關描述: > 此為一種 prefix instruction, --- ## TODO: 第 7 週測驗題測驗 2 > 重新作答並涵蓋延伸問題 ### 測驗 `2` :::success #### 延伸問題1. 解釋上述程式碼運作原理 ::: `div64` 是一個接受兩個參數的巨集,用於計算被除數 `n` 與除數 `base` 相除後的商數。此巨集會根據不同的情況選擇較優的除法策略。 ##### **第一個判別式:** :::danger 注意書寫規範: * 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致 * 注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。 > 已修正。 ::: ```c if (__builtin_constant_p(__base) && is_power_of_2(__base)) { __rem = (n) & (__base - 1); (n) >>= ilog2(__base); } ``` 成立條件為除數是常數且是 2 的冪,若某數 為2的冪,其二進位表示法僅有一個 1,而將其 -1 後會使原先為 1 的位元變成 0,其右邊所有位元均變成 1,舉例如下: $2 = 00000010,2-1 = 00000001$ $4 = 00000100,4-1 = 00000011$ $8 = 00001000,8-1 = 00000111$ 再將被除數與 -1 後的除數做 and 運算,結果即為餘數。接著對除數取 log,以取 log 後的結果作為被除數 `n` 右移運算的次數,即可求出商數。舉例如下: 假設 $n =13,base=8=2^{3}$,分別以二進位表示: * $n=1101_{(2)}$ * $base = 0111_{(2)}$ 因為是除以 $8$,以二進位除法的觀點來看就是會把 $n$ 的 msb 消除,保留剩餘的 bit。 ##### **第二個判別式:** ```c else if (__builtin_constant_p(__base) && __base != 0) { uint32_t __res_lo, __n_lo = (n); (n) = __div64_const32(n, __base); /* the remainder can be computed with 32-bit regs */ __res_lo = (n); __rem = __n_lo - __res_lo * __base; ``` 成立條件為除數是常數且除數不為 `0` 原先被除數為 64 位元,首先將其較低 32 位元 assign 給 `__n_lo` 這個無號 32 位元整數。 巨集 `__div64_const32` 將被除數與除數作為參數傳入此巨集,首先求出一個與除數 msb 相同的數,根據註解: $m =\frac{(p \times 2^{64} + b - 1)}{b}=\frac{(p \times 2^{64})}{b}+\frac{b-1}{b}$ 又 `~0ULL` 為 64 位元全 1 的無號整數,相當於 $2^{64}-1$,所以: ```c ___m = (~0ULL / ___b) * ___p; ``` 會近似於加號左半部的 $\frac{p \times 2^{64}}{b}$。 接續的下一行程式為 ```c ___m += (((~0ULL % ___b + 1) * ___p) + ___b - 1) / ___b; ``` 而 $m =\frac{(p \times 2^{64})}{b}+\frac{b-1}{b}$,這邊我有疑惑,為何程式不是 ```c ___m += (___b - 1) / ___b; ``` 這樣不是更符合加號的右半部? > 問題已解決,可如下推導 > > 根據除法公式: > $a=bq+r$ > 其中 $a$ 為被除數、$b$ 為除數、$q$ 為商數、$r$為餘數。 > $\therefore a+1=bq+(r+1)$ > > 令: > $2^{64} - 1 = q \times b + r$ > $2^{64} =q \times b + r+1$ > > 根據註解可知: > $m =\frac{(p \times 2^{64} + b - 1)}{b}=\frac{(p \times(q \times b + r+1)+b-1)}{b}$ > > 展開可得: > $m =\frac{pqb+pr+p+b-1}{b}=\frac{pqb}{b}+\frac{(r+1)p+b-1}{b}$ > > 最後通分: > $m= pq +\frac{(r+1)p+b-1}{b}$ > 其中: > $q$ = ~0ULL / $b$ > $r$ = ~0ULL % $b$ > 由上述推導過程,即可說明註解與程式等價。總結來說,bitwise 操作相當於整數除法,不會考慮到餘數的部分,如此會使數字有誤差,所以在 do while 中才會有如此的操作來彌補損失的誤差。 :::danger 減少非必要的 `:::warning` 標示,務求清晰且明確的描述 ::: ##### **第三個判別式:** ```c else if (likely(((n) >> 32) == 0)) { __rem = (uint32_t) (n) % __base; (n) = (uint32_t) (n) / __base; } ``` 這個成立條件代表 $0\leq n \leq2^{32}-1$,所以 n 可用 32 位元表示,接著就是用傳統的運算方式去求得商及餘數。 ##### **第四個判別式:** 若前三個判別式之條件均不滿足,則會呼叫 `__div64_32` 這個函式。 `__div64_32` 初始化: * 將 64 位元無號數 `rem` 初始化為被除數的值。 * 將 64 位元無號數 `b` 初始化為除數的值。 * 透過右移運算取得 `rem` 的較高 32 位元之值並將其 assign 給 `high`。 首先判斷 `high` 是否足夠大,若夠大求其除以 `base` 後之商值,求出商值後再左移使其恢復實際值,並將實際值 assign 給 `res`。 > 相當於 `res` 的較高 32 位元為被除數較高 32 位元之商。 求算完較高 32 位元後,接著要處理較低的 32 位元,程式碼如下: ```c rem -= (uint64_t) (high * base) << 32; ``` 舉個例子來解釋,假設: * `rem` = `0x0000 0007 0000 0000` * `base` = 3 那麼 `high` 就會是 `7`(`rem` >> 32),接著求 $7 \div 3$ 之商(即 2),接著將商值存回 `high` 中,最後將 2 左移 32 位元並 assign 給 `res`。 >`res` = `0x0000 0002 0000 0000` 由前述可知,目前的 `high` 為商值,將其與 `base`(即 3)相乘並左移 32 位元,此舉等同於求出被除數較高 32 位元除以 3 後的商,最後再以 `rem` 扣掉該值。 > `rem` = `0x0000 0007 0000 0000` - `0x0000 0006 0000 0000` = `0x0000 0001 0000 0000` 接著可以看到有兩階段的 while 迴圈: :::danger 注意書寫規範: * 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致 * 注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。 > 已修正。 ::: ```c while ((int64_t) b > 0 && b < rem) { b = b + b; d = d + d; } do { if (rem >= b) { rem -= b; res += d; } b >>= 1; d >>= 1; } while (d); ``` 在初始化階段,`b` 被~~賦予~~指派的值即為除數,所以在第一階段的 while 中,透過倍增的方式去找到最逼近 `rem` 的 `b` 。而對於另一個初始值為 `1` 的變數 `d`,他也會以倍增的方式,記錄商值的倍數。 接著看到第二階段的 do-while 迴圈,在上一階段 `res` 紀錄著較高 32 位元除以 `base` 後的商值,所以在第二階段,透過判斷 `rem` 及 `b` 間的大小,將剩餘位元與 `base` 相除後之商值加入 `res`,最後即可得最初被除數除以 `base` 之實際商值。 :::success #### 延伸問題2. 在 Linux 核心原始程式碼找到相關的程式碼並設計實驗 (使用 Linux 核心模組) 探討程式碼的效益和限制 ::: 研讀 Linux 核心程式碼:[div4.c](https://github.com/torvalds/linux/blob/master/lib/math/div64.c) 的過程中,在 `__div64_32` 這個函式第 47 行~第 63 行的位置: ```c while ((int64_t)b > 0 && b < rem) { b = b+b; d = d+d; } do { if (rem >= b) { rem -= b; res += d; } b >>= 1; d >>= 1; } while (d); *n = res; return rem; } ``` 這部分程式碼的解釋可參考上方 [第四個判別式](https://hackmd.io/QGMPoEQZQy2Y2RW-uimBHw?view#%E7%AC%AC%E5%9B%9B%E5%80%8B%E5%88%A4%E5%88%A5%E5%BC%8F%EF%BC%9A)。此處觀察到 while 迴圈中的變數 `b` 及 `d`,兩變數倍增的行為是採用傳統的加法運算,而在 do while 迴圈部分是採 bitwise 操作來達到倍減,我的想法是可將 while 迴圈的程式修改如下: ```diff while ((int64_t)b > 0 && b < rem) { - b = b+b; - d = d+d; + b <<= 1; + d <<= 1; } ``` 與授課老師討論後,老師給予的建議是: 1. 變更程式碼後,須觀察對應的組合語言序列是否有變化。 2. 若有變化,要能夠說明此變化有實際影響。 原先我天真地認為 ```diff while ((int64_t)b > 0 && b < rem) { - b = b+b; - d = d+d; + b = b + b; + d = d + d; } ``` 調整加號前後的空白字元就可以嘗試提交 patch 了,經授課老師說明後,才知道這種沒有實質功能變更的 patch 很難被收錄。 授課老師提示用下方方式進行編譯並觀察。 ```shell $ gcc -O2 -S ``` 查看編譯後的組合語言我所用的工具為先前一對一面談老師推薦的 [godbolt](https://godbolt.org/),由於欲比較差異的程式碼位於第四個判別式,所以需設計不符合前三個判別式的參數。 > 後來沒有用 godbolt,因為顯示出來的組合語言序列並沒有變化。 所以我的用於測試的 `main` 函式如下: ```c int main() { uint64_t res = 0x100000000; uint32_t base = 10; uint32_t rem = div64(res, base); printf("Quotient: %lu, Remainder: %u\n", res, rem); } ``` 原先 Linux 核心程式碼編譯後的結果: ```x86as .section __TEXT,__text,regular,pure_instructions .build_version macos, 13, 3 sdk_version 13, 3 .globl _main ; -- Begin function main .p2align 2 _main: ; @main .cfi_startproc ; %bb.0: sub sp, sp, #32 .cfi_def_cfa_offset 32 stp x29, x30, [sp, #16] ; 16-byte Folded Spill add x29, sp, #16 .cfi_def_cfa w29, 16 .cfi_offset w30, -8 .cfi_offset w29, -16 mov w8, #6 mov w9, #39321 movk w9, #6553, lsl #16 stp x9, x8, [sp] Lloh0: adrp x0, l_.str@PAGE Lloh1: add x0, x0, l_.str@PAGEOFF bl _printf mov w0, #0 ldp x29, x30, [sp, #16] ; 16-byte Folded Reload add sp, sp, #32 ret .loh AdrpAdd Lloh0, Lloh1 .cfi_endproc ; -- End function .section __TEXT,__cstring,cstring_literals l_.str: ; @.str .asciz "Quotient: %lu, Remainder: %u\n" .subsections_via_symbols ``` 而我將加法改為 bitwise 後的編譯結果: ```x86as .section __TEXT,__text,regular,pure_instructions .build_version macos, 13, 3 sdk_version 13, 3 .globl _main ; -- Begin function main .p2align 2 _main: ; @main .cfi_startproc ; %bb.0: sub sp, sp, #32 .cfi_def_cfa_offset 32 stp x29, x30, [sp, #16] ; 16-byte Folded Spill add x29, sp, #16 .cfi_def_cfa w29, 16 .cfi_offset w30, -8 .cfi_offset w29, -16 mov w8, #6 mov w9, #39321 movk w9, #6553, lsl #16 stp x9, x8, [sp] Lloh0: adrp x0, l_.str@PAGE Lloh1: add x0, x0, l_.str@PAGEOFF bl _printf mov w0, #0 ldp x29, x30, [sp, #16] ; 16-byte Folded Reload add sp, sp, #32 ret .loh AdrpAdd Lloh0, Lloh1 .cfi_endproc ; -- End function .section __TEXT,__cstring,cstring_literals l_.str: ; @.str .asciz "Quotient: %lu, Remainder: %u\n" .subsections_via_symbols ``` 兩者並無任何差異,觀察這部分的組合語言序列: ```x86as mov w8, #6 mov w9, #39321 movk w9, #6553, lsl #16 ``` 根據 [wikipedia: Apple M1](https://en.wikipedia.org/wiki/Apple_M1) 的介紹,我這台電腦所使用的指令集架構為 ARMv8-A,參考 [arm Developer](https://developer.arm.com/documentation/den0024/a/ARMv8-Registers) 的描述,對於暫存器的部分有以下介紹: * 此架構提供 31 個 64 位元的一般目的暫存器 X0~X30 * 一般目的暫存器又可分出較低的 32 位元一般目的暫存器:W0~W30 ![2024-06-28 晚上7.14.16](https://hackmd.io/_uploads/ryqfoMn8A.png) ```graphviz digraph G { rankdir=TB; node [shape=record,width =10,height=1,xlabel="63"]; array [label="<f0>| <f1>"]; } ``` :::danger 使用 Graphivz 重新製作上圖。 ::: 我的理解為: ```x86as mov w8, #6 ``` 編譯器先將兩者相除後的餘數(即 6)存至 `w8` 暫存器。 ```x86as mov w9, #39321 movk w9, #6553, lsl #16 ``` 在 `main` 函式中,被除數為 `0x100000000`、除數為 `10`,而被除數的 10 進位表示為 $16^{8}= 4294967296$。參考 [ARM Compiler armasm Reference Guide Version 6.01](https://developer.arm.com/documentation/dui0802/b/MOVK) 對於 movk 的說明後,第一行先將 10 進位的 39321 存到 `w9` 暫存器的較低 10 位元,接著再將 10 進位的 6553 存到 `w9` 暫存器的較高 16 位元。 試著計算 `w9` 暫存器中的值: $6553_{(10)} = 1100110011001_{(2)}$ $39321_{(10)} = 1001100110011001_{(2)}$ 所以 `w9` = $0001100110011001 1001100110011001_{(2)}$ =$114631_{(10)}$ 跟預想的不太樣,我以為 `w9` 會是商值。我不懂這樣商值是怎麼求算出來的。 :::danger 查閱 Arm 手冊,注意看指令描述 ::: 去降低編譯器最佳化的等級,原先是: ```shell $ gcc -O2 -S ``` 改為 ```shell $ gcc -O1 -S ``` 再次進行觀察,發現編譯後的組合語言序列與原先 `$ gcc -O2 -S` 沒有任何差異。 回顧上方的比較實驗,由觀察到的組合語言序列可知,在使用編譯器最佳化 `-O2` 的情況下,我所做的改變並沒有帶來實際影響,這個學習過程是有趣的,但我目前還無法去解釋這個結果的主因。 接著出於好奇如果去變更原先為 bitwise 的 do-while 迴圈,將其改為傳統除法操作,那應該會有變化吧?變更如下所示: ```diff do { if (rem >= b) { rem -= b; res += d; } - b >>= 1; - d >>= 1; + b /= 2; + d /= 2; } while (d); ``` 一樣使用 `gcc -O2 -S` 進行編譯。 原先 Linux 核心程式碼編譯後的結果: ```x86as .section __TEXT,__text,regular,pure_instructions .build_version macos, 13, 3 sdk_version 13, 3 .globl _main ; -- Begin function main .p2align 2 _main: ; @main .cfi_startproc ; %bb.0: sub sp, sp, #32 .cfi_def_cfa_offset 32 stp x29, x30, [sp, #16] ; 16-byte Folded Spill add x29, sp, #16 .cfi_def_cfa w29, 16 .cfi_offset w30, -8 .cfi_offset w29, -16 mov w8, #6 mov w9, #39321 movk w9, #6553, lsl #16 stp x9, x8, [sp] Lloh0: adrp x0, l_.str@PAGE Lloh1: add x0, x0, l_.str@PAGEOFF bl _printf mov w0, #0 ldp x29, x30, [sp, #16] ; 16-byte Folded Reload add sp, sp, #32 ret .loh AdrpAdd Lloh0, Lloh1 .cfi_endproc ; -- End function .section __TEXT,__cstring,cstring_literals l_.str: ; @.str .asciz "Quotient: %lu, Remainder: %u\n" .subsections_via_symbols ``` 而我將 bitwise 改為除法運算後的編譯結果: ```x86as .section __TEXT,__text,regular,pure_instructions .build_version macos, 13, 3 sdk_version 13, 3 .globl _main ; -- Begin function main .p2align 2 _main: ; @main .cfi_startproc ; %bb.0: sub sp, sp, #32 .cfi_def_cfa_offset 32 stp x29, x30, [sp, #16] ; 16-byte Folded Spill add x29, sp, #16 .cfi_def_cfa w29, 16 .cfi_offset w30, -8 .cfi_offset w29, -16 mov w8, #6 mov w9, #39321 movk w9, #6553, lsl #16 stp x9, x8, [sp] Lloh0: adrp x0, l_.str@PAGE Lloh1: add x0, x0, l_.str@PAGEOFF bl _printf mov w0, #0 ldp x29, x30, [sp, #16] ; 16-byte Folded Reload add sp, sp, #32 ret .loh AdrpAdd Lloh0, Lloh1 .cfi_endproc ; -- End function .section __TEXT,__cstring,cstring_literals l_.str: ; @.str .asciz "Quotient: %lu, Remainder: %u\n" .subsections_via_symbols ``` 晴天霹靂,兩者仍無差異,我真的很好奇這部分又是為什麼?因為根據之前教材所學習的知識,透過 bitwise 來取代除法、乘法運算,進而提升效能,在這個實驗中理應要有變化才對,請問老師我是否忽略了什麼細節? :::danger 「透過 bitwise 來取代除法、乘法運算,進而提升效能」要看場景,若在 Linux 核心,因為無法使用 FPU,所以這策略有效,但你若在 Apple M1 這樣的晶片執行應用程式,就不是如此。 ::: 問題一:經過高等級編譯器的最佳化後,傳統的乘法和除法運算與 bitwise 操作無異,那為何不都使用高等級編譯器進行優化就好? > 授課老師回覆:參見「你所不知道的 C 語言:編譯器和最佳化原理篇」的內容和解說錄影。編譯器最佳化常採取保守的策略,連 code motion 這樣單純的策略都很難全部適用。 問題二:是否有某些情境比較嚴苛,只能使用普通的編譯器來編譯? > 授課老師回覆:有,例如要確保產生的機械碼符合預期 (形式化驗證),參見 [形式化驗證 (Formal Verification)](https://hackmd.io/@sysprog/formal-verification) ___ ## TODO: 第 9 週測驗題測驗 3 > 重新作答並涵蓋延伸問題 [第九週測驗](https://hackmd.io/@sysprog/linux2024-quiz9) ### 教材 在[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics)中提到 C11 規範了記憶體對齊的操作,在 `stdalign.h` 中定義了: * alignas : 對齊記憶體地址 * alignof : 取得變數的對齊基數 這邊我看不懂,以 `alignas(16) int x` 為例,教材中的描述是 `x` 的記憶體位址會對齊 16,是否可以理解為會是 16 的倍數?以及為何這樣操作可以避免 false sharing? :::danger 注意 L1 cacheline 的寬度 ::: #### `<stdatomic.h>` 根據 C11-7.17 中描述,此標頭檔中定義了用於執行緒間共享資料 atomic 操作的巨集及<s>函數</s>(函式)。 :::danger 區分「函數」和「函式」。 務必本課程教材規範的術語。 > 好的,現在開始會注意。 ::: 起初我不懂範例程式 `atomic.c` 的執行結果 ```c static void atomic_val_inc(int nLoops) { for (int loop = 0; loop < nLoops; loop++) { while (atomic_flag_test_and_set(&atomic_float_obj.flag)) CPU_PAUSE(); atomic_float_obj.value += 1.0f; atomic_flag_clear(&atomic_float_obj.flag); } } ``` 在 main 中: ```c atomic_val_inc(10000); ``` 傳入了 10000 作為 `nLoops` 之值,裡頭的 for 迴圈迭代 10000 次後,`atomic_float_obj.value` 應該是 10000 才對,但執行結果實際上是 20000。 後來閱讀了[並行程式設計: POSIX Thread](https://hackmd.io/@sysprog/posix-threads) 後,才知道我遺漏了主執行緒(main),實際上經由 `pthread_create` 所創建的子執行緒以及主執行緒均會執行 `atomic_val_inc(10000)`,故最終的執行結果才會是 20000 而非原先預想的 10000。 釐清這邊的觀念後,才看懂教材中描述之 若將 `while(atomic_flag_test_and_set(&atomic_float_obj.flag))` 改為 `while (0)` 後會導致輸出結果不一的由來。 ### 測驗 `3` :::success 1.研讀程式碼註解提及的論文〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉,解釋程式碼運作原理 ::: 結構體: ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t; ``` 教材中對個成員的描述如下: * `code`:各執行緒要啟動該任務時所要執行的函式之指標,且輸入至該函式的參數是 work_t 本身的 reference * `join_count`:用來計算這個 work 還缺少了多少 arguments 才得以進行 * `args`:即 work 執行所需要的 arguments `thread` 初始化: 對照 `main` 函式及 `init` 函式會注意到下方程式片段: ```c init(&thread_queues[i], 8); ``` 由此可知每條執行緒一開始所維護的 deque 如下(size 為 8): ```graphviz digraph G { node [shape=record]; A [label="<f0> A[0] | A[1] | A[2] | ... | A[6] | A[7]"]; Top [label="top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f0; Bottom -> A:f0; } ``` #### `push` 操作 ```c void push(deque_t *q, work_t *w) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); size_t t = atomic_load_explicit(&q->top, memory_order_acquire); array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); if (b - t > a->size - 1) { /* Full queue */ resize(q); a = atomic_load_explicit(&q->array, memory_order_relaxed); } atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); atomic_thread_fence(memory_order_release); atomic_store_explicit(&q->bottom, DDDD, memory_order_relaxed); } ``` `push` 函式的功能為將新的工作放入當前執行緒所維護之 deque 的 buttom 端。 首先看到函式中的 if ,每條執行緒隨著新工作的加入,其維護的 deque 會逐漸變化,由初始的: ```graphviz digraph G { node [shape=record]; A [label="<f0> A[0] | A[1] | A[2] | ... | A[6] | A[7]"]; Top [label="top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f0; Bottom -> A:f0; } ``` 隨著新工作的加入,buttom 所指位置會遞增: ```graphviz digraph G { node [shape=record]; A [label="<f0>A[0] | A[1] | A[2] | ... | A[6] | A[7]"]; Top [label="top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; InvisibleNode [label="", width=0, style=invis]; Top -> A:f0; Bottom -> InvisibleNode; {rank=same; A InvisibleNode} } ``` 在這種狀態下,變數 `t` = 0、變數 `b` = 8,此時會大於 `size - 1`(即 7),故代表 deque 已滿,需執行 `resize` 擴充,擴充後的 deque 其 size 會大兩倍: ```graphviz digraph G { node [shape=record]; A [label="<f0> A[0] | A[1] | A[2] | ... | A[6] | A[7]|<f8> A[8]| ...| A[15]"]; Top [label="top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f0; Bottom -> A:f8; } ``` 接著是將新的工作放置 bottom 所指位置(即 A[8]),並將 bottom 指向下一個位置(即 A[9]): ```graphviz digraph G { node [shape=record]; A [label="<f0> A[0] | A[1] | ... | A[6] | A[7]|<f8> A[8]| <f9> A[9]|...| A[15]"]; Top [label="top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f0; Bottom -> A:f9; } ``` 故可知 `DDDD` 要填入 `b + 1`,但這邊我有點問題: :::warning 為何新的工作要存入 `&a->buffer[b % a->size]`?若改為`&a->buffer[b]` 是否也是相同效果? > 搭配 ThreadSanitizer 確認 ::: #### `take` 及 `steal` 操作: 兩者相關性較高,我將其放在一起討論,根據教材所描述: * `take`:執行緒從自己維護的 deque 之 bottom 端取工作來執行(即位於 bottom -1 的工作)。 * `steal`:閒置的執行緒從**其他執行緒**維護的 deque 之 top 端取工作來執行。 個別程式碼如下: ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; if (t <= b) { /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, AAAA, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; atomic_store_explicit(&q->bottom, BBBB, memory_order_relaxed); } } else { /* Empty queue */ x = EMPTY; atomic_store_explicit(&q->bottom, CCCC, memory_order_relaxed); } return x; } ``` 首先要先理解 `atomic_compare_exchange_strong_explicit` 的用法,以下是參考 c11: > `_Bool atomic_compare_exchange_strong_explicit( volatile A *object, C *expected, C desired, memory_order success, memory_order failure);` ```c if(*object == *expected) *object = desired; else *expected = *object; ``` 〈 c11 - 7.17.7.4 The atomic_compare_exchange generic functions 〉 此巨集為布林型別,會去比較 `object` 與 `expected` 是否相等,若相等回傳 true 並將 `desire` 存入 `object` 中,反之若不相等回傳 false 且將`object` 存入 `expected` 。 教材提到是刻意從 `top` 的方向去取,亦即取完後得將 `q->top + 1`,此提示顯然就是 `AAAA` 的相關資訊,示意圖如下: ```graphviz digraph G { node [shape=record]; A [label="<f0> empty |<f1> last element | empty | ... | empty "]; Top [label="t,top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f1; Bottom -> A:f1; } ``` 上圖的情境為某執行緒其 deque 僅存最後一個元素,若此元素沒有被其他執行緒 `steal` ,那麼cmpxchg 成立,會將 `AAAA` assign 給 `&q->top`,故可知 `AAAA` 即為 `t + 1`: ```graphviz digraph G { node [shape=record]; A [label="<f0> empty |<f1> empty |<f2> empty | ... | empty "]; Top [label="t+1,top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f2; Bottom -> A:f1; } ``` 可運用上圖來解釋程式碼中 `else` 的部份(即 t > b),此時若再次執行 `take`,示意圖如下: ```graphviz digraph G { node [shape=record]; A [label="<f0> empty |<f1> empty |<f2> empty | ... | empty "]; Top [label="t", shape=plaintext]; Bottom [label="b,bottom", shape=plaintext]; Top -> A:f2; Bottom -> A:f0; } ``` 此時 `t > b` 成立,即 deque 為空,應該要恢復 `bottom` 指向的位置,故 `CCCC` 應填入 `b + 1`。 接著看相反的情況,回到 cmpxchg 的部份,若當前 deque 也僅剩最後一個元素: ```graphviz digraph G { node [shape=record]; A [label="<f0> empty |<f1> last element | empty | ... | empty "]; Top [label="t,top", shape=plaintext]; Bottom [label="bottom", shape=plaintext]; Top -> A:f1; Bottom -> A:f1; } ``` 這邊我不懂,我的想法是: ```c size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); | |<發生 work stealing> | /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, AAAA, memory_order_seq_cst, memory_order_relaxed)) ``` 在這兩段程式碼期間發生了 work stealing,在 `steal` 函式可以看到其也具備相同的 cmpxchg 程式碼,對於如此的設定,我的想法是若 `steal` 成功執行,會將 `t + 1` assign 給 `&q->top`,而 `take` 中的 cmpxchg 就會出現 `*object != *expected` 的情況。 :::danger 觀察工作佇列的任務數量變化。 ::: ```c work_t *steal(deque_t *q) { size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ return ABORT; } return x; } ``` 與前面 `push` 一樣,也有 `atomic_load_explicit` 的疑問,為何需要 `% a->size`? :::danger 本質上是 ring buffer ::: #### `thread` 這個函式整合了前面的`take` 及 `steal`,各執行緒的 while 迴圈會去判斷其維護的 deque 是否為空,非空執行 `take`,若為空則會依序的去巡察其他執行緒的 deque,若其他執行緒的 deque 有多餘的工作,則協助該工作的執行。 #### `main` 接著看到主程式的部份,前幾行的程式用於生成執行緒及各自維護的 deque,因為`#define N_THREADS 24`,故總共會有 24 條執行緒。生成執行緒後,因 `nprint` 宣告為 10,可理解為每個 deque 初始狀態下會有 10 個工作。 而 `donework->join_count` 即為工作數的總和,經上述可知其值為 240(`N_THREADS * nprint`),可留意下方兩個函式: ```c work_t *print_task(work_t *w) { int *payload = (int *) w->args[0]; int item = *payload; printf("Did item %p with payload %d\n", w, item); work_t *cont = (work_t *) w->args[1]; free(payload); free(w); return join_work(cont); } ``` 當 `print_task` 完成時,會呼叫下方的: ```c work_t *join_work(work_t *work) { int old_join_count = atomic_fetch_sub(&work->join_count, 1); if (old_join_count == 1) return work; return NULL; } ``` 會將 `join_count` 這個紀錄工作總數的計數器減 1。 以目前的理解來看,整體架構上有兩種 work: * 普通的 work * 紀錄工作的總數的 done_work 而每個普通的 work 其 `join_count` 恆為 0,且 `args[1]` 為 `done_work`,這樣看下來,為何 `work->join_count` 還有存在的必要性? :::danger 為何不修改程式碼並充分實驗?不要「舉燭」。 ::: 按照 [2024q1 第 9 週測驗題](https://hackmd.io/@sysprog/linux2024-quiz9#%E6%B8%AC%E9%A9%97-3) 進行編譯: ```shell $ gcc -O2 -Wall -std=c11 -o work-steal work-steal.c -lpthread ``` 出現了下方的警告資訊: ```shell work-steal.c: In function ‘take’: work-steal.c:111:11: warning: assignment to ‘work_t *’ {aka ‘struct work_internal *’} from incompatible pointer type ‘_Atomic work_t *’ {aka ‘_Atomic struct work_internal *’} [-Wincompatible-pointer-types] 111 | x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); | ^ work-steal.c: In function ‘push’: work-steal.c:137:5: warning: initialization of ‘_Atomic work_t *’ {aka ‘_Atomic struct work_internal *’} from incompatible pointer type ‘work_t *’ {aka ‘struct work_internal *’} [-Wincompatible-pointer-types] 137 | atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); | ^~~~~~~~~~~~~~~~~~~~~ work-steal.c: In function ‘steal’: work-steal.c:151:11: warning: assignment to ‘work_t *’ {aka ‘struct work_internal *’} from incompatible pointer type ‘_Atomic work_t *’ {aka ‘_Atomic struct work_internal *’} [-Wincompatible-pointer-types] 151 | x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); ``` :::warning 目前還沒理解造成這些警告的原因。 > 注意 `_Atomic` 標示的方式 ::: 接著進行測試: ```shell $ ./work-steal ... work item 17 finished work item 6 finished work item 13 finished work item 12 finished work item 3 finished Expect 506 lines of output (including this one) ``` :::success 2.指出上述程式碼可改進之處並著手進行,如記憶體釋放和量化分析性能和延展性 (scalability) ::: :::danger 工程人員說話要精準,避免說「覺得」。先推論後驗證。 > 好的,現在開始會注意。 ::: 目前<s>我覺得</s> deque 的 size 有可改進之處,在 `main` 可以看到: ```c init(&thread_queues[i], 8); ``` 這代表每個 deque 其初始大小為 8,但實際上: ```c for (int j = 0; j < nprints; ++j) ``` >`nprint` 宣告為 10。 我在 `main` 中呼叫 `push` 函式前加入了下列程式碼: ```diff + printf("t: %d w: %d\n",i,j); ``` 如此可印出是第幾條執行緒的第幾個 work 的資訊。接著在 `push` 函式中的判別式加入下列程式碼: ```diff if (b - t > a->size - 1) { /* Full queue */ + printf("do resize\n"); resize(q); a = atomic_load_explicit(&q->array, memory_order_relaxed); } ``` 一旦有發生須擴充 deque 大小的情況,也會印出相關資訊。 而後看到終端機的輸出結果: ```shell $ ./work-steal t: 0 w: 0 t: 0 w: 1 t: 0 w: 2 t: 0 w: 3 t: 0 w: 4 t: 0 w: 5 t: 0 w: 6 t: 0 w: 7 t: 0 w: 8 do resize t: 0 w: 9 t: 1 w: 0 t: 1 w: 1 t: 1 w: 2 t: 1 w: 3 t: 1 w: 4 t: 1 w: 5 t: 1 w: 6 t: 1 w: 7 t: 1 w: 8 do resize t: 1 w: 9 ... ``` 我的想法是能不能改為 `init(&thread_queues[i], nprint);` 如此避免每個 deque 在最初就必須 resize 一次。 :::success 3.利用 work stealing 改寫第 6 次作業提及的並行化快速排序程式碼,在 Linux 使用者層級實作,並確保能充分運用硬體資源 ::: :::success 4.研讀 Linux 核心的 CMWQ,討論其 work stealing 的實作手法 ::: 相關教材: * [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) * [2024 年 Linux 核心設計/實作課程作業 —— integration(C)](https://hackmd.io/@sysprog/linux2024-integration/%2F%40sysprog%2Flinux2024-integration-c) CMWQ 關鍵資料結構: `work_queue` 我發現在 [2024 年 Linux 核心設計/實作課程作業 —— integration(C)](https://hackmd.io/@sysprog/linux2024-integration/%2F%40sysprog%2Flinux2024-integration-c) 所提及的: ```c struct work_struct { atomic_long_t data; struct list_head entry; work_func_t func; #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; ``` 其所在位置有變化,在比較早的 kernel 版本(V5),相關程式碼位於 [workqueue.h](https://elixir.bootlin.com/linux/v5.19.17/source/include/linux/workqueue.h) ,而在較新版本(V6.8),相關程式碼位於 [workqueue_types.h](https://elixir.bootlin.com/linux/v6.8/source/include/linux/workqueue_types.h),而函式實作的相關程式碼位於 [linux / kernel / workqueue.c](https://github.com/torvalds/linux/blob/master/kernel/workqueue.c) 觀察上述結構體可知,在 work_struct 結構體中置入了 list_head 物件,此即作業一指定佇列操作所用的雙向環狀鍊結串列用以將 workqueue 中的 work item 連接起來, 而用於執行這些 work item 的 worker 定義在[workqueue_internal.h](https://elixir.bootlin.com/linux/latest/source/kernel/workqueue_internal.h),其結構體如下: :::danger 注意書寫規範: * 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致 * 注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。 > 已修正。 ::: ```c struct worker { /* on idle list while idle, on busy hash table while busy */ union { struct list_head entry; /* L: while idle */ struct hlist_node hentry; /* L: while busy */ }; struct work_struct *current_work; /* K: work being processed and its */ work_func_t current_func; /* K: function */ struct pool_workqueue *current_pwq; /* K: pwq */ u64 current_at; /* K: runtime at start or last wakeup */ unsigned int current_color; /* K: color */ int sleeping; /* S: is worker sleeping? */ /* used by the scheduler to determine a worker's last known identity */ work_func_t last_func; /* K: last work's fn */ struct list_head scheduled; /* L: scheduled works */ struct task_struct *task; /* I: worker task */ struct worker_pool *pool; /* A: the associated pool */ /* L: for rescuers */ struct list_head node; /* A: anchored at pool->workers */ /* A: runs through worker->node */ unsigned long last_active; /* K: last active timestamp */ unsigned int flags; /* L: flags */ int id; /* I: worker id */ /* * Opaque string set with work_set_desc(). Printed out with task * dump for debugging - WARN, BUG, panic or sysrq. */ char desc[WORKER_DESC_LEN]; /* used only by rescuers to point to the target workqueue */ struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */ }; ``` 在註解中有提到,L、I、K 等等相關的細節可參考 `workqueue.c`: > L: pool->lock protected. Access with pool->lock held. > K: Only modified by worker while holding pool->lock. Can be safely read by self, while holding pool->lock or from IRQ context if %current is the kworker. > S: Only modified by worker self. > A: wq_pool_attach_mutex protected. > I: Modifiable by initialization/destruction paths and read-only for everyone else. > `workqueue_struct` 此結構體程式碼可在 `workqueue.c` 中找到 ```c struct workqueue_struct { struct list_head pwqs; /* WR: all pwqs of this wq */ struct list_head list; /* PR: list of all workqueues */ struct mutex mutex; /* protects this wq */ int work_color; /* WQ: current work color */ int flush_color; /* WQ: current flush color */ atomic_t nr_pwqs_to_flush; /* flush in progress */ struct wq_flusher *first_flusher; /* WQ: first flusher */ struct list_head flusher_queue; /* WQ: flush waiters */ struct list_head flusher_overflow; /* WQ: flush overflow list */ struct list_head maydays; /* MD: pwqs requesting rescue */ struct worker *rescuer; /* MD: rescue worker */ int nr_drainers; /* WQ: drain in progress */ int saved_max_active; /* WQ: saved pwq max_active */ struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs */ struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */ #ifdef CONFIG_SYSFS struct wq_device *wq_dev; /* I: for sysfs interface */ #endif #ifdef CONFIG_LOCKDEP char *lock_name; struct lock_class_key key; struct lockdep_map lockdep_map; #endif char name[WQ_NAME_LEN]; /* I: workqueue name */ /* * Destruction of workqueue_struct is RCU protected to allow walking * the workqueues list without grabbing wq_pool_mutex. * This is used to dump all workqueues from sysrq. */ struct rcu_head rcu; /* hot fields used during command issue, aligned to cacheline */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */ struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */ }; ``` `worker_pool` ```c struct worker_pool { raw_spinlock_t lock; /* the pool lock */ int cpu; /* I: the associated cpu */ int node; /* I: the associated node ID */ int id; /* I: pool ID */ unsigned int flags; /* X: flags */ unsigned long watchdog_ts; /* L: watchdog timestamp */ /* * The counter is incremented in a process context on the associated CPU * w/ preemption disabled, and decremented or reset in the same context * but w/ pool->lock held. The readers grab pool->lock and are * guaranteed to see if the counter reached zero. */ int nr_running; struct list_head worklist; /* L: list of pending works */ int nr_workers; /* L: total number of workers */ int nr_idle; /* L: currently idle workers */ struct list_head idle_list; /* L: list of idle workers */ struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for workers */ /* a workers is either on busy_hash or idle_list, or the manager */ DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER); /* L: hash of busy workers */ struct worker *manager; /* L: purely informational */ struct list_head workers; /* A: attached workers */ struct completion *detach_completion; /* all workers detached */ struct ida worker_ida; /* worker IDs for task name */ struct workqueue_attrs *attrs; /* I: worker attributes */ struct hlist_node hash_node; /* PL: unbound_pool_hash node */ int refcnt; /* PL: refcnt for unbound pools */ /* * Destruction of pool is RCU protected to allow dereferences * from get_work_pool(). */ struct rcu_head rcu; }; ``` * `struct list_head workers;` >如教材所述,會管理所有的 workers,所以這個串列可以視為 workers 們的集合。 `pool_workqueue` ```c /* * The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS * of work_struct->data are used for flags and the remaining high bits * point to the pwq; thus, pwqs need to be aligned at two's power of the * number of flag bits. */ struct pool_workqueue { struct worker_pool *pool; /* I: the associated pool */ struct workqueue_struct *wq; /* I: the owning workqueue */ int work_color; /* L: current color */ int flush_color; /* L: flushing color */ int refcnt; /* L: reference count */ int nr_in_flight[WORK_NR_COLORS]; /* L: nr of in_flight works */ /* * nr_active management and WORK_STRUCT_INACTIVE: * * When pwq->nr_active >= max_active, new work item is queued to * pwq->inactive_works instead of pool->worklist and marked with * WORK_STRUCT_INACTIVE. * * All work items marked with WORK_STRUCT_INACTIVE do not participate * in pwq->nr_active and all work items in pwq->inactive_works are * marked with WORK_STRUCT_INACTIVE. But not all WORK_STRUCT_INACTIVE * work items are in pwq->inactive_works. Some of them are ready to * run in pool->worklist or worker->scheduled. Those work itmes are * only struct wq_barrier which is used for flush_work() and should * not participate in pwq->nr_active. For non-barrier work item, it * is marked with WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works. */ int nr_active; /* L: nr of active works */ int max_active; /* L: max active works */ struct list_head inactive_works; /* L: inactive works */ struct list_head pwqs_node; /* WR: node on wq->pwqs */ struct list_head mayday_node; /* MD: node on wq->maydays */ /* * Release of unbound pwq is punted to system_wq. See put_pwq() * and pwq_unbound_release_workfn() for details. pool_workqueue * itself is also RCU protected so that the first pwq can be * determined without grabbing wq->mutex. */ struct work_struct unbound_release_work; struct rcu_head rcu; } __aligned(1 << WORK_STRUCT_FLAG_BITS); ``` :::danger 避免「舉燭」,明確說明 Linux 核心原始程式碼和上述論文行為的關聯。你可撰寫對應的 Linux 核心模組來測試。 :::