Try   HackMD

2024q1 Homework2 (quiz1+2)

contributed by < teresasa0731 >

教材閱讀

Problem 1-1: non-recursive QuickSort

運作原理

基於參考資料 Optimized QuickSort — C Implementation (Non-Recursive) ,將資料儲存由陣列改為鏈結串列後,其運作原理如下分析:

每次挑選最左邊節點作為基準值 pivot 分割序列,以指針p走訪序列,,並用LR標示待排序序列的起訖點。為了以非遞迴方式實現每輪操作,使用陣列實現堆疊,將當前子問題之開始/結束節點存入begin[]end[]中,並用while(i)迴圈模擬堆疊操作,當非空時(i>=0)進入循環。







structs



pivot
pivot



struct0

4



pivot->struct0





L
L



L->struct0





R
R



struct5

7



R->struct5





p
p



struct3

5



p->struct3





struct1

1



struct0->struct1





struct2

3



struct1->struct2





struct2->struct3





struct4

2



struct3->struct4





struct4->struct5





在迴圈內,若尚有未處理序列(子問題內元素個數多於一個),則依照當前子問題的pivot分割為左右子序列:指針p由左向右走訪,大於pivot者放入right,小(等)於者放入left中,後依序將 left->pivot->right 放入堆疊紀錄;否則將此pivot放入result中,表已排序過。







structs



left
left



struct1

1



left->struct1





pivot
pivot



struct0

4



pivot->struct0





right
right



struct3

5



right->struct3





struct2

3



struct1->struct2





struct4

2



struct2->struct4





struct5

7



struct3->struct5





在更新堆疊位置後,繼續進行新一輪的子問題直至堆疊為空,此時串列排序完成。

根據以上,可以歸納出此種實作手法的特點:

  • 堆疊的最大層級設為節點數的兩倍: max_level = 2 * n
  • 基於 Lomuto partition scheme 原理,選擇第一個節點(最左)作為 pivot
  • 依放入堆疊性質,優先從右序列right開始排序

改進方案

延伸上一節提到的實作手法分析,提出以下的改進方向:

  • 引入 lab0-c/list.h 的雙向鍵結串列加快排序&搜尋速度
  • 儲存 begin[]end[] 的堆疊大小 max_level 理論上不須使用到
    2n
  • pivot 的位置會決定此輪排序的效能,以隨機分佈的數列來看挑選的位置並無差異,但若是恰好遇到已排序串列如降序數列,則挑選第一個節點會有 worst case 產生

雙向鏈結串列

在引入 lab0-c/list.h 的雙向鏈結串列後,除了不需要 end[] 紀錄序列尾端外(減少堆疊使用空間),也將尋找隊列末的速度從遍歷序列須花

O(n)降至使用 head->prev
O(1)
。雖然相對單向鏈結串列而言在維護結構上較費時,但根據對 quick sort 函式的測試結果可看出,雙向鏈結串列的效率有提昇。

完整更動可見 commit 421c25e

## random input sequence with 100000 items
## Original version: single-linked list
> quick sort time [original] : 36316 
## Improved Version: double-linked list
> quick sort time [double-linked]: 30261

堆疊大小改善

先測試序列大小與堆疊深度的關係:

$ ./check
> size = 10       max level 4
> size = 100      max level 14
> size = 1000     max level 26
> size = 10000    max level 38
> size = 100000   max level 48

可以觀察到大小增加10倍時,堆疊深度大約會多10層,根據以上觀察,設計一個計算堆疊的函式:

+int count(int size) {
+    int cnt = 0;
+    while (size != 0)
+    {
+        cnt++;
+        size /= 10;
+    }
+    return cnt * 10;
+}
+
 long int quick_sort(struct list_head **head)
 {
     clock_t time = clock();
     long int quick_sort(struct list_head **head)
 
     int n = list_length(*head);
     int i = 0;
-    int max_level = 2 * n;
+    int max_level = count(n);

pivot 挑選位置

Quick Sort 的效率受到 pivot 位置選擇影響很大,若不斷地選到最大/最小元素作為 pivot 則會產生 worst-case ,時間複雜度退化成跟 Selection Sort 一樣的

O(n2)
以下實作以隨機挑選 pivot 方式來驗證其對程式的改善效果。

void random_pivot(struct list_head *head)
{
    if (!head || list_is_singular(head))
        return;
    int r = rand() % list_length(head);
    struct list_head *pick;
    list_for_each (pick, head) {
        if (r == 0)
            break;
        r--;
    }
    if (head->next != pick)
        list_swap(head->next, pick);
}

為驗證效果設計了兩個實驗,其一輸入隨機序列並將大小以10倍的級距增加;另一個則是分別測試在輸入 100000個元素串列下,best, average, worse case 下的效能改進:

$ ./check
> max level = 4   size = 10       quick sort time 6  
> max level = 14  size = 100      quick sort time 33  
> max level = 26  size = 1000     quick sort time 409  
> max level = 38  size = 10000    quick sort time 6733  
> max level = 48  size = 100000   quick sort time 33866  

$ ./test
> Segmentation fault (core dumped)

$ ./test
> max level = 50000       quick sort time (avg.  case): 6359356  
> max level = 100000      quick sort time (worst case): 6359356  
> max level = 100000      quick sort time (best  case): 14961502 

可以看到當輸入量達到100000個元素時,使用上述減少堆疊深度函式 count() 會產生 seg. fault,故先將堆疊大小改回

2n後再次測試,可以看到其最大堆疊深度達到
O(n)
,除了佔用空間量大外,在排序時也即耗時(相較於隨機序列的平均結果)

$ ./check
> max level = 8   size = 10       quick sort time 7  
> max level = 18  size = 100      quick sort time 39  
> max level = 24  size = 1000     quick sort time 488  
> max level = 36  size = 10000    quick sort time 3591  
> max level = 52  size = 100000   quick sort time 54835   

$ ./test
> max level = 50  quick sort time (avg.  case): 30540  
> max level = 48  quick sort time (worst case): 30540  
> max level = 48  quick sort time (best  case): 24480 

將 pivot 改成隨機挑選後,可以看到雖然須犧牲小部份時間在 pivot 處理上,但對於 worst case 有很顯著的效能提昇,其使用的堆疊空間亦減少至前面設計的

O(logn)的假設範圍內。


Problem 1-2: Timsort

運作原理

  • find_run() 先把整個資料分成小區塊(稱作 Run )

    • 搜尋連續單調遞增序列(若是連續單調遞減則在完成後翻轉),並將長度存在result.head->prev
    • 此串列頭存在recieve.head中,,未分割串列則存在recieve.next,最後回傳結構體struct pair
  • merge_collapse檢查tp中的前三個 run 是否滿足條件:

    • A > B + C
    • B > C

    當以上條件成立時,A 會放入堆疊中,並繼續取出前三個 B,C,D 比對;上述不成立則取 A,C 中較小者與 B 進行merge_at()合併,保持 stable sorting 的性質,並達到每次合併的 run 大小盡量相同

  • merge()的作法程式碼似乎與測驗二提到的合併方法(暫存較短之序列再進行逐對比較合併)不同,而是引入 lib/list_sort.c 的手法,不須額外配置,在空間使用上更據優勢。

改進方案

根據題目敘述,基於設計者認為現實世界中序列大多為已部份排序(partial sorted),故在合併兩個 run 時引入 Galloping mode 取代一般 merge sort 使用的 one-pair-at-a-time,可以減少比較次數加速合併。

範例:假設兩個已排序 run
A: [3, 7, 11, 12, 13, 31, 45, 221]
B: [21, 22, 24, 24, 29, 1000]
可以看到 A 中 [3, 7, 11, 12, 13] 可以整組直接排在 B 前面。

為了避免序列沒有可以整組排列的性質時,使用 galloping 會比 one-pair-at-a-time 更耗時與耗空間,timsort 引入固定參數 MIN_GALLOP=7 與變數 minGallop=MIN_GALLOP 來決定使用模式,當一組合併比較時發現連續7次都來自同一組 run 時切換為 galloping 模式,且每次進入 galloping 模式會使min_gallop降低,更易觸發 galloping 模式。

Galloping search (expotential search)
在查找整組可插入位置時需要使用到 galloping search ,是以

(2k+1)th方式查找元素。
根據作者實驗結果比較 galloping search 對比 linear search 與 Binary Search 的效能差異,得出在 Index = 7 之前 galloping search 相較來說會需要更多的比較次數,故MIN_GALLOP預設是 7。

實作挑戰

Linked list 不支援隨機存取情況下,使用 Galloping search 可以降低比較次數,但可能會增加記憶體存取次數與時間,且模式切換的判斷函式也會耗時。
參考原始碼是使用 array 做為儲存結構體,但則須使用額外的記憶體空間暫存陣列,如下圖範例(圖源來自 Timsort-wikipedia)

Galloping mode
故此處對於引入 Galloping mode 對整體效能的改進還須加入其他方面考量,如記憶體使用量、排序速度、比較次數等。

整合 sysprog21/lab0-c

To be processed after completing lab0-c


Problem 2-1: Construct Binary Tree

運作原理

基於 preorder & inorder 來重建二元樹的方法,先以 preorder 當前第一個節點為 root,在 inorder 節點中找尋並將其分成左右子樹,依此方法迭代重建二元樹。

hlist

hlist 搭配以 hlist_ 開頭的函式是 Linux 設計用於 hash table 實作的雙向鏈結串列,其定義在 include/linux/types.h 之中:

struct hlist_node {
    struct hlist_node *next, **pprev;
};

struct hlist_head {
    struct hlist_node *first;
};

:bulb:與 double-linked list 不同之處

  • *pprev 指向指自身節點的指標。
  • hlist_head 中的 *first 指向表頭之節點。

透過 node_add 將節點雜湊到表中,以order_node的型態儲存並以鏈結串列處理碰撞機制;存取節點時再以巨集 list_for_each 走訪對應的雜湊串列。







G


cluster_1

order_node 1


cluster_2

order_node 2


cluster_3

order_node 3



null1
NULL



null2
NULL



list_head

hlist_head heads[]

 

list_head.first

 

 

 

 

 



hn1

hlist_node

pprev

next



list_head:ht1->hn1





hn3

hlist_node

pprev

next



list_head:ht4->hn3





hn1:s->list_head:ht1





hn2

hlist_node

pprev

next



hn1:next->hn2





hn2:next->null1





hn2:s->hn1:s





hn3:next->null2





hn3:s->list_head:ht4





dfs

已知深度優先搜尋法(dfs)對樹的存取就是 preorder 順序,在每次切割的前序/中序非空情況下,

  1. preorder 當前節點 [pre_low] 即為該子樹的 root,
  2. 在 inorder 所在的 hash table 中找到根節點後,
  3. 將中序切割成左右子樹,其中 preorder[pre_low + 1] ~ preorder[pre_low + left_size] 屬於左子樹、preorder[pre_high - right_size + 1] ~ preorder[pre_high] 屬於右子樹(左右子樹大小可以透過根節點的位置計算後得到),
  4. 將以上分別傳入dfs遞迴重建。

測試程式

測試程式碼 main.c

先隨機生成一個二元樹並紀錄其 preorder & inorder 順序後,使用此測驗程式碼重建二元樹,並與隨機生成樹的 postorder 進行比較,確認重建樹的正確性。

$ ./main
tree height = 5, tree nodes = 5
Reconstruct time : 2
successfully rebuilt
$ ./main
tree height = 5, tree nodes = 13
Reconstruct time : 3
successfully rebuilt

改進方案

commit 1085173

因 preorder 的順序即為 dfs 的順序,使用遞迴重建僅有程式易讀性一項好處,故將 buildTree() 函式改以非遞迴方法依 preorder 順序,並將每次分割出的子樹放入堆疊。

設計實驗

測試程式碼 analyze.c

隨機生成的二元樹 10,000 次,其中因小於1000個節點的二元樹較看不出差異故先去除,且為比較遞迴效能,故設計不重複節點以避免 hash table 碰撞問題。
buildTree()函式的執行時間與節點數量關係做成下圖,可以看出在節點數逐漸增加後, non-recursive dfs 的效能較 recursive 版本平均而言較佳。

analyze

Linux kernel: cgroups 相關程式碼

pre-order walk 程式碼位於 /kernel/cgroup/cgroup.c css_next_descendant_pre的部份

結構體 struct cgroup_subsys_state *

struct cgroup_subsys_state {
    ...    
    /* siblings list anchored at the parent's ->children */
    struct list_head sibling;
    struct list_head children;
    ...
        
    u64 serial_nr;
    ...
    /*
     * PI: the parent css.	Placed here for cache proximity to following
     * fields of the containing structure.
     */
    struct cgroup_subsys_state *parent;
};

css結構體包含 siblingchildrenparent指標,其變數關係如下:







G



hn1

css_01

sibling

child

...

parent



hn2

css_02

sibling

child

...

parent



hn1:c->hn2:n





hn3

css_03

sibling

child

...

parent



hn1:s->hn3:n





hn2:p->hn1:w





null
NULL



hn2:c->null





hn3:c->null





將 CSS 簡化成樹狀結構可表示成:







BST



l1

0



l2_1

1



l1->l2_1





l2_2

3



l2_1->l2_2





l3

2



l2_1->l3





NULL2
NULL



l2_2->NULL2





NULL1
NULL



l3->NULL1





巨集 css_for_each_descendant_pre

#define css_for_each_descendant_pre(pos, css)				\
	for ((pos) = css_next_descendant_pre(NULL, (css)); (pos);	\
	     (pos) = css_next_descendant_pre((pos), (css)))

巨集中以迴圈呼叫css_next_descendant_pre(),並回傳 pre-order walk 中的下一個元素

css_next_descendant_pre(struct cgroup_subsys_state *pos,
			struct cgroup_subsys_state *root)
{
	struct cgroup_subsys_state *next;

	cgroup_assert_mutex_or_rcu_locked();

	/* if first iteration, visit @root */
	if (!pos)
		return root;

	/* visit the first child if exists */
	next = css_next_child(NULL, pos);
	if (next)
		return next;

	/* no child, visit my or the closest ancestor's next sibling */
	while (pos != root) {
		next = css_next_child(pos, pos->parent);
		if (next)
			return next;
		pos = pos->parent;
	}

	return NULL;
}

root開始存取,利用css_next_child 尋找下一個節點並回傳;當抵達leaf後使用pos->parent回頭尋找下一個節點,因 CSS 結構體依 serial_nr 排序避免了重複走訪的問題。


Problem 2-2: LRU cache

運作原理

LRU(Least Recently Used Cache) 一般會透過雜湊表結合雙向鏈結串列,但在 worst case 下可能會達到

O(n)的時間複雜度,而此題使用hlist來減少快取存取的時間,在選擇合適的 hash function 下,讓讀取/寫入時間複雜度降至
O(1)

結構體

typedef struct {
    int capacity;                // 緩存的最大容量
    int count;                   // 當前緩存中的資料數量
    struct list_head dhead;      // 環狀串列串接所有資料,以最近使用項目來排序
    struct hlist_head hhead[];   // 頭節點數組,用於快速訪問緩存項目
} LRUCache;

typedef struct {
    int key;                     // 索引
    int value;                   // 儲存內容
    struct hlist_node node;
    struct list_head link;
} LRUNode;






G


cluster_2

LRUNode_02


cluster_0

LRUCache


cluster_3

LRUNode_03


cluster_1

LRUNode_01



hhead_ptr

*hhead



table

hhead[0]

[1]

[2]

[3]

[4]

[5]

...



hhead_ptr->table:t0





dhead

dhead

prev

next



hnode1

node

pprev

next



table:t1->hnode1





hnode2

node

pprev

next



table:t5->hnode2





lhead1

link

prev

next



hnode1:p->table:t1





hnode3

node

pprev

next



hnode1:n->hnode3





lhead2

link

prev

next



hnode2:p->table:t5





null2
NULL



hnode2:n->null2





lhead3

link

prev

next



hnode3:p->hnode1





null1
NULL



hnode3:n->null1





依照下圖dhead的順序,可以得知最近存取的節點為LRUNode_02







G


cluster_1

LRUNode_01


cluster_2

LRUNode_02


cluster_3

LRUNode_03



dhead

dhead

prev

next



lhead1

link

prev

next



dhead:n->lhead1:t





lhead2

link

prev

next



lhead1:n->lhead2:t





lhead2:p->lhead1:t





lhead3

link

prev

next



lhead2:n->lhead3:t





lhead3:p->lhead2:t





API in LRU.h

  • 存取資料:lRUCacheGet(LRUCache *obj, int key)
    將輸入 key 雜湊後查表 hhead[hash] 得到資料儲存位置,使用 hlist_for_each 走訪 dhead 尋找該索引值,並將其移至串列開頭以維持 LRU 存取順序。
  • 新增資料:lRUCachePut(LRUCache *obj, int key, int value)
    將輸入 key 經雜湊查表 hhead[hash] 後開始走訪,若該索引已儲存在快取中,則將其資料更新並移至串列 dhead 開頭;反之則先檢查快取是否已達上限,已滿則將串列末尾的資料先移除(即 Least Used Node),再將節點新增至dheadhhead 之中。

測試程式

測試程式碼 main.c

為了測試LRU.h 的 API 功能是否正確運行,在測試程式中新增一個大小為5的快取,並隨機產生測資100筆,執行放入/存取的呼叫,檢驗其正確性。

改進方案

hhead 串列儲存順序

可以看到無論是在 lRUCachePut 中新增節點時,或是 lRUCacheGet查找節點時, hhead 內的資料並不會隨之更新,也就是沒有依循 LRU 的順序。
而在 hhead 中是否有需要多花費時間來維持串列,則須討論到 hash table 碰撞的機率與產生的成本,在高度碰撞與其可能產生較長的碰撞串列下,維護其存取順序可以提高效率;反之則增加時間成本。

int lRUCacheGet(LRUCache *obj, int key)
{
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each(pos, &obj->hhead[hash])
    {
        LRUNode *cache = list_entry(pos, LRUNode, node);  // HHHH
        if (cache->key == key) {
            list_move(&cache->link, &obj->dhead);  // IIII
+            hlist_del(&cache->node);
+            hlist_add_head(&cache->node, &obj->hhead[hash]);
            return cache->value;
        }
    }
    return -1;
}

void lRUCachePut(LRUCache *obj, int key, int value)
{
    LRUNode *cache = NULL;
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each(pos, &obj->hhead[hash])
    {
        LRUNode *c = list_entry(pos, LRUNode, node);  // JJJJ
        if (c->key == key) {
            list_move(&c->link, &obj->dhead);  // KKKK
+            hlist_del(&c->node);
+            hlist_add_head(&c->node, &obj->hhead[hash]);
            cache = c;
        }
    }
    ...

Linux kernel: LRU 相關程式碼

Clock Algorithm

Linux kernel /drivers/md/dm-bufio.c

Linux應用在 buffer-cache replacement 中,並根據程式碼註解說明可以得知,其不是使用傳統的 LRU List 的替換方法(優先替換最久未使用的),而是使用 Clock Algorithm 來實作。

此算法使用環形列表 (circular list),當該頁框 (page frame) 被訪問到時會標記 reference bit = 1;另有一個指針 cursor 在列表中移動,當 page fault 發生時選擇被替換頁面 (victim page):

  • R == 0 則該頁面被置換成新頁面,cursor 會移往下一個位置
  • R == 1 則將其重置為 0 ,然後 cursor 會持續移動並檢查,直到有頁面被替換。

wiki: Page replacement algorithm

LRU cache

Linux kernel /lib/lru_cache.c

另也可以看到使用類似 LRU policy 來實現 cache,其流程可以大致歸納成以下:







G


cluster_1




fault

page fault



a




fault->a





active

active

Ref

...

Ref



a->active

demotion



reclaim
reclaim



inactive

inactive

Ref

...

Ref



reclaim->inactive





inactive->a





inactive:m->active:t


promotion



根據Linux kernel 原始碼得知在 v2.6.28 後為了更高效的管理記憶體,將 LRU 串列又區分出 LRU_ANON LRU_FILE,至此 Linux 的頁面置換演算法結構大致成形。

vmscan: split LRU lists into anon & file sets
Split the LRU lists in two, one set for pages that are backed by real file
systems ("file") and one for pages that are backed by memory and swap
("anon"). The latter includes tmpfs.

commit 4f98a2f


Problem 2-3: find nth-bit

運作原理

目標 : 為了尋找在指定的記憶體空間內,第 n 個設定的位元位置。

完整函式與測試程式 5_find_nth_bit

在供外界呼叫的 API 中會先根據輸入值的大小做處理,透過巨集 small_const_nbits(size)
檢查是否符合小的常數位元數必須是一個 Compile-time constants,且值必須小於等於 BITS_PER_LONG 並大於0。

#define small_const_nbits(nbits) \
    (__builtin_constant_p(nbits) && (nbits) <= BITS_PER_LONG && (nbits) > 0)

Compile-time constants

巨集所使用的 __builtin_constant_p(nbits) 是來自於編譯器 GCC 的內建函數,用來判斷傳入表示式(expression, 此處為輸入值大小),若回傳 1 代表是 Compile-time constants,則可以在編譯時對該表示式進行常數折疊(constant-floading);另提到回傳 0 不代表其表示式不是常數,但只要有 side effects 的表示式皆會回傳 0

Built-in Function: int __builtin_constant_p (exp)
You can use the built-in function __builtin_constant_p to determine if the expression exp is known to be constant at compile time and hence that GCC can perform constant-folding on expressions involving that value. The argument of the function is the expression to test. The expression is not evaluated, side-effects are discarded. The function returns the integer 1 if the argument is known to be a compile-time constant and 0 if it is not known to be a compile-time constant. Any expression that has side-effects makes the function return 0. A return of 0 does not indicate that the expression is not a constant, but merely that GCC cannot prove it is a constant within the constraints of the active set of optimization options.
gcc/Other-Builtins

Side effect
根據 C99 Standards,可以知道符合 side effects 的操作有以下四種:

  1. Accessing a volatile object
  2. modifying an object
  3. modifying a file
  4. calling a function that does any of those operations

Accessing a volatile object, modifying an object, modifying a file, or calling a function that does any of those operations are all side effects,11) which are changes in the state of the execution environment. Evaluation of an expression may produce side effects. At certain specified points in the execution sequence called sequence points, all side effects
of previous evaluations shall be complete and no side effects of subsequent evaluations shall have taken place. (A summary of the sequence points is given in annex C.)
C99 §5.1.2.3-2

如果為 Compile-time constants,則將輸入值 & 與判斷式GENMASK(size - 1, 0) 後返回答案。
可以發現此處在巨集 GENMASK(h, l) 的行為就是將輸入值根據我們所設定的 size,來取出有效範圍 [size-1, 0]

#define GENMASK(h, l) \
    (((~0UL) - (1UL << (l)) + 1) & (~0UL >> (BITS_PER_LONG - 1 - (h))))
...

if (small_const_nbits(size)) {
    unsigned long val = *addr & GENMASK(size - 1, 0);

    return val ? fns(val, n) : size;
}

在找出 word 中由右數來第一個 set bit 的位置時,使用 linear search 在 worst case 的效率太差,此處採用類似 binary search 的方法,每次對剩餘數值的一半做判斷並捨棄全零部份。

e.g. 假設輸入為

0b00001111001010000000000000000000

  • word & 0xffff 0000 0000 0000 0000 0010 1100 0000 0000 -> num = 0
  • word & 0xff 0000 0000 0000 0000 0010 1100 0000 0000 -> num = 0 + 8, shift 8 bit
  • word & 0xf 0000 0000 0000 0000 0010 1100 0000 0000 -> num = 8
  • word & 0x3 0000 0000 0000 0000 0010 1100 0000 0000 -> num = 8 + 2 = 10, shift 2 bit
  • word & 0x1 0000 0000 0000 0000 0010 1100 0000 0000 -> num = 10
static inline unsigned long __ffs(unsigned long word)
{
    int num = 0;

#if BITS_PER_LONG == 64
    if ((word & 0xffffffff) == 0) {   // AAAA
        num += 32;
        word >>= 32;
    }
#endif
    if ((word & 0xffff) == 0) {
        num += 16;
        word >>= 16;
    }
    if ((word & 0xff) == 0) {
        num += 8;
        word >>= 8;
    }
    if ((word & 0xf) == 0) {
        num += 4;
        word >>= 4;
    }
    if ((word & 0x3) == 0) {
        num += 2;
        word >>= 2;
    }
    if ((word & 0x1) == 0)
        num += 1;
    return num;
}

Linux kernel: find_nth_bit 應用

bitmap_remap()

Linux kernel /lib/bitmap.c

主要是用來做 bitmap 的映射,而find_nth_bit的作用是在找出新的 bitmap 對應位。

void bitmap_remap(unsigned long *dst, const unsigned long *src,
		const unsigned long *old, const unsigned long *new,
		unsigned int nbits)
{
	unsigned int oldbit, w;

	if (dst == src)		/* following doesn't handle inplace remaps */
		return;
	bitmap_zero(dst, nbits);

	w = bitmap_weight(new, nbits);
	for_each_set_bit(oldbit, src, nbits) {
		int n = bitmap_pos_to_ord(old, oldbit, nbits);

		if (n < 0 || w == 0)
			set_bit(oldbit, dst);	/* identity map */
		else
			set_bit(find_nth_bit(new, nbits, n % w), dst);
	}
}

CPU affinity

  • CPU affinity 是儘量長時間運行在某個指定的 CPU 上 ,且不被轉移至其他 CPU 的傾向性。
  • 在 NUMA( Non-Uniform Memory Access,非統一記憶體存取架構)下,cpu 對於訪問某些資源 (RAM, I/O) 的成本不一。

根據以上可以推測在 NUMA 系統中,透過考慮 CPU 與記憶體之間的距離將特定的執行緒綁定到 CPU 上,以最大限度地減少存取遠端記憶體所帶來的效能損失。而與 find_nth_bit() 相關的應用,可將任務綁定在數據所在之 NUMA 節點的相應 CPU,透過 CPU affinity 達到最小化訪問記憶體延遲。

Because of the high cost of invalidating and repopulating caches, most operating systems with SMP support try to avoid migrating a thread from one processor to another and instead attempt to keep a thread running on the same processor and take advantage of a warm cache. This is known as processor affinit —that is, a process has an affinity for the processor on which it is currently running.
Operating System Concepts §5.5.4 Processor Affinity

The access to certain resources (RAM, I/O ports) has different costs from different CPUs. This is the case in NUMA (Non-Uniform Memory Architecture) machines. Preferably memory should be accessed locally but this requirement is usually not visible to the scheduler. Therefore forcing a process or thread to the CPUs which have local access to the most-used memory helps to significantly boost the performance.
GNU manual §22.3.5 Limiting execution to certain CPUs

sched_numa_find_nth_cpu()

Linux kernel /kernel/sched/topology.c

函式用於在 NUMA 中找到與指定 CPU 最接近的第 n 個 CPU ,其中 cpumask_nth_and() 便引用到在 /include/linux/cpumask.h 中的 find_nth_bit()

/**
 * sched_numa_find_nth_cpu() - given the NUMA topology, find the Nth closest CPU
 *                             from @cpus to @cpu, taking into account distance
 *                             from a given @node.
 * @cpus: cpumask to find a cpu from
 * @cpu: CPU to start searching
 * @node: NUMA node to order CPUs by distance
 *
 * Return: cpu, or nr_cpu_ids when nothing found.
 */
int sched_numa_find_nth_cpu(const struct cpumask *cpus, int cpu, int node)
{
	struct __cmp_key k = { .cpus = cpus, .cpu = cpu };
	struct cpumask ***hop_masks;
	int hop, ret = nr_cpu_ids;

	if (node == NUMA_NO_NODE)
		return cpumask_nth_and(cpu, cpus, cpu_online_mask);

	rcu_read_lock();

	/* CPU-less node entries are uninitialized in sched_domains_numa_masks */
	node = numa_nearest_node(node, N_CPU);
	k.node = node;

	k.masks = rcu_dereference(sched_domains_numa_masks);
	if (!k.masks)
		goto unlock;

	hop_masks = bsearch(&k, k.masks, sched_domains_numa_levels, sizeof(k.masks[0]), hop_cmp);
	hop = hop_masks	- k.masks;

	ret = hop ?
		cpumask_nth_and_andnot(cpu - k.w, cpus, k.masks[hop][node], k.masks[hop-1][node]) :
		cpumask_nth_and(cpu, cpus, k.masks[0][node]);
unlock:
	rcu_read_unlock();
	return ret;
}
EXPORT_SYMBOL_GPL(sched_numa_find_nth_cpu);