--- lang: ja-jp tags: Terraform, Cloud Dataflow, Apache Beam --- # Cloud Dataflow w/Terraform Apache BeamのフルマネージドサービスであるCloud Dataflowにパイプラインのテンプレートを登録し、Terraformでそれらの実行を制御する方法について学ぶ。 ## Goals - Apache Beamによるパイプライン処理の定義方法を知る - Cloud Dataflowにテンプレート化されたパイプラインを登録する方法を知る - Terraformを利用してパイプラインの実行を宣言・制御する ## Index - [Apache Beam:オンボーディング](#Apache-Beam:オンボーディング) - [Cloud Dataflow:テンプレートの作成](#Cloud-Dataflow:テンプレートの作成) - [Terraform:リソースの宣言](#Terraform:リソースの宣言) - [翻訳:google_dataflow_job](#翻訳:google_dataflow_job) - [Hands-on:AWS RDSからBigQueryへのデータ転送](#Hands-on:AWS-RDSからBigQueryへのデータ転送) - [Appendix](#Appendix) ## Apache Beam:オンボーディング [Apache Beam: Documentation](https://beam.apache.org/documentation/) [Qiita: Apache Beam (Dataflow)実践入門【Python】](https://qiita.com/esakik/items/3c5c18d4a645db7a8634) ## Cloud Dataflow:テンプレートの作成 [Google Cloud: テンプレートの作成](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates?hl=ja) テンプレートとは、パイプライン中の処理を記述したもので、パラメータを受け取ることで挙動を変更できるもの。Cloud Dataflowでは、Googleがユースケースに合わせて事前に用意しているものと、ユーザがカスタマイズしたものの両者をサポートしている。 Cloud Dataflowのテンプレートを登録するまでの一般的な流れは次の通り: 1. Java or PythonでApache Beamのパイプラインを記述する 2. パイプラインのパラメータ化を設定する 3. コマンドを実行してテンプレート化したパイプラインをCloud Storageにアップロードする ## Terraform:リソースの宣言 1. Cloud DataflowのAPIを有効化する 2. Cloud Storage上のテンプレートのパスを確認する 3. ジョブの実行リソースを宣言する ==WIP== ## 翻訳:google_dataflow_job [Terraform: google_dataflow_job](https://www.terraform.io/docs/providers/google/r/dataflow_job.html) Cloud Dataflow上にジョブを作成します。Cloud DataflowはGoogle Compute Engine上で動作するApache Beamのフルマネージドサービスです。より詳細な情報については、[Apache Beam](https://beam.apache.org/)または[Cloud Dataflow](https://cloud.google.com/dataflow?hl=ja)の公式ドキュメントを参照してください。 ### Example Usage ```hcl= resource "google_dataflow_job" "big_data_job" { name = "dataflow-job" template_gcs_path = "gs://my-bucket/templates/template_file" temp_gcs_location = "gs://my-bucket/tmp_dir" parameters = { foo = "bar" baz = "qux" } } ``` ### Note on "destroy" / "apply" Cloud Dataflowのジョブには数多くの種類が存在します。常に実行され、Cloud Storageといったストレージから新しいデータを取得し、データを継続的に出力するといったジョブを設定することが可能です。また、決められた量のデータを処理し、終了するといったジョブも設定できます。全てのジョブはプログラミングエラーまたはその他の原因により実行を中断することがあります。この点において、Cloud Dataflowのジョブはその他のTerraform・Google Cloudのリソースとは異なります。 Cloud Dataflowのリソースは「非終了状態」にある場合に「存在している」とみなされます。もし(`FAILED`、`COMPLETE`、`CANCELLED`といった)「終了状態」に到達した場合には、次回の`apply`(訳注:`terraform apply`コマンドのことを指す)のタイミングで再度作成されることになります。この挙動は継続的に実行される必要のあるジョブにとっては期待されるものですが、その他の用途でCloud Dataflowのジョブを作成しようとしているユーザにとっては驚くべきものかもしれません。 削除されるべきCloud Dataflowジョブはキャンセルされた、あるいはすでに利用が完了しているものでしょう。キャンセルされたものについては、ジョブは終了し、この場合にはすでに書き込まれたデータは残り、これから書き込まれる予定であったものについては破棄されます。もしすでに利用が完了している場合には、一切の新しいデータがパイプラインに流れ込むことはありませんが、その時点でパイプラインに残っているデータについては処理が実行されます。デフォルトの挙動は`cancel`ですが、もし`on_delete`フィールドを`drain`に設定した場合には、`terraform destroy`の実行時に(処理の完了のために)長い時間を要する可能性があります。 ### Argument Reference 次の引数がサポートされています: - `name` : (必須)リソースにつけるユニークな名称であり、Cloud Dataflowにより必須の引数です - `template_gcs_path` : (必須)Cloud Storage上のCloud Dataflowジョブのテンプレートへのパスです - `temp_gcs_location` : (必須)Cloud Dataflowのジョブが一時的にデータを書き込むCloud Storage上の書き込み可能な場所(ディレクトリ)です - - - - `parameters` : (任意)Cloud Dataflowのジョブに渡されるキー・バリューの組で、テンプレートの値を埋めるために利用されます - `labels` : (任意)ジョブに対して付与されるユーザのラベルです。キーとバリューはそれぞれ[ラベル名の制約](https://cloud.google.com/compute/docs/labeling-resources#restrictions)のページに記載されるルールに従っている必要があります。また、Googleが提供するCloud Dataflowのテンプレートには、大抵デフォルトで`goog-dataflow-provided`から始まるラベルが提供されている場合が多いです。設定の中で明示的に設定されない限り、再適用の際に差異が生じることを防ぐためにそれらラベルは無視されます - `max_workers` : (任意)ジョブの実行時に許容されるワーカーの数です。ワーカーを追加するほど処理速度は向上しますが、コストもかかるようになります - `on_delete` : (任意)"drain"または"cancel"のどちらかの値を取ります。`terraform destroy`の実行時の削除の挙動を指定するものです。上記の注釈を確認してください - `project` : (任意)リソースが紐付けられるプロジェクトです。指定されなかった場合には`provider`の設定でのプロジェクトの値が利用されます - `zone` : (任意)作成されるジョブが実行されるゾーンを指定します。指定されなかった場合には`provider`の設定でのゾーンの値が利用されます - `service_account_email` : (任意)ジョブの作成に利用されるサービスアカウントのメールアドレスです - `network` : (任意)仮想マシンに割り当てられるネットワークです。指定されない場合には"default"という値が設定されます - `subnetwork` : (任意)仮想マシンに割り当てられるサブネットワークです。`regions/REGION/subnetworks/SUBNETWORK`という形式で指定する必要があります - `machine_type` : (任意)ジョブの実行で利用されるマシンタイプです - `ip_configuration` : (任意)仮想マシンのIPアドレスの設定です。"WORKER_IP_PUBLIC"または"WORKER_IP_PRIVATE"のいずれかを指定してください ### Attributes Reference - `job_id` : ジョブのユニークな識別子です - `type` : このジョブの種類です。[JobType enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobType)のいずれかの値をとります - `state` : リソースの現時点での状態です。[JobState enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState)のいずれかの値をとります ## Hands-on:AWS RDSからBigQueryへのデータ転送 AWSアカウントとGCPのプロジェクトの立ち上げ、リソースの管理については、ここではTerraformを使用しない。 ### AWSアカウントのセットアップ [AWS:ご利用開始のためのリソースセンター](https://aws.amazon.com/jp/getting-started/)のドキュメントを参考に、AWSアカウント(GCPでいうプロジェクト)を作成する。 ここではルートユーザを利用したアカウントの作成を行った。 ### GCPプロジェクトのセットアップ 管理者権限で新期のプロジェクトを立てた。 ### AWS RDSのインスタンスの作成 `MySQL 5.7.28`のMySQL Communityエディションを無料利用枠で作成した。テストの環境であるため、データベースの設定は「簡単作成」を選択している。 DBインスタンス識別子は`cloud-dataflow-data-transfer-sample`などとした。 ### サンプルデータベースの生成 [Qiita: 【SQL】MySQL公式サンプルデータベースを使う](https://qiita.com/yukibe/items/fc6016348ecf4f3b10bf)の記事を参考に、ソースファイルをRDS上のデータベースにインポートする。 大まかな手順は以下の通り: 1. ソースファイルを[MySQL: Sakila Sample Database](https://dev.mysql.com/doc/sakila/en/)からダウンロードする 2. AWS RDSのデータベースへの接続情報を確認する 3. ローカル環境のMySQLクライアントからデータをインポートする #### 1. データのダウンロード [MySQL: Other MySQL Documentation](https://dev.mysql.com/doc/index-other.html)から`sakila database`のZIPファイルをダウンロードし、展開する。 ![](https://i.imgur.com/IeYz4bc.png) #### 2. データベースへの接続 AWS RDSの「データベース」ページから、作成したデータベースへの接続情報を確認し、MySQLクライアントを利用して接続の確認を行う。 - `$USERNAME` : 接続に利用するユーザ名 - `$PASSWORD` : ユーザのパスワード - `$HOST` : 作成したデータベースの接続エンドポイント ![](https://i.imgur.com/8H6MjaQ.png) :::warning 「簡単設定」で作成したデータベースインスタンスでは、「パブリックアクセシビリティ」が「なし」に設定されている(デフォルト値)。ローカルPCの環境からインスタンス上のデータベースにアクセスするには、この設定を「あり」にする必要がある。今回は動作確認のための設定であるが、本番環境では仮想ネットワークを介してよりセキュアな環境で接続される必要がある。 ::: ```console % mysql -u $USERNAME -h $HOST -P 3306 -p Enter password: $PASSWORD ``` #### 3. データのインポート 無事に接続が完了したら、[MySQL: Sakila Sample Database > 4 Installation](https://dev.mysql.com/doc/sakila/en/sakila-installation.html)の手順に従って、ローカルに存在するマイグレーション&シードのファイルをデータベースに反映させる。 展開されたソースファイルの構成は次の通り: ``` . └── sakila-db ├── sakila-data.sql ├── sakila-schema.sql └── sakila.mwb ``` これらのソースファイルを読み込み、データベースを作成する。 ```console mysql> SOURCE /path/to/sakila-db/sakila-schema.sql; Query OK, 0 rows affected (0.01 sec) ... mysql> SOURCE /path/to/sakila-db/sakila-data.sql; Query OK, 0 rows affected (0.23 sec) ... # 'sakila'という名称のデータベースが確認できればOK mysql> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | innodb | | mysql | | performance_schema | | sakila | | sys | +--------------------+ 6 rows in set (0.12 sec) ``` ### Apache Beam スクリプトの作成 パイプラインはJava or Golang or Pythonで記述できる。ここではPythonによるパイプラインの定義を行う。 #### 開発環境の構築 Pythonの仮想環境の構築と依存パッケージの宣言とインストールを行う。ここでは[Pipenv](https://pipenv-ja.readthedocs.io/ja/translate-ja/)を活用する。 2020年4月時点での[Apache Beam: Python SDK Quickstart](https://beam.apache.org/get-started/quickstart-py/)によると、Python 3.7までのバージョンをサポートしていると記載されているため、Pipenvで条件を満たす環境を作成する: ```console % pipenv --python 3.7 ``` Apache Beamを作成した仮想環境にインストールする(Cloud Dataflowにアップロードされることを前提としているので、`extra_requirements`として`gcp`を指定する): ```console % pipenv install apache-beam[gcp] ``` #### スクリプトの作成 ETL(Extraction, Transformation, Loading)のパイプラインを定義し、実行するスクリプトを作成する: ```python= ``` ## Appendix ### BigQuery:パーティショニング [Google Cloud: パーティション分割テーブルの概要](https://cloud.google.com/bigquery/docs/partitioned-tables?hl=ja) パーティショニングの基準は次の通り(抜粋): - 「取り込み時間」:データを入力した日時に基づいてテーブルが分割される - 「日付とタイムスタンプ」:`TIMESTAMP`または`DATE`の型を持つカラムの値に基づいてテーブルが分割される - 「整数範囲」:整数型を持つカラムの値に基づいてテーブルが分割される ### Apache Beam:Pythonの静的型付 ### Parquet:from RDS to S3 [Depelopers.IO: [新機能]RDSのスナップショットがS3にエクスポートできるようになりました。](https://dev.classmethod.jp/articles/rds-snapshot-s3-export/) ### Parquetとは?