## sidekiq を読む会
### 順番
前田->深谷->箕輪->伊藤->大谷->根岸
## version
v7.0.2
## 2023/01/25 箕輪ドライバー伊藤メモ
rails (5.1.7)
Sidekiq::Launcher -> Sidekiq::Manager#start
- Sidekiq::Manager
- Processorを管理している?
- #initialize
- Processorを作成する
- #start
Processor#start
- Sidekiq::Processor
- about
- スタンドアロンスレッド
- スレッド、ジョブ、カプセルを持つ
- Redisからジョブをフェッチし、ジョブを実行する
- ジョブを実行する際にジョブクラスをインスタンス化し、Sidekiqミドルウェアチェーンを実行する
- エラーが発生したら自分自身をManagerを介して新しいProcessorを作成し置き換える
- #initialize
- Manager#processor_result をblock化してcallbackとして登録している
- #start
- #run
- 実際にdequeue(fetch)して実行する
- Manager#processor_result
## 2023/01/11 深谷ドライバー箕輪メモ
- 今回: SidekiqJobがワーカーキューに登録しているので、ActiveJobとSidekiqJobのコードリーディングをして、rails等からエンキューしている部分を読む
- module ActiveJob::Base
- ActiveJob::Enqueing
- def perform_later(*args)
job_or_instantiate(*args).enqueue
end
- enqueueの呼び出し
- run_callbacks enqueueのコールバック登録が可能
- ActiveJob::QueueAdapters::SidekiqAdapter
- Base 実体はActiveJob::Executionに定義
- execute -> perform_now
- 処理実行前にインクリメントする。Guard against処理回数の上限判定などに用いるなど
- 最終的にクライアント側で実装されているperformの呼び出し
- Sidekiq::Client
- push
- normalize_item
- validate
- 主にパラメータの型チェック
- normalized_hash
- Sidekiq::Jobの継承をチェック
- default_job_options
- デフォルトがretry:true/false,とqueue名称
- wapped -> rails側のactivejob
- (一時的に)不必要なパラメータの削除
- middleware.invoke
- sidekiqのミドルウェア
- verify_json
- on_complex_arguments 厳密なJSONフォーマットかのチェックレベル(raise/warn)
- raw_push
- redisとやりとりしてpushする
- 次回、Manager追ってenqueするまで
-
## 2023/01/04 根岸ドライバー深谷メモ
- ひさしぶりに開催 2023年最初
- Launcher#runからみる
- @poller.start
- Poller.initilize config受け取ってる
- Pollerはn秒ごとにretryとscheduledなジョブをよんで、enqueueするジョブを探して、redisに入れる
- Poler#initialize
- Enqは、エンキューする役割
- ConnectionPoolは、connection_pool gem由来
- コネクションプールを行う実現するgem
- sidekiqはプールをgemを使って実現してる
- Poller#start
- リトライ、スケジュールなキューからジョブを取り出して、ワーキングキューに登録する
- スレッド作ってる
- 子プロセスがはじまるのをinitial_waitはでまってる?
- 一定期間 + ランダムな期間待つ
- @enq.enqueue_jobsを実行する:
- 1度に一つずつ実行待ちなキューからジョブを取り出す
- 取り出したジョブをパースしてclientに渡す
- clientは、redisに対して、ジョブを登録している
- clientに渡す情報には、queue, class, argsなどがある
- backtraceを有効にするバックトレースが取れる?
- clientはmiddlewareにジョブ情報を一度渡してmiddlewareから受け取ったものを、raw_pushしてる
- raw_pushでは、実行キューに登録、キュー名の登録などを行っている
-
- 次回: SidekiqJobがワーカーキューに登録しているので、ActiveJobとSidekiqJobのコードリーディングをして、rails等からエンキューしている部分を読む
-
## 2022/12/14 大谷ドライバー根岸メモ
- bin見る
- CLI見る
- include Singleton unless $TESTING面白い
- テストで依存整理する
- def parse オプション見る
- default_configuration
- Sidekiq::Config
- DEFAULTSが定義されていてそれとマージする
- polling周りが充実してる
- average_scheduled_poll_interval とかはscheduledでそうじゃないときは(直接起動したときは)どうこうみたいな話かも
- error_handlers, death_handlersへー
- ERROR_HANDLER
- contextをdump_jsonしてる
- ログですね
- initialize
- エラーハンドラ〜がなければERROR_HANDLERを呼ぶ
- ほかdirectory/redis_config/capsulesが初期化
- concurrencyの設定がある
- カプセルって何? https://github.com/mperham/sidekiq/blob/main/docs/capsule.md
- グローバルで変更可能なのでコンフィグがRactorで取り扱えない
- グローバル設定がdefault_configurationを作っておいて、configure_clientとかがカプセル化されて渡されるようになった
- 内部コンポーネントが並行実行時に有効になるようになんとかした
- 層が作られた
- config.capsule("unsafe") みたいなふうに個別に作れる
- redis周り
- Rubyのredis clientそのものは平行化できるが、接続先のredis実体そのものは単一でなければならないという制限がある
- もとはconcurrency+3のredis poolを作っていた それがSidekiqコンポーネント5つのほか、カプセルが同時実行するためのプールに変更された
- SidekiqコンポーネントではSidekiq.redisとかで呼んではいけない Sidekiq::Capsule.redis みたいに呼ぶ必要がある
- バグりそう……
- client_middleware, server_middleware, capsule, redis=, new_redis_pool, redis_info, redis, register, lookup, death_handlers(ジョブが失敗したとき), on(イベントフックの追加),
- ざっとフーンにしておいてCLIに戻る
- initialize
- setup_options → parse_options
- オプションをパースする
- initialize_logger ログレベル設定
- validate!
- run
- boot_application
- rails起動してsidekiq/railsを読み(本筋じゃないので呼んでない)config/environment.rbを読み起動
- IO.pipeやsigsなどでIOやシグナル用意する
- JRUBYだとUSR1, USR2は動かない
- signal.trapしたとき
- old_handler.callする
- self.write.puts(sig)
- runnning, redisの情報出す
- redis_versionが低かったらraise
- maxmemory_policy(redis側)がnoevictionだと警告
- カプセルのredis_pool_sizeがconcurrencyより足りなかったら警告
- server_middleware等の起動
- 以上でメインスレッドが立ち上がったので、Component#fire_eventをstartupというlifecycle_eventでコール
- oneshotでイベントを起こす マルチスレッドを立ち上げる
- launch(self_read)で起動
- launch
- Launcher.new(config)で起動
- Launcher
- カプセルを管理するなにかを起動する
- STATS_TTL 5年
- PROCTITLES psで見たときにこれを出してくれる?
- initialize
- embedded default false
- Managerをcapsuleごとに初期化
- Pollerを初期化
- run
- safe_thread method(&"start_heartbeat")
- スレッド起動してwatchdogをopen... ここでheartbeatを読んでる
- heartbeat→beatは to_dataしたものをloopで返している
- 末尾で♥というメソッドを読んでる
- ♥
- identityはhostname:pid
- curstateでWORK_STATE取ってる
- redis{|conn| conn.pipelined } (複数のコマンド送るやつ)で中身を見て
- work_key= identity:work
- transaction.unlink 一度リセット
- curstate.each_pairでworkey, tid dump_json(hash)をtransaction.hsetする
- redisのhashを入れている
- WORKSTATEはSharedWorkState
- キューとかが出てくる
- rtt check_rtt
- round trip time
- redisに接続して、pingの間隔で取ってる
- 遅かったら警告
- failsを初期化
- kbを今のメモリ使用量で初期化
- exists, msgをredis でつなげて取得してくる
- 情報をめっちゃ書き込んでる
- redisに書き込めなかったらheartbeatをfire_eventして動かす
- その後fireevent heartbeat
- msgが返ってきたらkillする
- エラー起きたら
- failureなどをincrement
- runに戻ってくる
- 次はPoller#runを行く