Issei.M
    • Create new note
    • Create a note from template
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Write
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
      • Invitee
      • No invitee
    • Publish Note

      Publish Note

      Everyone on the web can find and read all notes of this public team.
      Once published, notes can be searched and viewed by anyone online.
      See published notes
      Please check the box to agree to the Community Guidelines.
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Save as template
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Sharing URL Create Help
Create Create new note Create a note from template
Menu
Options
Versions and GitHub Sync Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Write
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
Invitee
No invitee
Publish Note

Publish Note

Everyone on the web can find and read all notes of this public team.
Once published, notes can be searched and viewed by anyone online.
See published notes
Please check the box to agree to the Community Guidelines.
Engagement control
Commenting
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
  • Everyone
Suggest edit
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
Emoji Reply
Enable
Import from Dropbox Google Drive Gist Clipboard
   owned this note    owned this note      
Published Linked with GitHub
Subscribed
  • Any changes
    Be notified of any changes
  • Mention me
    Be notified of mention me
  • Unsubscribe
Subscribe
# goroutine と Channels を使ってジョブを並行化した時のメモ 先日、Go で書かれたとあるバッチジョブのパフォーマンス改善として処理の並行化をするにあたって、今回初めて使った goroutine, Channels をで色々とハマりどころがあったので備忘録用にここにまとめておきます。 尚、今回は並行化を施す前のサンプルから、最終的な実装まで順を追ってコードの実装例を交えて書きますので少々長くなります。最終的な実装の部分だけ見たい人は https://github.com/issei-m/go-concurrency-test にコードを公開しているのでこちらをご覧下さい。 ## 対象読者 Go 初学者、あるいは goroutine や Channels を使って初めて並行処理を実装しようとしている方。 また、 goroutine や Channels に関しては公式ドキュメントを含め質の高い情報が豊富にあるので、本記事はそれぞれ詳細な部分は極力省いて実践的な内容としました。似た様な実装をしようとしている方の参考になれば幸いです。 ## 今回並行処理を実装した元のバッチジョブのサンプル ざっくり書くと以下の様な感じです。(色々端折ってますが詳しくは GitHub リポジトリの方をご覧ください) ```go= var bufferSize = 20 func ProcessItems(items []item.Item) { failed := false processedCount := 0 for _, targetItem := range items { result, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く if err != nil { failed = true break // 1個でも失敗したら処理を抜ける } logger.Info(result) // 処理成功時は結果をロガーで出力する processedCount++ if processedCount%bufferSize == 0 { // バッファをクリアする処理 } } if !failed && processedCount%bufferSize > 0 { // バッファをクリアする処理 } } ``` 処理内容をまとめると、 - 渡された `items []item.Item` から項目を1個ずつ取り出して `item.ProcessItem(targetItem item.Item) (string, error)` に渡す - 処理に成功した場合は処理結果である1番目の戻り値をロガーで記録する - 途中で1回でも失敗した場合、以降の処理を全てキャンセルする - **既に成功した部分についてはロールバックする必要はない** - ループ内の処理で別途何かのバッファリングを行っており、20件に1回クリアする必要がある と言った形になります。`item.ProcessItem(targetItem)` は内部で重いブロッキング I/O 処理を実行している為、逐次処理では効率が悪く、性能試験でボトルネックとなっていた為、この部分を goroutine を使って並行化する運びとなりました。 ## 並行化の流れ ここから実際に goroutine と Channels を使って実装を行っていきますが、冒頭にも書いた通りいくつかの段階に分けて途中のコードを記載していくので、最終実装だけ見たい人は [GitHub リポジトリ](https://github.com/issei-m/go-concurrency-test)をご覧下さい。 では早速 goroutine を使って行きましょう。今回は `item.ProcessItem` 関数の実行を並行に行いたいので、これを安直に実装するとこんな感じになります: ```go= var wg sync.WaitGroup for _, targetItem := range items { wg.Add(1) go func(targetItem item.Item) { defer wg.Done() result, _ := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く logger.Info(result) // 処理成功時は結果をロガーで出力する }(targetItem) } wg.Wait() // 全ての goroutine で wg.Done() が呼ばれるまで処理がブロックされる ``` これで全ての `item.ProcessItem(targetItem)` の処理は並行に実行されます。 goroutine によって並行に行われる `item.ProcessItem` の実行を、メインスレッド (因みにメインスレッドも goroutine の1つです) は待ってくれないので、全ての goroutine の処理が終わるまでメイン goroutine の実行をブロックする必要があります。(そうしないとプログラムが直ちに終了してしまう) この時点では [`sync.WaitGroup`](https://pkg.go.dev/sync#WaitGroup) を使っています。`wg.Add` は内部のカウンタをインクリメントし、 `wg.Done` はデクリメントを行います。 `wg.Wait` はそのカウンタが0になるまで処理をブロックしますので、この様にする事で全ての goroutine の処理が終わるのを待つ事ができます。 さて、この時点では上記サンプルはまだ不完全で、以下の問題があります。 - `items` の数だけ goroutine を無制限に作っている - `item.ProcessItem(targetItem)` のエラー処理やバッファリングが未実装 次回以降の節では Channels を使ってこれらを順に対応していきます。 ### 並行数の制御 先程の例では `items` の数だけ goroutine が作られてました。goroutine そのものは軽量なので大量に作っても結構問題ないのですが、中で実行している処理のスループットを抑えたい等 (API のレートリミットや、外部ミドルウェアへの負荷等) の理由で同時処理数を制限したくなる事があります。 並行処理数を制限する方法は主にセマフォを使って `for range items` のループを適宜止めるか、予め指定した数の goroutine を *非同期処理の worker として事前にスポーンしておく* 等があると思いますが、今回は結果処理の兼ね合いがあるので後者で実装します。 ```go= concurrency := 20 // 同時処理数は20個までとする chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel var wg sync.WaitGroup for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() // close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる for targetItem := range chItems { result, _ := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く logger.Info(result) // 処理成功時は結果をロガーで出力する } }() } for _, targetItem := range items { chItems <- targetItem } close(chItems) // 全て送り出したら close する. これをしないと `for targetItem := range chItems` が正しく動作しない wg.Wait() // 全ての goroutine で wg.Done() が呼ばれるまで処理がブロックされる ``` `concurrency` の数だけ予め goroutine をワーカーとしてスポーンしておきます。内部では `items` の中身を処理しますが、 Go では goroutine 間で安全な値の受け渡しには通常 channel を使います。この実装では channel である `chItems` から受け取った `targetItem` を逐次処理していくと言った内容になっています。また、この時点では `chItems` に値は入っていないので、全ての goroutine はバックグラウンドで `chItems` に値が送信されるまで待機している事になります。 次の `for range` では全ての `targetItem` を `chItems` に送出します。この時点で、各ワーカーでは随時 `chItems` からの値の受信が始まり、非同期に処理が行われていきます。 また、全ての `chItems` の受信者間 (ワーカー間) で受信は均等にロードバランスされます。 全ての送出が終わった時点 (ワーカーとは非同期で動くのですぐ終わります) で `close(chItems)` をしてこれ以上処理する **item.Item** が無い事を通知します。こうする事で、やがて各ワーカーの `chItems` の受信が随時終了し、ワーカーも終了していきます。 なお、 `close` を忘れると、ワーカーの `for targetItem := range chItems` は終わらずブロックされます。メインスレッドではワーカーの終了を **sync.WaitGroup** で待機していますが、ワーカーは永久に終わらないので deadlock となり、エラーになるので注意しましょう。 ### エラー処理 ここで一番最初のスニペットを見てみましょう。 ```go= result, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く if err != nil { // } logger.Info(result) ``` `item.ProcessItem(targetItem)` は処理成功した場合は1番目の戻り値に結果を、失敗した場合は2番目の戻り値にエラーをセットして返す多値返却の関数です。 さて、この関数は goroutine で処理しますが、エラー検出時の処理やバッファリングを行う為、結果をメインスレッドに送り返す必要があります。ここでも channel を使って実際に結果を返してみましょう。 ```go= // 関数の戻り値が多値な為、1つの構造体にまとめる type taskResult struct { result string err error } // ... concurrency := 20 // 同時処理数は20個までとする chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す channel var wg sync.WaitGroup for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() // close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる for targetItem := range chItems { r, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出 } }() } for _, targetItem := range items { chItems <- targetItem } close(chItems) // 全て送り出したら close する. これをしないと `for targetItem := range chItems` が正しく動作しない wg.Wait() // 全ての goroutine で wg.Done() が呼ばれるまで処理がブロックされる ``` 但しこのままでは正常に動きません。**channel への値の送出は、その時点でこれを受信する別の goroutine が動いていないと処理がブロックされる**為です。ワーカー内の `chResults <- &taskResult{result: r, err: err}` は、一連の処理で `chResults` の受信者が不在な為1発でブロックされ、 deadlock と見なされエラーになります。 そこで、次の様にコードを改修します。ちょっと修正量が多いですがご容赦下さい。 ```go= concurrency := 20 // 同時処理数は20個までとする chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す channel numWorkers := int32(concurrency) // sync.WaitGroup の代わりにカウンタを使う for i := 0; i < concurrency; i++ { go func() { defer atomic.AddInt32(&numWorkers, -1) // goroutine が終わる度にカウンタをデクリメントする. 並行処理中に安全にに処理する為 `atomic.AddInt32` を使う. // close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる for targetItem := range chItems { r, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出 } }() } // `targetItem` の送出は別の goroutine 内で行う go func() { for _, targetItem := range items { chItems <- targetItem } close(chItems) }() // ワーカーが全て終了するまで `chResults` から値を取り出し続ける for numWorkers > 0 { result := <-chResults if result.err != nil { // TODO: エラー処理 } else { logger.Info(result.result) // 処理成功時は結果をロガーで出力する // TODO: バッファリング処理 } } ``` これで動作する様になります。今回2つポイントがあります。 まずは、`chItems` への送出自体を別の goroutine に分けた事です。理由としては先程説明した `chResults <-` と同様に、`chItems <-` による送出もこれを受信する別の goroutine が動いていないとブロックされ deadlock になってしまう為です。 この場合、 `<-chResults` による結果の受信処理、あるいは `chItems <-` への `targetItem` の送出処理のいずれかを別の goroutine で処理する必要がありますが、今回はメインスレッドでは結果処理とそれによる後続処理の制御を行いたい為、送出側を別 goroutine にしています。 次にワーカーの終了待ちを **sync.WaitGroup** から単純なカウンタに置き換えた事です。これもメインスレッドでは単にワーカーの処理を待ち続けながら別の処理も行う為そうしています。尚、カウンタのデクリメントは競合を防ぐ為、 [`atomic.AddInt32`](https://pkg.go.dev/sync/atomic#AddInt32) を使います。 さて、ここまで来たら後は TODO の部分を実装するだけです。内容は次の通りでした: - 途中で1回でも失敗した場合、以降の処理を全てキャンセルする - 既に成功した部分についてはロールバックする必要はない - ループ内の処理で別途何かのバッファリングを行っており、20件に1回クリアする必要がある バッファのクリアは大した事はないですね。問題はエラー処理です。エラーを検出した時点でワーカーを含めた全体の処理を止める必要があり、これには [`context.WithCancel`](https://pkg.go.dev/context#WithCancel) を使うのが簡単です。最後に、バッファリングとエラー処理を実装してみましょう。 ```go= concurrency := 20 // 同時処理数は20個までとする chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す channel chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す channel ctx, cancel := context.WithCancel(context.Background()) // 既に使っている `context.Context` があればそれを指定する。今回は無いので `context.Background()` を新たに作って指定 defer cancel() numWorkers := int32(concurrency) // sync.WaitGroup の代わりにカウンタを使う for i := 0; i < concurrency; i++ { go func() { defer atomic.AddInt32(&numWorkers, -1) // goroutine が終わる度にカウンタをデクリメントする. 並行処理中に安全にに処理する為 `atomic.AddInt32` を使う. // close(chItems) が呼ばれるまで targetItem の取り出しが (goroutine 間で均等に) 行われる for targetItem := range chItems { r, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出 } }() } // `targetItem` の送出は別の goroutine 内で行う go func() { defer close(chItems) // goroutine 終了時に確実に close する for _, targetItem := range items { select { case <-ctx.Done(): // cancel() が実行された後は余計な `item.ProcessItem` を実行したくないので、送出を中止して goroutine を終了する return default: chItems <- targetItem } } }() failed := false processedCount := 0 // ワーカーが全て終了するまで `chResults` から値を取り出し続ける for numWorkers > 0 { select { case result := <-chResults: if failed { break } // エラーを検出したら `cancel()` を実行し、他の goroutine にそれを通知し、終了する. // 但しメイン goroutine のこの時点では終了せず全てのワーカーが閉じられるまで待つ. // (途中でループから抜けると、ワーカー内で `chResults <-` で送出している部分がブロックされてしまい、 goroutine が終了せずリークしてしまう) if result.err != nil { logger.Error(fmt.Sprintf("Error detected: %e", result.err)) cancel() failed = true break } logger.Info(result.result) // 処理成功時は結果をロガーで出力する processedCount++ if processedCount%20 == 0 { logger.Info("Flush buffer!") } default: // `default` が無いと、 `chResults` deadlock になるので注意 } } if !failed && processedCount%20 > 0 { logger.Info("Flush buffer!") } ``` ここでのポイントは、コメントにもある通り、エラーを検出しても全てのワーカーが終了するまでループを抜けないと言う事です。途中で抜けてしまうと `chResults` を受信する箇所が無くなってしまい、ワーカーの `chResults <-` 部分が永久にブロックされる事で goroutine が終了せず、リークしてしまうからです。エラーを検出したらフラグを立てて以降は `<-chResults` の結果を破棄しながらワーカーの終了を待ちます。キャンセル後は `chItems` への送出も止まるので、すぐにワーカーは終了するでしょう。 後は goroutine を使わないバージョンで実装してたとおり、一定の件数毎にバッファのフラッシュ処理をしています。当然ここでもエラーが発生する場合があると思いますが、その場合は `item.ProcessItem` のエラー処理と同様の以下の処理を書いてあげれば OK です: ```go= // ... if processedCount%20 == 0 { // 20件毎にバッファをフラッシュする. エラーが発生した時は他と同様以降の処理を止める if err := バッファのフラッシュ処理; err != nil { logger.Error(fmt.Sprintf("Error detected: %e", err)) cancel() failed = true } } // ... ``` ## まとめ 以上、 goroutine と Channels を使った並行処理についてでした。 Channels に関しては正しく使わないと deadlock を起こしたり、 goroutine がいつまでも消えずにリークし続けたりとハマりどころが多いので注意が必要です。 ## 参考 - [Concurrency - Effective Go](https://go.dev/doc/effective_go#concurrency) - [Range and Close - A Tour of Go](https://go.dev/tour/concurrency/4) - [Goroutine leak](https://medium.com/golangspec/goroutine-leak-400063aef468)

Import from clipboard

Advanced permission required

Your current role can only read. Ask the system administrator to acquire write and comment permission.

This team is disabled

Sorry, this team is disabled. You can't edit this note.

This note is locked

Sorry, only owner can edit this note.

Reach the limit

Sorry, you've reached the max length this note can be.
Please reduce the content or divide it to more notes, thank you!

Import from Gist

Import from Snippet

or

Export to Snippet

Are you sure?

Do you really want to delete this note?
All users will lose their connection.

Create a note from template

Create a note from template

Oops...
This template is not available.
Upgrade
All
  • All
  • Team
No template found.

Create custom template

Upgrade

Delete template

Do you really want to delete this template?
Turn this template into a regular note and keep its content, versions, and comments.

This page need refresh

You have an incompatible client version.
Refresh to update.
New version available!
See releases notes here
Refresh to enjoy new features.
Your user state has changed.
Refresh to load new user state.

Sign in

Forgot password

or

By clicking below, you agree to our terms of service.

Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
Wallet ( )
Connect another wallet

New to HackMD? Sign up

Help

  • English
  • 中文
  • Français
  • Deutsch
  • 日本語
  • Español
  • Català
  • Ελληνικά
  • Português
  • italiano
  • Türkçe
  • Русский
  • Nederlands
  • hrvatski jezik
  • język polski
  • Українська
  • हिन्दी
  • svenska
  • Esperanto
  • dansk

Documents

Help & Tutorial

How to use Book mode

How to use Slide mode

API Docs

Edit in VSCode

Install browser extension

Get in Touch

Feedback

Discord

Send us email

Resources

Releases

Pricing

Blog

Policy

Terms

Privacy

Cheatsheet

Syntax Example Reference
# Header Header 基本排版
- Unordered List
  • Unordered List
1. Ordered List
  1. Ordered List
- [ ] Todo List
  • Todo List
> Blockquote
Blockquote
**Bold font** Bold font
*Italics font* Italics font
~~Strikethrough~~ Strikethrough
19^th^ 19th
H~2~O H2O
++Inserted text++ Inserted text
==Marked text== Marked text
[link text](https:// "title") Link
![image alt](https:// "title") Image
`Code` Code 在筆記中貼入程式碼
```javascript
var i = 0;
```
var i = 0;
:smile: :smile: Emoji list
{%youtube youtube_id %} Externals
$L^aT_eX$ LaTeX
:::info
This is a alert area.
:::

This is a alert area.

Versions and GitHub Sync
Upgrade to Prime Plan

  • Edit version name
  • Delete

revision author avatar     named on  

More Less

No updates to save
Compare
    Choose a version
    No search result
    Version not found
Sign in to link this note to GitHub
Learn more
This note is not linked with GitHub
 

Feedback

Submission failed, please try again

Thanks for your support.

On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

Please give us some advice and help us improve HackMD.

 

Thanks for your feedback

Remove version name

Do you want to remove this version name and description?

Transfer ownership

Transfer to
    Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

      Link with GitHub

      Please authorize HackMD on GitHub
      • Please sign in to GitHub and install the HackMD app on your GitHub repo.
      • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
      Learn more  Sign in to GitHub

      Push the note to GitHub Push to GitHub Pull a file from GitHub

        Authorize again
       

      Choose which file to push to

      Select repo
      Refresh Authorize more repos
      Select branch
      Select file
      Select branch
      Choose version(s) to push
      • Save a new version and push
      • Choose from existing versions
      Include title and tags
      Available push count

      Upgrade

      Pull from GitHub

       
      File from GitHub
      File from HackMD

      GitHub Link Settings

      File linked

      Linked by
      File path
      Last synced branch
      Available push count

      Upgrade

      Danger Zone

      Unlink
      You will no longer receive notification when GitHub file changes after unlink.

      Syncing

      Push failed

      Push successfully