--- tags: Data Engineering, BigQuery lang: ja-jp --- # Survey: BigQuery Partial Update ## Memo - [Diagram: Overview](https://drive.google.com/open?id=1I28auDTKeLEdQuNlj7CoWUR2ilnh5E-5) - Pipeline Overview: - Apache Beam Tasks - **A** S3に保存されているAurora DBのテーブルデータをパイプラインの読み込む - **B** 読み込んだデータをCloud Storageに書き出す - **C** データに対するマスキングといった変換処理をかける - **D** 読み込んだデータをBigQueryに(一時的なデータとして)書き出す - **E** BigQueryの一時的なテーブルと既存のテーブルを結合し更新差分を取得する([Pipeline](#Pipeline)セクションを参照:APIの仕様に応じてBigQueryに`Airflow: BigQueryCreateEmptyTable`を使って一時的に書き出すかもしれない) - **F** 更新差分を既存のテーブルに反映する([Upsert the records](#Upsert-the-records)セクションを参照) - Cloud Composer Workflow 1. [Airflow: DataflowPythonOperator](https://drive.google.com/open?id=1I28auDTKeLEdQuNlj7CoWUR2ilnh5E-5)のAPIを使用して`A`のタスクを実行する 2. `1`の完了後、`A`のタスクで読み込んだデータを対象に`B`のタスクを実行する 3. `1`の完了後、`A`のタスクで読み込んだデータを対象に`C`のタスクを実行する 4. `3`の完了後、その結果を対象に`D`のタスクを実行する 5. `4`の完了後、[Airflow: BigQueryOperator](https://drive.google.com/open?id=1I28auDTKeLEdQuNlj7CoWUR2ilnh5E-5)のAPIを使用して`E`のタスクを実行する 6. `5`の完了後、Airflow: BigQueryOperatorを使用して`F`のタスクを実行する ## Problems - 既存のテーブルを破棄し、Full Dumpしたものを新規に更新することは避けたい - データが一時的に利用不可能になる - データの転送コストが高くなる - 転送にかかる時間が長くなる ## Overview 1. updated_atが前回同期実行時よりも後のレコードについてS3(or GCS)のダンプから取得し、BigQueryに書き出す(tmp_usersとか) 2. 現在BigQueryのデータセットに書き込まれているテーブル(users)と先ほど書き出したものをJOINする 3. 記事にあるクエリを実行し、結果を書き出す(この時に再度パーティショニングできると良さそう) ### DAG ```flow st=>start: Start e=>end: End op1=>operation: Snapshot op2=>operation: Read Parquet Files op3=>operation: Extract Updated Records op4=>operation: Write to BigQuery temp tables op5=>operation: Update the table partially st->op1->op2->op3->op4->op5->e ``` ### Pipeline - スナップショットはParquet形式でバケットにFull Dumpする - Apache Beamのパイプラインで`updated_at`カラムが前回の実行時より後のレコードを抽出する - 抽出したレコードをBigQueryの`tmp`データセットあたりに書き込む - `tmp`と既存のテーブルを以下のクエリで更新差分を取得する ```sql= WITH merged_users AS ( SELECT * FROM `production.tmp_users` UNION ALL SELECT * FROM `production.users` ), partitioned_users AS ( SELECT * , ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC) AS rn FROM `merged_users` ) SELECT * EXCEPT(rn) FROM `partitioned_users` WHERE `rn` = 1; ``` ### Upsert the records [Google Cloud: MERGE ステートメント](https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax?hl=ja#merge_statement)を利用して、`INSERT`と`UPDATE`の操作をアトミックに保つ。 - 同期で新規に作成されるレコードについては`INSERT`をかける - 同期で更新される既存のレコードについては`UPDATE`をかける といった処理を記述できる。具体例としては以下のようなイメージ(テーブル内のレコードを直接更新することになる): ```sql= MERGE dataset.Inventory T USING dataset.NewArrivals S ON T.product = S.product WHEN MATCHED THEN UPDATE SET quantity = T.quantity + S.quantity WHEN NOT MATCHED THEN INSERT (product, quantity) VALUES(product, quantity) ``` - [Apache Airflow: BigQuery](https://airflow.apache.org/docs/stable/integration.html#bigquery) - [Google Cloud: クイックスタート:クライアント ライブラリの使用](https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries?hl=ja) Airflowのオペレータでは、テーブルに対する操作そのものの実装はないので、上述のDMLの実行にはPythonのBigQuery SDKを利用するのが良さそう。 ### BigQuery Table [Terraform: google_bigquery_table](https://www.terraform.io/docs/providers/google/r/bigquery_table.html) `time_partitioning`フィールドを定義する。`field`や`type`を指定する(`type`は"DAY"のみがサポートされているとのこと)。 ## Tasks - バケットとしてS3を利用するのか、Cloud Storageを利用するのか決める - いずれの場合にも、入力形式は`Parquet`にする - Terraformで`users`テーブルを宣言する - スキーマ情報とパーティショニングの設定を含める - Cloud ComposerのTerraformとの統合について調べる - Apache AirflowでCloud Dataflowを起動する方法の詳細を調べる:[link](https://airflow.apache.org/docs/stable/integration.html#cloud-dataflow) - Cloud DataflowのTerraformとの統合について調べる - Apache BeamでBigQueryにテーブルを書き出す方法について調べる:[link](https://beam.apache.org/releases/pydoc/2.19.0/apache_beam.io.gcp.bigquery.html#writing-data-to-bigquery) ## References - [Mercari Tech Blog: 数百GBのデータをMySQLからBigQueryへ同期する](https://tech.mercari.com/entry/2018/06/28/100000)
×
Sign in
Email
Password
Forgot password
or
By clicking below, you agree to our
terms of service
.
Sign in via Facebook
Sign in via Twitter
Sign in via GitHub
Sign in via Dropbox
Sign in with Wallet
Wallet (
)
Connect another wallet
New to HackMD?
Sign up