# processing pipeline: a config approach. <!-- Put the link to this slide here so people can follow --> slide: https://hackmd.io/@rig/Hy37BuqdY#/ --- # Why am I? * lazyness * _don't want to code_ everything as program * _data processing directives_ should look more _like specification_ than a piece of software therefore I want: --- # A pipeline * **knows** some _environment_ values * **has a context** holding _data_ and _methods_ (strategies) * **consists of** several _jobs_ each implemented as process * of several _steps_ each calling a _strategy method_ * getting some _arguments_ * to do it's **magic** ---- <!-- https://www.tonyballantyne.com/graphs.html#orgheadline20 --> ## pipeline structure ```graphviz digraph dfd2 { compound=true rankdir=LR size="5.5" #ratio="compress" graph [ fontname="Source Sans Pro", fontsize=20 ]; node [ fontname="Source Sans Pro", fontsize=18 ]; edge [ fontname="Source Sans Pro", fontsize=12 ]; subgraph input { #inext [label="<f0> |<f1> Data store two" shape=record]; inext [label="<f0> |<f1> Inputs\nexternal" shape=record]; } #c -> sync [ltail=session lhead=session] inext -> env subgraph cluster1 { #label="{<f0> pipeline structure|<f1> foo}" label="Pipeline Context" shape=Mrecord concentrate=true #i1 [label="inputs"] [shape=box] env [label="environment" shape=plaintext ] j1 [label="{<f0> 1.0|<f1> Job\n}" shape=Mrecord]; j2 [label="{<f0> 2.0|<f1> Job\n}" shape=Mrecord]; jdots [label="{<f0> ...|<f1> Job\n}" shape=Mrecord]; jn [label="{<f0> n.0|<f1> Job\n}" shape=Mrecord]; #j1 -># {"step n", "...", "step 2", "step 1"} subgraph cluster2 { label="process" pdef [label="def" shape=plaintext ] #rankdir=LR #edge [label="out-> in"] #concentrate=false concentrate=true "..." -> "step n" "step 2" -> "..." "step 1" -> "step 2" #[label="out-> in"] {rank=same; "step n", "...", "step 2", "step 1"} #{rank=same; "step 2", "step 1"} shape=box } "step n" -> env j1 -> "pdef" j2 -> "pdef" jdots -> "pdef" jn -> "pdef" } [label="data"] "step 2" -> outext [style=dashed] # label="optional"] "step n" -> outext [style=dashed] subgraph output { outext [label="<f0> |<f1> Outputs\nexternal" shape=record]; } } ``` --- # configuration specs what should a configuration look like? ---- ## config abstract example ```yaml # file: my_pipeline_config.yaml environment: define: stuff that: "i/need.csv" example: /a/folder/somwhere/ pipeline: - "my_job1": # (specified as process of steps) - step1: # is standard method call arg1: 'thisfile.stuff' arg2: 'that_col' - my_step2: # is my special step's strategy #input: ~ # default: previous step's output arg2: 5 arg3: 'yes' arg_n: foo - step_s: #input: ~ # default: previous step's output - "my_job2": - read: some: data_file.csv - filter: on_what: "context:my_job1" # from context store ``` --- # implementation details how to do stuff that is _not_ part of the standard strategies ---- ## define strategy method: ```python def my_step2(*, input=None, arg2='foo_default', arg3=None): # my magic return my_result ``` ## register my strategy and use in pipeline: ```python with PipelineContext("my_pipeline_config.yaml") as pc: pc.register(my_step2) pc.execute() ``` that's it. ---- ## pass-through method arguments replacement by last step result, decision tree: ```plantuml @startuml start :use arg //n// //n_max = 1// (for now); if (call-arg //n// is **None**) then (yes) :check if replacement is supposed; if (declaration-arg //n// defaults to **None**) then (yes) :replace with last step result; else (leave it) endif else (leave it) endif :process arg //n+1//; stop @enduml ``` --- # DEMO time --- # The END ---
{"metaMigratedAt":"2023-06-16T15:05:48.499Z","metaMigratedFrom":"YAML","title":"Pipeline outline","breaks":true,"description":"View with \"Slide Mode\".","slideOptions":"{\"transition\":\"slide\"}","contributors":"[{\"id\":\"27dbcc5c-a89f-4466-8618-5052f83fe9b6\",\"add\":9964,\"del\":5723}]"}
    191 views