Airflow Agent

Prerequisite

  • Install Airflow and Flytekit
RUN pip install apache-airflow RUN pip install google-cloud-orchestration-airflow==1.9.1 RUN pip install apache-airflow-providers-google RUN pip install jsonpickle pip install git+https://github.com/flyteorg/flytekit.git@873c60e505bb3492b5e3fa09840836b4d6775f4b#subdirectory=plugins/flytekit-airflow

Authenticating to Google Cloud

Use default credentials to connec to Google Cloud

export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://' export GOOGLE_APPLICATION_CREDENTIALS=/Users/kevin/.config/gcloud/application_default_credentials.json

Run the workflow locally

from datetime import timedelta from airflow.utils import trigger_rule from flytekit import workflow from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitSparkJobOperator @workflow def wf(): create_cluster = DataprocCreateClusterOperator( task_id="create_dataproc_cluster", image_version="2.0.27-debian10", storage_bucket="opta-gcp-dogfood-gcp", master_machine_type="n1-highmem-32", master_disk_size=1024, num_workers=2, worker_machine_type="n1-highmem-64", worker_disk_size=1024, region="us-west1", cluster_name="flyte-dataproc", project_id="dogfood-gcp-dataplane", ) run_spark = DataprocSubmitSparkJobOperator( job_name="spark_pi", task_id="run_spark", dataproc_jars=["file:///usr/lib/spark/examples/jars/spark-examples.jar"], main_class="org.apache.spark.examples.JavaWordCount", arguments=["gs://opta-gcp-dogfood-gcp/spark/file.txt"], cluster_name="flyte-dataproc", region="us-west1", project_id="dogfood-gcp-dataplane", ) delete_cluster = DataprocDeleteClusterOperator( task_id="create_dataproc_cluster", project_id="dogfood-gcp-dataplane", cluster_name="flyte-dataproc", region="us-west1", retries=3, retry_delay=timedelta(minutes=5), email_on_failure=True, trigger_rule=trigger_rule.TriggerRule.ALL_DONE ) create_cluster >> run_spark >> delete_cluster if __name__ == '__main__': wf()
from airflow.sensors.filesystem import FileSensor from flytekit import task, workflow, ImageSpec airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow" image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw") @task(container_image=image_spec) def t1(): print("flyte") @workflow def wf(): sensor = FileSensor(task_id="id", filepath="/tmp/1234") sensor >> t1() if __name__ == '__main__': wf()