# Efficient plugin development with Flyte Agents ## Outline - Why Plugins in Flyte? - How flyteplugins work? - How flyteplugins agent work? - How to setup Agent Service in Development Mode? - How to Develop your agent efficiently? ## Why plugins in Flyte? Flyte backend plugins enable a more streamlined worflow lifecycle where Tasks that invoke specific APIs/modules served by a plugin, won't spin up a new Pod but will execute the task in the external system, potentially reducing the overall workflow execution time and saving compute resources. Without backend plugins, every task's lifecycle will be like this: ![image.png](https://hackmd.io/_uploads/B1XLiwUXa.png) With the use of backend `flyteplugins` and depending on the Task type, the workflow lifecycle is reduced to this: ![image.png](https://hackmd.io/_uploads/SJGPoP8XT.png) ## How flyteplugins work? The backend `flyteplugins` are a part of `flytepropeller`, Flyte's K8s operator. In consequence, the overhead of executing Tasks using `flyteplugins` will be caused at the `flytepropeller` level. Here is a picture about how it works: ![image.png](https://hackmd.io/_uploads/HJWsZGImp.png) Before the Agent Framework was added to Flyte, developers had to do the following to create flyteplugins: 1. Create a plugin in the `flytekit/plugins` directory. This is written in Python. 2. Specify the task config in (1), which will be passed to task metadata for `flytepropeller`. 3. Write the plugin handler in the `flyteplugins` repo. This is written in Go. `flyteplugins` is an amazing mechanism, which can create lots of posiibilities. However, it's not friendly to most of data scientists and ML engineers, since they have to learn how to write Golang code, and even study the `flytepropeller` architecture, which is time-consuming and hard. ## How Agents work? Flyte recently introduced an Agent framework designed to simplify the development process and make it more friendly. The Agent server is a stateless component, written in Python, bringing two main benefits: 1. The overhead of executing plugin task is transfered to the Agent server. Now `flytepropeller` only has to monitor the status of the task, consuming less resources. 2. Plugins become far easier to develop because now you can: - Develop them with Python - Test them locally ![image.png](https://hackmd.io/_uploads/By9RmzU7a.png) Also, for scalability, we can have multiple Agent servers to handle higher levels of concurrent requests: ![image.png](https://hackmd.io/_uploads/r1vp7fLQT.png) Learn more about the Agent framework in the [Flyte docs](https://docs.flyte.org/projects/cookbook/en/stable/auto_examples/development_lifecycle/agent_service.html). ## How to setup Agent Service in Development Mode Let's develop a plugin with the powerful Agent service! > NOTE: In this section, I make use of a local development environment. Learn more about the [sandbox cluster](https://docs.flyte.org/en/latest/deployment/deployment/sandbox.html), instrumented by `flytectl demo`, and how to [setup your own dev environment](https://docs.flyte.org/en/latest/community/contribute.html#development-environment-setup-guide). 1. Use the dev mode through `flytectl`: ```bash flytectl demo start --dev ``` 2. Start the Agent gRPC server: ```bash pyflyte serve ``` 3. Set the config in the yaml file (Bigquery for example) ```bash cd flyte vim ./flyte-single-binary-local-dev.yaml ``` ```yaml tasks: task-plugins: enabled-plugins: - agent-service - container - sidecar - K8S-ARRAY default-for-task-types: - bigquery_query_job_task: agent-service - container: container - container_array: K8S-ARRAY ``` ```yaml plugins: # Registered Task Types agent-service: supportedTaskTypes: - bigquery_query_job_task defaultAgent: endpoint: "dns:///localhost:8000" # your grpc agent server port insecure: true timeouts: GetTask: 100s defaultTimeout: 10s ``` 4. Start the Flyte server with the config YAML file: ```bash flyte start --config ./flyte-single-binary-local-dev.yaml ``` ## How to Develop your agent efficiently? Getting started with the development of an agent can be challenging, so I recommend splitting the process step by step. Here's a recommended process: 1. Study how `task.py` in the particular plugin, works to pass parameters to Agent 2. Study how local Agent works 3. Learn how to test it efficiently in remote environment ### How task.py passes parameters to Agent Take BigQuery plugin as an example again. The [get_custom](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py#L70-L79) function will create a dictionary object in [task.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py). Then, it will be passed to [task_template.custom](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L70-L72) in [agent.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py). Here's a more comprehensive diagram about how parameters pass to `agent` from `task.py`. ![image.png](https://hackmd.io/_uploads/rktJxu87a.png) ### How local Agent works The core of the base Agent logic is [here](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L168-L181) Let's take BigQuery as example again. ```python= DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)] bigquery_task_templatized_query = BigQueryTask( name="sql.bigquery.w_io", # Define inputs as well as their types that can be used to customize the query. inputs=kwtypes(version=int), output_structured_dataset_type=DogeCoinDataset, task_config=BigQueryConfig(ProjectID="flyte"), query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;", ) @task def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame: return sd.open(pd.DataFrame).all() @workflow def full_bigquery_wf(version: int) -> pd.DataFrame: sd = bigquery_task_templatized_query(version=version) return convert_bq_table_to_pandas_dataframe(sd=sd) if __name__ == "__main__": full_bigquery_wf(version=1) ``` To test your BigQuery code locally, execute: ```bash python bigquery example.py ``` This will process the code through the local agent using the [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L158-L167) class, and use the code [agent.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L48-L114). Before moving to a remote environment, I strongly advise testing your code locally. ### How to test it efficiently in a remote environment To execute your agent remotely, use: `pyflyte run --remote` or `pyflyte register` I recommend starting by registering and invoking your agent tasks in FlyteConsole (Flyte's UI). This initial step doesn’t require building an image, as you’ll only be invoking the web API, without initiating a Pod for task execution. Take ChatGPT plugin for example. You can write the code like this to invoke the task.: ```python from flytekit import task, workflow, ImageSpec from flytekitplugins.chatgpt import ChatGPTTask chatgpt_job = ChatGPTTask( name="chatgpt", config={ "openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7", "chatgpt_conf": { "model": "gpt-3.5-turbo", "temperature": 0.7, }, }, ) ``` First, let's register the workflow and task to the Flyte console. ```bash pyflyte register chatgpt_example.py ``` Example output: ``` Loading packages ['chatgpt_example'] under source root /mnt/c/code/dev/example/plugins Successfully serialized 1 flyte objects [✔] Registration chatgpt type TASK successful with version iopF5N9M7ABb6gEe3Chpsg== Successfully registered 1 entities ``` Now, you can invoke the task in the Flyte console! ![image.png](https://hackmd.io/_uploads/SJgd0MIQp.png) Ultimately, you must test it with a simple workflow and define the image to mock the real world use case. There are two methods for this: using ImageSpec or a Dockerfile. This [PR](https://github.com/flyteorg/flytekit/pull/1822) provides guidance on both approaches. Here's a brief example: ```python @task() def t1(s: str) -> str: s = "Repsonse: " + s return s @workflow def wf() -> str: message = chatgpt_job(message="hi") return t1(s=message) if __name__ == "__main__": print(wf()) ``` ## Additional Resources If you want to learn more about ``flyteplugins`` and the Agent framework, refer to these resources: 1. Flyte School: Enrich your AI pipelines - A deep dive into Flyte plugins https://www.youtube.com/watch?v=ah8Q5mSeikE&t=1145s&ab_channel=Union-ai 2. Writing Agents in Python https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/development_lifecycle/agent_service.html 3. Demystifying Flyte Agents https://youtu.be/nD98GQ-pyAE?si=mI_s7DG8LBMt92Zp ## Special Thanks - Kevin Su, an amazing Flyte maintainer and a good mentor, he taught me lots of things about Flyte, helping me go through the process, from zero to hero in Flyte! - Yi Cheng Lu, spent times discussing flyteplugins, flytepropeller and agent with me, which helped me have a deeper understanding. - Da Yi Wu, asked me how flyteplugins and Agent works, which gave me a chance to practice and explain how it works. - David Espejo, the reviewer of this article, gave me tons of awesome advice, I added lots of diagram because of him.