# Prefect on ECSの取扱まとめ ## 役割(どうあることになるのか) ### Agent - Agentは1つ立っていれば良い - >A single agent can manage many concurrent flow runs - the only reason to have multiple active agents is if you need to support flow runs on different deployment platforms. - → ECSのサービスとして1つ `ECSagent(…).start()` されている状態 - デプロイ方法 1. ECSAgentがスタートするDockerイメージを作成 - labelを設定する - 実行対象になるFlowは同じlabelを持たせる必要がある 3. ECRにプッシュ 4. ECSのサービスとして、上記のDockerイメージを使用したタスクを登録 - 環境変数として以下が必要 - `PREFECT__CLOUD__AGENT__AUTH_TOKEN=<RUNNER TOKEN>` - `AWS_ACCESS_KEY_ID=...` - `AWS_SECRET_ACCESS_KEY=...` - `AWS_DEFAULT_REGION=...` - ref. - [Overview | Prefect Docs](https://docs.prefect.io/orchestration/agents/overview.html#agent-types) - [ECS Agent | Prefect Docs](https://docs.prefect.io/orchestration/agents/ecs.html) ### Flow #### - ECSでのタスクとして実行される - `ECSRun(…)` でECSでのタスク定義をする - AWS Auroraとの接続をする必要がある場合、`run_task_kwards` プロパティで `subnets`,`securityGroups`を適切に設定する必要があり、`assignPublicIp` は `ENABLED` である必要がある - Flowの実行内容(コード)は、`Storage` として指定した場所に保管され、実行のタイミングで参照されることになる - 今回はDocker Storageとして、ECRにイメージとして登録する方法をとる - `flow.register()` で、上記の設定を登録する - ECRにイメージをプッシュするため、それができるAWSクレデンシャル及び認証が必要 - `aws ecr get-login-password --region region | docker login --username AWS --password-stdin aws_account_id.dkr.ecr.region.amazonaws.com` - ref. [プライベートレジストリの認証 - Amazon ECR](https://docs.aws.amazon.com/ja_jp/AmazonECR/latest/userguide/registry_auth.html) - Flowをつなげる[Running dependent flows | Prefect Docs](https://docs.prefect.io/core/idioms/flow-to-flow.html)では、一つのFlowの結果を次のFlowへパラメータを使ってつなげることはできないので、それが一つのFlowとしての粒度の基準となる #### 一つのFlowの基本的な動作(案) 1. ステータスの取得(from Aurora)(Aurora連携前は、S3上でのファイルでの管理) 1. 対象ファイルの取得(from S3) 1. Flowのメイン処理の実行 1. 対象ファイルの処理結果のアップロード(to ongoing bucket on S3) 1. ステータスの更新(to Aurora)(Aurora連携前は、S3上でのファイルでの管理) #### データ以外の依存関係を作る > **You can specify non-data dependencies with the functional API** > A common misconception is that the functional API does not allow users to specify non-data dependent tasks (Task B should run after Task A, but no data is exchanged). In fact, this is possible using the special upstream_tasks keyword argument to the task's call method. Here is an example: ```python= from prefect import task, Flow @task(name="A") def task_A(): # does something interesting and stateful return None @task(name="B") def task_B(): # also does something interesting and stateful return None with Flow("functional-example") as flow: result = task_B(upstream_tasks=[task_A]) ``` https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows ### ECSまわりのロールについて - Fargateで設定するロールは2種類 - Task Execution Role - [Amazon ECS タスク実行 IAM ロール - Amazon Elastic Container Service](https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/task_execution_IAM_role.html) - > ECS上でコンテナを実行するために必要な操作を行うためのIAM Role - Task Role - [タスク用の IAM ロール - Amazon Elastic Container Service](https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/task-iam-roles.html) - > ECS上で稼働するコンテナアプリケーションからAWSサービスを利用する際に利用するIAM Role - ref. [【ECS】ECSに関するIAM Roleを整理する【AWS】 - Qiita](https://qiita.com/tmiki/items/25473b8975f8a1095c0a) ## 開発手順 ### Agent - etl/agent/ - ファイル構成 - Dockerfile - `ecs_agent.py` を実行する - ecs_agent.py - `ECSAgent(…).start()` の記述 - 基本的に変更はないものと思う - デプロイ - 手動 1. Dockerイメージのビルド 2. ECRへのプッシュ 3. ECSのサービスへの登録 - CI/CD - 未定 ### Flow - etl/ - ファイル構成 - 各フロー毎にさらにフォルダを分ける - Flow(=ECSタスク)として使用するDockerイメージは必要十分なもので構成される - Dockerリポジトリの数がFlowの数だけ必要になってしまう - 20210407時点では、同じリポジトリにFlow毎のタグをつけることでタスク実行時に使用するイメージを取得させている - Flow毎にイメージのタグを分けるというのは、ECRの運用としてアンチパターンか? - もしくは、[Multi Flow Storage](https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html) でいくつかのFlowはまとめる? - 20210407時点では未採用 - その場合の仕分けはどうするか? - flow_observer.py - 各Flowをつなげたdata-pipelineのメインとなるフロー - `flow_copy_raw_data` によってコピーされたファイルのリストに対して、Prefectの [Mapping](https://docs.prefect.io/orchestration/flow_config/run_configs.html)を利用することで、ファイルごとにパラレルに処理を実行していく - flow_copy_raw_data.py - S3の`raw-data-bucket` から、`ongoing-data-bucket` へデータをコピーするフロー - ongoingにコピー時に、~~ファイル名~~ UUIDを生成し、UUIDを使ったファイルのリネームとフォルダ生成をして格納する - flow_pptx_convert.py - PPT,PPTXファイルをPDFファイルに変換する - flow_pdf_convert.py - PDFファイルをPNGイメージに変換する - flow_ocr.py - イメージファイルをOCRにかけて、テキスト化する - (flow_transcribe.py) - 動画ファイルをテキスト化する - flow_nlp.py - テキストファイルに自然言語処理を行なう - (flow_upsert_to_db.py) - 検証1:[MySQL Tasks | Prefect Docs](https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute)からのAWS Auroraへのupsert処理を行なう(SQLでの直接操作) - 接続情報はAWS Secrets Managerに格納済 - 検証2:DjangoへGraphQLでリクエストをし、Djangoからupsertしてもらう - (flow_compless_and_output.py) - 指定したフォルダをzipにして、`output-data-bucket` へ格納する - 開発・デプロイの手順 - 手動 1. `flow_….py` で各タスクを記述 - `@task` でタスクの記述 - `with Flow(…) as flow` でフローの記述 - `storage=` : 実行時にコードを参照する場所の指定 - `run_config=` : 実行環境およびAgentの指定 - `flow.register(…)` でフローの登録 2. `flow_….py` の実行 - この時、`flow.register(…)` のみが実行されることになる - = タスク・フローを実行するのではなく、登録する - `flow.register(…)` を実行する環境で、Prefectのバージョンは最新のものにしておく - なぜなら、Agentの方がECSサービスで常に最新のものに保たれており、バージョンを合わせるため - ref. [Issue with upgrading to 0.14.15 using Docker storage · Issue #4385 · PrefectHQ/prefect](https://github.com/PrefectHQ/prefect/issues/4385) - storageにDockerを指定している場合、ECRへの登録も自動的に行なう - そのためのAWSクレデンシャルや、Dockerコマンドを実行できる環境が必要 - → CI/CD ではどうするか - CI/CD - 未定 ### ネットワーク環境 #### AWS VPC - securityGroups - inbound - SSH: 22 が必要 - Prefect Cloudとの通信 - ポートが空いてないと、Prefect CloudはAWS ECSからのレス待ちの状態のまま変化なしが続き、AWS ECSではTaskが起動すらしない - HTTPS: 443 が必要 - ECRからのイメージのPull - ポートが空いてないと以下になる > 停止理由 ResourceInitializationError: unable to pull secrets or registry auth: pull command failed: : signal: killed - outbount - すべてのトラフィック - endpoints - 下記、適切なsubnets, security groupと接続しておく - com.amazonaws.ap-northeast-1.s3 (type: interface) - com.amazonaws.ap-northeast-1.ecr.dkr (type: interface) - com.amazonaws.ap-northeast-1.ecr.api (type: interface) - com.amazonaws.ap-northeast-1.logs (type: interface) #### Agent - subnets - privateのもので動作確認OK - securityGroups - AWS VPCでinboundを限定したもので動作確認OK - assignPublicIp - ENABLED: OK - AWS VCP - サブネット - パブリックIPv4アドレスの自動割り当て - いいえ の設定を変更してしまうことはない - DISABLE: NG - Prefect Cloudとの通信ができなくなる #### refs. - [Fargate Agent (Deprecated) | Prefect Docs](https://docs.prefect.io/orchestration/agents/fargate.html) > Outbound Traffic If you encounter issues with Fargate raising errors in cases of client timeouts or inability to pull containers then you may need to adjust your networkConfiguration. Visit this discussion thread for more information on configuring AWS security groups. - [Fargate: CannotPullContainer located on ECS registry · Issue #1128 · aws/amazon-ecs-agent](https://github.com/aws/amazon-ecs-agent/issues/1128) ### Schedulesについて [Schedules | Prefect Docs](https://docs.prefect.io/core/concepts/schedules.html#overview) - Flowのコードに上記のマニュアル通りに記述し、`flow.register()`がなされることで、登録完了後即時反映されることを確認した - コード上での記載のほか、https://cloud.prefect.io/ 上でのWebUIからの設定でも設定反映できることを確認した ### Secret(クレデンシャル)情報の扱いについて - 本件では、AWS Secrets Managerでの運用によせる - 手動でのデプロイ時には、ローカル環境にAWSクレデンシャルが登録されていればそれで良い - CI/CD時には、CI/CDでの環境変数にAWSクレデンシャルとして`Access key ID`、`Secret access key`および`Default region`を設定しておく - ECRへのプッシュがあるので、Secrets Managerへのアクセス権だと不十分 ### 課題 #### フローが開始されたのち、途中で止める方法がない - フローの開始トリガーをどうするか - 途中で止める方法をどうするか(解決済: 20210408) - → 人力での確認作業が途中に入るので、Rawデータのアップロードから一度も止まることなしにタグのDB登録反映まで行くことはありえない - → つまり、flowがそのような設計になるはずなので、自ずと途中途中で処理は止まることなる - 上記を含めて、運用方法の検討が必要 - → スプリント8以降?