# Flyte Executors ## What's it One of Flyte's strength is the ability to statically define workflows. This allows compiler optimizations that can greately reduce overall execution time and streamline resource allocation. ## UX ```python= @task def my_task(a: int, b: str) -> str: return f"{a} - {b}" @workflow( executor_config=ContainerConfig( max_parallelism=10, requests=Resources(mem="500Mi") ) ) def my_wf() -> str: x = my_task(a=1, b="a") y = my_task(a=2, b="b") return my_task(a=3, b=x) ``` ``executor_config`` allows users to define a config default that will be applied to all tasks that do not otherwise define requests, limits nor a config. ## Registration When registering workflows with ``executor_config``, the backend compiler will traverse the workflow and attempt to optimize the graph as follows: * Check if there are any tasks for which the ``executor_config`` can be applied (tasks with no config or resource requirements). * If any, update start node to include the executor's spec and number of replicas needed * Change task type to ``executor-container`` (a new plugin that will be able to handle executing and monitoring tasks against the new executor pool) ## Idl Changes Add Executor Pool Name to launch plans to allow reusuing and propagating already created executor pools through launchplans Add ExecutorConfig message as a property in the workflow. ## Runtime Changes **Q:** If passed an ExecutorPool to a launchplan, and the workflow behind the launchplan already defines an ExecutorConfig, how can we tell if it uses the same/compatible ExecutorConfig? ### Propeller There are two new components needed in Propeller: 1. An ExecutorPool type with two methods: 1. Initialize(ctx context.Context, name string, config ExecutorConfig) 2. Finalize(ctx context.Context, name string) 2. An Executor-Container Task Plugin that will: 1. Submit work to the executor pool, 2. Monitor work on the pool. 3. If the task succeeds, mark it as such, if it disappears (Redis goes down), mark it as failed. Options for calling Executor Pool: 1. In workflow executor. [Initialize here](https://github.com/flyteorg/flytepropeller/blob/master/pkg/controller/workflow/executor.go#L364), and finalize [here](https://github.com/flyteorg/flytepropeller/blob/master/pkg/controller/workflow/executor.go#L394) and [here](https://github.com/flyteorg/flytepropeller/blob/master/pkg/controller/workflow/executor.go#L402). Pros: * We know these places of code happen only once... Cons: * Allow only one ExecutorPool per Workflow... **is that an ok limitation?** 2. Add separate/new nodes to handle initializing and finalizing ExecutorPool. Pros: * Allows arbitrary number of pools to be managed Cons: * Explicit nodes that will show up in the UI and retrieved through the API? * Need to add a notion of a FinalizeNode in the workflow to ensure it's called in success and failure cases. Or else we will have to insert a regular node and override the failure node. ### Flytekit Flytekit will need a new entry point/command that starts up as a worker for the executor pool to pull for work and execute... ## ExecutorPool stack ### [Faktory](https://github.com/contribsys/faktory) > At a high level, Faktory is a work server. It is the repository for background jobs within your application. Jobs have a type and a set of arguments and are placed into queues for workers to fetch and execute. > You can use this server to distribute jobs to one or hundreds of machines. Jobs can be executed with any language by clients using the Faktory API to fetch a job from a queue. How will this look like: * Deploy Faktory as part of the Flyte Deployment ([Helm Chart](https://github.com/contribsys/faktory/wiki/Kubernetes-Deployment-Example)) * Initialize/Finalize need to create/delete a K8s Job using the ExecutorConfig spec and pass faktory's URL to them. * Executor-Container plugin submits work to Faktory with a deterministic [job-definition-name](https://github.com/contribsys/faktory_worker_go#faq). That could be the executionId or if we want to support multiple job-pools, then it can be derived from that + executor config spec hash or something... * Each worker (in python) that starts up must call RegisterJob with the same job-definition-name that guarantees it'll only pull work for that job definitions... ![](https://i.imgur.com/7977mpI.png) Pros: * Already existing solution, don't reinvent the wheel... * Good observability into the work queue... etc. * Workers are implemented in a lot of other languages so we can show examples of "raw containers" using the same pattern... Cons: * Redis is part of the [enterprise plan](https://contribsys.com/faktory/) (199 USD/mo for the first 100 connections, 99 USD/mo afterwards) ### [Celery Project](https://docs.celeryproject.org/en/stable/index.html) > Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. > It’s a task queue with focus on real-time processing, while also supporting task scheduling. How will this look like: * Pretty much the same semantics as with Faktory as far as I can tell * No server, clients and workers connect directly with redis * [Go implementation](https://github.com/gocelery/gocelery). ![](https://i.imgur.com/DsrEDJj.png) Pros: * BSD simple Task Queue. * Redis has first class support and part of the OSS project. Cons: * No observability ## Decision Celery is simpler to prototype with. We can revisit the decision later.