Flyte backend plugins enable a more streamlined worflow lifecycle where Tasks that invoke specific APIs/modules served by a plugin, won't spin up a new Pod but will execute the task in the external system, potentially reducing the overall workflow execution time and saving compute resources.
Without backend plugins, every task's lifecycle will be like this:
With the use of backend flyteplugins
and depending on the Task type, the workflow lifecycle is reduced to this:
The backend flyteplugins
are a part of flytepropeller
, Flyte's K8s operator. In consequence, the overhead of executing Tasks using flyteplugins
will be caused at the flytepropeller
level.
Here is a picture about how it works:
Before the Agent Framework was added to Flyte, developers had to do the following to create flyteplugins:
flytekit/plugins
directory. This is written in Python.flytepropeller
.flyteplugins
repo. This is written in Go.flyteplugins
is an amazing mechanism, which can create lots of posiibilities.
However, it's not friendly to most of data scientists and ML engineers, since they have to learn how to write Golang code, and even study the flytepropeller
architecture, which is time-consuming and hard.
Flyte recently introduced an Agent framework designed to simplify the development process and make it more friendly.
The Agent server is a stateless component, written in Python, bringing two main benefits:
flytepropeller
only has to monitor the status of the task, consuming less resources.Also, for scalability, we can have multiple Agent servers to handle higher levels of concurrent requests:
Learn more about the Agent framework in the Flyte docs.
Let's develop a plugin with the powerful Agent service!
NOTE: In this section, I make use of a local development environment. Learn more about the sandbox cluster, instrumented by
flytectl demo
, and how to setup your own dev environment.
flytectl
:flytectl demo start --dev
pyflyte serve
cd flyte
vim ./flyte-single-binary-local-dev.yaml
tasks:
task-plugins:
enabled-plugins:
- agent-service
- container
- sidecar
- K8S-ARRAY
default-for-task-types:
- bigquery_query_job_task: agent-service
- container: container
- container_array: K8S-ARRAY
plugins:
# Registered Task Types
agent-service:
supportedTaskTypes:
- bigquery_query_job_task
defaultAgent:
endpoint: "dns:///localhost:8000" # your grpc agent server port
insecure: true
timeouts:
GetTask: 100s
defaultTimeout: 10s
flyte start --config ./flyte-single-binary-local-dev.yaml
Getting started with the development of an agent can be challenging, so I recommend splitting the process step by step.
Here's a recommended process:
task.py
in the particular plugin, works to pass parameters to AgentTake BigQuery plugin as an example again.
The get_custom function will create a dictionary object in task.py. Then, it will be passed to task_template.custom in agent.py.
Here's a more comprehensive diagram about how parameters pass to agent
from task.py
.
The core of the base Agent logic is here
Let's take BigQuery as example again.
DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)]
bigquery_task_templatized_query = BigQueryTask(
name="sql.bigquery.w_io",
# Define inputs as well as their types that can be used to customize the query.
inputs=kwtypes(version=int),
output_structured_dataset_type=DogeCoinDataset,
task_config=BigQueryConfig(ProjectID="flyte"),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
)
@task
def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame:
return sd.open(pd.DataFrame).all()
@workflow
def full_bigquery_wf(version: int) -> pd.DataFrame:
sd = bigquery_task_templatized_query(version=version)
return convert_bq_table_to_pandas_dataframe(sd=sd)
if __name__ == "__main__":
full_bigquery_wf(version=1)
To test your BigQuery code locally, execute:
python bigquery example.py
This will process the code through the local agent using the AsyncAgentExecutorMixin class, and use the code agent.py.
Before moving to a remote environment, I strongly advise testing your code locally.
To execute your agent remotely, use:
pyflyte run --remote
or pyflyte register
I recommend starting by registering and invoking your agent tasks in FlyteConsole (Flyte's UI).
This initial step doesn’t require building an image, as you’ll only be invoking the web API, without initiating a Pod for task execution.
Take ChatGPT plugin for example.
You can write the code like this to invoke the task.:
from flytekit import task, workflow, ImageSpec
from flytekitplugins.chatgpt import ChatGPTTask
chatgpt_job = ChatGPTTask(
name="chatgpt",
config={
"openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7",
"chatgpt_conf": {
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
},
)
First, let's register the workflow and task to the Flyte console.
pyflyte register chatgpt_example.py
Example output:
Loading packages ['chatgpt_example'] under source root /mnt/c/code/dev/example/plugins
Successfully serialized 1 flyte objects
[✔] Registration chatgpt type TASK successful with version iopF5N9M7ABb6gEe3Chpsg==
Successfully registered 1 entities
Now, you can invoke the task in the Flyte console!
Ultimately, you must test it with a simple workflow and define the image to mock the real world use case.
There are two methods for this: using ImageSpec or a Dockerfile.
This PR provides guidance on both approaches.
Here's a brief example:
@task()
def t1(s: str) -> str:
s = "Repsonse: " + s
return s
@workflow
def wf() -> str:
message = chatgpt_job(message="hi")
return t1(s=message)
if __name__ == "__main__":
print(wf())
If you want to learn more about flyteplugins
and the Agent framework, refer to these resources: