# Linux 核心設計: Concurrency Managed Workqueue(CMWQ)
## Why CMWQ?
> [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
在 Linux 中提供 workqueue 介面以實現非同步的流程執行。使用者以一個 "work item" 來描述需被執行的 context,將其加入到所建立 queue 之中。則 kernel 會選出合適的 "worker" thread 來完成之。
在 CMWQ 之前的 workqueue 實作中,worker thread 的數量是與被建立的 workqueue 數量直接掛勾的。意味著使用者每建立一個新的 workqueue,系統上的 worker thread 總量便會增加。實作中包含了兩種 worker thread 的建立方式:
* multi threaded (MT) wq: 在每個 CPU 上會有一個 worker thread,work item 可以分配不同的 CPU 上執行
* single threaded (ST) wq: 僅以單一的 worker thread 來完成系統上所有的 work item
在 MT wq 的部份,這代表系統需要建立與 CPU 數量乘上 workqueue 總量等價的 worker thread,這是很嚴重的的問題。畢竟隨著 Linux 的演進,一方面 workqueue 的使用者逐漸增加,而現代電腦的 CPU 核心數量同時也不斷增加。這代表一些系統在剛啟動(booting)時,其 32k 的 PID 空間就會被 worker thread 的 process 給佔滿。
資源限制和 concurrency level 之間存在取捨關係,ST wq 是為整個系統提供唯一的 context,因此所有的 work 都是一一排隊執行,造成 concurrency level 不佳,雖然對資源的使用量比較低。而 MT wq 為每個 CPU 提供一個 context,代價是消耗了大量資源。但即便如此它提供的 concurrency 效果卻不盡如人意。一個關鍵的原因是 MT wq 的 worker thread 的數目是與 CPU 數量嚴格綁定的,且 worker thread 的 work item 也不能互相轉移。另一方面,傳統 workqueue 也容易出現 deadlock 問題。
總結來說,使用 ST wq 得到的 concurrency level 較低,但資源上較為節省。然而使用 MT wq 固然嚴重的消耗資源,能提高的 concurrency level 卻很有限。這樣不對等的 trade off 讓過去的 workqueue 並不是那麼好用。
CMWQ 就在此背景下誕生,並專注在以下目標:
* 與原始 workqueue API 的相容性
* 捨棄各個 workqueue 獨立對應一組 worker-pools 的作法。取而代之,所有 workqueue 會共享 per-CPU worker pools,並按需提供靈活的 concurrency level,避免大量的資源耗損
* worker pool 和 concurrency level 間的調節由系統內部處理,對使用者來說可以視為一個黑盒子
## CMWQ 的設計
在 CMWQ 中,每一項函式流程的執行被抽象為一個 work item ([`work_struct`](https://elixir.bootlin.com/linux/v6.4/source/include/linux/workqueue.h#L98))。
```cpp
typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
```
work itme 中包含一個 `func` 成員,是所要被執行的函式指標。當 Linux driver 或 subsystem 想要非同步式的執行某個函數,它就必須建立 `work_struct` 並設置 `func` 以指向要運行的函數,並將其加入到 workqueue 中。
worker thread 負責從 queue 中一個接一個地取出 work item 並執行其中的 `func`。如果沒有 work item 在排隊,則 worker thread 就會處於 idle 狀態。在 worker thread 之上對其管理的結構稱為 worker-pools。
設計上,CMWQ 中對於每個 CPU 會有兩個 worker-pools。一個針對普通的 work,另一個則針對高優先級的 work。此外還有一些未綁定 CPU 的額外 worker-pools,這些 pools 的數量則是動態的。使用者通過 workqueue API 建立 work item 並進行排隊,並且允許設置 flags 來影響部份 work item 的執行方式。例如 CPU locality、concurrency 的限制、優先級等。
當將 work item 排隊到 workqueue 時,會根據 queue 參數和 workqueue 的屬性決定目標的 worker-pools,比如可以設定 workqueue 中的 work item 僅由指定 CPU 之普通或高優先級的 worker-pools 來執行。
對於限定 CPU 的 worker-pools,其 concurrency 的管理方式是將自己與 scheduler 掛勾。則每當任一 worker 被喚醒或是進入睡眠時,worker-pools 會收到通知,藉此來跟踪當前可運行的 woker 數量。一般而言,work item 不太會佔用 CPU 資源,因此設計上傾向使用最少數量的 worker thread 但避免 bandwidth 的損失。實作上,只要該 CPU 上有一個或多個 runnable worker thread,worker-pools 就暫時不會執行新的 work。一直到最後一個正在運行的 worker thread 進入睡眠狀態時,才立即排程一個新的 worker thread,這樣 CPU 就不會在仍有尚未處理的 work item 時無所事事,但也不至於過度的建立大量 worker thread。
另一方面,除了 kthreads 的內存空間之外,保留 idle 的 worker thread 不會消耗其他資源,因此 CWMQ 會保留一個 idle 的 worker thread 一段時間。這樣如果隨即有 work 要處理則可以直接沿用,不必再重新建立。
如果使用不限定 CPU 的 workqueue,使用者可以藉由對應 API 來為此設定自定義的屬性,則 workqueue 將自動建立與屬性對應的 worker-pools。此種 workqueue 對 concurrency level 的調節將由使用者定義。此外,如果想讓綁定 CPU 的 workqueue 也有此特性,API 也提供相應的 flag 以忽略 concurrency 的管理。
## CMWQ 的初始化
要允許核心內部或者使用者能夠註冊 workqueue,kernel 首先要透過 [`workqueue_init_early`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L6253) 和 [`workqueue_init`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L6331) 兩個階段的初始化來將整個子系統準備好。
### 上半部份: `workqueue_init_early`
作為上半部份的 `workqueue_init_early` 主要目的在於盡早初始化好 kernel 中的相關資料結構,以利於 kernel 的其他部份可以在早期就能建立 workqueue 和註冊 work item。由於 kthread 尚不能建立,這時候 work 還不能被完成,必須等到下半部份的 `workqueue_init` 也完成後才能繼續進行。
```cpp
void __init workqueue_init_early(void)
{
int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
int i, cpu;
BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(HK_TYPE_WQ));
cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, housekeeping_cpumask(HK_TYPE_DOMAIN));
pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
```
具體初始化的項目其一是 [`wq_unbound_mask`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L337),這標示可以拿來服務 unbound workqueue 的 CPU 有哪些。
而 [`pwq_cache`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L308) 是建立可用來配置 `pool_workqueue` 的 memory pool。若預期未來會有頻繁建立和釋放 `pool_workqueue` 空間的需求,使用 `kmem_cache` 相較於直接採通用的 `kmalloc` 配置可以獲得效率上的優勢或是減輕記憶體碎片化的問題。
```cpp
/* initialize CPU pools */
for_each_possible_cpu(cpu) {
struct worker_pool *pool;
i = 0;
for_each_cpu_worker_pool(pool, cpu) {
BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
pool->attrs->nice = std_nice[i++];
pool->node = cpu_to_node(cpu);
/* alloc pool ID */
mutex_lock(&wq_pool_mutex);
BUG_ON(worker_pool_assign_id(pool));
mutex_unlock(&wq_pool_mutex);
}
}
```
接著會為每個 CPU 建立個別的 `struct worker_pool`。內層 for loop 是因為前面有提到的會有一個針對普通的 work 和另一個針對高優先級的 work
```cpp
/* create default unbound and ordered wq attrs */
for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
struct workqueue_attrs *attrs;
BUG_ON(!(attrs = alloc_workqueue_attrs()));
attrs->nice = std_nice[i];
unbound_std_wq_attrs[i] = attrs;
/*
* An ordered wq should have only one pwq as ordering is
* guaranteed by max_active which is enforced by pwqs.
* Turn off NUMA so that dfl_pwq is used for all nodes.
*/
BUG_ON(!(attrs = alloc_workqueue_attrs()));
attrs->nice = std_nice[i];
attrs->no_numa = true;
ordered_wq_attrs[i] = attrs;
}
```
unbound 的部份不會建先建立 worker pool,不過會建立預設的 attribute [`unbound_std_wq_attrs`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L363) 和 [`ordered_wq_attrs`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L366)。
```cpp
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0);
system_power_efficient_wq = alloc_workqueue("events_power_efficient",
WQ_POWER_EFFICIENT, 0);
system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
WQ_FREEZABLE | WQ_POWER_EFFICIENT,
0);
BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
!system_unbound_wq || !system_freezable_wq ||
!system_power_efficient_wq ||
!system_freezable_power_efficient_wq);
}
```
在 worker pool 被創建完成之後,就能利用 [`alloc_workqueue`](#建立-workqueue) 來建立 workqueue `struct workqueue_struct` 了。這裡可以看到 kernel 內部事先建立出了各種不同類型的 workqueue。
### `workqueue_init`
[`workqueue_init`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L6331) 是 workqueue 子系統的下半部分。此階段主要在建立 workqueue 所需的 worker thread,且也允許日後建立更多的 thread。
```cpp
void __init workqueue_init(void)
{
struct workqueue_struct *wq;
struct worker_pool *pool;
int cpu, bkt;
/*
* It'd be simpler to initialize NUMA in workqueue_init_early() but
* CPU to node mapping may not be available that early on some
* archs such as power and arm64. As per-cpu pools created
* previously could be missing node hint and unbound pools NUMA
* affinity, fix them up.
*
* Also, while iterating workqueues, create rescuers if requested.
*/
wq_numa_init();
mutex_lock(&wq_pool_mutex);
for_each_possible_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
pool->node = cpu_to_node(cpu);
}
}
```
這裡先將各個 pool 對應到正確的 node id。細心觀察可以發現 `workqueue_init_early` 已經做過類似的初始化,這裡還需進行的理由可以參照註解。
```cpp
list_for_each_entry(wq, &workqueues, list) {
wq_update_unbound_numa(wq, smp_processor_id(), true);
WARN(init_rescuer(wq),
"workqueue: failed to create early rescuer for %s",
wq->name);
}
mutex_unlock(&wq_pool_mutex);
```
`list_for_each_entry` 走訪所有 [`workqueues`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L333)(每個用 `alloc_workqueue` 建立的 workqueue 會 pending 到 list 中) list 中的 `workqueue_struct`,並逐一進行以下兩項操作:
* `wq_update_unbound_numa`: CPU 熱插拔相關?
* `init_rescuer`: 假設 workqueue 中的 work 可以釋放記憶體,但資源吃緊情況下,為了順利進行需要釋放記憶體建立更多的 worker thread,這就形成 deadlock 的狀況。為了避免這種問題,CMWQ 針對與記憶體釋放有關的 workqueue 保留了一個 rescuer thread,後者保證了工作的進展。這就是透過 `init_rescuer` 建立的。如果 workqueue 的 flag 設定含 `WQ_MEM_RECLAIM`,會為這類型的 wq 嘗試建立 rescuer。
```cpp
/* create the initial workers */
for_each_online_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
pool->flags &= ~POOL_DISASSOCIATED;
BUG_ON(!create_worker(pool));
}
}
```
這裡與 CPU-bounded worker pool 有關。[`create_worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1938) 正式建立 `worker_pool` 底下的 [`worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue_internal.h#L24),`worker` 也就包含了處理 worker item 的 kernel thread。

```cpp
hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
BUG_ON(!create_worker(pool));
```
初始化也應用在目前存在的 unboued worker pool,也就是一開始在 `workqueue_init_early` 設置的那些,這些是透過 hash table 來管理。
```cpp
wq_online = true;
wq_watchdog_init();
}
```
上下兩個階段的 workqueue 初始化完成後,可以將 `wq_online` 設置為 true。主要可以幫助 kernel 當下是否可以直接建立 worker 或 rescuer kthread。
## API 的使用
基於 6.4 版本之 Linux kernel 進行探討
### 建立 workqueue
workqueue 的建立需透過 [`alloc_workqueue`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L4425),可藉之得到配置的 [`struct workqueue_struct`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L264)
```cpp
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
int max_active, ...);
```
`fmt` 和最後的 Variable Length Argument (`...`) 和 workqueue 的命名有關。`max_active` 則是限定 **per-CPU(不是 global!)** 可處理 work item 的最大context 數量。
參數的 `flags` 可用來設置 workqueue 的特性,在 [Concurrency Managed Workqueue (cmwq): flags](https://www.kernel.org/doc/html/latest/core-api/workqueue.html#flags) 一節有更詳細的解釋。
#### `WQ_UNBOUND`
表示被加入到該 queue 中的 work item 是由不指定 CPU 的特殊 worker-pools 所服務的。這種情況下 kernel 不會對該 workqueue 提供 concurrent 管理。worker-pools 會嘗試盡快開始執行 work item。
#### `WQ_FREEZABLE`
Linux 的電源管理機制會讓系統進入 suspend 狀態,此情況下 userspace 的 thread 和部分的 kernel thread 會被凍結。設置此 flag 則 workqueue 也會在 suspend 時暫停: kernel 會處理完當前的所有 work 且不接受新的 work,直到解凍。
#### `WQ_MEM_RECLAIM`
CMWQ 中存在所謂 **rescuer thread** 的機制。一般來說,為了保證每個 workqueue 都能順利有進度,這依賴於 kernel 可以建立更多的 worker thread。然而在記憶體資源吃緊的狀態下,worker thread 並不一定可以成功建立。這可能導致 deadlock 的狀況。
舉例來說,workqueue A 目前正被處理,但其依賴於 workqueue B 釋放記憶體資源。結果 workqueue B 此時卻因為記憶體不足導致建不出 worker thread 來處理,自然沒機會將記憶體釋放出去。
當設置 `WQ_MEM_RECLAIM`,則 kernel 將預先考慮此狀況讓建立的 workqueue 可以無視記憶體的壓力來建立 **rescuer thread** 以避免 deadlock 的發生。
#### `WQ_HIGHPRI`
使用此 flag 的 workqueue 將使用高優先級的 worker-pools 去處理(nice value 較低的 worker thread)。
注意到高優先級與普通的 worker-pools 之間是不能互通的。
#### `WQ_CPU_INTENSIVE`
被設置此 flag 的 workqueue 中的 work 與其是特別消耗 CPU 資源的,則 kernel 將不會將其造成的壓力計算到 concurrent level。具體的影響是,這些 workqueue 中的 work item 將歸 scheduler 管理,相關的 work item 不會阻止同一 worker-pools 中的其他 work item 被執行。
#### `WQ_SYSFS`
使 workqueue 信息在 sysfs 中可以被 user 查看到。
#### `WQ_POWER_EFFICIENT`
per-CPU 的 workqueue 由於 cache locality,往往會表現出更好的性能。這也使 scheduler 無法選擇 CPU 來執行 worker thread。然而這也不是那麼完美的,其中一個不幸的副作用是造成功耗的增加。
在省電的考量上,如果特定 CPU 上沒有任何任務要執行,則 scheduler 會認為該 CPU 是 idle,並嘗試維持 idle 狀態。然而,例如在 idle CPU 上的 interrupt handler 可能在其對應的 workqueue 上添加新的 work item,進而強制 scheduler 打破 idle 狀態。換句話說,系統在省電的選擇就受限於 workqueue 而可能無法選擇最佳選項。
針對此狀況,標有 `WQ_POWER_EFFICIENT` 的 workqueue 預設是綁定 CPU 的,但如果指定了 `workqueue.power_efficient` 這個 kernel 參數,可以讓該 workqueue 變為未綁定。則可以一定程度的應付上述問題。
:::warning
`WQ_SYSFS` 和 `WQ_POWER_EFFICIENT` 的詳細解釋目前在 [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 是沒有的
:::
```cpp
__printf(1, 4)
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
int max_active, ...)
{
size_t tbl_size = 0;
va_list args;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
/*
* Unbound && max_active == 1 used to imply ordered, which is no
* longer the case on NUMA machines due to per-node pools. While
* alloc_ordered_workqueue() is the right way to create an ordered
* workqueue, keep the previous behavior to avoid subtle breakages
* on NUMA.
*/
if ((flags & WQ_UNBOUND) && max_active == 1)
flags |= __WQ_ORDERED;
/* see the comment above the definition of WQ_POWER_EFFICIENT */
if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
flags |= WQ_UNBOUND;
```
首先,根據參數,`alloc_workqueue` 會加上一些額外的 flag。`__WQ_ORDERED` 表示 work item 會嚴格的按照在 queue 中的順序運行。這個 flag 是通過設置 `WQ_UNBOUND` 和 `max_active` 為 1 來隱式添加的。
此外,標有 `WQ_POWER_EFFICIENT` 的 workqueue 如果指定了 `workqueue.power_efficient` 會變成 unbound,這在前面有先說明。
```cpp
/* allocate wq and format name */
if (flags & WQ_UNBOUND)
tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
if (!wq)
return NULL;
if (flags & WQ_UNBOUND) {
wq->unbound_attrs = alloc_workqueue_attrs();
if (!wq->unbound_attrs)
goto err_free_wq;
}
```
配置 `workqueue_struct` 所需要的空間。對於 unbounded wq,worker-pools 需要額外的空間來描述,此外也需要空間來記錄自定義的屬性。
```cpp
va_start(args, max_active);
vsnprintf(wq->name, sizeof(wq->name), fmt, args);
va_end(args);
```
通過參數設置 wq 的名稱的字串。
```cpp
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
```
參數的 `max_active` 如果給 0 的話,則系統會使用預設的 [`WQ_DFL_ACTIVE`](https://elixir.bootlin.com/linux/v6.4/source/include/linux/workqueue.h#L347)。然後再由 `wq_clamp_max_active` 校正 `max_active` 至合理範圍。
```cpp
if (alloc_and_link_pwqs(wq) < 0)
goto err_unreg_lockdep;
```
最後部分大多與底下資料結構的初始化相關,暫且先略過大部分。但需要留意 [`alloc_and_link_pwqs`](#`alloc_and_link_pwqs`) 這個地方會建立 `workqueue_struct`、`pool_workqueue`、`worker_pool` 之間的關聯。
### `alloc_and_link_pwqs`
[`alloc_and_link_pwqs`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L4336) 把將在 `alloc_workqueue` 配置的 `workqueue_struct` 與 `pool_workqueue`、`worker_pool` 的關係連結在一起。
```cpp
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
int cpu, ret;
if (!(wq->flags & WQ_UNBOUND)) {
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
if (!wq->cpu_pwqs)
return -ENOMEM;
for_each_possible_cpu(cpu) {
struct pool_workqueue *pwq =
per_cpu_ptr(wq->cpu_pwqs, cpu);
struct worker_pool *cpu_pools =
per_cpu(cpu_worker_pools, cpu);
init_pwq(pwq, wq, &cpu_pools[highpri]);
mutex_lock(&wq->mutex);
link_pwq(pwq);
mutex_unlock(&wq->mutex);
}
return 0;
}
```
如果是 bounded waitqueue,workqueue 下會配置與 CPU 數量等量的 `pool_workqueue`,維護在 `wq->cpu_pwqs` 中。且每個 `pool_workqueue` 會各自對應到前面在 `workqueue_init_early` 建立的 `worker_pool`,根據 flag 可能是普通或高優先級的 workqueue。

```cpp
cpus_read_lock();
if (wq->flags & __WQ_ORDERED) {
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
/* there should only be single pwq for ordering guarantee */
WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
"ordering guarantee broken for workqueue %s\n", wq->name);
} else {
ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
}
cpus_read_unlock();
return ret;
}
```
unbouded workqueue 則是根據是否是需要嚴格按照 work item 在 queue 中的順序運行的 `__WQ_ORDERED`。若是,則使用 `ordered_wq_attrs`,否的或會使用 `unbound_std_wq_attrs`。對應的 worker pool 則是透過 attribute 去取得([`get_unbound_pool`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L3763))。
### worker thread
```cpp
/**
* worker_thread - the worker thread function
* @__worker: self
*
* The worker thread function. All workers belong to a worker_pool -
* either a per-cpu one or dynamic unbound one. These workers process all
* work items regardless of their specific target workqueue. The only
* exception is work items which belong to workqueues with a rescuer which
* will be explained in rescuer_thread().
*
* Return: 0
*/
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct worker_pool *pool = worker->pool;
/* tell the scheduler that this is a workqueue worker */
set_pf_worker(true);
```
在 CMWQ 中,每個 worker-pool 中各自會建立自己的 [`worker_thread`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L2497)。需要知道 workqueue 只是對使用者端的抽象,對後端的 `worker_thread` 來說,無論是屬於 bounded 還是 unbounded 的 worker-pools,`worker_thread` 並不會區分 workqueue(除了需要 rescuer thread 的 workqueue),對其來說就只是有個 work list 由它負責處理與完成而已。
[`set_pf_worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L2475) 對當前的 task 標注 [`PF_WQ_WORKER`](https://elixir.bootlin.com/linux/v6.4/source/include/linux/sched.h#L1729) 這個 flag,這讓 scheduler 可以和 CMWQ 機制掛勾並為其提供特殊的處理。
```cpp
woke_up:
raw_spin_lock_irq(&pool->lock);
/* am I supposed to die? */
if (unlikely(worker->flags & WORKER_DIE)) {
raw_spin_unlock_irq(&pool->lock);
set_pf_worker(false);
set_task_comm(worker->task, "kworker/dying");
ida_free(&pool->worker_ida, worker->id);
worker_detach_from_pool(worker);
WARN_ON_ONCE(!list_empty(&worker->entry));
kfree(worker);
return 0;
}
worker_leave_idle(worker);
```
`woke_up` tag 下的流程是 worker thread 重新啟動時需進行的,包含 `WORKER_DIE` 被設置時需要將其從 pool 中移除,並回收此 worker。
若非為了結束該 worker 而喚醒之,那麼 [`worker_leave_idle`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1837) 將自己的狀態設為非 idle,並把自己從對應 worker_pool 的 idle list 中移除出來。
```cpp
recheck:
/* no more worker necessary? */
if (!need_more_worker(pool))
goto sleep;
/* do we need to manage? */
if (unlikely(!may_start_working(pool)) && manage_workers(worker))
goto recheck;
/*
* ->scheduled list can only be filled while a worker is
* preparing to process a work or actually processing it.
* Make sure nobody diddled with it while I was sleeping.
*/
WARN_ON_ONCE(!list_empty(&worker->scheduled));
```
`recheck` 會進行一系列的檢查以動態調整 worker thread。
[`need_more_worker`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L806) 根據待解的 work 數量和已經 running 的 worker 數量(`nr_running`),決定此 worker thread 是否也需被啟動,又或者返回 sleep 狀態。
如果沒有 idle worker ([`!may_start_working()`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L812)),會判斷此 worker thread 是否需要作為 manager 以 [`manage_workers`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L2268) 來建立 idle thread。
被喚醒的 worker thread 之 `scheduled` list 不應該有任何的 work,若發生則這裡 `WARN_ON_ONCE` 對此異常做出警告。
```cpp
/*
* Finish PREP stage. We're guaranteed to have at least one idle
* worker or that someone else has already assumed the manager
* role. This is where @worker starts participating in concurrency
* management if applicable and concurrency management is restored
* after being rebound. See rebind_workers() for details.
*/
worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
```
[`worker_clr_flags`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1006) 清除 `WORKER_PREP` 和 `WORKER_REBOUND` 兩個 flag。`WORKER_PREP` 表示 worker thread 是準備可以執行 work 的狀態,清除之意外這 worker thread 將開始挑選 work 來處理。`WORKER_REBOUND` 則與 [`rebind_workers`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L5225) 有關。
如果清除這些 flag 使得 worker 不再具有 `WORKER_NOT_RUNNING`,同時也將 pool 的 `nr_running` + 1。
:::info
```cpp
WORKER_NOT_RUNNING = WORKER_PREP |
WORKER_CPU_INTENSIVE |
WORKER_UNBOUND |
WORKER_REBOUND;
```
:::
```cpp
do {
struct work_struct *work =
list_first_entry(&pool->worklist,
struct work_struct, entry);
pool->watchdog_ts = jiffies;
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(pool));
worker_set_flags(worker, WORKER_PREP);
```
這裡是 worker thread 挑選 work item 以進行的流程。具體所為是從對應的 worker_pool 之 linked list `pool->worklist` 上一個個取得 `work` 來處裡,直到所有 work 都被完成。( [`!keep_working()`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L818),但每個 pool 會留下至少一個 worker thread)。
對於每個 `work_struct`,如果該其 data bitmask 未設置 `WORK_STRUCT_LINKED`,這表示 work 與 work 之間沒有依賴關係,可以直接以 `process_one_work` 來執行該 work 所想執行的內容即可。反之則表示 work 之間存在依賴關係,[`move_linked_works`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1089) 會將一系列標註 `WORK_STRUCT_LINKED` 的 work 移轉到另一個 list `worker->scheduled`,並按次序執行再用 `process_one_work` 去完成該 list 上的 work。換言之,work 間沒有依賴關係的情形就用優化的 code path 來進行即可。
返回 sleep 狀態情形下,用 [`worker_set_flags`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L981) 去設置回 `WORKER_PREP`。
```cpp
sleep:
/*
* pool->lock is held and there's no work to process and no need to
* manage, sleep. Workers are woken up only while holding
* pool->lock or from local cpu, so setting the current state
* before releasing pool->lock is enough to prevent losing any
* event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_IDLE);
raw_spin_unlock_irq(&pool->lock);
schedule();
goto woke_up;
}
```
與 [`worker_leave_idle`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1837) 相對,[`worker_enter_idle`](https://elixir.bootlin.com/linux/v6.4/source/kernel/workqueue.c#L1804) 根據 worker 回到 idle 的情況設置對應的狀態,並安排下次被喚醒的時間。
### 初始化
workqueue_init -> create_worker -> worker_thread
### 加入新任務至 workqueue
`struct workqueue_struct`
## Reference
* [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
* [Concurrency Managed Workqueue之(二):CMWQ概述](http://www.wowotech.net/irq_subsystem/cmwq-intro.html)