Try   HackMD

[Design Doc] Job Pipelining

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
We migrated all design docs to Notion platform -> https://pl-strflt.notion.site/Bacalhau-Pipelines-4f4b46558148477db721b8b8f1f09766
Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

Enrico, 2022-10-10

Kudos to everyone who worked on this epic before myself - I took the liberty of copy/pasting some of your thoughts here.

Context

Bacalhau (currently v0.3.x) can take workloads composed by a single job but to execute a multi-step workload they have to manually submit and track each job, this (1) poses a significant burden on the user, (2) makes reproducing a pipeline difficult, (3) data origin can only be determined by manually logging each step and backtracking.

In the effort to offering broader support towards modern multi-step workloads, this document aims at designing a compelling pipelining feature that is user-friendly and allows for complex workloads.

In this context, a Pipeline is completely user-defined meaning they write a pipeline spec detailing what/how each step (i.e. a Bacalhau job) is related to one another.

Philippe This last point could be a useful restriction. Still, at some point, won't we have to decide which DAG format we want to use? This would tell us (1️⃣ ‒ inflow) which tools / SDKs data engineers may use to define / compute DAGs (ie their description) and (2️⃣ - outflow) what format orchestrators should be able to read.

This document is based on prior work:

Personas

  • User: a computational scientist, interested in consuming jobs/pipelines. Can be a data engineer as well. In short, they're the bacalhau docker run cli users.
  • Compute Provider (CP): a third-party willing to run a Bacalhau node on their infrastructure. In short, they're the bacalhau serve users. We consider this persona too because any new piece of infrastructure must be easily deployable.

Philippe May it be useful to define from the start sub-personae, based on their main priorities? A distinction that we find mostly in the current user stories, but could make more sense to differentiate those at the persona level to then not be limited in the usage and number of user stories we may have.

  • eg Data integrator (will be interested by an automated lineage solution, the visualization of graphs of data dependencies).
  • eg Data scientist, focused on asynchronous collaboration (interested in availability of the descriptions of DAGs of tasks, and in an automated lineage solution).
  • eg Dev-op engineer, focused on performance (will be interested by caching strategies).

Philippe

  • Two personae (Data User and Compute Provider) make a lot of sense to start with. But one of the advantages of modern workflow automation platforms is their extensibility to infrastructures of very different sizes, considering their agnosticism to the scale of the underlying architecture. One workflow that runs on a large global infrastructure in production may have been designed and prepared by a solo engineer, running a local computation engine locally. In early stages of development of workflows, data users and compute providers are often the same (third?) persona, working on smaller static datasets. Workflow platforms and tooling often support and promote this way of developing and then deploying workflows, and it could be valuable for us to consider this persona quite early too.

This is mentioned in feature n°9.

Called local to multi-node cluster journey in other Bac. material.

User stories

  • As a computational scientist, I want to branch pipelines with two or more parallel jobs and then reduce the results back into a single job. For example, see @NiklasTR's example use case
  • As a computational scientist, I want to reproduce an existing pipeline
  • As a data engineer, I want to be able to track down the origin of a data artifact generated by a pipeline
  • As a computational scientist, I need to list & inspect pipelines

Goals

  • Allow for multi-step job pipelines (i.e. chain consecutive/parallel jobs)
  • Achieve pipeline reproducibility
  • Achieve data lineage
  • Avoid any operational burden for users & Compute Providers
  • Aim at a smooth onboarding for existing multi-step workloads

Long term goals

In the long run we'd like to add support for running popular distributed compute frameworks such as Spark on Bacalhau.

Walid: This requires inter-job communications, or even better intra-CP communications, and co-locate compute nodes within the same data centres or geographical regions as much as possible to minimize the added latency of moving intermediary results across multiple hops and regions. This also allows Bacalhau to run Spark like jobs that require inter-job communication since the communication will happen between compute nodes in a private network of a single SP

The above would introduce networking changes as well as some security concerns. Let's keep this as a future goal.

Features

List of essential features:

  1. Simple multi-step pipelines - e.g. input -> job 1 -> job 2 -> output
  2. Fan in jobs - e.g. start with two separate jobs, fan into a single job
  3. Fan out jobs - e.g. start with a single job, fan out to two separate jobs
  4. Easy data passing - e.g. use the output from the previous job as the input to the next one
  5. Pipeline parameters - e.g. parameterized input data, some configuration
  6. Pipeline scheduling - e.g. run every day, see Cron as a service #70
  7. Failure control - e.g. retries, fail early, continue despite failures, etc.
  8. Pipeline status - e.g. track where we currently are in a pipeline run
  9. Development & testing - e.g. local environment to run pipelines or single jobs within a pipeline, test harness

Philippe

About Simple multi-step pipelines

I suppose there is still room for clarification in the kinds of pipelines we want to support or not support. Sequential? Branching / joining? Dynamic / conditional workflows? Non-deterministic workflows? Triggers (scheduling is mentioned in the list of essential features ; triggering is loosely mentioned in the list of desirable features)?

NB: this is also conditioned by the DAG format (/ framework).

Philippe

About Pipeline parameters

We may clarify here whether we are talking about configuration of the pipeline? or of the compute infrastructure? or both?

Philippe We may add a feature about description of DAGs, their availability and (ideally) their discoverability.

List of desirable features:

  1. Job caching - e.g. don't re-run something we already have a result for
  2. Pipeline triggering - e.g. run this pipeline when something
  3. Asset typing - e.g. this job expects a json file, a stream, etc.
  4. Pipeline lifecycle - e.g. can inject or run certain things at certain points
  5. Pipeline hooks - e.g. can connect to a websocket to listen for lifecycle changes
  6. Logging - e.g. stream logs from all jobs in a pipeline back to user
  7. Monitoring - e.g. expose metrics about jobs in a pipeline
  8. Data-driven pipelines - e.g. pipelines react to new data

Philippe

About Job caching

What is our take on deterministic operations in the current design. Enforcements a priori? validation a posteriori, possibly relying on Verifiers – as called in current Bac. architecture? user's responsibility? toggle-able feature?

Philippe

About Asset typing There might be something interesting to do here with multiformats, with information included in CIDs without having to fetch underlying data.

Technical design

Be responsible or not?
Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

The first point to discuss is whether or not Bacalhau take resposibility over a Pipeline or not. This translates into different UX from the user/CP perspective.

Not Responsible

To alleviate Bacalhau from that, we could extend the current API to allow for pluggability into an Orchestrator managed externally, by CP or just running on user's laptop. This shouldn't entails lots of engineering work: build a plug-in for our favourite Orchestrator + add some info regarding the pipeline to jobspec.

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
Edit figure

For example, a hypothetical BacalhauOperator built as an Airflow operator would compose a DAG as shown in the snippet below. That's code a user can run on their laptop Then, they'd connect to their Orchestrator dashboard to monitor the progress of the pipeline. At this point, bacalhau list will show two individual jobs.

# connect to localhost:8080 or remote managed service with DAG("my-dag") as dag: resize_job = BacalhauOperator(verb="docker run", \ inputs="QmeZRGhe4PmjctYVSVHuEiA9oSXnqmYa4kQubSHgWbjv72") \ image="dpokidov/imagemagick:7.1.0-47-ubuntu" \ cmd="magick mogrify -resize 100x100 -quality 100 -path /outputs '/input_images/*.jpg'") blackwhite_job = BacalhauOperator(verb="run python", \ image="python3.10" \ cmd="python convert_bw.py") resize_job >> blackwhite_job
Pros & Cons
  • Pros
    1. Relatively easy existing pipeline onboarding: just replace existing tasks with Bacalhau jobs
    2. Separation of concerns - Bacalhau stays tiny and doesn't get bloated with more code
    3. Writing a connector/operator is relatively easy
  • Cons
    1. Makes our value proposition weaker i.e. there's no Bacalhau Pipeline as such - @lewq long term goals?
    2. We don't really have solid datapoints on what orchestration systems our users prefer - where to start from?
    3. The full pipeline lifecycle is controlled outside of Bacalhau - could be hard to for us to get feedback/improve on this feature
    4. We'd still have to deploy an Orchestrator service somewhere for bootstrapping demos

Responsible

Alternatively, Bacalhau could take full resposibility over pipelines meaning Pipeline is a first-class citizen as much as Job is. This means:

  1. The Orchestrator is shipped with Bacalhau and is part of the core codebase. Should it be self-build or not? Let's discuss this in the sections below
  2. User submits and lists a Pipeline directly to Bacalhau using a to-be-defined DSL
  3. Pipeline API is CRUD: can Create and Read but no updates/deletes (immutable), similar to a normal Job

Although this approach requires a good amount of engineering work, I believe this is the way to go because it:

  • Allows us to control the pipeline lifecycle, test it and debug issues
  • Pipeline spec can be persisted and used for reproducibility and data lineage
  • It also makes the value proposition of Bacalhau more compelling because: (a) there's no set-up effort required to user, (b) CP can deliver that functionality too "just by installing Bacalhau".

High level architecture

According to the docs:

The requestor node is responsible for handling requests from clients using JSON over HTTP and is the main “custodian” of jobs submitted to it.

It's therefore a good candidate for handling a pipeline request too. An orchestrator interface could use either an external orchestrator (e.g. CP already manages one) or a self-built, internal, orchestrator.

Edit figure

Outstanding questions

External engine

The idea is an external engine is managed by a third-party. It may be the case CPs already have their own Orchestrators running and naturally they'd like to use those. However, while we do have some understading on what current users prefer as orchestrators (see List of open-source orchestrators), we lack of knowledge regarding what managed services they use, if any.

Internal engine

A Bacalhau node is composed of the following services:

  • bacalhau-daemon: compute
  • ipfs-daemon: storage
  • openresty: check node health
  • prometheus-daemon (optional): for app metrics

The orchestrator engine could run in a dedicated internal service, next to those listed above, or live within bacalhau-daemon. Either way, we can include that as part of the installation process and make it CP friendly.

Orchestrator Engine: Self-built vs open-source?

Prior research on this scope highlighted two alternatives as in whether we should build our own orchestrator engine, or if we should just use a open-source solution. While Phil's thread leans towards a self-built solution, Polyphene's thread hints a stable open-source engine would be better.

  1. Leverage an open-source orchestration platofrm like Airflow, build an operator that is able to control bacalhau jobs
  2. Build DAG orchestration into Bacalhau

Then hide interaction with airflow/bacalhau behind an interface and expose the ability to create and manage dags in the API. As you can see, both require hiding the implementation of the DAG and adding the ability to interact with the DAG in an API. The key difference is what has control over the DAG.

Open-source solution

Pros:

  • Handing off control to a well-established DAG solution.
  • Probably has more functionality that we can hope to implement in Bac.

Cons:

  • Server operator burden. We'd have to ask CPs to run and operate Airflow, along with bacalhau.
  • Operational burden. What about HA? What about backups? What about updates? How do we orchestrate it?
  • Will have to implement a custom Airflow operator (or equivalent) to allow it to interact with the bacalhau network.
Self-built Into Bacalhau

Pros:

  • Full control, tight integration, can improve over time.
  • Likely to be more robust over the long term, due to fewer external dependencies
  • Less operational and server operator burden.

Cons:

  • Probably more engineering effort, but might not be much more. (Implementing an Airflow Operator vs. implementing control of a DAG)

For more details, check the Engineering Effort Discussion here.

Reproducibility

Philippe I certainly lack general knowledge of common lexicon: are we talking about caching? or some enforcement about deterministic operations? or sthg else?

TODO

Lineage

TODO - see https://openlineage.io/

What IPFS/IPLD/Estuary/web3.storage features can we use to perist lineage?

List of open-source orchestrators

Airflow POC

Concepts

Provider

Providers can contain operators, hooks, sensor, and transfer operators to communicate with a multitude of external systems (reference)

Operator

A reusable, pre-made Task template whose logic is all done for you and that just needs some arguments.

How to Create a custom operator

Hook

A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries

Passing data between tasks: XComs

XComs are one method of passing data between tasks, but they are only appropriate for small amounts of data. Large data sets require a method making use of intermediate storage and possibly utilizing an external processing framework.

https://big-data-demystified.ninja/2020/04/15/airflow-xcoms-example-airflow-demystified/

with DAG('first-bacalhau-dag', start_date=datetime(2021, 1, 1)) as dag:
    op1 = BashOperator(
        task_id='submit-a-job',
        bash_command='echo hello {{ ti.xcom_push(key="jobid", value="$(bacalhau docker run --id-only --wait ubuntu date)") }}',
    )

    op2 = BashOperator(
        task_id='get-a-job',
        bash_command='bacalhau get --download-timeout-secs 10 --output-dir /tmp/enrico/ {{ ti.xcom_pull(key="jobid") }}',
    )

    op1 >> op2

Lineage

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
Lineage support is very experimental and subject to change. (reference)

  • Inlet: upstream task ids, attr annotated object
  • Outlet: can only be attr annotated object
f_in = File(url="/tmp/whole_directory/") outlets = [] for file in FILE_CATEGORIES: f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file)) outlets.append(f_out) run_this = BashOperator(task_id="run_me_first", \ bash_command="echo 1", \ dag=dag, \ inlets=f_in, \ outlets=outlets )

Metadata is pushed into XCOM

How would a BacalhauOperator look like?

Docker operator

DockerOperator - Execute a command inside a docker container.

t_print = DockerOperator(
        api_version="1.19",
        docker_url="tcp://localhost:2375",
        image="centos:latest",
        mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
        command=f"cat {t_move.output}",
        task_id="print",
        dag=dag,
    )

https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/index.html

K8s operator

KubernetesPodOperator - Execute a task in a Kubernetes Pod

write_xcom = KubernetesPodOperator(
        namespace='default',
        image='alpine',
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        in_cluster=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/4.4.0/

Postgres

PostgresOperator - run SQL query

get_birth_date = PostgresOperator( task_id="get_birth_date", postgres_conn_id="postgres_default", sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s", parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, )

Dbt Cloud (!!!)

DbtCloudRunJobOperator - to trigger a run of a dbt Cloud job

DbtCloudJobRunSensor to periodically retrieve the status of a dbt Cloud job run and check whether the run has succeeded.

DbtCloudGetJobRunArtifactOperator to download dbt-generated artifacts for a dbt Cloud job run

DbtCloudListJobsOperator to list all jobs tied to a specified dbt Cloud account.

https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/stable/operators.html

## Trigger job
trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)
## Poll status
job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/stable/index.html

Single VS Multiple operators?

Single:

Multiple:

  • PRO: Bacalhau comes with multiple verbs and one is normally interested in doing more than just DockerRun

Bacalhau

  • BacalhauDockerRunOperator
  • BacalhauGetOperator
  • Bacalhau[...]Operator
  • BacalhauWasmOperator

Probably hooks too look at how DBT Cloud provider is made.

Use REST api instead of client?

Meeting minutes

Will move this to another doc as soon as I wrap my head around hackmd

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
.

10-25-2022

  • Enrico
    • Described progeess
  • Philippe
    • Unify pipelines, template level
      • 1 CID in, 1 CID OUT
      • it's actually 1+ CIDs in, 1 CID out
    • Fan in/out
      • remove string keys, use indices
      • We output an array containing indices
    • Flyte
      • built in scheduler
      • data caching
      • not best rep for post-Airflow era
  • Enrico
    • extendibilty is key
  • Philippe
    • Simon's img processing examples
  • Demo:
    • Img processing - cool, must run on laptop
  • Philippe
    • Docker image could be CID so
      • deterministic task could be made out of a tuple: docker-CID, input-CID
      • use IPFS-backed Docker registry

10-19-2022

  • Part 1) Alternatives to Airflow
    • Philippe
      • Kedro and others may not come with a scheduler attachted
      • Differentiatrs
        • Airflow still domenante - network effect!
        • post-airflow mind set -> extra abstraction layer (e.g Dagster)
      • Task scheduler is key
        • Prefect changed it recentrly
      • Popularity
        • Dagster (!) - focus on data integration
        • Prefect - the next airflow
        • ( Metaflow (by Netflix) )
      • Dagster less mature
      • Github stars
        • Prefect 10k stars (!) - possibly will grown
      • Will share examples offline
    • Luke
      • put reseach in writing
      • any front runner?
      • Philippe:
        • Prefect first, Argo 2nd
        • PYTHON, Jaml, (visual editor?)
  • Part 2) AIRFLOW research
    • Kai
      • CID ås output is small - good news
      • Bacalhau could be bacalahu\
      • Airflow
        • XCom is cool for intermediate steps
        • How do you get your end result out of your pipeline
        • Philippe:
          • templating
        • Airflow should output a CID, input CID as Bacalhau normally does
    • Philippe: pipeline state could sit on IPFS
    • Kai: pipeline export format? Philippe: look for a common interface
    • Philippe: dbt operator
      • Figure out how Prefect manages task comms

10-13-2022

meeting goals

  • Spark or not?
  • Be responsible or not?
  • Enrico: Elaborate on Pros and Cons
  • opts
    • Operator
      • POC bac can talk to operator
    • embedded
      • core feature
  • Kay:
    • job should be aware its part of a pipeline
    • visualize the whole thing
  • Philippe: integration is key to address Walid's
  • Luke:
    • we can do both (not now)
      • INTEGRATION
        • engage with existing community - Airflow cool!
        • get feedback from them
        • Luke votes for this option
      • NATIVE DAG
  • Kay:
    • cost of NATIVE is expensive, INTEG. is cheap
  • Luke: figure out how to pass data across jobs in INEGRATION
  • Philippe: Pros and Cons - see what Lison say
  • Luke:
    • Build Integration prototype
    • Free compute for you!
  • Kay:
    • Airflow: onboarding should easy
  • Kay: would be cool to write a NATIVE DAG design
  • Philippe:
    • onboarding to Airflow could be tricky - surely need to optimize
    • Passing CID is enough? Need more data ?
    • How does caching and determinism impact Piplines? don't know yet
  • DECISION:
    • Enrico: Go ahead with prototype
    • Enrico: Expand on INTEGRATION
    • Philipp - Thomas:
      • pick a 2nd orch. other than Airflow
      • invstigate caching in Airflow

10-06-2022

  • User research
    • Popular systems? HPC pipelines - ask Wes?
  • Goal: push data to filecoin
  • Compatiblity with existing systems
  • Philippe: CoD summits are good for data research (Lisbon in coming up!)
    • Personas: DS vs Data engineering (latter probably more promising, more narrow scope)
    • data integrations - smaller set of tools
    • Reliabiltiy is needed
  • Luke: Data engineering space!
    • dbt
    • snowflake
    • docker is main interface! what's the equivalent for DAGs?
    • find a niche that works well with pvt dataset
  • Philippe
    • open dataset & open science
    • first users could be: scientists that collaborate
  • Kay
    • meta-dag idea
  • Philippe
    • dag: ipfs, IPLD fashion
  • Luke
    • lineage
    • provenance
    • reproducibility
  • Philippe
    • 4-5 orch. systems, pick 1
    • Airflow, Dagster, Prefect Maybe Argo and Luigi, Luke: flyte
  • Kay
    • stress on the "reproducibilty"
    • ideas
      • option 1 - connect to an external DAG system - have bacalhau talk to it
      • option 2 - DAG maanged from within a Bacalhau job (maybe not good)
  • Luke
    • pitch to airflow user
      • here's free compute, (!) data is public
      • here's the connection to airflow
  • Luke
  • Check what opinions DAG systems have on data input/output
    • Airflow
    • others?

Next steps:

  1. Prepare a design doc
  2. Discovery work: explore the grounds for integrations (limitations, etc.)
  3. Build a Protoype & Future Plans