# Airflow Agent
- PR: https://github.com/flyteorg/flytekit/pull/1782
## Prerequisite
- Install Airflow and Flytekit
```bash=
RUN pip install apache-airflow
RUN pip install google-cloud-orchestration-airflow==1.9.1
RUN pip install apache-airflow-providers-google
RUN pip install jsonpickle
pip install git+https://github.com/flyteorg/flytekit.git@873c60e505bb3492b5e3fa09840836b4d6775f4b#subdirectory=plugins/flytekit-airflow
```
## Authenticating to Google Cloud
Use [default credentials](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html#note-on-application-default-credentials) to connec to Google Cloud
```bash=
export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://'
export GOOGLE_APPLICATION_CREDENTIALS=/Users/kevin/.config/gcloud/application_default_credentials.json
```
## Run the workflow locally
```python=
from datetime import timedelta
from airflow.utils import trigger_rule
from flytekit import workflow
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitSparkJobOperator
@workflow
def wf():
create_cluster = DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
image_version="2.0.27-debian10",
storage_bucket="opta-gcp-dogfood-gcp",
master_machine_type="n1-highmem-32",
master_disk_size=1024,
num_workers=2,
worker_machine_type="n1-highmem-64",
worker_disk_size=1024,
region="us-west1",
cluster_name="flyte-dataproc",
project_id="dogfood-gcp-dataplane",
)
run_spark = DataprocSubmitSparkJobOperator(
job_name="spark_pi",
task_id="run_spark",
dataproc_jars=["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
main_class="org.apache.spark.examples.JavaWordCount",
arguments=["gs://opta-gcp-dogfood-gcp/spark/file.txt"],
cluster_name="flyte-dataproc",
region="us-west1",
project_id="dogfood-gcp-dataplane",
)
delete_cluster = DataprocDeleteClusterOperator(
task_id="create_dataproc_cluster",
project_id="dogfood-gcp-dataplane",
cluster_name="flyte-dataproc",
region="us-west1",
retries=3,
retry_delay=timedelta(minutes=5),
email_on_failure=True,
trigger_rule=trigger_rule.TriggerRule.ALL_DONE
)
create_cluster >> run_spark >> delete_cluster
if __name__ == '__main__':
wf()
```
```python=
from airflow.sensors.filesystem import FileSensor
from flytekit import task, workflow, ImageSpec
airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")
@task(container_image=image_spec)
def t1():
print("flyte")
@workflow
def wf():
sensor = FileSensor(task_id="id", filepath="/tmp/1234")
sensor >> t1()
if __name__ == '__main__':
wf()
```