# Linux 核心設計: Concurrency Managed Workqueue(CMWQ) ## Why CMWQ? > [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 在 Linux 中提供 workqueue 介面以實現非同步的流程執行。使用者以一個 "work item" 來描述需被執行的 context,將其加入到所建立 queue 之中。則 kernel 會選出合適的 "worker" thread 來完成之。 在 CMWQ 之前的 workqueue 實作中,worker thread 的數量是與被建立的 workqueue 數量直接掛勾的。意味著使用者每建立一個新的 workqueue,系統上的 worker thread 總量便會增加。實作中包含了兩種 worker thread 的建立方式: * multi threaded (MT) wq: 在每個 CPU 上會有一個 worker thread,work item 可以分配不同的 CPU 上執行 * single threaded (ST) wq: 僅以單一的 worker thread 來完成系統上所有的 work item 在 MT wq 的部份,這代表系統需要建立與 CPU 數量乘上 workqueue 總量等價的 worker thread,這是很嚴重的的問題。畢竟隨著 Linux 的演進,一方面 workqueue 的使用者逐漸增加,而現代電腦的 CPU 核心數量同時也不斷增加。這代表一些系統在剛啟動(booting)時,其 32k 的 PID 空間就會被 worker thread 的 process 給佔滿。 資源限制和 concurrency level 之間存在取捨關係,ST wq 是為整個系統提供唯一的 context,因此所有的 work 都是一一排隊執行,造成 concurrency level 不佳,雖然對資源的使用量比較低。而 MT wq 為每個 CPU 提供一個 context,代價是消耗了大量資源。但即便如此它提供的 concurrency 效果卻不盡如人意。一個關鍵的原因是 MT wq 的 worker thread 的數目是與 CPU 數量嚴格綁定的,且 worker thread 的 work item 也不能互相轉移。另一方面,傳統 workqueue 也容易出現 deadlock 問題。 總結來說,使用 ST wq 得到的 concurrency level 較低,但資源上較為節省。然而使用 MT wq 固然嚴重的消耗資源,能提高的 concurrency level 卻很有限。這樣不對等的 trade off 讓過去的 workqueue 並不是那麼好用。 CMWQ 就在此背景下誕生,並專注在以下目標: * 與原始 workqueue API 的相容性 * 捨棄各個 workqueue 獨立對應一組 worker-pools 的作法。取而代之,所有 workqueue 會共享 per-CPU worker pools,並按需提供靈活的 concurrency level,避免大量的資源耗損 * worker pool 和 concurrency level 間的調節由系統內部處理,對使用者來說可以視為一個黑盒子 ## CMWQ 的設計 在 CMWQ 中,每一項函式流程的執行被抽象為一個 work item ([`work_struct`](https://elixir.bootlin.com/linux/v6.4/source/include/linux/workqueue.h#L98))。 ```cpp typedef void (*work_func_t)(struct work_struct *work); struct work_struct { atomic_long_t data; struct list_head entry; work_func_t func; #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif }; ``` work itme 中包含一個 `func` 成員,是所要被執行的函式指標。當 Linux driver 或 subsystem 想要非同步式的執行某個函數,它就必須建立 `work_struct` 並設置 `func` 以指向要運行的函數,並將其加入到 workqueue 中。 worker thread 負責從 queue 中一個接一個地取出 work item 並執行其中的 `func`。如果沒有 work item 在排隊,則 worker thread 就會處於 idle 狀態。在 worker thread 之上對其管理的結構稱為 worker-pools。 設計上,CMWQ 中對於每個 CPU 會有兩個 worker-pools。一個針對普通的 work,另一個則針對高優先級的 work。此外還有一些未綁定 CPU 的額外 worker-pools,這些 pools 的數量則是動態的。使用者通過 workqueue API 建立 work item 並進行排隊,並且允許設置 flags 來影響部份 work item 的執行方式。例如 CPU locality、concurrency 的限制、優先級等。 當將 work item 排隊到 workqueue 時,會根據 queue 參數和 workqueue 的屬性決定目標的 worker-pools,比如可以設定 workqueue 中的 work item 僅由指定 CPU 之普通或高優先級的 worker-pools 來執行。 對於限定 CPU 的 worker-pools,其 concurrency 的管理方式是將自己與 scheduler 掛勾。則每當任一 worker 被喚醒或是進入睡眠時,worker-pools 會收到通知,藉此來跟踪當前可運行的 woker 數量。一般而言,work item 不太會佔用 CPU 資源,因此設計上傾向使用最少數量的 worker thread 但避免 bandwidth 的損失。實作上,只要該 CPU 上有一個或多個 runnable worker thread,worker-pools 就暫時不會執行新的 work。一直到最後一個正在運行的 worker thread 進入睡眠狀態時,才立即排程一個新的 worker thread,這樣 CPU 就不會在仍有尚未處理的 work item 時無所事事,但也不至於過度的建立大量 worker thread。 另一方面,除了 kthreads 的內存空間之外,保留 idle 的 worker thread 不會消耗其他資源,因此 CWMQ 會保留一個 idle 的 worker thread 一段時間。這樣如果隨即有 work 要處理則可以直接沿用,不必再重新建立。 如果使用不限定 CPU 的 workqueue,使用者可以藉由對應 API 來為此設定自定義的屬性,則 workqueue 將自動建立與屬性對應的 worker-pools。此種 workqueue 對 concurrency level 的調節將由使用者定義。此外,如果想讓綁定 CPU 的 workqueue 也有此特性,API 也提供相應的 flag 以忽略 concurrency 的管理。 ## CMWQ 的初始化 要允許核心內部或者使用者能夠註冊 workqueue,kernel 首先要透過 [`workqueue_init_early`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L6253) 和 [`workqueue_init`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L6331) 兩個階段的初始化來將整個子系統準備好。 ### 上半部份: `workqueue_init_early` 作為上半部份的 `workqueue_init_early` 主要目的在於盡早初始化好 kernel 中的相關資料結構,以利於 kernel 的其他部份可以在早期就能建立 workqueue 和註冊 work item。由於 kthread 尚不能建立,這時候 work 還不能被完成,必須等到下半部份的 `workqueue_init` 也完成後才能繼續進行。 ```cpp void __init workqueue_init_early(void) { int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; int i, cpu; BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL)); cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(HK_TYPE_WQ)); cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, housekeeping_cpumask(HK_TYPE_DOMAIN)); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); ``` 具體初始化的項目其一是 [`wq_unbound_mask`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L337),這標示可以拿來服務 unbound workqueue 的 CPU 有哪些。 而 [`pwq_cache`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L308) 是建立可用來配置 `pool_workqueue` 的 memory pool。若預期未來會有頻繁建立和釋放 `pool_workqueue` 空間的需求,使用 `kmem_cache` 相較於直接採通用的 `kmalloc` 配置可以獲得效率上的優勢或是減輕記憶體碎片化的問題。 ```cpp /* initialize CPU pools */ for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; for_each_cpu_worker_pool(pool, cpu) { BUG_ON(init_worker_pool(pool)); pool->cpu = cpu; cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); pool->attrs->nice = std_nice[i++]; pool->node = cpu_to_node(cpu); /* alloc pool ID */ mutex_lock(&wq_pool_mutex); BUG_ON(worker_pool_assign_id(pool)); mutex_unlock(&wq_pool_mutex); } } ``` 接著會為每個 CPU 建立個別的 `struct worker_pool`。內層 for loop 是因為前面有提到的會有一個針對普通的 work 和另一個針對高優先級的 work ```cpp /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { struct workqueue_attrs *attrs; BUG_ON(!(attrs = alloc_workqueue_attrs())); attrs->nice = std_nice[i]; unbound_std_wq_attrs[i] = attrs; /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ BUG_ON(!(attrs = alloc_workqueue_attrs())); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs; } ``` unbound 的部份不會建先建立 worker pool,不過會建立預設的 attribute [`unbound_std_wq_attrs`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L363) 和 [`ordered_wq_attrs`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L366)。 ```cpp system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0); system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0); BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq); } ``` 在 worker pool 被創建完成之後,就能利用 [`alloc_workqueue`](#建立-workqueue) 來建立 workqueue `struct workqueue_struct` 了。這裡可以看到 kernel 內部事先建立出了各種不同類型的 workqueue。 ### `workqueue_init` [`workqueue_init`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L6331) 是 workqueue 子系統的下半部分。此階段主要在建立 workqueue 所需的 worker thread,且也允許日後建立更多的 thread。 ```cpp void __init workqueue_init(void) { struct workqueue_struct *wq; struct worker_pool *pool; int cpu, bkt; /* * It'd be simpler to initialize NUMA in workqueue_init_early() but * CPU to node mapping may not be available that early on some * archs such as power and arm64. As per-cpu pools created * previously could be missing node hint and unbound pools NUMA * affinity, fix them up. * * Also, while iterating workqueues, create rescuers if requested. */ wq_numa_init(); mutex_lock(&wq_pool_mutex); for_each_possible_cpu(cpu) { for_each_cpu_worker_pool(pool, cpu) { pool->node = cpu_to_node(cpu); } } ``` 這裡先將各個 pool 對應到正確的 node id。細心觀察可以發現 `workqueue_init_early` 已經做過類似的初始化,這裡還需進行的理由可以參照註解。 ```cpp list_for_each_entry(wq, &workqueues, list) { wq_update_unbound_numa(wq, smp_processor_id(), true); WARN(init_rescuer(wq), "workqueue: failed to create early rescuer for %s", wq->name); } mutex_unlock(&wq_pool_mutex); ``` `list_for_each_entry` 走訪所有 [`workqueues`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L333)(每個用 `alloc_workqueue` 建立的 workqueue 會 pending 到 list 中) list 中的 `workqueue_struct`,並逐一進行以下兩項操作: * `wq_update_unbound_numa`: CPU 熱插拔相關? * `init_rescuer`: 假設 workqueue 中的 work 可以釋放記憶體,但資源吃緊情況下,為了順利進行需要釋放記憶體建立更多的 worker thread,這就形成 deadlock 的狀況。為了避免這種問題,CMWQ 針對與記憶體釋放有關的 workqueue 保留了一個 rescuer thread,後者保證了工作的進展。這就是透過 `init_rescuer` 建立的。如果 workqueue 的 flag 設定含 `WQ_MEM_RECLAIM`,會為這類型的 wq 嘗試建立 rescuer。 ```cpp /* create the initial workers */ for_each_online_cpu(cpu) { for_each_cpu_worker_pool(pool, cpu) { pool->flags &= ~POOL_DISASSOCIATED; BUG_ON(!create_worker(pool)); } } ``` 這裡與 CPU-bounded worker pool 有關。[`create_worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1938) 正式建立 `worker_pool` 底下的 [`worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue_internal.h#L24),`worker` 也就包含了處理 worker item 的 kernel thread。 ![](https://hackmd.io/_uploads/SJr0DbqA3.png) ```cpp hash_for_each(unbound_pool_hash, bkt, pool, hash_node) BUG_ON(!create_worker(pool)); ``` 初始化也應用在目前存在的 unboued worker pool,也就是一開始在 `workqueue_init_early` 設置的那些,這些是透過 hash table 來管理。 ```cpp wq_online = true; wq_watchdog_init(); } ``` 上下兩個階段的 workqueue 初始化完成後,可以將 `wq_online` 設置為 true。主要可以幫助 kernel 當下是否可以直接建立 worker 或 rescuer kthread。 ## API 的使用 基於 6.4 版本之 Linux kernel 進行探討 ### 建立 workqueue workqueue 的建立需透過 [`alloc_workqueue`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L4425),可藉之得到配置的 [`struct workqueue_struct`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L264) ```cpp struct workqueue_struct *alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...); ``` `fmt` 和最後的 Variable Length Argument (`...`) 和 workqueue 的命名有關。`max_active` 則是限定 **per-CPU(不是 global!)** 可處理 work item 的最大context 數量。 參數的 `flags` 可用來設置 workqueue 的特性,在 [Concurrency Managed Workqueue (cmwq): flags](https://www.kernel.org/doc/html/latest/core-api/workqueue.html#flags) 一節有更詳細的解釋。 #### `WQ_UNBOUND` 表示被加入到該 queue 中的 work item 是由不指定 CPU 的特殊 worker-pools 所服務的。這種情況下 kernel 不會對該 workqueue 提供 concurrent 管理。worker-pools 會嘗試盡快開始執行 work item。 #### `WQ_FREEZABLE` Linux 的電源管理機制會讓系統進入 suspend 狀態,此情況下 userspace 的 thread 和部分的 kernel thread 會被凍結。設置此 flag 則 workqueue 也會在 suspend 時暫停: kernel 會處理完當前的所有 work 且不接受新的 work,直到解凍。 #### `WQ_MEM_RECLAIM` CMWQ 中存在所謂 **rescuer thread** 的機制。一般來說,為了保證每個 workqueue 都能順利有進度,這依賴於 kernel 可以建立更多的 worker thread。然而在記憶體資源吃緊的狀態下,worker thread 並不一定可以成功建立。這可能導致 deadlock 的狀況。 舉例來說,workqueue A 目前正被處理,但其依賴於 workqueue B 釋放記憶體資源。結果 workqueue B 此時卻因為記憶體不足導致建不出 worker thread 來處理,自然沒機會將記憶體釋放出去。 當設置 `WQ_MEM_RECLAIM`,則 kernel 將預先考慮此狀況讓建立的 workqueue 可以無視記憶體的壓力來建立 **rescuer thread** 以避免 deadlock 的發生。 #### `WQ_HIGHPRI` 使用此 flag 的 workqueue 將使用高優先級的 worker-pools 去處理(nice value 較低的 worker thread)。 注意到高優先級與普通的 worker-pools 之間是不能互通的。 #### `WQ_CPU_INTENSIVE` 被設置此 flag 的 workqueue 中的 work 與其是特別消耗 CPU 資源的,則 kernel 將不會將其造成的壓力計算到 concurrent level。具體的影響是,這些 workqueue 中的 work item 將歸 scheduler 管理,相關的 work item 不會阻止同一 worker-pools 中的其他 work item 被執行。 #### `WQ_SYSFS` 使 workqueue 信息在 sysfs 中可以被 user 查看到。 #### `WQ_POWER_EFFICIENT` per-CPU 的 workqueue 由於 cache locality,往往會表現出更好的性能。這也使 scheduler 無法選擇 CPU 來執行 worker thread。然而這也不是那麼完美的,其中一個不幸的副作用是造成功耗的增加。 在省電的考量上,如果特定 CPU 上沒有任何任務要執行,則 scheduler 會認為該 CPU 是 idle,並嘗試維持 idle 狀態。然而,例如在 idle CPU 上的 interrupt handler 可能在其對應的 workqueue 上添加新的 work item,進而強制 scheduler 打破 idle 狀態。換句話說,系統在省電的選擇就受限於 workqueue 而可能無法選擇最佳選項。 針對此狀況,標有 `WQ_POWER_EFFICIENT` 的 workqueue 預設是綁定 CPU 的,但如果指定了 `workqueue.power_efficient` 這個 kernel 參數,可以讓該 workqueue 變為未綁定。則可以一定程度的應付上述問題。 :::warning `WQ_SYSFS` 和 `WQ_POWER_EFFICIENT` 的詳細解釋目前在 [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 是沒有的 ::: ```cpp __printf(1, 4) struct workqueue_struct *alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...) { size_t tbl_size = 0; va_list args; struct workqueue_struct *wq; struct pool_workqueue *pwq; /* * Unbound && max_active == 1 used to imply ordered, which is no * longer the case on NUMA machines due to per-node pools. While * alloc_ordered_workqueue() is the right way to create an ordered * workqueue, keep the previous behavior to avoid subtle breakages * on NUMA. */ if ((flags & WQ_UNBOUND) && max_active == 1) flags |= __WQ_ORDERED; /* see the comment above the definition of WQ_POWER_EFFICIENT */ if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) flags |= WQ_UNBOUND; ``` 首先,根據參數,`alloc_workqueue` 會加上一些額外的 flag。`__WQ_ORDERED` 表示 work item 會嚴格的按照在 queue 中的順序運行。這個 flag 是通過設置 `WQ_UNBOUND` 和 `max_active` 為 1 來隱式添加的。 此外,標有 `WQ_POWER_EFFICIENT` 的 workqueue 如果指定了 `workqueue.power_efficient` 會變成 unbound,這在前面有先說明。 ```cpp /* allocate wq and format name */ if (flags & WQ_UNBOUND) tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]); wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); if (!wq) return NULL; if (flags & WQ_UNBOUND) { wq->unbound_attrs = alloc_workqueue_attrs(); if (!wq->unbound_attrs) goto err_free_wq; } ``` 配置 `workqueue_struct` 所需要的空間。對於 unbounded wq,worker-pools 需要額外的空間來描述,此外也需要空間來記錄自定義的屬性。 ```cpp va_start(args, max_active); vsnprintf(wq->name, sizeof(wq->name), fmt, args); va_end(args); ``` 通過參數設置 wq 的名稱的字串。 ```cpp max_active = max_active ?: WQ_DFL_ACTIVE; max_active = wq_clamp_max_active(max_active, flags, wq->name); ``` 參數的 `max_active` 如果給 0 的話,則系統會使用預設的 [`WQ_DFL_ACTIVE`](https://elixir.bootlin.com/linux/v6.4/source/include/linux/workqueue.h#L347)。然後再由 `wq_clamp_max_active` 校正 `max_active` 至合理範圍。 ```cpp if (alloc_and_link_pwqs(wq) < 0) goto err_unreg_lockdep; ``` 最後部分大多與底下資料結構的初始化相關,暫且先略過大部分。但需要留意 [`alloc_and_link_pwqs`](#`alloc_and_link_pwqs`) 這個地方會建立 `workqueue_struct`、`pool_workqueue`、`worker_pool` 之間的關聯。 ### `alloc_and_link_pwqs` [`alloc_and_link_pwqs`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L4336) 把將在 `alloc_workqueue` 配置的 `workqueue_struct` 與 `pool_workqueue`、`worker_pool` 的關係連結在一起。 ```cpp static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; int cpu, ret; if (!(wq->flags & WQ_UNBOUND)) { wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); if (!wq->cpu_pwqs) return -ENOMEM; for_each_possible_cpu(cpu) { struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu); init_pwq(pwq, wq, &cpu_pools[highpri]); mutex_lock(&wq->mutex); link_pwq(pwq); mutex_unlock(&wq->mutex); } return 0; } ``` 如果是 bounded waitqueue,workqueue 下會配置與 CPU 數量等量的 `pool_workqueue`,維護在 `wq->cpu_pwqs` 中。且每個 `pool_workqueue` 會各自對應到前面在 `workqueue_init_early` 建立的 `worker_pool`,根據 flag 可能是普通或高優先級的 workqueue。 ![](https://hackmd.io/_uploads/SkQWw9FRn.png) ```cpp cpus_read_lock(); if (wq->flags & __WQ_ORDERED) { ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s\n", wq->name); } else { ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } cpus_read_unlock(); return ret; } ``` unbouded workqueue 則是根據是否是需要嚴格按照 work item 在 queue 中的順序運行的 `__WQ_ORDERED`。若是,則使用 `ordered_wq_attrs`,否的或會使用 `unbound_std_wq_attrs`。對應的 worker pool 則是透過 attribute 去取得([`get_unbound_pool`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L3763))。 ### worker thread ```cpp /** * worker_thread - the worker thread function * @__worker: self * * The worker thread function. All workers belong to a worker_pool - * either a per-cpu one or dynamic unbound one. These workers process all * work items regardless of their specific target workqueue. The only * exception is work items which belong to workqueues with a rescuer which * will be explained in rescuer_thread(). * * Return: 0 */ static int worker_thread(void *__worker) { struct worker *worker = __worker; struct worker_pool *pool = worker->pool; /* tell the scheduler that this is a workqueue worker */ set_pf_worker(true); ``` 在 CMWQ 中,每個 worker-pool 中各自會建立自己的 [`worker_thread`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L2497)。需要知道 workqueue 只是對使用者端的抽象,對後端的 `worker_thread` 來說,無論是屬於 bounded 還是 unbounded 的 worker-pools,`worker_thread` 並不會區分 workqueue(除了需要 rescuer thread 的 workqueue),對其來說就只是有個 work list 由它負責處理與完成而已。 [`set_pf_worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L2475) 對當前的 task 標注 [`PF_WQ_WORKER`](https://elixir.bootlin.com/linux/v6.4/source/include/linux/sched.h#L1729) 這個 flag,這讓 scheduler 可以和 CMWQ 機制掛勾並為其提供特殊的處理。 ```cpp woke_up: raw_spin_lock_irq(&pool->lock); /* am I supposed to die? */ if (unlikely(worker->flags & WORKER_DIE)) { raw_spin_unlock_irq(&pool->lock); set_pf_worker(false); set_task_comm(worker->task, "kworker/dying"); ida_free(&pool->worker_ida, worker->id); worker_detach_from_pool(worker); WARN_ON_ONCE(!list_empty(&worker->entry)); kfree(worker); return 0; } worker_leave_idle(worker); ``` `woke_up` tag 下的流程是 worker thread 重新啟動時需進行的,包含 `WORKER_DIE` 被設置時需要將其從 pool 中移除,並回收此 worker。 若非為了結束該 worker 而喚醒之,那麼 [`worker_leave_idle`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1837) 將自己的狀態設為非 idle,並把自己從對應 worker_pool 的 idle list 中移除出來。 ```cpp recheck: /* no more worker necessary? */ if (!need_more_worker(pool)) goto sleep; /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck; /* * ->scheduled list can only be filled while a worker is * preparing to process a work or actually processing it. * Make sure nobody diddled with it while I was sleeping. */ WARN_ON_ONCE(!list_empty(&worker->scheduled)); ``` `recheck` 會進行一系列的檢查以動態調整 worker thread。 [`need_more_worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L806) 根據待解的 work 數量和已經 running 的 worker 數量(`nr_running`),決定此 worker thread 是否也需被啟動,又或者返回 sleep 狀態。 如果沒有 idle worker ([`!may_start_working()`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L812)),會判斷此 worker thread 是否需要作為 manager 以 [`manage_workers`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L2268) 來建立 idle thread。 被喚醒的 worker thread 之 `scheduled` list 不應該有任何的 work,若發生則這裡 `WARN_ON_ONCE` 對此異常做出警告。 ```cpp /* * Finish PREP stage. We're guaranteed to have at least one idle * worker or that someone else has already assumed the manager * role. This is where @worker starts participating in concurrency * management if applicable and concurrency management is restored * after being rebound. See rebind_workers() for details. */ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND); ``` [`worker_clr_flags`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1006) 清除 `WORKER_PREP` 和 `WORKER_REBOUND` 兩個 flag。`WORKER_PREP` 表示 worker thread 是準備可以執行 work 的狀態,清除之意外這 worker thread 將開始挑選 work 來處理。`WORKER_REBOUND` 則與 [`rebind_workers`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L5225) 有關。 如果清除這些 flag 使得 worker 不再具有 `WORKER_NOT_RUNNING`,同時也將 pool 的 `nr_running` + 1。 :::info ```cpp WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE | WORKER_UNBOUND | WORKER_REBOUND; ``` ::: ```cpp do { struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry); pool->watchdog_ts = jiffies; if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { /* optimization path, not strictly necessary */ process_one_work(worker, work); if (unlikely(!list_empty(&worker->scheduled))) process_scheduled_works(worker); } else { move_linked_works(work, &worker->scheduled, NULL); process_scheduled_works(worker); } } while (keep_working(pool)); worker_set_flags(worker, WORKER_PREP); ``` 這裡是 worker thread 挑選 work item 以進行的流程。具體所為是從對應的 worker_pool 之 linked list `pool->worklist` 上一個個取得 `work` 來處裡,直到所有 work 都被完成。( [`!keep_working()`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L818),但每個 pool 會留下至少一個 worker thread)。 對於每個 `work_struct`,如果該其 data bitmask 未設置 `WORK_STRUCT_LINKED`,這表示 work 與 work 之間沒有依賴關係,可以直接以 `process_one_work` 來執行該 work 所想執行的內容即可。反之則表示 work 之間存在依賴關係,[`move_linked_works`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1089) 會將一系列標註 `WORK_STRUCT_LINKED` 的 work 移轉到另一個 list `worker->scheduled`,並按次序執行再用 `process_one_work` 去完成該 list 上的 work。換言之,work 間沒有依賴關係的情形就用優化的 code path 來進行即可。 返回 sleep 狀態情形下,用 [`worker_set_flags`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L981) 去設置回 `WORKER_PREP`。 ```cpp sleep: /* * pool->lock is held and there's no work to process and no need to * manage, sleep. Workers are woken up only while holding * pool->lock or from local cpu, so setting the current state * before releasing pool->lock is enough to prevent losing any * event. */ worker_enter_idle(worker); __set_current_state(TASK_IDLE); raw_spin_unlock_irq(&pool->lock); schedule(); goto woke_up; } ``` 與 [`worker_leave_idle`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1837) 相對,[`worker_enter_idle`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1804) 根據 worker 回到 idle 的情況設置對應的狀態,並安排下次被喚醒的時間。 ### 初始化 workqueue_init -> create_worker -> worker_thread ### 加入新任務至 workqueue `struct workqueue_struct` ## Reference * [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) * [Concurrency Managed Workqueue之(二):CMWQ概述](http://www.wowotech.net/irq_subsystem/cmwq-intro.html)