Enrico, 2022-10-10
Kudos to everyone who worked on this epic before myself - I took the liberty of copy/pasting some of your thoughts here.
Bacalhau (currently v0.3.x) can take workloads composed by a single job but to execute a multi-step workload they have to manually submit and track each job, this (1) poses a significant burden on the user, (2) makes reproducing a pipeline difficult, (3) data origin can only be determined by manually logging each step and backtracking.
In the effort to offering broader support towards modern multi-step workloads, this document aims at designing a compelling pipelining feature that is user-friendly and allows for complex workloads.
In this context, a Pipeline is completely user-defined meaning they write a pipeline spec detailing what/how each step (i.e. a Bacalhau job) is related to one another.
Philippe This last point could be a useful restriction. Still, at some point, won't we have to decide which DAG format we want to use? This would tell us (1️⃣ ‒ inflow) which tools / SDKs data engineers may use to define / compute DAGs (ie their description) and (2️⃣ - outflow) what format orchestrators should be able to read.
This document is based on prior work:
bacalhau docker run
cli users.bacalhau serve
users. We consider this persona too because any new piece of infrastructure must be easily deployable.Philippe May it be useful to define from the start sub-personae, based on their main priorities? A distinction that we find mostly in the current user stories, but could make more sense to differentiate those at the persona level to then not be limited in the usage and number of user stories we may have.
- eg Data integrator (will be interested by an automated lineage solution, the visualization of graphs of data dependencies).
- eg Data scientist, focused on asynchronous collaboration (interested in availability of the descriptions of DAGs of tasks, and in an automated lineage solution).
- eg Dev-op engineer, focused on performance (will be interested by caching strategies).
Philippe
- Two personae (Data User and Compute Provider) make a lot of sense to start with. But one of the advantages of modern workflow automation platforms is their extensibility to infrastructures of very different sizes, considering their agnosticism to the scale of the underlying architecture. One workflow that runs on a large global infrastructure in production may have been designed and prepared by a solo engineer, running a local computation engine locally. In early stages of development of workflows, data users and compute providers are often the same (third?) persona, working on smaller static datasets. Workflow platforms and tooling often support and promote this way of developing and then deploying workflows, and it could be valuable for us to consider this persona quite early too.
This is mentioned in feature n°9.
Called local to multi-node cluster journey in other Bac. material.
In the long run we'd like to add support for running popular distributed compute frameworks such as Spark on Bacalhau.
Walid: This requires inter-job communications, or even better intra-CP communications, and co-locate compute nodes within the same data centres or geographical regions as much as possible to minimize the added latency of moving intermediary results across multiple hops and regions. This also allows Bacalhau to run Spark like jobs that require inter-job communication since the communication will happen between compute nodes in a private network of a single SP
The above would introduce networking changes as well as some security concerns. Let's keep this as a future goal.
List of essential features:
Philippe
About Simple multi-step pipelines
I suppose there is still room for clarification in the kinds of pipelines we want to support or not support. Sequential? Branching / joining? Dynamic / conditional workflows? Non-deterministic workflows? Triggers (scheduling is mentioned in the list of essential features ; triggering is loosely mentioned in the list of desirable features)?
NB: this is also conditioned by the DAG format (/ framework).
Philippe
About Pipeline parameters
We may clarify here whether we are talking about configuration of the pipeline? or of the compute infrastructure? or both?
Philippe We may add a feature about description of DAGs, their availability and (ideally) their discoverability.
List of desirable features:
Philippe
About Job caching
What is our take on deterministic operations in the current design. Enforcements a priori? validation a posteriori, possibly relying on Verifiers – as called in current Bac. architecture? user's responsibility? toggle-able feature?
Philippe
About Asset typing There might be something interesting to do here with multiformats, with information included in CIDs without having to fetch underlying data.
The first point to discuss is whether or not Bacalhau take resposibility over a Pipeline or not. This translates into different UX from the user/CP perspective.
To alleviate Bacalhau from that, we could extend the current API to allow for pluggability into an Orchestrator managed externally, by CP or just running on user's laptop. This shouldn't entails lots of engineering work: build a plug-in for our favourite Orchestrator + add some info regarding the pipeline to jobspec.
For example, a hypothetical BacalhauOperator
built as an Airflow operator would compose a DAG as shown in the snippet below.
That's code a user can run on their laptop
Then, they'd connect to their Orchestrator dashboard to monitor the progress of the pipeline. At this point, bacalhau list
will show two individual jobs.
# connect to localhost:8080 or remote managed service
with DAG("my-dag") as dag:
resize_job = BacalhauOperator(verb="docker run", \
inputs="QmeZRGhe4PmjctYVSVHuEiA9oSXnqmYa4kQubSHgWbjv72") \
image="dpokidov/imagemagick:7.1.0-47-ubuntu" \
cmd="magick mogrify -resize 100x100 -quality 100 -path /outputs '/input_images/*.jpg'")
blackwhite_job = BacalhauOperator(verb="run python", \
image="python3.10" \
cmd="python convert_bw.py")
resize_job >> blackwhite_job
Alternatively, Bacalhau could take full resposibility over pipelines meaning Pipeline is a first-class citizen as much as Job is. This means:
Although this approach requires a good amount of engineering work, I believe this is the way to go because it:
According to the docs:
The requestor node is responsible for handling requests from clients using JSON over HTTP and is the main “custodian” of jobs submitted to it.
It's therefore a good candidate for handling a pipeline request too. An orchestrator interface could use either an external orchestrator (e.g. CP already manages one) or a self-built, internal, orchestrator.
The idea is an external engine is managed by a third-party. It may be the case CPs already have their own Orchestrators running and naturally they'd like to use those. However, while we do have some understading on what current users prefer as orchestrators (see List of open-source orchestrators), we lack of knowledge regarding what managed services they use, if any.
A Bacalhau node is composed of the following services:
The orchestrator engine could run in a dedicated internal service, next to those listed above, or live within bacalhau-daemon
. Either way, we can include that as part of the installation process and make it CP friendly.
Prior research on this scope highlighted two alternatives as in whether we should build our own orchestrator engine, or if we should just use a open-source solution. While Phil's thread leans towards a self-built solution, Polyphene's thread hints a stable open-source engine would be better.
Then hide interaction with airflow/bacalhau behind an interface and expose the ability to create and manage dags in the API. As you can see, both require hiding the implementation of the DAG and adding the ability to interact with the DAG in an API. The key difference is what has control over the DAG.
Pros:
Cons:
Pros:
Cons:
For more details, check the Engineering Effort Discussion here.
Philippe I certainly lack general knowledge of common lexicon: are we talking about caching? or some enforcement about deterministic operations? or sthg else?
TODO
TODO - see https://openlineage.io/
What IPFS/IPLD/Estuary/web3.storage features can we use to perist lineage?
Provider
Providers can contain operators, hooks, sensor, and transfer operators to communicate with a multitude of external systems… (reference)
Operator
A reusable, pre-made Task template whose logic is all done for you and that just needs some arguments.
…How to Create a custom operator
Hook
A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries
Passing data between tasks: XComs
XComs are one method of passing data between tasks, but they are only appropriate for small amounts of data. Large data sets require a method making use of intermediate storage and possibly utilizing an external processing framework.
https://big-data-demystified.ninja/2020/04/15/airflow-xcoms-example-airflow-demystified/
with DAG('first-bacalhau-dag', start_date=datetime(2021, 1, 1)) as dag:
op1 = BashOperator(
task_id='submit-a-job',
bash_command='echo hello {{ ti.xcom_push(key="jobid", value="$(bacalhau docker run --id-only --wait ubuntu date)") }}',
)
op2 = BashOperator(
task_id='get-a-job',
bash_command='bacalhau get --download-timeout-secs 10 --output-dir /tmp/enrico/ {{ ti.xcom_pull(key="jobid") }}',
)
op1 >> op2
Lineage
Lineage support is very experimental and subject to change. (reference)Image Not Showing Possible ReasonsLearn More →
- The image file may be corrupted
- The server hosting the image is unavailable
- The image path is incorrect
- The image format is not supported
f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(task_id="run_me_first", \
bash_command="echo 1", \
dag=dag, \
inlets=f_in, \
outlets=outlets
)
Metadata is pushed into XCOM
DockerOperator - Execute a command inside a docker container.
t_print = DockerOperator(
api_version="1.19",
docker_url="tcp://localhost:2375",
image="centos:latest",
mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
command=f"cat {t_move.output}",
task_id="print",
dag=dag,
)
https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/index.html
KubernetesPodOperator - Execute a task in a Kubernetes Pod
write_xcom = KubernetesPodOperator(
namespace='default',
image='alpine',
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
is_delete_operator_pod=True,
in_cluster=True,
task_id="write-xcom",
get_logs=True,
)
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/4.4.0/
PostgresOperator
- run SQL query
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
DbtCloudRunJobOperator
- to trigger a run of a dbt Cloud job
DbtCloudJobRunSensor
to periodically retrieve the status of a dbt Cloud job run and check whether the run has succeeded.
DbtCloudGetJobRunArtifactOperator
to download dbt-generated artifacts for a dbt Cloud job run
DbtCloudListJobsOperator
to list all jobs tied to a specified dbt Cloud account.
https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/stable/operators.html
## Trigger job
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
## Poll status
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)
https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/stable/index.html
Single:
Multiple:
DockerRun
BacalhauDockerRunOperator
BacalhauGetOperator
Bacalhau[...]Operator
BacalhauWasmOperator
Probably hooks too… look at how DBT Cloud provider is made.
Use REST api instead of client?
Will move this to another doc as soon as I wrap my head around hackmd
meeting goals