<style> u { text-decoration-color: gray; text-decoration-style: wavy; /* 波線 */ } </style> # 目標 ghost-userspace の実装で Agent クラスや FullAgent クラスの実装がイマイチよく分からなかった。なので、実際の使われ方を見て、これらのクラスの理解を目指す。 ghOStスケジューラの実装は、初期化処理と実際にスケジューリングする処理に分けて見ることができる。 # 初期化処理 ## main main関数で行うことは以下の3つ。2番目の処理は後で詳しく見ていく。 * コマンドライン引数をパースし AgentConfig を作成する。 * AgentProcess によって agent を実行状態にする。 * シグナルを検知したらプログラムを終了する。 ```cpp ABSL_FLAG(std::string, ghost_cpus, "1-5", "cpulist"); ABSL_FLAG(std::string, enclave, "", "Connect to preexisting enclave directory"); int main(int argc, char* argv[]) { // ① コマンドライン引数のパース absl::InitializeSymbolizer(argv[0]); absl::ParseCommandLine(argc, argv); // ② AgentConfigの作成 ghost::AgentConfig config; ghost::ParseAgentConfig(&config); printf("Initializing...\n"); // ③ 各CPUごとにagentを起動 //   agentを実行するに先立って必要な処理(enclaveの作成など)も全てここで行われる。 auto uap = new ghost::AgentProcess<ghost::FullFifoAgent<ghost::LocalEnclave>, ghost::AgentConfig>(config); // ④ シグナルハンドラの設定 ghost::GhostHelper()->InitCore(); printf("Initialization complete, ghOSt active.\n"); [ ... ] ghost::Notification exit; ghost::GhostSignals::AddHandler(SIGINT, [&exit](int) { static bool first = true; // We only modify the first SIGINT. if (first) { exit.Notify(); first = false; return false; // We'll exit on subsequent SIGTERMs. } return true; }); ghost::GhostSignals::AddHandler(SIGUSR1, [uap](int) { uap->Rpc(ghost::FifoScheduler::kDebugRunqueue); return false; }); // ⑤ シグナルハンドラが発火されるまでブロック exit.WaitForNotification(); delete uap; printf("\nDone!\n"); return 0; } ``` ## AgentProcess main 関数からの呼び出しは以下のようになっていた。 ```cpp // ③ 各CPUごとにagentを起動 //   agentを実行するに先立って必要な処理(enclaveの作成など)も全てここで行われる。 auto uap = new ghost::AgentProcess<ghost::FullFifoAgent<ghost::LocalEnclave>, ghost::AgentConfig>(config); ``` AgentProcess のコンストラクタでは、まず最初に fork を行い、子プロセス上で FullAgent インスタンスを生成する。 ```cpp explicit AgentProcess(AgentConfigType config) { // ① 親プロセスと子プロセス間の同期処理で必要なSharedBlobインスタンスの作成 sb_ = std::make_unique<SharedBlob>(); // ② fork agent_proc_ = std::make_unique<ForkedProcess>(config.stderr_fd_); if (!agent_proc_->IsChild()) { // ③ 親プロセスの場合はagentの起動処理が完了するまで待機 sb_->agent_ready_.WaitForNotification(); return; } ... // ④ 子プロセスの場合はFullAgentインスタンスを生成する //   このコンストラクタの内部でagentスレッドが生成される full_agent_ = std::make_unique<FullAgentType>(config); ... }; ``` :::info なぜ agent スレッドだけ別のプロセス上で実行するのか、という疑問については AgentProcess の実装に書かれたコメントを読むといい。 ::: ## FullFifoAgent FullAgent の派生クラスとして実装されたクラス。 クラス定義は以下のようになっている。 ```cpp template <class EnclaveType> class FullFifoAgent : public FullAgent<EnclaveType> { public: // ★ コンストラクタ explicit FullFifoAgent(AgentConfig config) : FullAgent<EnclaveType>(config) { ... } ... // ★ FIFOスケジューラ専用のAgentの派生クラスのインスタンスを生成する処理を実装 std::unique_ptr<Agent> MakeAgent(const Cpu& cpu) override { return std::make_unique<FifoAgent>(&this->enclave_, cpu, scheduler_.get()); } ... private: // ★ FullFifoAgentに対してスケジューラは1つ!! std::unique_ptr<FifoScheduler> scheduler_; }; ``` コンストラクタの実装を見ていく。 コンストラクタで行っている処理は大まかに以下の4つの処理に分けられる。 ```cpp explicit FullFifoAgent(AgentConfig config) : FullAgent<EnclaveType>(config) { // ① LocalEnclaveを作成 // ② スケジューラを作成 scheduler_ = MultiThreadedFifoScheduler(&this->enclave_, *this->enclave_.cpus()); // ③ 各CPUごとにagentスレッドを生成 this->StartAgentTasks(); // ④ enclaveに対しユーザー側の準備が完了したことを通知する this->enclave_.Ready(); } ``` それぞれの処理を見ていく。 ### ① LocalEnclaveの生成 FullAgent のコンストラクタで LocalEnclave のコンストラクタが呼ばれている。 ```cpp explicit FullAgent(AgentConfigType config) : enclave_(config) { GhostHelper()->InitCore(); } ``` LocalEnclave のコンストラクタを見ていく。 ```cpp LocalEnclave::LocalEnclave(AgentConfig config) : Enclave(config), dir_fd_(config.enclave_fd_) { // ① ghOStFS上のenclaveディレクトリと自身を紐付けたり、カーネル⇄ユーザー間のデータ //   共有設定をしたりする。 if (dir_fd_ == -1) { // 新しくenclaveを作る場合 CreateAndAttachToEnclave(); // ★ } else { // 既存のenclaveにアタッチする場合 AttachToExistingEnclave(); } // ② cpus_メンバの初期化を行う。 //   具体的には各CPUに対しRunRequestの設定をする。 BuildCpuReps(); if (config_.tick_config_ == CpuTickConfig::kAllTicks) { // ③ config_に指定されていれば、CPU_TICKメッセージを発行するようにする SetDeliverTicks(true); } // ④ BPF用の処理 CHECK_EQ(agent_bpf_init(), 0); } ``` ★の部分を詳しく見ていく。 ```cpp void LocalEnclave::CreateAndAttachToEnclave() { destroy_when_destructed_ = true; // ① /sys/fs/ghost/ctlに対し"create n 83"を書き込み、enclaveディレクトリを作成する ctl_fd_ = LocalEnclave::MakeNextEnclave(); // ② 作成されたenclave_nディレクトリのfdをdir_fd_にセットする dir_fd_ = LocalEnclave::GetEnclaveDirectory(ctl_fd_); // ③ 新しくenclaveを作るかに関係なく行う初期化処理 //   カーネルとユーザーで共有するcpu_dataのmmapとSW領域の設定を行う CommonInit(); // ④ enclave_dir/cpumaskに管理するCPUのリストを渡して設定する //   成功するまで何回か試す。 int cpumask_fd = openat(dir_fd_, "cpumask", O_RDWR); std::string cpumask = enclave_cpus_.CpuMaskStr(); constexpr int kMaxRetries = 10; int retries = 0; do { int ret = write(cpumask_fd, cpumask.c_str(), cpumask.length()); if (ret == cpumask.length()) { break; } absl::SleepFor(absl::Milliseconds(50)); } while (++retries < kMaxRetries); close(cpumask_fd); } ``` ③ の CommontInit は CreateAndAttachToEnclave と AttachToExistingEnclave の両方から呼び出される処理。カーネルとユーザーが共有するメモリの設定を行う。 ```cpp void LocalEnclave::CommonInit() { // ① enclave_n/cpu_dataファイルを読み書き可でopenする int data_fd = LocalEnclave::GetCpuDataRegion(dir_fd_); // ② cpu_dataをmmapする data_region_size_ = GetFileSize(data_fd); data_region_ = static_cast<ghost_cpu_data*>( mmap(/*addr=*/nullptr, data_region_size_, PROT_READ | PROT_WRITE, MAP_SHARED, data_fd, /*offset=*/0)); close(data_fd); // ③ SW領域も初期化する GhostHelper()->SetGlobalStatusWordTable( new LocalStatusWordTable(dir_fd_, /*id=*/0, config_.numa_node_)); } ``` ### ② スケジューラの作成 ```cpp std::unique_ptr<FifoScheduler> MultiThreadedFifoScheduler(Enclave* enclave, CpuList cpulist) { // ① スレッド安全なタスクアロケータを生成 auto allocator = std::make_shared<ThreadSafeMallocTaskAllocator<FifoTask>>(); // ② FifoSchedulerの生成(①のタスクアロケータはこれに必要) auto scheduler = std::make_unique<FifoScheduler>(enclave, std::move(cpulist), std::move(allocator)); return scheduler; } // ②の中身 FifoSchedulerのコンストラクタ FifoScheduler::FifoScheduler(Enclave* enclave, CpuList cpulist, std::shared_ptr<TaskAllocator<FifoTask>> allocator) : BasicDispatchScheduler(enclave, std::move(cpulist), std::move(allocator)) { for (const Cpu& cpu : cpus()) { // TODO: extend Cpu to get numa node. int node = 0; CpuState* cs = cpu_state(cpu); // ②-① CPUごとにLocalChannelを作成 cs->channel = enclave->MakeChannel(GHOST_MAX_QUEUE_ELEMS, node, MachineTopology()->ToCpuList({cpu})); if (!default_channel_) { // ②-② 最初に作成したLocalChannelをデフォルトチャンネルに設定する default_channel_ = cs->channel.get(); } } } ``` ### ③ 各CPUごとにAgentスレッドを生成 StartAgentTasks の実装を見ていく。 ```cpp void StartAgentTasks() { for (const Cpu& cpu : *enclave_.cpus()) { // ① CPUごとにAgentのインスタンスを作成する //   Agentは抽象基底クラスなのでMakeAgentで作成されるのは、その派生クラスである //   この実装は単純にFifoAgentのコンストラクタを実行するだけである agents_.push_back(MakeAgent(cpu)); // ② agentスレッドを生成する agents_.back()->StartBegin(); } for (auto& agent : agents_) { // ③ agentスレッドの起動が完了するまで待機する agent->StartComplete(); } } ``` ②の実装は以下のようになっている。 ```cpp void Agent::StartBegin() { thread_ = std::thread(&Agent::ThreadBody, this); } ``` ③ではready_メンバによって待機する。③のブロックを解放するには、agentスレッドからAgent::SignalReady()を発行する必要がある(FifoAgent::AgentThreadでもきちんと実装されている)。 ```cpp void Agent::StartComplete() { ready_.WaitForNotification(); } void SignalReady() { ready_.Notify(); } ``` ### ④ 初期化処理の最後(Enclave の処理) Readyの宣言に書いてあるコメントを要約してみる。 > Agent と Scheduler は相互依存しているため、ダイヤモンド型の初期化パターンを実装した。相互依存とは例えば、 > * Scheduler はスケジューリングするときに Agent の *status word* が必要。 > * Agent は対話をする Scheduler が必要。 > > のような依存関係のことである。 > このような依存関係を解消するために、以下の順序で初期化処理を行っていくことになっている。 > 1. 全ての Agent をカーネルにアタッチする > 2. Enclave->DerivedReady() > 3. Scheduler::EnclaveReady() を実行する > 4. Agent::EnclaveReady() を実行する Enclave::Ready() の実装を見ていく。 ```cpp void Enclave::Ready() { // ① Enclaveが管理する全てのCPUに対してAgentインスタンスが用意できるまで待機する //   agents_にインスタンスが追加されるのは、Agent::ThreadBodyの最後の方で //   enclave_->AttachAgent(cpu_, this) //   を呼び出したときである。 auto ready = [this]() { mu_.AssertHeld(); return agents_.size() == enclave_cpus_.Size(); }; mu_.LockWhen(absl::Condition(&ready)); // ② Encalveの派生クラスで実装された初期化処理を行う。 //   LocalEnclaveでは実装がなかったので、ここでは何も行わないと考えてよさそう。 DerivedReady(); // ③ デフォルトのChannelをカーネルに設定する。 if (!schedulers_.empty()) { // ここでif文を行っているが、schedulers_が空になることは基本的にはない。 // schedulerのないagentをテストするためにif文が追加されている CHECK(schedulers_.front()->GetDefaultChannel().SetEnclaveDefault()); } InsertBpfPrograms(); DisableMyBpfProgLoad(); // ⇣設計に関するコメント // On the topic of multithreaded Discovery: the main issue is that global // schedulers can't handle concurrent discovery and scheduling. For instance, // they use the SingleThreadMallocTaskAllocator and probably have other // scheduler-specific assumptions. If they ever can handle concurrency, then // we can move DiscoverTasks elsewhere - perhaps the schedulers themselves can // call DiscoverTasks from e.g. a non-global-agent. // // Even per-cpu agents can't easily handle concurrent Discovery. Consider the // lifetime of a Task. The only agent task that is allowed to muck with Task // is whoever handles the queue that the task is associated to. For example, // what happens when we receive a TaskDead? We want to call ~Task(). But you // can't do that if another thread - including the Discoverer - is accessing // it. So long as we have this assumption (queue owner -> can muck with // Task), we cannot do concurrent discovery. // // As another case, consider what happens when the Discoverer finds a task // that is in the process of departing while another thread is handling // messages. If the Discoverer calls ~Task(), then the thread that called // GetTask has an unreference-counted reference! If we knew that there were // *no* messages for the task, then we could call ~Task(). But we can't be // sure of that: it's possible that task was new and used our new agent's // default channel - so we'll eventually get those messages. // // As a final consideration, take EDF: we need to scrape the PrioTable before // scheduling. It's easiest to scrape after discovery completes, so that // scheduler won't practically be able to schedule during discovery. // // Without redesigning Tasks and the way schedulers work, we'll have to stick // to non-concurrent discovery and scheduling. We'd probably need something // like a kref and RCU too. // // Right now we're single threaded. All Agents are in WaitForEnclaveReady, // which is triggered by agent->EnclaveReady() below (not // scheduler->EnclaveReady()). That means we can't schedule until Discovery // is complete, which may take time. for (auto scheduler : schedulers_) scheduler->DiscoverTasks(); for (auto scheduler : schedulers_) scheduler->EnclaveReady(); // enclaveの初期化が完了したことを各agentに連絡する // この連絡をするまでagentはスケジューリングを開始しない for (const Cpu& cpu : enclave_cpus_) { Agent* agent = GetAgent(cpu); agent->EnclaveReady(); } mu_.Unlock(); // ユーザー空間側の準備が整ったことをカーネル側に連絡する AdvertiseOnline(); } ``` ## FifoAgent FifoAgent の初期化処理を見ていく。 まず、[ここ](#③-各CPUごとにAgentスレッドを生成)で生成された Agent スレッドは LocalAgent::ThreadBody の処理を行う。 ```cpp void LocalAgent::ThreadBody() { int queue_fd; Scheduler* s = AgentScheduler(); if (!s) { // テストケース用 queue_fd = -1; } else { // ① agentのキューに対応するfdを取得 queue_fd = s->GetAgentChannel(cpu_).GetFd(); } // ② プロセスの名前を"ap_task_<cpu>"にセット。 //   ここで設定された値は /proc/<pid>/comm から確認することができる。 CHECK_EQ(prctl(PR_SET_NAME, absl::StrCat("ap_task_", cpu().id()).c_str()), 0); gtid_ = Gtid::Current(); // ③ 以前のagentがCPUから関連付けられなくなるまで待機(agent_onlineが0になるまで待機) enclave_->WaitForOldAgent(); int ret; do { // ④ enclave_n/ctlファイルに"become agent <cpu> <qfd>"を書き込むことで //   自スレッドをagentとしてカーネルに登録する。 //   登録に成功したらアフィニティの設定も行う(TODO:)。 ret = GhostHelper()->SchedAgentEnterGhost(enclave_->GetCtlFd(), cpu_, queue_fd); } while (ret && errno == EBUSY); // ⑤ agentのghost_sw_infoを取得する status_word_ = LocalStatusWord(StatusWord::AgentSW{}); // ⑥ Enclave.agent_にthisをプッシュする enclave_->AttachAgent(cpu_, this); // ⑦ agentのメイン処理へ(LocalAgentの派生クラスで実装される) AgentThread(); WaitForExitNotification(); } ``` FifoAgent に実装されている AgentThread をみる。 ```cpp void FifoAgent::AgentThread() { ... // ① agentの準備ができたことをmainスレッドに伝える SignalReady(); // ② enclaveの準備ができるまで待機する //   enclaveのいくつかの初期化処理はagentの初期化処理に依存しているため、 //   この間にそれらの処理を行うことになっている。 WaitForEnclaveReady(); // ③ 初期化が完全に終了したのでスケジューリングを開始する(詳しくは後ほど) while (!Finished() || !scheduler_->Empty(cpu())) { ... } } ``` Enclave の初期化処理と Agent の初期化処理は共依存関係にある。そのため agent がスケジューリングを開始する前に、<u>両者の初期化が終了していることを互いに認知させる必要がある</u>。以下は enclave の初期化処理を行う main スレッドと agent のスレッドのシーケンス図である。 ![0A477EA6-134E-4785-B31D-D960429EFB91](https://hackmd.io/_uploads/SJXBiYeN6.jpg) ## まとめ ここまでの初期化処理の概要を以下に示す。 ![01C31834-9739-4459-BDA6-924342C4C8B6](https://hackmd.io/_uploads/ByQXc3KVa.jpg) # 初期化処理後の周期的実行 ## FifoAgent::AgentThread FifoAgent::AgentTread の実装の後半を見ていく。 ```cpp void FifoAgent::AgentThread() { ... // ひたすら無限ループで FifoScheduler::Schedule を呼び出している while (!Finished() || !scheduler_->Empty(cpu())) { // Scheduleを呼び出すときは、自CPUの番号を渡す scheduler_->Schedule(cpu(), status_word()); ... } } ``` ## FifoScheduler::Schedule FifoScheduler::Schedule の実装を見ていく。 ```cpp void FifoScheduler::Schedule(const Cpu& cpu, const StatusWord& agent_sw) { // ① agentのSWからbarrier値を取得する(後ほどコミット時に使う) BarrierToken agent_barrier = agent_sw.barrier(); CpuState* cs = cpu_state(cpu); // ② キューが空になるまでメッセージを取り出していき、各メッセージごとに処理を行う Message msg; while (!(msg = Peek(cs->channel.get())).empty()) { // ③ BasicDispatchSchedulerで実装されている関数 //   それぞれのメッセージタイプに応じたコールバック(TaskNewなど)が呼ばれる DispatchMessage(msg); // ④ キューからメッセージを取り出す Consume(cs->channel.get(), msg); } // ⑤ 到着していたメッセージの処理を全て完了したので、次のタスクを決める FifoSchedule(cpu, agent_barrier, agent_sw.boosted_priority()); } ``` ① でagentのバリア値を読み出しているが、これはコミットのときにカーネルに渡される。agentがスケジューリングの処理をしている間にカーネル側でメッセージが追加されるなどして、コミット時点での状態が最新のものではなくなる場合を検知できるようにするために使われる。 各メッセージの処理は後半でまとめる。 ## FifoScheduler::FifoSchedule FifoScheduler::FifoSchedule の実装を見ていく。 ```cpp void FifoScheduler::FifoSchedule(const Cpu& cpu, BarrierToken agent_barrier, bool prio_boost) { CpuState* cs = cpu_state(cpu); FifoTask* next = nullptr; // ① prio_boostされていないときはghOStスレッドをスケジュールすることができる。 //   prio_boostについて詳しくは後述する。 if (!prio_boost) { // ② 現在のタスクがまだ実行中ならそのまま処理を継続する(FIFOなので)。 next = cs->current; // ③ CPUが空き状態なら、キューの先頭からタスクを取り出し、それをnextに設定する。 if (!next) next = cs->run_queue.Dequeue(); } // ④ nextが存在する場合はコミットし、存在しない場合はYieldしてCPUを明け渡す RunRequest* req = enclave()->GetRunRequest(cpu); if (next) { while (next->status_word.on_cpu()) { // Pauseは、x86_64のpause命令を実行するだけの関数。 // スピンロック時のビジーループをCPUに対して伝達する役割を担う。 // これがあると、CPUは省電力になるらしい。 Pause(); } // nextをコミットするRunRequestを作成 req->Open({ .target = next->gtid, .target_barrier = next->seqnum, .agent_barrier = agent_barrier, .commit_flags = COMMIT_AT_TXN_COMMIT, }); // コミットする if (req->Commit()) { // コミットに成功したらnextがCPU上で実行状態になったということなので、 // スケジューラ側の状態を更新する。 TaskOnCpu(next, cpu); } else { // コミットに失敗 if (next == cs->current) { TaskOffCpu(next, /*blocked=*/false, /*from_switchto=*/false); } next->prio_boost = true; cs->run_queue.Enqueue(next); } } else { // ==== nextが存在しない場合 ==== // ④-① 実行可能なghOStスレッドが残っているにも関わらずprio_boostが要因で //   CPUを明け渡すときは、次にCPUがidle状態になるときにagentを起こすように //   要請する。 //    これが必要な理由としては、pick_next_taskのときにfairクラスがghostクラス //    を飛ばして勝手にidleスレッドをスケジュールしてしまったりするから int flags = 0; if (prio_boost && (cs->current || !cs->run_queue.Empty())) { flags = RTLA_ON_IDLE; } // ④-② agentはCPUを他のタスクに明け渡す //    このあとにスケジュールされるスレッドはghOSt以外のポリシーのスレッドである。 req->LocalYield(agent_barrier, flags); // ④-③ ここに到達するのはagentが再度実行状態になったときである } } ``` ## prio_boost について この関数の引数で渡されている prio_boost というブール値は、agent スレッドが優先度をブーストしてスケジュールされているかどうかを示している。**優先度がブーストされる**、とはどういうことか、についてはカーネル側の実装を見ていく必要がある。 カーネルでは ghOSt スレッドは IDLE スレッドの次に優先度が低く設定されている。そのため、通常 ghOSt スレッドは CFS スレッドなどによってプリエンプトされてしまう。しかし、agent に関しては CFS や RT のスレッドが存在していようとも、最高優先度でスケジュールされるようになっている。 このように **ghOSt ポリシーよりも優先度の高いスレッドが1つ以上存在している状況で agent スレッドがスケジュールされていることを「優先度がブーストされている」としている**のだ。 agent スレッドの優先度ブーストのイメージ図を以下に示す。 ![DD1BB8D8-F6B4-42F8-90F1-D33E23F4A025](https://hackmd.io/_uploads/B1S3ksE4T.jpg) :::info prio_boost かどうかは agent の SW で agent に公開される。 ここらへんの詳しい内容については、カーネル側の pick_next_ghost_agent 関数の実装を見るとよい。 ::: # メッセージごとの処理 ## TASK_NEW TASK_NEW メッセージは、<u>スレッドが新しく ghOSt スレッドとなったときに一番最初に発行されるメッセージ</u>である。TASK_NEW メッセージを受け取った BasicDispatchScheduler は、タスクアロケータによって FifoTask のインスタンスを作成し、引数の task に渡すことになっている。 ```cpp void FifoScheduler::TaskNew(FifoTask* task, const Message& msg) { // ① メッセージ内のペイロードを専用のデータ構造にキャスト const ghost_msg_payload_task_new* payload = static_cast<const ghost_msg_payload_task_new*>(msg.payload()); // ② taskの初期化を行う task->seqnum = msg.seqnum(); task->run_state = FifoTaskState::kBlocked; // ③ すでに実行可能状態のときはWAKEUP相当の処理をここで行ってしまう if (payload->runnable) { // ④ FIFOスケジューリングにおける状態をkRunnableに変更する task->run_state = FifoTaskState::kRunnable; // ⑤ taskを割り当てるCPUを探す Cpu cpu = AssignCpu(task); // ⑥ taskをcpuと関連付ける Migrate(task, cpu, msg.seqnum()); } else { // Wait until task becomes runnable to avoid race between migration // and MSG_TASK_WAKEUP showing up on the default channel. } } ``` ⑤ ⑥ で呼び出している AssignCpu と Migrate はタスクが最初に ghOSt スレッドになって実行状態に遷移したときに呼ばれる関数である。 ### ★ AssignCpu AssignCpu 関数の実装を見ていく。TASK_NEW メッセージは必ずデフォルトのキューにプッシュされるので、この関数の呼び出しはスレッドセーフである。 ```cpp Cpu FifoScheduler::AssignCpu(FifoTask* task) { static auto begin = cpus().begin(); static auto end = cpus().end(); static auto next = end; if (next == end) { next = begin; } // nextを返してインクリメントする return next++; } ``` つまり、<u>呼び出されるごとにCPUの番号を1つずらす</u>、というような実装になっている。 例をあげると、番号が 0-1, 4 のCPUを管理するスケジューラは 0 ➙ 1 ➙ 4 ➙ 0 ➙ 1 ➙ 4 ➙ ... のように返す関数である。 ### ★ Migrate Migrate の実装を見ていく。 ```cpp void FifoScheduler::Migrate(FifoTask* task, Cpu cpu, BarrierToken seqnum) { ... // ① cpu専用のキューにtaskを関連付ける。 //   この処理が無事に完了すれば、今後taskに関係するメッセージはそのキューにプッシュされる CpuState* cs = cpu_state(cpu); const Channel* channel = cs->channel.get(); CHECK(channel->AssociateTask(task->gtid, seqnum, /*status=*/nullptr)); task->cpu = cpu.id(); // ② 移動先のRUNキューにtaskをプッシュする cs->run_queue.Enqueue(task); // ③ Pingしてagentを起動させる enclave()->GetAgent(cpu)->Ping(); } ``` ## TASK_WAKEUP 対応する処理は TaskRunnable である。 ```cpp void FifoScheduler::TaskRunnable(FifoTask* task, const Message& msg) { const ghost_msg_payload_task_wakeup* payload = static_cast<const ghost_msg_payload_task_wakeup*>(msg.payload()); CHECK(task->blocked()); task->run_state = FifoTaskState::kRunnable; task->prio_boost = !payload->deferrable; if (task->cpu < 0) { // TASK_NEWを受け取ってから一度も実行状態になっていない場合、ここでMigrateを // 行う。task->cpuは-1で初期化される。 Cpu cpu = AssignCpu(task); Migrate(task, cpu, msg.seqnum()); } else { CpuState* cs = cpu_state_of(task); cs->run_queue.Enqueue(task); } } ``` ## TASK_BLOCKED タスクがスリープ状態に遷移したときに発行されるメッセージである。 ```cpp void FifoScheduler::TaskBlocked(FifoTask* task, const Message& msg) { const ghost_msg_payload_task_blocked* payload = static_cast<const ghost_msg_payload_task_blocked*>(msg.payload()); TaskOffCpu(task, /*blocked=*/true, payload->from_switchto); if (payload->from_switchto) { Cpu cpu = topology()->cpu(payload->cpu); enclave()->GetAgent(cpu)->Ping(); } } ``` ### ★ TaskOffCpu タスク task をCPUから取り外すときに呼ばれる関数。 遷移先の状態は blocked 引数によって決まり、true のときは kBlocked に、false の場合は kRunnable に遷移する。 ```cpp void FifoScheduler::TaskOffCpu(FifoTask* task, bool blocked, bool from_switchto) { GHOST_DPRINT(3, stderr, "Task %s offcpu %d", task->gtid.describe(), task->cpu); CpuState* cs = cpu_state_of(task); if (task->oncpu()) { CHECK_EQ(cs->current, task); cs->current = nullptr; } else { CHECK(from_switchto); CHECK_EQ(task->run_state, FifoTaskState::kBlocked); } task->run_state = blocked ? FifoTaskState::kBlocked : FifoTaskState::kRunnable; } ``` ## TASK_PREEMPT ```cpp void FifoScheduler::TaskPreempted(FifoTask* task, const Message& msg) { const ghost_msg_payload_task_preempt* payload = static_cast<const ghost_msg_payload_task_preempt*>(msg.payload()); // ① taskをCPUから取り外す。ただし、状態は実行可能状態のまま。 TaskOffCpu(task, /*blocked=*/false, payload->from_switchto); // ② taskのメンバに値をセット //   ここでセットされているprio_boostはagentのprio_boostとは違うもの。 //   単純にFIFOスケジューリングの中で、優先度を最高にするという意味。 //   詳しくは後半のFIFOキューなどを参照。 task->preempted = true; task->prio_boost = true; // ③ kRunnable状態なのでRUNキューに再度プッシュする CpuState* cs = cpu_state_of(task); cs->run_queue.Enqueue(task); if (payload->from_switchto) { // ここには到達しない Cpu cpu = topology()->cpu(payload->cpu); enclave()->GetAgent(cpu)->Ping(); } } ``` ## TASK_YIELD 単純に task をCPUから切り離し、RUNキューの最後尾にプッシュする。 ```cpp void FifoScheduler::TaskYield(FifoTask* task, const Message& msg) { const ghost_msg_payload_task_yield* payload = static_cast<const ghost_msg_payload_task_yield*>(msg.payload()); TaskOffCpu(task, /*blocked=*/false, payload->from_switchto); CpuState* cs = cpu_state_of(task); cs->run_queue.Enqueue(task); if (payload->from_switchto) { Cpu cpu = topology()->cpu(payload->cpu); enclave()->GetAgent(cpu)->Ping(); } } ``` ## TASK_DEAD ```cpp void FifoScheduler::TaskDead(FifoTask* task, const Message& msg) { CHECK(task->blocked()); allocator()->FreeTask(task); } ``` ## TASK_DEPARTED ```cpp void FifoScheduler::TaskDeparted(FifoTask* task, const Message& msg) { const ghost_msg_payload_task_departed* payload = static_cast<const ghost_msg_payload_task_departed*>(msg.payload()); // taskの状態をBLOCKEDにしてから消す if (task->oncpu() || payload->from_switchto) { TaskOffCpu(task, /*blocked=*/false, payload->from_switchto); } else if (task->queued()) { CpuState* cs = cpu_state_of(task); cs->run_queue.Erase(task); } else { CHECK(task->blocked()); } if (payload->from_switchto) { Cpu cpu = topology()->cpu(payload->cpu); enclave()->GetAgent(cpu)->Ping(); } allocator()->FreeTask(task); } ``` # 重要なデータ構造 ## FifoTask Task クラスからの派生クラス。 新しく以下の4つのメンバを持つ。 ||| |-|-| | **cpu** | そのタスクが関連付けられているCPUの番号。 | **run_state** | タスクの状態。FifoTaskState 列挙型で4つの状態を取りうる。 | **preempted** | そのタスクの最後の実行がプリエンプトされてかどうかを示すブール値。 | **prio_boost**| タスクの優先度が一時的にブーストしているかどうかを示す値。FIFOスケジューリングでは、RUNキューの先頭に配置されているタスクが優先的にスケジュールされる。カーネルスレッドやCFSスレッドの出現によりプリエンプトされたとき、RUNキューの先頭に戻してあげることで優先度を高く保つことができる。 run_state に設定される値は以下のように定義されている。 少しまわりくどく見えるのはメッセージを処理するごとに状態が遷移するようにするため。 ```cpp enum class FifoTaskState { kBlocked, // not on runqueue. kRunnable, // transitory state: // 1. kBlocked->kRunnable->kQueued // 2. kQueued->kRunnable->kOnCpu kQueued, // on runqueue. kOnCpu, // running on cpu. }; ``` ## FifoRq FIFO用のRUNキュー。 ```cpp class FifoRq { ... private: mutable absl::Mutex mu_; std::deque<FifoTask*> rq_ ABSL_GUARDED_BY(mu_); }; ``` 主要なメンバ関数は以下の3つ。 ### ★ Enqueue ```cpp void FifoRq::Enqueue(FifoTask* task) { // ① 状態が kRunnable --> kQueued に遷移する task->run_state = FifoTaskState::kQueued; // ② 優先度ブーストがかかっていればRUNキューの先頭にプッシュ、 //  そうでない場合は最後尾にプッシュする。 absl::MutexLock lock(&mu_); if (task->prio_boost) rq_.push_front(task); else rq_.push_back(task); } ``` ### ★ Dequeue RUNキューの先頭からタスクを取り出す関数。 ```cpp FifoTask* FifoRq::Dequeue() { absl::MutexLock lock(&mu_); if (rq_.empty()) return nullptr; FifoTask* task = rq_.front(); task->run_state = FifoTaskState::kRunnable; rq_.pop_front(); return task; } ``` ### ★ Erase RUNキューの中からタスクをサーチして削除する関数。 ```cpp void FifoRq::Erase(FifoTask* task) { CHECK_EQ(task->run_state, FifoTaskState::kQueued); absl::MutexLock lock(&mu_); size_t size = rq_.size(); if (size > 0) { // Check if 'task' is at the back of the runqueue (common case). size_t pos = size - 1; if (rq_[pos] == task) { rq_.erase(rq_.cbegin() + pos); task->run_state = FifoTaskState::kRunnable; return; } // Now search for 'task' from the beginning of the runqueue. for (pos = 0; pos < size - 1; pos++) { if (rq_[pos] == task) { rq_.erase(rq_.cbegin() + pos); task->run_state = FifoTaskState::kRunnable; return; } } } CHECK(false); } ```