<style> /* code要素の設定。バッククォーテーションで囲まれた要素に対する設定。 */ .markdown-body code { font-size: 1em; font-family: monospace; color: lightslategray !important; background-color: transparent !important; } /* codeの前後に挿入される空白を消す */ .markdown-body code::before, .markdown-body code::after { content: ""; } /* code block */ .markdown-body pre code { color: black !important; } /* リンクの見た目 */ .markdown-body a { color: inherit; /* 色は本文と同じ */ text-decoration: underline dashed #BDBDBD 0.1em; /* 下線のスタイル */ text-underline-offset: 0.3em; /* 下線の位置の調整 */ } /* リンク要素の上にマウスが乗ったときの見た目 */ .markdown-body a:hover { text-decoration: underline dashed #E080A0 0.1em; text-underline-offset: 0.3em; } /* 下線の設定。++で囲まれた要素につく下線のスタイルを設定する。 */ ins { text-decoration: underline solid black 0.05em; text-underline-offset: 0.25em; } p { font-family: Roboto Condensed; } .markdown-body mark { padding: 0; background: linear-gradient(transparent 60%, #e3d7a3 60%); } </style> # Centralized型 Centralized型スケジューラの実装のポイントを最初に見てみる。 * agentスレッドは複数存在しているものの、Global AgentとSatellite Agentに分類され、Global Agentのみがスケジューリングを行う。 * ある時点を見ると、Global Agentは1つしか存在していない。 * そのため、スケジューラの実装では、アトミック変数によって効率的にスケジューラを実装できる。 Per-CPU型のチュートリアルでスケジューラ実装の勘所はつかめたと思うので、ここでは最初から動くスケジューラを実装する。前提として、基本的なデータ構造(`TutorialXXX`という名前のクラス)についてはPer-CPU型と同じ名前で実装することにする。 まず、`TutorialTaskState`クラスと`TurorialTask`クラスに関しては、Per-CPU型のものを再利用する。 `TutorialRq`の実装は、Mutexによる排他処理をなくした実装にした。後ほど詳しく見ていくが、++`TutorialRq`へのアクセスが競合することがないようになっているため、排他処理が不要++になっている。つまり、`TutorialRq`は`deque<TutorialTask *>`のただのラッパークラスである。 `TutorialScheduler`の実装らへんから大きく変わってくる。  まず、このクラスが持つフィールドは以下のようになっている。スケジューラがCPUごとに管理するデータはPer-CPU型ほど多くはない。そのため`TutorialCpuState`クラスは以下の実装のように2つのフィールドしか持たない実装になっている。  メンバ変数`global_cpu_`は、現在のGlobal CPUの値を保持する。このメンバ変数は**アトミック変数**で定義されており、Global CPUの切り替え処理などでアトミックな書き換え操作が行われる。詳しくは後述する。  Centralized型はメッセージキューを1つしか持たない。そのため、`LocalChannel`のインスタンスは1つだけ持っておけば良い。 ```cpp struct TutorialCpuState { TutorialTask *current = nullptr; Agent *agent = nullptr; }; class TutorialScheduler : public BasicDispatchScheduler<TutorialTask> { ... private: // CPUごとのTutorialCpuStateインスタンス。 TutorialCpuState cpu_states_[MAX_CPUS]; // 現在のグローバルCPU。 // この値に対応するagentがGlobal Agentとなり、スケジューラの処理を行っていく。 // Global CPUで他のCFSタスクなどが実行可能状態になった場合、スケジューラは // Global CPUを他のCPUに移動することでCPUを明け渡すことになっている。 // Global CPUの切り替わり処理は、PickNextGlobalCpuIdで行われる。 std::atomic<int> global_cpu_; // ただ1つのメッセージキュー。 // Centralized型の場合はメッセージキューは1つしか使わない。 LocalChannel global_channel_; // 実行可能キュー。ここに入っているタスクは、状態が実行可能状態となる。 TutorialRq rq_; }; ``` `global_cpu_`の使われ方を見ていく。まずは、`TutorialAgent::AgentThread`の実装について見ていく。このメンバ変数はprivateなので、`TutorialAgent`からアクセスされるときは`GetGlobalCpuId`を介してアクセスされることに注意してほしい。  各agentは、Global CPUと自分自身のCPUを比べて異なる場合は`LocalYield`する。一致した場合は(優先度がブーストされていない限り)`GlobalSchedule`関数を呼び、スケジューラの仕事を行う。 ```cpp void AgentThread() final { ... auto req = enclave()->GetRunRequest(cpu()); while (!Finished()) { // ループの最初にagentのバリア値を読み出しておく必要がある。 // (少なくともGetGlobalCpuIdの実行よりは先に!) BarrierToken agent_barrier = barrier(); // 現在のglobal cpuの値を見て、自分がSatellite Agentなのか、 // Global Agentなのか、を確認し、それぞれの処理を行う。 if (cpu().id() != global_scheduler_->GetGlobalCpuId()) { // Satellite Agent req->LocalYield(agent_barrier, /* flags */ 0); } else { // Global Agent // 優先度がboostされていれば、GlobalCPUを切り替える。 // this cpuを使いたがっているタスクにCPUを明け渡し、スケジューラは // 別のCPUに移動する、というイメージ。 if (boosted_priority()) { // グローバルCPUを切り替える global_scheduler_->PickNextGlobalCpuId(); } else { global_scheduler_->GlobalSchedule(agent_barrier); } } } } ``` きちんとアトミック操作が適切に行われていれば`GlobalSchedule`を呼び出すスレッドは任意の時点で1つに限られる。この関数の実装は後で詳しく見ていく。 さて、agentの優先度がブーストされていた場合の処理を見ていく。その場合は、++同CPU上に他のCFSタスクなどが実行可能状態で存在しているため、agentはCPUを明け渡す必要がある++。Centralized型の実装では、この処理を**Global CPUを切り替える**ことによって行う。Global CPUの切り替え処理は`PickNextGlobalCpuId`を呼び出すことで行われるように実装した。Global CPUを切り替えたagentはループの先頭に戻り、`global_cpu_`の値を再度確認することでCPUをYieldする、という流れになっている。 この関数の実装は簡易的なものにとどめてある(チュートリアルなので)。ここで実装しているアルゴリズムは、++CPU番号の小さい方から順番に走査していき、最初に見つけた利用可能なCPUを`global_cpu_`にセットする++、というもの。CPUが利用可能かどうか、についてはステータスワードを確認している。 ```cpp Cpu PickNextGlobalCpuId() { // CPUをリストの先頭から見ていき、最初に見つけられた使用可能なCPUを // 次のグローバルCPUとして設定する。この実装は愚直な実装であり、 // 本来はハイパースレッディングの相手➙L3キャッシュを共有しているCPU➙ // NUMAノードを共有しているCPU、といった順に検索をかけるべきである。 redo: for (auto cpu: cpus()) { if (Available(cpu)) { // グローバルCPUをセットしてからPingを送る。 SetGlobalCpuId(cpu.id()); enclave()->GetAgent(cpu)->Ping(); return cpu; } } goto redo; } ``` コメントにも書いてあるが、`SetGlobalCpuId`の呼び出しと、`Ping`の呼び出しの順序を入れ替えるとうまく動作しない。ここについても後述することにする。 `GlobalSchedule`の実装を見ていく。この関数は同時刻に1つのスレッドからしか呼び出されていない、ということに注意して見てほしい。  `GlobalSchedule`の実装はいくつかの段階に分けて見ることができる。具体的には、以下の5段階に分けられる。 * メッセージ処理 * 空いているCPUの確認 * 空いているCPUそれぞれにタスクを割り振る * すべてのトランザクションをまとめてコミット * コミットの結果を確認し、適切な処理を行う 順番にこれらの処理を見ていく。 まず、メッセージ処理はこれまでと同様である。 ```cpp void TutorialScheduler::GlobalSchedule(const BarrierToken &agent_barrier) { // メッセージの処理 Message msg; while (!(msg = global_channel_.Peek()).empty()) { DispatchMessage(msg); global_channel_.Consume(msg); } ``` 空いているCPUの確認は、管理しているCPUを順番に見ていき利用可能なものを`available`という`CpuList`に加えていく。`available`に追加される条件としては、そのCPUにCFSタスクなどの他のスケジューリングポリシーのタスクが存在しないこと、ghOStタスクが実行中でないこと、などがある。ここらへんの実装は採用するスケジューリングアルゴリズムによって変わってくるだろう。 ```cpp // スケジュール可能なCPUを選ぶ。 CpuList available = MachineTopology()->EmptyCpuList(); for (const Cpu &cpu: cpus()) { // グローバルCPUはグローバルagentが使用中のため、ghOStタスクをスケジュール // することはできない。 if (cpu.id() == GetGlobalCpuId()) continue; // 他のスケジューリングクラスのタスクが存在しているCPUには、ghOStタスクを // スケジュールすることはできない。 if (!Available(cpu)) continue; // もし、まだタスクが実行中だったら、タスクを切り替えることはしない。 // タスクが実行中でない、とは、cs->current==nullptrのときをいう。 auto cs = cpu_state(cpu); if (cs->current) { continue; } available.Set(cpu); } ``` `available`にCPUをセットし終えたら、それぞれのCPUに対しタスクを割り当てていく。とはいっても、++ここではトランザクションの内容を書き込むだけ++であって、まだコミットはしないので注意。`available`のすべてのCPUにタスクが割り振られるとは限らない(`rq_`内のタスクの数が少なかった場合など)ため、実際にタスクを割り当てたCPUのリストを`assigned`で管理している。 ```cpp // availableなCPUそれぞれのトランザクションを作成する。 CpuList assigned = MachineTopology()->EmptyCpuList(); // 各CPUのnext taskを選び、トランザクションを発行する。 while (!available.Empty()) { // 実行可能タスクが存在しなければ終了 if (rq_.Empty()) break; // nextタスクを実行可能キューから取り出す。 TutorialTask *next = rq_.PopFront(); CHECK(next->runnable()); CHECK_EQ(next->GetCpu(), -1); // nextタスクを割り当てるCPUを選択する。 const Cpu& next_cpu = available.Front(); TutorialCpuState* cs = cpu_state(next_cpu); CHECK_EQ(cs->current, nullptr); // cs->currentにnextをセットし、トランザクションをopenする。 // ※ トランザクションはまだcommitしない。あとでまとめて行う。 cs->current = next; available.Clear(next_cpu); assigned.Set(next_cpu); RunRequest* req = enclave()->GetRunRequest(next_cpu); req->Open({ .target = next->gtid, .target_barrier = next->seqnum, .commit_flags = COMMIT_AT_TXN_COMMIT}); } ``` トランザクションのコミットを行う。 ```cpp // まとめてコミットする。 if (!assigned.Empty()) { enclave()->CommitRunRequests(assigned); } ``` コミットの結果をフィードバックする。 ```cpp // コミットの結果を見て処理を行う。 for (const Cpu& next_cpu : assigned) { TutorialCpuState* cs = cpu_state(next_cpu); RunRequest* req = enclave()->GetRunRequest(next_cpu); if (req->succeeded()) { // コミットに成功した場合、晴れてcs->currentが実行状態になる。 cs->current->SetState(TutorialTaskState::kRunning); cs->current->SetCpu(next_cpu.id()); } else { // コミットに失敗した場合、cs->currentを実行可能キューに戻す必要がある。 rq_.PushFront(cs->current); cs->current = nullptr; } } } ``` `GlobalSchedule`の実装は以上である。この関数と連携して、各メッセージのコールバック関数も実装する必要があるが、Per-CPU型と実装はそこまで変わらないので割愛する。詳しくはコミットの内容を参照してほしい。 その他のデータ構造で変わったこととしては、`TutorialScheduler`のインスタンスを作成するとき、タスクアロケータは`SingleThreadMallocTaskAllocator`を使う、ということ。理由は上述したのと同じ。 ```cpp TutorialFullAgent(AgentConfig &config) : FullAgent(config) { auto allocator = std::make_shared<SingleThreadMallocTaskAllocator<TutorialTask>>(); global_scheduler_ = std::make_unique<TutorialScheduler>(&enclave_, *enclave_.cpus(), std::move(allocator)); StartAgentTasks(); enclave_.Ready(); } ``` 対応するコミット ➙ [リンク](https://github.com/ruth561/ghost-userspace/commit/b43fbf8d95af9a9722109d7f129df6615b0ff6d6) ### `global_cpu_`の扱いについて Centralized型の実装はある時点でGlobal Agentが複数存在しないようにする必要がある。今回の実装ではアトミック変数によって実現されていた。この変数を操作する関数は`GetGlobalCpuId`と`SetGlobalCpuId`であり、それぞれ以下のようなメモリオーダーでアクセスがされている。 ```cpp int GetGlobalCpuId() { return global_cpu_.load(std::memory_order_acquire); // Acquire } void SetGlobalCpuId(int next_cpu) { global_cpu_.store(next_cpu, std::memory_order_release); // Release } ``` `global_cpu_`を変更するとき、`SetGlobalCpuId(next_cpu)`を実行したあと、そのCPUに対応するagentに`Ping`を送る(これは`PickNextGlobalCpuId`の実装内部で行われる)。`Ping`を受け取ったagentは実行を再開し、Global Agentとしてスケジューラの仕事を行う。 `AgentThread`のループの中身を見てみる。agentのバリアを読み出したあと、`global_cpu_`の値を確認し、それが自分自身と異なっていた場合は++読み出したバリアによって`LocalYield`を行っている++。もし、`global_cpu_`を読み出してから`LocalYield`を行うまでの間に`Ping`が送られてきたら、カーネル側でagentのバリアがインクリメントされるため、++`LocalYield`が失敗する++ようになっている。そのため、++適切に処理を行えば`LocalYield`と`Ping`がブッキングすることはない++。 ```cpp ... while (!Finished()) { BarrierToken agent_barrier = barrier(); if (cpu().id() != global_scheduler_->GetGlobalCpuId()) { // Satellite Agent req->LocalYield(agent_barrier, /* flags */ 0); } else { // Global Agent ... } } ... ``` この実装を例えば以下のようにすると、どのagentスレッドもスリープ状態になる可能性があるので注意である。 ```cpp ... while (!Finished()) { if (cpu().id() != global_scheduler_->GetGlobalCpuId()) { // Satellite Agent // global_cpu_を読み出したあとにagentのバリアを読み出す req->LocalYield(barrier(), /* flags */ 0); } else { // Global Agent ... } } ... ```