## sidekiq を読む会 ### 順番 前田->深谷->箕輪->伊藤->大谷->根岸 ## version v7.0.2 ## 2023/01/25 箕輪ドライバー伊藤メモ rails (5.1.7) Sidekiq::Launcher -> Sidekiq::Manager#start - Sidekiq::Manager - Processorを管理している? - #initialize - Processorを作成する - #start Processor#start - Sidekiq::Processor - about - スタンドアロンスレッド - スレッド、ジョブ、カプセルを持つ - Redisからジョブをフェッチし、ジョブを実行する - ジョブを実行する際にジョブクラスをインスタンス化し、Sidekiqミドルウェアチェーンを実行する - エラーが発生したら自分自身をManagerを介して新しいProcessorを作成し置き換える - #initialize - Manager#processor_result をblock化してcallbackとして登録している - #start - #run - 実際にdequeue(fetch)して実行する - Manager#processor_result ## 2023/01/11 深谷ドライバー箕輪メモ - 今回: SidekiqJobがワーカーキューに登録しているので、ActiveJobとSidekiqJobのコードリーディングをして、rails等からエンキューしている部分を読む - module ActiveJob::Base - ActiveJob::Enqueing - def perform_later(*args) job_or_instantiate(*args).enqueue end - enqueueの呼び出し - run_callbacks enqueueのコールバック登録が可能 - ActiveJob::QueueAdapters::SidekiqAdapter - Base 実体はActiveJob::Executionに定義 - execute -> perform_now - 処理実行前にインクリメントする。Guard against処理回数の上限判定などに用いるなど - 最終的にクライアント側で実装されているperformの呼び出し - Sidekiq::Client - push - normalize_item - validate - 主にパラメータの型チェック - normalized_hash - Sidekiq::Jobの継承をチェック - default_job_options - デフォルトがretry:true/false,とqueue名称 - wapped -> rails側のactivejob - (一時的に)不必要なパラメータの削除 - middleware.invoke - sidekiqのミドルウェア - verify_json - on_complex_arguments 厳密なJSONフォーマットかのチェックレベル(raise/warn) - raw_push - redisとやりとりしてpushする - 次回、Manager追ってenqueするまで - ## 2023/01/04 根岸ドライバー深谷メモ - ひさしぶりに開催 2023年最初 - Launcher#runからみる - @poller.start - Poller.initilize config受け取ってる - Pollerはn秒ごとにretryとscheduledなジョブをよんで、enqueueするジョブを探して、redisに入れる - Poler#initialize - Enqは、エンキューする役割 - ConnectionPoolは、connection_pool gem由来 - コネクションプールを行う実現するgem - sidekiqはプールをgemを使って実現してる - Poller#start - リトライ、スケジュールなキューからジョブを取り出して、ワーキングキューに登録する - スレッド作ってる - 子プロセスがはじまるのをinitial_waitはでまってる? - 一定期間 + ランダムな期間待つ - @enq.enqueue_jobsを実行する: - 1度に一つずつ実行待ちなキューからジョブを取り出す - 取り出したジョブをパースしてclientに渡す - clientは、redisに対して、ジョブを登録している - clientに渡す情報には、queue, class, argsなどがある - backtraceを有効にするバックトレースが取れる? - clientはmiddlewareにジョブ情報を一度渡してmiddlewareから受け取ったものを、raw_pushしてる - raw_pushでは、実行キューに登録、キュー名の登録などを行っている - - 次回: SidekiqJobがワーカーキューに登録しているので、ActiveJobとSidekiqJobのコードリーディングをして、rails等からエンキューしている部分を読む - ## 2022/12/14 大谷ドライバー根岸メモ - bin見る - CLI見る - include Singleton unless $TESTING面白い - テストで依存整理する - def parse オプション見る - default_configuration - Sidekiq::Config - DEFAULTSが定義されていてそれとマージする - polling周りが充実してる - average_scheduled_poll_interval とかはscheduledでそうじゃないときは(直接起動したときは)どうこうみたいな話かも - error_handlers, death_handlersへー - ERROR_HANDLER - contextをdump_jsonしてる - ログですね - initialize - エラーハンドラ〜がなければERROR_HANDLERを呼ぶ - ほかdirectory/redis_config/capsulesが初期化 - concurrencyの設定がある - カプセルって何? https://github.com/mperham/sidekiq/blob/main/docs/capsule.md - グローバルで変更可能なのでコンフィグがRactorで取り扱えない - グローバル設定がdefault_configurationを作っておいて、configure_clientとかがカプセル化されて渡されるようになった - 内部コンポーネントが並行実行時に有効になるようになんとかした - 層が作られた - config.capsule("unsafe") みたいなふうに個別に作れる - redis周り - Rubyのredis clientそのものは平行化できるが、接続先のredis実体そのものは単一でなければならないという制限がある - もとはconcurrency+3のredis poolを作っていた それがSidekiqコンポーネント5つのほか、カプセルが同時実行するためのプールに変更された - SidekiqコンポーネントではSidekiq.redisとかで呼んではいけない Sidekiq::Capsule.redis みたいに呼ぶ必要がある - バグりそう…… - client_middleware, server_middleware, capsule, redis=, new_redis_pool, redis_info, redis, register, lookup, death_handlers(ジョブが失敗したとき), on(イベントフックの追加), - ざっとフーンにしておいてCLIに戻る - initialize - setup_options → parse_options - オプションをパースする - initialize_logger ログレベル設定 - validate! - run - boot_application - rails起動してsidekiq/railsを読み(本筋じゃないので呼んでない)config/environment.rbを読み起動 - IO.pipeやsigsなどでIOやシグナル用意する - JRUBYだとUSR1, USR2は動かない - signal.trapしたとき - old_handler.callする - self.write.puts(sig) - runnning, redisの情報出す - redis_versionが低かったらraise - maxmemory_policy(redis側)がnoevictionだと警告 - カプセルのredis_pool_sizeがconcurrencyより足りなかったら警告 - server_middleware等の起動 - 以上でメインスレッドが立ち上がったので、Component#fire_eventをstartupというlifecycle_eventでコール - oneshotでイベントを起こす マルチスレッドを立ち上げる - launch(self_read)で起動 - launch - Launcher.new(config)で起動 - Launcher - カプセルを管理するなにかを起動する - STATS_TTL 5年 - PROCTITLES psで見たときにこれを出してくれる? - initialize - embedded default false - Managerをcapsuleごとに初期化 - Pollerを初期化 - run - safe_thread method(&"start_heartbeat") - スレッド起動してwatchdogをopen... ここでheartbeatを読んでる - heartbeat→beatは to_dataしたものをloopで返している - 末尾で♥というメソッドを読んでる - ♥ - identityはhostname:pid - curstateでWORK_STATE取ってる - redis{|conn| conn.pipelined } (複数のコマンド送るやつ)で中身を見て - work_key= identity:work - transaction.unlink 一度リセット - curstate.each_pairでworkey, tid dump_json(hash)をtransaction.hsetする - redisのhashを入れている - WORKSTATEはSharedWorkState - キューとかが出てくる - rtt check_rtt - round trip time - redisに接続して、pingの間隔で取ってる - 遅かったら警告 - failsを初期化 - kbを今のメモリ使用量で初期化 - exists, msgをredis でつなげて取得してくる - 情報をめっちゃ書き込んでる - redisに書き込めなかったらheartbeatをfire_eventして動かす - その後fireevent heartbeat - msgが返ってきたらkillする - エラー起きたら - failureなどをincrement - runに戻ってくる - 次はPoller#runを行く