# Airflow Agent - PR: https://github.com/flyteorg/flytekit/pull/1782 ## Prerequisite - Install Airflow and Flytekit ```bash= RUN pip install apache-airflow RUN pip install google-cloud-orchestration-airflow==1.9.1 RUN pip install apache-airflow-providers-google RUN pip install jsonpickle pip install git+https://github.com/flyteorg/flytekit.git@873c60e505bb3492b5e3fa09840836b4d6775f4b#subdirectory=plugins/flytekit-airflow ``` ## Authenticating to Google Cloud Use [default credentials](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html#note-on-application-default-credentials) to connec to Google Cloud ```bash= export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://' export GOOGLE_APPLICATION_CREDENTIALS=/Users/kevin/.config/gcloud/application_default_credentials.json ``` ## Run the workflow locally ```python= from datetime import timedelta from airflow.utils import trigger_rule from flytekit import workflow from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitSparkJobOperator @workflow def wf(): create_cluster = DataprocCreateClusterOperator( task_id="create_dataproc_cluster", image_version="2.0.27-debian10", storage_bucket="opta-gcp-dogfood-gcp", master_machine_type="n1-highmem-32", master_disk_size=1024, num_workers=2, worker_machine_type="n1-highmem-64", worker_disk_size=1024, region="us-west1", cluster_name="flyte-dataproc", project_id="dogfood-gcp-dataplane", ) run_spark = DataprocSubmitSparkJobOperator( job_name="spark_pi", task_id="run_spark", dataproc_jars=["file:///usr/lib/spark/examples/jars/spark-examples.jar"], main_class="org.apache.spark.examples.JavaWordCount", arguments=["gs://opta-gcp-dogfood-gcp/spark/file.txt"], cluster_name="flyte-dataproc", region="us-west1", project_id="dogfood-gcp-dataplane", ) delete_cluster = DataprocDeleteClusterOperator( task_id="create_dataproc_cluster", project_id="dogfood-gcp-dataplane", cluster_name="flyte-dataproc", region="us-west1", retries=3, retry_delay=timedelta(minutes=5), email_on_failure=True, trigger_rule=trigger_rule.TriggerRule.ALL_DONE ) create_cluster >> run_spark >> delete_cluster if __name__ == '__main__': wf() ``` ```python= from airflow.sensors.filesystem import FileSensor from flytekit import task, workflow, ImageSpec airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow" image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw") @task(container_image=image_spec) def t1(): print("flyte") @workflow def wf(): sensor = FileSensor(task_id="id", filepath="/tmp/1234") sensor >> t1() if __name__ == '__main__': wf() ```