<style>
/* code要素の設定。バッククォーテーションで囲まれた要素に対する設定。 */
.markdown-body code {
font-size: 1em;
font-family: monospace;
color: lightslategray !important;
background-color: transparent !important;
}
/* codeの前後に挿入される空白を消す */
.markdown-body code::before,
.markdown-body code::after { content: ""; }
/* code block */
.markdown-body pre code {
color: black !important;
}
/* リンクの見た目 */
.markdown-body a {
color: inherit; /* 色は本文と同じ */
text-decoration: underline dashed #BDBDBD 0.1em; /* 下線のスタイル */
text-underline-offset: 0.3em; /* 下線の位置の調整 */
}
/* リンク要素の上にマウスが乗ったときの見た目 */
.markdown-body a:hover {
text-decoration: underline dashed #E080A0 0.1em;
text-underline-offset: 0.3em;
}
/* 下線の設定。++で囲まれた要素につく下線のスタイルを設定する。 */
ins {
text-decoration: underline solid black 0.05em;
text-underline-offset: 0.25em;
}
p {
font-family: Roboto Condensed;
}
.markdown-body mark {
padding: 0;
background: linear-gradient(transparent 60%, #e3d7a3 60%);
}
</style>
# Centralized型
Centralized型スケジューラの実装のポイントを最初に見てみる。
* agentスレッドは複数存在しているものの、Global AgentとSatellite Agentに分類され、Global Agentのみがスケジューリングを行う。
* ある時点を見ると、Global Agentは1つしか存在していない。
* そのため、スケジューラの実装では、アトミック変数によって効率的にスケジューラを実装できる。
Per-CPU型のチュートリアルでスケジューラ実装の勘所はつかめたと思うので、ここでは最初から動くスケジューラを実装する。前提として、基本的なデータ構造(`TutorialXXX`という名前のクラス)についてはPer-CPU型と同じ名前で実装することにする。
まず、`TutorialTaskState`クラスと`TurorialTask`クラスに関しては、Per-CPU型のものを再利用する。
`TutorialRq`の実装は、Mutexによる排他処理をなくした実装にした。後ほど詳しく見ていくが、++`TutorialRq`へのアクセスが競合することがないようになっているため、排他処理が不要++になっている。つまり、`TutorialRq`は`deque<TutorialTask *>`のただのラッパークラスである。
`TutorialScheduler`の実装らへんから大きく変わってくる。
まず、このクラスが持つフィールドは以下のようになっている。スケジューラがCPUごとに管理するデータはPer-CPU型ほど多くはない。そのため`TutorialCpuState`クラスは以下の実装のように2つのフィールドしか持たない実装になっている。
メンバ変数`global_cpu_`は、現在のGlobal CPUの値を保持する。このメンバ変数は**アトミック変数**で定義されており、Global CPUの切り替え処理などでアトミックな書き換え操作が行われる。詳しくは後述する。
Centralized型はメッセージキューを1つしか持たない。そのため、`LocalChannel`のインスタンスは1つだけ持っておけば良い。
```cpp
struct TutorialCpuState {
TutorialTask *current = nullptr;
Agent *agent = nullptr;
};
class TutorialScheduler : public BasicDispatchScheduler<TutorialTask> {
...
private:
// CPUごとのTutorialCpuStateインスタンス。
TutorialCpuState cpu_states_[MAX_CPUS];
// 現在のグローバルCPU。
// この値に対応するagentがGlobal Agentとなり、スケジューラの処理を行っていく。
// Global CPUで他のCFSタスクなどが実行可能状態になった場合、スケジューラは
// Global CPUを他のCPUに移動することでCPUを明け渡すことになっている。
// Global CPUの切り替わり処理は、PickNextGlobalCpuIdで行われる。
std::atomic<int> global_cpu_;
// ただ1つのメッセージキュー。
// Centralized型の場合はメッセージキューは1つしか使わない。
LocalChannel global_channel_;
// 実行可能キュー。ここに入っているタスクは、状態が実行可能状態となる。
TutorialRq rq_;
};
```
`global_cpu_`の使われ方を見ていく。まずは、`TutorialAgent::AgentThread`の実装について見ていく。このメンバ変数はprivateなので、`TutorialAgent`からアクセスされるときは`GetGlobalCpuId`を介してアクセスされることに注意してほしい。
各agentは、Global CPUと自分自身のCPUを比べて異なる場合は`LocalYield`する。一致した場合は(優先度がブーストされていない限り)`GlobalSchedule`関数を呼び、スケジューラの仕事を行う。
```cpp
void AgentThread() final {
...
auto req = enclave()->GetRunRequest(cpu());
while (!Finished()) {
// ループの最初にagentのバリア値を読み出しておく必要がある。
// (少なくともGetGlobalCpuIdの実行よりは先に!)
BarrierToken agent_barrier = barrier();
// 現在のglobal cpuの値を見て、自分がSatellite Agentなのか、
// Global Agentなのか、を確認し、それぞれの処理を行う。
if (cpu().id() != global_scheduler_->GetGlobalCpuId()) { // Satellite Agent
req->LocalYield(agent_barrier, /* flags */ 0);
} else { // Global Agent
// 優先度がboostされていれば、GlobalCPUを切り替える。
// this cpuを使いたがっているタスクにCPUを明け渡し、スケジューラは
// 別のCPUに移動する、というイメージ。
if (boosted_priority()) {
// グローバルCPUを切り替える
global_scheduler_->PickNextGlobalCpuId();
} else {
global_scheduler_->GlobalSchedule(agent_barrier);
}
}
}
}
```
きちんとアトミック操作が適切に行われていれば`GlobalSchedule`を呼び出すスレッドは任意の時点で1つに限られる。この関数の実装は後で詳しく見ていく。
さて、agentの優先度がブーストされていた場合の処理を見ていく。その場合は、++同CPU上に他のCFSタスクなどが実行可能状態で存在しているため、agentはCPUを明け渡す必要がある++。Centralized型の実装では、この処理を**Global CPUを切り替える**ことによって行う。Global CPUの切り替え処理は`PickNextGlobalCpuId`を呼び出すことで行われるように実装した。Global CPUを切り替えたagentはループの先頭に戻り、`global_cpu_`の値を再度確認することでCPUをYieldする、という流れになっている。
この関数の実装は簡易的なものにとどめてある(チュートリアルなので)。ここで実装しているアルゴリズムは、++CPU番号の小さい方から順番に走査していき、最初に見つけた利用可能なCPUを`global_cpu_`にセットする++、というもの。CPUが利用可能かどうか、についてはステータスワードを確認している。
```cpp
Cpu PickNextGlobalCpuId() {
// CPUをリストの先頭から見ていき、最初に見つけられた使用可能なCPUを
// 次のグローバルCPUとして設定する。この実装は愚直な実装であり、
// 本来はハイパースレッディングの相手➙L3キャッシュを共有しているCPU➙
// NUMAノードを共有しているCPU、といった順に検索をかけるべきである。
redo:
for (auto cpu: cpus()) {
if (Available(cpu)) {
// グローバルCPUをセットしてからPingを送る。
SetGlobalCpuId(cpu.id());
enclave()->GetAgent(cpu)->Ping();
return cpu;
}
}
goto redo;
}
```
コメントにも書いてあるが、`SetGlobalCpuId`の呼び出しと、`Ping`の呼び出しの順序を入れ替えるとうまく動作しない。ここについても後述することにする。
`GlobalSchedule`の実装を見ていく。この関数は同時刻に1つのスレッドからしか呼び出されていない、ということに注意して見てほしい。
`GlobalSchedule`の実装はいくつかの段階に分けて見ることができる。具体的には、以下の5段階に分けられる。
* メッセージ処理
* 空いているCPUの確認
* 空いているCPUそれぞれにタスクを割り振る
* すべてのトランザクションをまとめてコミット
* コミットの結果を確認し、適切な処理を行う
順番にこれらの処理を見ていく。
まず、メッセージ処理はこれまでと同様である。
```cpp
void TutorialScheduler::GlobalSchedule(const BarrierToken &agent_barrier) {
// メッセージの処理
Message msg;
while (!(msg = global_channel_.Peek()).empty()) {
DispatchMessage(msg);
global_channel_.Consume(msg);
}
```
空いているCPUの確認は、管理しているCPUを順番に見ていき利用可能なものを`available`という`CpuList`に加えていく。`available`に追加される条件としては、そのCPUにCFSタスクなどの他のスケジューリングポリシーのタスクが存在しないこと、ghOStタスクが実行中でないこと、などがある。ここらへんの実装は採用するスケジューリングアルゴリズムによって変わってくるだろう。
```cpp
// スケジュール可能なCPUを選ぶ。
CpuList available = MachineTopology()->EmptyCpuList();
for (const Cpu &cpu: cpus()) {
// グローバルCPUはグローバルagentが使用中のため、ghOStタスクをスケジュール
// することはできない。
if (cpu.id() == GetGlobalCpuId())
continue;
// 他のスケジューリングクラスのタスクが存在しているCPUには、ghOStタスクを
// スケジュールすることはできない。
if (!Available(cpu))
continue;
// もし、まだタスクが実行中だったら、タスクを切り替えることはしない。
// タスクが実行中でない、とは、cs->current==nullptrのときをいう。
auto cs = cpu_state(cpu);
if (cs->current) {
continue;
}
available.Set(cpu);
}
```
`available`にCPUをセットし終えたら、それぞれのCPUに対しタスクを割り当てていく。とはいっても、++ここではトランザクションの内容を書き込むだけ++であって、まだコミットはしないので注意。`available`のすべてのCPUにタスクが割り振られるとは限らない(`rq_`内のタスクの数が少なかった場合など)ため、実際にタスクを割り当てたCPUのリストを`assigned`で管理している。
```cpp
// availableなCPUそれぞれのトランザクションを作成する。
CpuList assigned = MachineTopology()->EmptyCpuList();
// 各CPUのnext taskを選び、トランザクションを発行する。
while (!available.Empty()) {
// 実行可能タスクが存在しなければ終了
if (rq_.Empty())
break;
// nextタスクを実行可能キューから取り出す。
TutorialTask *next = rq_.PopFront();
CHECK(next->runnable());
CHECK_EQ(next->GetCpu(), -1);
// nextタスクを割り当てるCPUを選択する。
const Cpu& next_cpu = available.Front();
TutorialCpuState* cs = cpu_state(next_cpu);
CHECK_EQ(cs->current, nullptr);
// cs->currentにnextをセットし、トランザクションをopenする。
// ※ トランザクションはまだcommitしない。あとでまとめて行う。
cs->current = next;
available.Clear(next_cpu);
assigned.Set(next_cpu);
RunRequest* req = enclave()->GetRunRequest(next_cpu);
req->Open({
.target = next->gtid,
.target_barrier = next->seqnum,
.commit_flags = COMMIT_AT_TXN_COMMIT});
}
```
トランザクションのコミットを行う。
```cpp
// まとめてコミットする。
if (!assigned.Empty()) {
enclave()->CommitRunRequests(assigned);
}
```
コミットの結果をフィードバックする。
```cpp
// コミットの結果を見て処理を行う。
for (const Cpu& next_cpu : assigned) {
TutorialCpuState* cs = cpu_state(next_cpu);
RunRequest* req = enclave()->GetRunRequest(next_cpu);
if (req->succeeded()) {
// コミットに成功した場合、晴れてcs->currentが実行状態になる。
cs->current->SetState(TutorialTaskState::kRunning);
cs->current->SetCpu(next_cpu.id());
} else {
// コミットに失敗した場合、cs->currentを実行可能キューに戻す必要がある。
rq_.PushFront(cs->current);
cs->current = nullptr;
}
}
}
```
`GlobalSchedule`の実装は以上である。この関数と連携して、各メッセージのコールバック関数も実装する必要があるが、Per-CPU型と実装はそこまで変わらないので割愛する。詳しくはコミットの内容を参照してほしい。
その他のデータ構造で変わったこととしては、`TutorialScheduler`のインスタンスを作成するとき、タスクアロケータは`SingleThreadMallocTaskAllocator`を使う、ということ。理由は上述したのと同じ。
```cpp
TutorialFullAgent(AgentConfig &config) : FullAgent(config) {
auto allocator = std::make_shared<SingleThreadMallocTaskAllocator<TutorialTask>>();
global_scheduler_ = std::make_unique<TutorialScheduler>(&enclave_,
*enclave_.cpus(),
std::move(allocator));
StartAgentTasks();
enclave_.Ready();
}
```
対応するコミット ➙ [リンク](https://github.com/ruth561/ghost-userspace/commit/b43fbf8d95af9a9722109d7f129df6615b0ff6d6)
### `global_cpu_`の扱いについて
Centralized型の実装はある時点でGlobal Agentが複数存在しないようにする必要がある。今回の実装ではアトミック変数によって実現されていた。この変数を操作する関数は`GetGlobalCpuId`と`SetGlobalCpuId`であり、それぞれ以下のようなメモリオーダーでアクセスがされている。
```cpp
int GetGlobalCpuId() {
return global_cpu_.load(std::memory_order_acquire); // Acquire
}
void SetGlobalCpuId(int next_cpu) {
global_cpu_.store(next_cpu, std::memory_order_release); // Release
}
```
`global_cpu_`を変更するとき、`SetGlobalCpuId(next_cpu)`を実行したあと、そのCPUに対応するagentに`Ping`を送る(これは`PickNextGlobalCpuId`の実装内部で行われる)。`Ping`を受け取ったagentは実行を再開し、Global Agentとしてスケジューラの仕事を行う。
`AgentThread`のループの中身を見てみる。agentのバリアを読み出したあと、`global_cpu_`の値を確認し、それが自分自身と異なっていた場合は++読み出したバリアによって`LocalYield`を行っている++。もし、`global_cpu_`を読み出してから`LocalYield`を行うまでの間に`Ping`が送られてきたら、カーネル側でagentのバリアがインクリメントされるため、++`LocalYield`が失敗する++ようになっている。そのため、++適切に処理を行えば`LocalYield`と`Ping`がブッキングすることはない++。
```cpp
...
while (!Finished()) {
BarrierToken agent_barrier = barrier();
if (cpu().id() != global_scheduler_->GetGlobalCpuId()) { // Satellite Agent
req->LocalYield(agent_barrier, /* flags */ 0);
} else { // Global Agent
...
}
}
...
```
この実装を例えば以下のようにすると、どのagentスレッドもスリープ状態になる可能性があるので注意である。
```cpp
...
while (!Finished()) {
if (cpu().id() != global_scheduler_->GetGlobalCpuId()) { // Satellite Agent
// global_cpu_を読み出したあとにagentのバリアを読み出す
req->LocalYield(barrier(), /* flags */ 0);
} else { // Global Agent
...
}
}
...
```