owned this note
owned this note
Published
Linked with GitHub
# goroutine と Channels を使ってジョブを並行化した時のメモ
先日、Go で書かれたとあるバッチジョブのパフォーマンス改善として処理の並行化をするにあたって、今回初めて使った goroutine, Channels をで色々とハマりどころがあったので備忘録用にここにまとめておきます。
尚、今回は並行化を施す前のサンプルから、最終的な実装まで順を追ってコードの実装例を交えて書きますので少々長くなります。最終的な実装の部分だけ見たい人は https://github.com/issei-m/go-concurrency-test にコードを公開しているのでこちらをご覧下さい。
## 対象読者
Go 初学者、あるいは goroutine や Channels を使って初めて並行処理を実装しようとしている方。
また、 goroutine や Channels に関しては公式ドキュメントを含め質の高い情報が豊富にあるので、本記事はそれぞれ詳細な部分は極力省いて実践的な内容としました。似た様な実装をしようとしている方の参考になれば幸いです。
## 今回並行処理を実装した元のバッチジョブのサンプル
ざっくり書くと以下の様な感じです。(色々端折ってますが詳しくは GitHub リポジトリの方をご覧ください)
```go=
var bufferSize = 20
func ProcessItems(items []item.Item) {
failed := false
processedCount := 0
for _, targetItem := range items {
result, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
if err != nil {
failed = true
break // 1個でも失敗したら処理を抜ける
}
logger.Info(result) // 処理成功時は結果をロガーで出力する
processedCount++
if processedCount%bufferSize == 0 {
// バッファをクリアする処理
}
}
if !failed && processedCount%bufferSize > 0 {
// バッファをクリアする処理
}
}
```
処理内容をまとめると、
- 渡された `items []item.Item` から項目を1個ずつ取り出して `item.ProcessItem(targetItem item.Item) (string, error)` に渡す
- 処理に成功した場合は処理結果である1番目の戻り値をロガーで記録する
- 途中で1回でも失敗した場合、以降の処理を全てキャンセルする
- **既に成功した部分についてはロールバックする必要はない**
- ループ内の処理で別途何かのバッファリングを行っており、20件に1回クリアする必要がある
と言った形になります。`item.ProcessItem(targetItem)` は内部で重いブロッキング I/O 処理を実行している為、逐次処理では効率が悪く、性能試験でボトルネックとなっていた為、この部分を goroutine を使って並行化する運びとなりました。
## 並行化の流れ
ここから実際に goroutine と Channels を使って実装を行っていきますが、冒頭にも書いた通りいくつかの段階に分けて途中のコードを記載していくので、最終実装だけ見たい人は [GitHub リポジトリ](https://github.com/issei-m/go-concurrency-test)をご覧下さい。
では早速 goroutine を使って行きましょう。今回は `item.ProcessItem` 関数の実行を並行に行いたいので、これを安直に実装するとこんな感じになります:
```go=
var wg sync.WaitGroup
for _, targetItem := range items {
wg.Add(1)
go func(targetItem item.Item) {
defer wg.Done()
result, _ := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
logger.Info(result) // 処理成功時は結果をロガーで出力する
}(targetItem)
}
wg.Wait() // 全ての goroutine で wg.Done() が呼ばれるまで処理がブロックされる
```
これで全ての `item.ProcessItem(targetItem)` の処理は並行に実行されます。
goroutine によって並行に行われる `item.ProcessItem` の実行を、メインスレッド (因みにメインスレッドも goroutine の1つです) は待ってくれないので、全ての goroutine の処理が終わるまでメイン goroutine の実行をブロックする必要があります。(そうしないとプログラムが直ちに終了してしまう)
この時点では [`sync.WaitGroup`](https://pkg.go.dev/sync#WaitGroup) を使っています。`wg.Add` は内部のカウンタをインクリメントし、 `wg.Done` はデクリメントを行います。 `wg.Wait` はそのカウンタが0になるまで処理をブロックしますので、この様にする事で全ての goroutine の処理が終わるのを待つ事ができます。
さて、この時点では上記サンプルはまだ不完全で、以下の問題があります。
- `items` の数だけ goroutine を無制限に作っている
- `item.ProcessItem(targetItem)` のエラー処理やバッファリングが未実装
次回以降の節では Channels を使ってこれらを順に対応していきます。
### 並行数の制御
先程の例では `items` の数だけ goroutine が作られてました。goroutine そのものは軽量なので大量に作っても結構問題ないのですが、中で実行している処理のスループットを抑えたい等 (API のレートリミットや、外部ミドルウェアへの負荷等) の理由で同時処理数を制限したくなる事があります。
並行処理数を制限する方法は主にセマフォを使って `for range items` のループを適宜止めるか、予め指定した数の goroutine を *非同期処理の worker として事前にスポーンしておく* 等があると思いますが、今回は結果処理の兼ね合いがあるので後者で実装します。
```go=
concurrency := 20 // 同時処理数は20個までとする
chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる
for targetItem := range chItems {
result, _ := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
logger.Info(result) // 処理成功時は結果をロガーで出力する
}
}()
}
for _, targetItem := range items {
chItems <- targetItem
}
close(chItems) // 全て送り出したら close する. これをしないと `for targetItem := range chItems` が正しく動作しない
wg.Wait() // 全ての goroutine で wg.Done() が呼ばれるまで処理がブロックされる
```
`concurrency` の数だけ予め goroutine をワーカーとしてスポーンしておきます。内部では `items` の中身を処理しますが、 Go では goroutine 間で安全な値の受け渡しには通常 channel を使います。この実装では channel である `chItems` から受け取った `targetItem` を逐次処理していくと言った内容になっています。また、この時点では `chItems` に値は入っていないので、全ての goroutine はバックグラウンドで `chItems` に値が送信されるまで待機している事になります。
次の `for range` では全ての `targetItem` を `chItems` に送出します。この時点で、各ワーカーでは随時 `chItems` からの値の受信が始まり、非同期に処理が行われていきます。
また、全ての `chItems` の受信者間 (ワーカー間) で受信は均等にロードバランスされます。
全ての送出が終わった時点 (ワーカーとは非同期で動くのですぐ終わります) で `close(chItems)` をしてこれ以上処理する **item.Item** が無い事を通知します。こうする事で、やがて各ワーカーの `chItems` の受信が随時終了し、ワーカーも終了していきます。
なお、 `close` を忘れると、ワーカーの `for targetItem := range chItems` は終わらずブロックされます。メインスレッドではワーカーの終了を **sync.WaitGroup** で待機していますが、ワーカーは永久に終わらないので deadlock となり、エラーになるので注意しましょう。
### エラー処理
ここで一番最初のスニペットを見てみましょう。
```go=
result, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
if err != nil {
//
}
logger.Info(result)
```
`item.ProcessItem(targetItem)` は処理成功した場合は1番目の戻り値に結果を、失敗した場合は2番目の戻り値にエラーをセットして返す多値返却の関数です。
さて、この関数は goroutine で処理しますが、エラー検出時の処理やバッファリングを行う為、結果をメインスレッドに送り返す必要があります。ここでも channel を使って実際に結果を返してみましょう。
```go=
// 関数の戻り値が多値な為、1つの構造体にまとめる
type taskResult struct {
result string
err error
}
// ...
concurrency := 20 // 同時処理数は20個までとする
chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel
chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す channel
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる
for targetItem := range chItems {
r, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出
}
}()
}
for _, targetItem := range items {
chItems <- targetItem
}
close(chItems) // 全て送り出したら close する. これをしないと `for targetItem := range chItems` が正しく動作しない
wg.Wait() // 全ての goroutine で wg.Done() が呼ばれるまで処理がブロックされる
```
但しこのままでは正常に動きません。**channel への値の送出は、その時点でこれを受信する別の goroutine が動いていないと処理がブロックされる**為です。ワーカー内の `chResults <- &taskResult{result: r, err: err}` は、一連の処理で `chResults` の受信者が不在な為1発でブロックされ、 deadlock と見なされエラーになります。
そこで、次の様にコードを改修します。ちょっと修正量が多いですがご容赦下さい。
```go=
concurrency := 20 // 同時処理数は20個までとする
chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel
chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す channel
numWorkers := int32(concurrency) // sync.WaitGroup の代わりにカウンタを使う
for i := 0; i < concurrency; i++ {
go func() {
defer atomic.AddInt32(&numWorkers, -1) // goroutine が終わる度にカウンタをデクリメントする. 並行処理中に安全にに処理する為 `atomic.AddInt32` を使う.
// close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる
for targetItem := range chItems {
r, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出
}
}()
}
// `targetItem` の送出は別の goroutine 内で行う
go func() {
for _, targetItem := range items {
chItems <- targetItem
}
close(chItems)
}()
// ワーカーが全て終了するまで `chResults` から値を取り出し続ける
for numWorkers > 0 {
result := <-chResults
if result.err != nil {
// TODO: エラー処理
} else {
logger.Info(result.result) // 処理成功時は結果をロガーで出力する
// TODO: バッファリング処理
}
}
```
これで動作する様になります。今回2つポイントがあります。
まずは、`chItems` への送出自体を別の goroutine に分けた事です。理由としては先程説明した `chResults <-` と同様に、`chItems <-` による送出もこれを受信する別の goroutine が動いていないとブロックされ deadlock になってしまう為です。
この場合、 `<-chResults` による結果の受信処理、あるいは `chItems <-` への `targetItem` の送出処理のいずれかを別の goroutine で処理する必要がありますが、今回はメインスレッドでは結果処理とそれによる後続処理の制御を行いたい為、送出側を別 goroutine にしています。
次にワーカーの終了待ちを **sync.WaitGroup** から単純なカウンタに置き換えた事です。これもメインスレッドでは単にワーカーの処理を待ち続けながら別の処理も行う為そうしています。尚、カウンタのデクリメントは競合を防ぐ為、 [`atomic.AddInt32`](https://pkg.go.dev/sync/atomic#AddInt32) を使います。
さて、ここまで来たら後は TODO の部分を実装するだけです。内容は次の通りでした:
- 途中で1回でも失敗した場合、以降の処理を全てキャンセルする
- 既に成功した部分についてはロールバックする必要はない
- ループ内の処理で別途何かのバッファリングを行っており、20件に1回クリアする必要がある
バッファのクリアは大した事はないですね。問題はエラー処理です。エラーを検出した時点でワーカーを含めた全体の処理を止める必要があり、これには [`context.WithCancel`](https://pkg.go.dev/context#WithCancel) を使うのが簡単です。最後に、バッファリングとエラー処理を実装してみましょう。
```go=
concurrency := 20 // 同時処理数は20個までとする
chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel
chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す channel
ctx, cancel := context.WithCancel(context.Background()) // 既に使っている `context.Context` があればそれを指定する。今回は無いので `context.Background()` を新たに作って指定
defer cancel()
numWorkers := int32(concurrency) // sync.WaitGroup の代わりにカウンタを使う
for i := 0; i < concurrency; i++ {
go func() {
defer atomic.AddInt32(&numWorkers, -1) // goroutine が終わる度にカウンタをデクリメントする. 並行処理中に安全にに処理する為 `atomic.AddInt32` を使う.
// close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる
for targetItem := range chItems {
r, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出
}
}()
}
// `targetItem` の送出は別の goroutine 内で行う
go func() {
defer close(chItems) // goroutine 終了時に確実に close する
for _, targetItem := range items {
select {
case <-ctx.Done():
// cancel() が実行された後は余計な `item.ProcessItem` を実行したくないので、送出を中止して goroutine を終了する
return
default:
chItems <- targetItem
}
}
}()
failed := false
processedCount := 0
// ワーカーが全て終了するまで `chResults` から値を取り出し続ける
for numWorkers > 0 {
select {
case result := <-chResults:
if failed {
break
}
// エラーを検出したら `cancel()` を実行し、他の goroutine にそれを通知し、終了する.
// 但しメイン goroutine のこの時点では終了せず全てのワーカーが閉じられるまで待つ.
// (途中でループから抜けると、ワーカー内で `chResults <-` で送出している部分がブロックされてしまい、 goroutine が終了せずリークしてしまう)
if result.err != nil {
logger.Error(fmt.Sprintf("Error detected: %e", result.err))
cancel()
failed = true
break
}
logger.Info(result.result) // 処理成功時は結果をロガーで出力する
processedCount++
if processedCount%20 == 0 {
logger.Info("Flush buffer!")
}
default:
// `default` が無いと、 `chResults` deadlock になるので注意
}
}
if !failed && processedCount%20 > 0 {
logger.Info("Flush buffer!")
}
```
ここでのポイントは、コメントにもある通り、エラーを検出しても全てのワーカーが終了するまでループを抜けないと言う事です。途中で抜けてしまうと `chResults` を受信する箇所が無くなってしまい、ワーカーの `chResults <-` 部分が永久にブロックされる事で goroutine が終了せず、リークしてしまうからです。エラーを検出したらフラグを立てて以降は `<-chResults` の結果を破棄しながらワーカーの終了を待ちます。キャンセル後は `chItems` への送出も止まるので、すぐにワーカーは終了するでしょう。
後は goroutine を使わないバージョンで実装してたとおり、一定の件数毎にバッファのフラッシュ処理をしています。当然ここでもエラーが発生する場合があると思いますが、その場合は `item.ProcessItem` のエラー処理と同様の以下の処理を書いてあげれば OK です:
```go=
// ...
if processedCount%20 == 0 {
// 20件毎にバッファをフラッシュする. エラーが発生した時は他と同様以降の処理を止める
if err := バッファのフラッシュ処理; err != nil {
logger.Error(fmt.Sprintf("Error detected: %e", err))
cancel()
failed = true
}
}
// ...
```
## まとめ
以上、 goroutine と Channels を使った並行処理についてでした。 Channels に関しては正しく使わないと deadlock を起こしたり、 goroutine がいつまでも消えずにリークし続けたりとハマりどころが多いので注意が必要です。
## 参考
- [Concurrency - Effective Go](https://go.dev/doc/effective_go#concurrency)
- [Range and Close - A Tour of Go](https://go.dev/tour/concurrency/4)
- [Goroutine leak](https://medium.com/golangspec/goroutine-leak-400063aef468)