# Efficient plugin development with Flyte Agents
## Outline
- Why Plugins in Flyte?
- How flyteplugins work?
- How flyteplugins agent work?
- How to setup Agent Service in Development Mode?
- How to Develop your agent efficiently?
## Why plugins in Flyte?
Flyte backend plugins enable a more streamlined worflow lifecycle where Tasks that invoke specific APIs/modules served by a plugin, won't spin up a new Pod but will execute the task in the external system, potentially reducing the overall workflow execution time and saving compute resources.
Without backend plugins, every task's lifecycle will be like this:
![image.png](https://hackmd.io/_uploads/B1XLiwUXa.png)
With the use of backend `flyteplugins` and depending on the Task type, the workflow lifecycle is reduced to this:
![image.png](https://hackmd.io/_uploads/SJGPoP8XT.png)
## How flyteplugins work?
The backend `flyteplugins` are a part of `flytepropeller`, Flyte's K8s operator. In consequence, the overhead of executing Tasks using `flyteplugins` will be caused at the `flytepropeller` level.
Here is a picture about how it works:
![image.png](https://hackmd.io/_uploads/HJWsZGImp.png)
Before the Agent Framework was added to Flyte, developers had to do the following to create flyteplugins:
1. Create a plugin in the `flytekit/plugins` directory. This is written in Python.
2. Specify the task config in (1), which will be passed to task metadata for `flytepropeller`.
3. Write the plugin handler in the `flyteplugins` repo. This is written in Go.
`flyteplugins` is an amazing mechanism, which can create lots of posiibilities.
However, it's not friendly to most of data scientists and ML engineers, since they have to learn how to write Golang code, and even study the `flytepropeller` architecture, which is time-consuming and hard.
## How Agents work?
Flyte recently introduced an Agent framework designed to simplify the development process and make it more friendly.
The Agent server is a stateless component, written in Python, bringing two main benefits:
1. The overhead of executing plugin task is transfered to the Agent server. Now `flytepropeller` only has to monitor the status of the task, consuming less resources.
2. Plugins become far easier to develop because now you can:
- Develop them with Python
- Test them locally
![image.png](https://hackmd.io/_uploads/By9RmzU7a.png)
Also, for scalability, we can have multiple Agent servers to handle higher levels of concurrent requests:
![image.png](https://hackmd.io/_uploads/r1vp7fLQT.png)
Learn more about the Agent framework in the [Flyte docs](https://docs.flyte.org/projects/cookbook/en/stable/auto_examples/development_lifecycle/agent_service.html).
## How to setup Agent Service in Development Mode
Let's develop a plugin with the powerful Agent service!
> NOTE: In this section, I make use of a local development environment. Learn more about the [sandbox cluster](https://docs.flyte.org/en/latest/deployment/deployment/sandbox.html), instrumented by `flytectl demo`, and how to [setup your own dev environment](https://docs.flyte.org/en/latest/community/contribute.html#development-environment-setup-guide).
1. Use the dev mode through `flytectl`:
```bash
flytectl demo start --dev
```
2. Start the Agent gRPC server:
```bash
pyflyte serve
```
3. Set the config in the yaml file (Bigquery for example)
```bash
cd flyte
vim ./flyte-single-binary-local-dev.yaml
```
```yaml
tasks:
task-plugins:
enabled-plugins:
- agent-service
- container
- sidecar
- K8S-ARRAY
default-for-task-types:
- bigquery_query_job_task: agent-service
- container: container
- container_array: K8S-ARRAY
```
```yaml
plugins:
# Registered Task Types
agent-service:
supportedTaskTypes:
- bigquery_query_job_task
defaultAgent:
endpoint: "dns:///localhost:8000" # your grpc agent server port
insecure: true
timeouts:
GetTask: 100s
defaultTimeout: 10s
```
4. Start the Flyte server with the config YAML file:
```bash
flyte start --config ./flyte-single-binary-local-dev.yaml
```
## How to Develop your agent efficiently?
Getting started with the development of an agent can be challenging, so I recommend splitting the process step by step.
Here's a recommended process:
1. Study how `task.py` in the particular plugin, works to pass parameters to Agent
2. Study how local Agent works
3. Learn how to test it efficiently in remote environment
### How task.py passes parameters to Agent
Take BigQuery plugin as an example again.
The [get_custom](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py#L70-L79) function will create a dictionary object in [task.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py). Then, it will be passed to [task_template.custom](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L70-L72) in [agent.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py).
Here's a more comprehensive diagram about how parameters pass to `agent` from `task.py`.
![image.png](https://hackmd.io/_uploads/rktJxu87a.png)
### How local Agent works
The core of the base Agent logic is [here](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L168-L181)
Let's take BigQuery as example again.
```python=
DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)]
bigquery_task_templatized_query = BigQueryTask(
name="sql.bigquery.w_io",
# Define inputs as well as their types that can be used to customize the query.
inputs=kwtypes(version=int),
output_structured_dataset_type=DogeCoinDataset,
task_config=BigQueryConfig(ProjectID="flyte"),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
)
@task
def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame:
return sd.open(pd.DataFrame).all()
@workflow
def full_bigquery_wf(version: int) -> pd.DataFrame:
sd = bigquery_task_templatized_query(version=version)
return convert_bq_table_to_pandas_dataframe(sd=sd)
if __name__ == "__main__":
full_bigquery_wf(version=1)
```
To test your BigQuery code locally, execute:
```bash
python bigquery example.py
```
This will process the code through the local agent using the [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L158-L167) class, and use the code [agent.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L48-L114).
Before moving to a remote environment, I strongly advise testing your code locally.
### How to test it efficiently in a remote environment
To execute your agent remotely, use:
`pyflyte run --remote` or `pyflyte register`
I recommend starting by registering and invoking your agent tasks in FlyteConsole (Flyte's UI).
This initial step doesn’t require building an image, as you’ll only be invoking the web API, without initiating a Pod for task execution.
Take ChatGPT plugin for example.
You can write the code like this to invoke the task.:
```python
from flytekit import task, workflow, ImageSpec
from flytekitplugins.chatgpt import ChatGPTTask
chatgpt_job = ChatGPTTask(
name="chatgpt",
config={
"openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7",
"chatgpt_conf": {
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
},
)
```
First, let's register the workflow and task to the Flyte console.
```bash
pyflyte register chatgpt_example.py
```
Example output:
```
Loading packages ['chatgpt_example'] under source root /mnt/c/code/dev/example/plugins
Successfully serialized 1 flyte objects
[✔] Registration chatgpt type TASK successful with version iopF5N9M7ABb6gEe3Chpsg==
Successfully registered 1 entities
```
Now, you can invoke the task in the Flyte console!
![image.png](https://hackmd.io/_uploads/SJgd0MIQp.png)
Ultimately, you must test it with a simple workflow and define the image to mock the real world use case.
There are two methods for this: using ImageSpec or a Dockerfile.
This [PR](https://github.com/flyteorg/flytekit/pull/1822) provides guidance on both approaches.
Here's a brief example:
```python
@task()
def t1(s: str) -> str:
s = "Repsonse: " + s
return s
@workflow
def wf() -> str:
message = chatgpt_job(message="hi")
return t1(s=message)
if __name__ == "__main__":
print(wf())
```
## Additional Resources
If you want to learn more about ``flyteplugins`` and the Agent framework, refer to these resources:
1. Flyte School: Enrich your AI pipelines - A deep dive into Flyte plugins
https://www.youtube.com/watch?v=ah8Q5mSeikE&t=1145s&ab_channel=Union-ai
2. Writing Agents in Python
https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/development_lifecycle/agent_service.html
3. Demystifying Flyte Agents
https://youtu.be/nD98GQ-pyAE?si=mI_s7DG8LBMt92Zp
## Special Thanks
- Kevin Su, an amazing Flyte maintainer and a good mentor, he taught me lots of things about Flyte, helping me go through the process, from zero to hero in Flyte!
- Yi Cheng Lu, spent times discussing flyteplugins, flytepropeller and agent with me, which helped me have a deeper understanding.
- Da Yi Wu, asked me how flyteplugins and Agent works, which gave me a chance to practice and explain how it works.
- David Espejo, the reviewer of this article, gave me tons of awesome advice, I added lots of diagram because of him.