Try   HackMD

Efficient plugin development with Flyte Agents

Outline

  • Why Plugins in Flyte?
  • How flyteplugins work?
  • How flyteplugins agent work?
  • How to setup Agent Service in Development Mode?
  • How to Develop your agent efficiently?

Why plugins in Flyte?

Flyte backend plugins enable a more streamlined worflow lifecycle where Tasks that invoke specific APIs/modules served by a plugin, won't spin up a new Pod but will execute the task in the external system, potentially reducing the overall workflow execution time and saving compute resources.

Without backend plugins, every task's lifecycle will be like this:
image.png

With the use of backend flyteplugins and depending on the Task type, the workflow lifecycle is reduced to this:
image.png

How flyteplugins work?

The backend flyteplugins are a part of flytepropeller, Flyte's K8s operator. In consequence, the overhead of executing Tasks using flyteplugins will be caused at the flytepropeller level.

Here is a picture about how it works:
image.png

Before the Agent Framework was added to Flyte, developers had to do the following to create flyteplugins:

  1. Create a plugin in the flytekit/plugins directory. This is written in Python.
  2. Specify the task config in (1), which will be passed to task metadata for flytepropeller.
  3. Write the plugin handler in the flyteplugins repo. This is written in Go.

flyteplugins is an amazing mechanism, which can create lots of posiibilities.
However, it's not friendly to most of data scientists and ML engineers, since they have to learn how to write Golang code, and even study the flytepropeller architecture, which is time-consuming and hard.

How Agents work?

Flyte recently introduced an Agent framework designed to simplify the development process and make it more friendly.
The Agent server is a stateless component, written in Python, bringing two main benefits:

  1. The overhead of executing plugin task is transfered to the Agent server. Now flytepropeller only has to monitor the status of the task, consuming less resources.
  2. Plugins become far easier to develop because now you can:
  • Develop them with Python
  • Test them locally

image.png

Also, for scalability, we can have multiple Agent servers to handle higher levels of concurrent requests:
image.png

Learn more about the Agent framework in the Flyte docs.

How to setup Agent Service in Development Mode

Let's develop a plugin with the powerful Agent service!

NOTE: In this section, I make use of a local development environment. Learn more about the sandbox cluster, instrumented by flytectl demo, and how to setup your own dev environment.

  1. Use the dev mode through flytectl:
flytectl demo start --dev
  1. Start the Agent gRPC server:
pyflyte serve
  1. Set the config in the yaml file (Bigquery for example)
cd flyte
vim ./flyte-single-binary-local-dev.yaml
tasks:
  task-plugins:
    enabled-plugins:
      - agent-service
      - container
      - sidecar
      - K8S-ARRAY
    default-for-task-types:
      - bigquery_query_job_task: agent-service
      - container: container
      - container_array: K8S-ARRAY
plugins:
  # Registered Task Types
  agent-service:
    supportedTaskTypes:
      - bigquery_query_job_task
    defaultAgent:
      endpoint: "dns:///localhost:8000" # your grpc agent server port
      insecure: true
      timeouts:
        GetTask: 100s
      defaultTimeout: 10s
  1. Start the Flyte server with the config YAML file:
flyte start --config ./flyte-single-binary-local-dev.yaml

How to Develop your agent efficiently?

Getting started with the development of an agent can be challenging, so I recommend splitting the process step by step.

Here's a recommended process:

  1. Study how task.py in the particular plugin, works to pass parameters to Agent
  2. Study how local Agent works
  3. Learn how to test it efficiently in remote environment

How task.py passes parameters to Agent

Take BigQuery plugin as an example again.

The get_custom function will create a dictionary object in task.py. Then, it will be passed to task_template.custom in agent.py.

Here's a more comprehensive diagram about how parameters pass to agent from task.py.
image.png

How local Agent works

The core of the base Agent logic is here

Let's take BigQuery as example again.

DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)] bigquery_task_templatized_query = BigQueryTask( name="sql.bigquery.w_io", # Define inputs as well as their types that can be used to customize the query. inputs=kwtypes(version=int), output_structured_dataset_type=DogeCoinDataset, task_config=BigQueryConfig(ProjectID="flyte"), query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;", ) @task def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame: return sd.open(pd.DataFrame).all() @workflow def full_bigquery_wf(version: int) -> pd.DataFrame: sd = bigquery_task_templatized_query(version=version) return convert_bq_table_to_pandas_dataframe(sd=sd) if __name__ == "__main__": full_bigquery_wf(version=1)

To test your BigQuery code locally, execute:

python bigquery example.py

This will process the code through the local agent using the AsyncAgentExecutorMixin class, and use the code agent.py.

Before moving to a remote environment, I strongly advise testing your code locally.

How to test it efficiently in a remote environment

To execute your agent remotely, use:
pyflyte run --remote or pyflyte register

I recommend starting by registering and invoking your agent tasks in FlyteConsole (Flyte's UI).
This initial step doesn’t require building an image, as you’ll only be invoking the web API, without initiating a Pod for task execution.

Take ChatGPT plugin for example.

You can write the code like this to invoke the task.:

from flytekit import task, workflow, ImageSpec
from flytekitplugins.chatgpt import ChatGPTTask

chatgpt_job = ChatGPTTask(
    name="chatgpt",
    config={
        "openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7",
        "chatgpt_conf": {
            "model": "gpt-3.5-turbo",
            "temperature": 0.7,
        },
    },
)

First, let's register the workflow and task to the Flyte console.

pyflyte register chatgpt_example.py

Example output:

Loading packages ['chatgpt_example'] under source root /mnt/c/code/dev/example/plugins
Successfully serialized 1 flyte objects
[✔] Registration chatgpt type TASK successful with version iopF5N9M7ABb6gEe3Chpsg==
Successfully registered 1 entities

Now, you can invoke the task in the Flyte console!
image.png

Ultimately, you must test it with a simple workflow and define the image to mock the real world use case.
There are two methods for this: using ImageSpec or a Dockerfile.
This PR provides guidance on both approaches.
Here's a brief example:

@task()
def t1(s: str) -> str:
    s = "Repsonse: " + s
    return s

@workflow
def wf() -> str:
    message = chatgpt_job(message="hi")
    return t1(s=message)

if __name__ == "__main__":
    print(wf())

Additional Resources

If you want to learn more about flyteplugins and the Agent framework, refer to these resources:

  1. Flyte School: Enrich your AI pipelines - A deep dive into Flyte plugins
    https://www.youtube.com/watch?v=ah8Q5mSeikE&t=1145s&ab_channel=Union-ai
  2. Writing Agents in Python
    https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/development_lifecycle/agent_service.html
  3. Demystifying Flyte Agents
    https://youtu.be/nD98GQ-pyAE?si=mI_s7DG8LBMt92Zp

Special Thanks

  • Kevin Su, an amazing Flyte maintainer and a good mentor, he taught me lots of things about Flyte, helping me go through the process, from zero to hero in Flyte!
  • Yi Cheng Lu, spent times discussing flyteplugins, flytepropeller and agent with me, which helped me have a deeper understanding.
  • Da Yi Wu, asked me how flyteplugins and Agent works, which gave me a chance to practice and explain how it works.
  • David Espejo, the reviewer of this article, gave me tons of awesome advice, I added lots of diagram because of him.