owned this note
owned this note
Published
Linked with GitHub
# How To Develop Agent Plugin Service?
## Outline
- Why Plugin Service but not write the service within a task?
- Why Agent Plugin Service?
- How Agent Plugin Service works?
- How to setup Agent Service in Development Mode?
- How to Develop your agent efficiently?
## Why Plugin Service but not write the service within a task?
Flyte can execute external web APIs in two ways: by writing the service within a task or by utilizing FlytePlugins.
### Writing the service within a task
Suppose you have a workflow that requires the integration of a web API, like ChatGPT.
You might have a Python code similar to the one below:
```python
@task()
def t1(input: str) -> str:
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": input}
]
)
return completion.choices[0].message
@workflow
def wf() -> str:
return t1(input="Your Input Message!")
```
Here is how the task's lifecycle unfolds:
1. FlytePropeller initiates a pod to execute the task.
2. The task, running within the pod, calls the ChatGPT API.
3. After the task is completed, FlytePropeller terminates the pod.
This process can be resource-intensive and also time-consuming, as initiating and terminating pods for each task execution consumes additional resources and needs much time.
### Utilizing FlytePlugins
Let's analyze the example above and compare it to the code provided [here](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/bigquery_plugin/bigquery.html).
```python
bigquery_task_no_io = BigQueryTask(
name="sql.bigquery.no_io",
inputs={},
query_template="SELECT 1",
task_config=BigQueryConfig(ProjectID="flyte"),
)
@workflow
def no_io_wf():
return bigquery_task_no_io()
```
In this example, the lifecycle of the bigquery task progresses as follows:
1. FlytePlugins invokes the BigQuery API, as seen [here](https://github.com/flyteorg/flyte/tree/master/flyteplugins/go/tasks/plugins/webapi/bigquery).
This approach is notably quicker and more resource-efficient.
## Why Agent Plugin Service
Previously, Flyte utilized FlytePropeller to interface with WebAPI. In this setup, FlytePropeller would employ the [“invokePlugin”](https://github.com/flyteorg/flyte/blob/0ca2d22b5f3abdf491944b67a249108a6cb1e343/flytepropeller/pkg/controller/nodes/task/handler.go#L375) function to engage FlytePlugins’s WebAPI.
To execute jobs on platforms like Google BigQuery, Databricks Spark, or AWS Batch using WebAPI, it was required to embed the code within [FlytePlugins](https://github.com/flyteorg/flyte/tree/master/flyteplugins/go/tasks/plugins/webapi).
This was a challenge for many of Flyte’s contributors, as the majority are more accustomed to Python than Golang.
Recognizing this challenge, we introduced a new solution: an agent service that allows the creation of Python plugins to interact with WebAPI. This new approach deviates significantly from the previous plugin mechanism.
The agent service is exclusively Python-based, enabling a more efficient implementation process and offering a more user-friendly experience for those proficient in Python.
## How Agent Plugin Service works?
Compare to the bigquery plugin example above.
Here's how the bigquery agent service work.
1. FlytePlugins send grpc request to the agent server.
2. Agent server return the query data.
You can find that it is a little bit slower than the plugin example above.
But it is far more easy to be implemented and still faster than writing the service in the task.
## How to setup Agent Service in Development Mode?
Let's develop the powerful agent service!
1. Use the dev mode through flytectl.
```bash
flytectl demo start --dev
```
2. Start the agent grpc server.
```bash
pyflyte serve agent
```
3. Set the config in the yaml file (Bigquery for example)
```bash
cd flyte
vim ./flyte-single-binary-local-dev.yaml
```
```yaml
tasks:
task-plugins:
enabled-plugins:
- agent-service
- container
- sidecar
- K8S-ARRAY
default-for-task-types:
- bigquery_query_job_task: agent-service
- container: container
- container_array: K8S-ARRAY
```
```yaml
plugins:
# Registered Task Types
agent-service:
supportedTaskTypes:
- bigquery_query_job_task
defaultAgent:
endpoint: "dns:///localhost:8000" # your grpc agent server port
insecure: true
timeouts:
GetTask: 100s
defaultTimeout: 10s
```
4. Start the Flyte server with config yaml file
```bash
./flyte start --config ./flyte-single-binary-local-dev.yaml
```
## How to Develop your agent efficiently?
Starting with the development of an agent can be challenging, so I recommend testing it locally first.
You can reference this code to understand how the local agent operates, using BigQuery as an example.
You can see the code [here](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L168-L181) to know how local agent works.
Let's take BigQuery as example again.
```python=
DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)]
bigquery_task_templatized_query = BigQueryTask(
name="sql.bigquery.w_io",
# Define inputs as well as their types that can be used to customize the query.
inputs=kwtypes(version=int),
output_structured_dataset_type=DogeCoinDataset,
task_config=BigQueryConfig(ProjectID="flyte"),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
)
@task
def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame:
return sd.open(pd.DataFrame).all()
@workflow
def full_bigquery_wf(version: int) -> pd.DataFrame:
sd = bigquery_task_templatized_query(version=version)
return convert_bq_table_to_pandas_dataframe(sd=sd)
```
To test your BigQuery code locally, execute:
```
python bigquery example.py
```
This will process the code through the local agent using the [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L158-L167) class, and use the code [agent.py](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L48-L114).
Before moving to a remote environment, I strongly advise testing your code locally.
To execute your agent remotely, use:
`pyflyte run --remote` or `pyflyte register`
I recommend starting by registering and invoking your agent tasks in FlyteConsole.
This initial step doesn’t require building an image, as you’ll only be invoking the web API, without initiating a pod for task execution.
Ultimately, you must test it with additional tasks and define the image.
There are two methods for this: using ImageSpec or a Dockerfile.
This [PR](https://github.com/flyteorg/flyte/pull/4168) provides guidance on both approaches.