# processing pipeline:
a config approach.
<!-- Put the link to this slide here so people can follow -->
slide: https://hackmd.io/@rig/Hy37BuqdY#/
---
# Why am I?
* lazyness
* _don't want to code_ everything as program
* _data processing directives_ should look more
_like specification_ than a piece of software
therefore I want:
---
# A pipeline
* **knows** some _environment_ values
* **has a context** holding _data_
and _methods_ (strategies)
* **consists of** several _jobs_
each implemented as process
* of several _steps_
each calling a _strategy method_
* getting some _arguments_
* to do it's **magic**
----
<!-- https://www.tonyballantyne.com/graphs.html#orgheadline20 -->
## pipeline structure
```graphviz
digraph dfd2 {
compound=true
rankdir=LR
size="5.5"
#ratio="compress"
graph [ fontname="Source Sans Pro", fontsize=20 ];
node [ fontname="Source Sans Pro", fontsize=18 ];
edge [ fontname="Source Sans Pro", fontsize=12 ];
subgraph input {
#inext [label="<f0> |<f1> Data store two" shape=record];
inext [label="<f0> |<f1> Inputs\nexternal" shape=record];
}
#c -> sync [ltail=session lhead=session]
inext -> env
subgraph cluster1 {
#label="{<f0> pipeline structure|<f1> foo}"
label="Pipeline Context"
shape=Mrecord
concentrate=true
#i1 [label="inputs"] [shape=box]
env [label="environment" shape=plaintext ]
j1 [label="{<f0> 1.0|<f1> Job\n}" shape=Mrecord];
j2 [label="{<f0> 2.0|<f1> Job\n}" shape=Mrecord];
jdots [label="{<f0> ...|<f1> Job\n}" shape=Mrecord];
jn [label="{<f0> n.0|<f1> Job\n}" shape=Mrecord];
#j1 -># {"step n", "...", "step 2", "step 1"}
subgraph cluster2 {
label="process"
pdef [label="def" shape=plaintext ]
#rankdir=LR
#edge [label="out-> in"]
#concentrate=false
concentrate=true
"..." -> "step n"
"step 2" -> "..."
"step 1" -> "step 2" #[label="out-> in"]
{rank=same; "step n", "...", "step 2", "step 1"}
#{rank=same; "step 2", "step 1"}
shape=box
}
"step n" -> env
j1 -> "pdef"
j2 -> "pdef"
jdots -> "pdef"
jn -> "pdef"
} [label="data"]
"step 2" -> outext [style=dashed] # label="optional"]
"step n" -> outext [style=dashed]
subgraph output {
outext [label="<f0> |<f1> Outputs\nexternal" shape=record];
}
}
```
---
# configuration specs
what should a configuration look like?
----
## config abstract example
```yaml
# file: my_pipeline_config.yaml
environment:
define: stuff
that: "i/need.csv"
example: /a/folder/somwhere/
pipeline:
- "my_job1": # (specified as process of steps)
- step1: # is standard method call
arg1: 'thisfile.stuff'
arg2: 'that_col'
- my_step2: # is my special step's strategy
#input: ~ # default: previous step's output
arg2: 5
arg3: 'yes'
arg_n: foo
- step_s:
#input: ~ # default: previous step's output
- "my_job2":
- read:
some: data_file.csv
- filter:
on_what: "context:my_job1" # from context store
```
---
# implementation details
how to do stuff that is _not_ part of the standard strategies
----
## define strategy method:
```python
def my_step2(*, input=None, arg2='foo_default', arg3=None):
# my magic
return my_result
```
## register my strategy and use in pipeline:
```python
with PipelineContext("my_pipeline_config.yaml") as pc:
pc.register(my_step2)
pc.execute()
```
that's it.
----
## pass-through method arguments
replacement by last step result, decision tree:
```plantuml
@startuml
start
:use arg //n//
//n_max = 1// (for now);
if (call-arg //n// is **None**) then (yes)
:check if replacement is supposed;
if (declaration-arg //n// defaults to **None**) then (yes)
:replace with last step result;
else (leave it)
endif
else (leave it)
endif
:process arg //n+1//;
stop
@enduml
```
---
# DEMO time
---
# The END
---
{"metaMigratedAt":"2023-06-16T15:05:48.499Z","metaMigratedFrom":"YAML","title":"Pipeline outline","breaks":true,"description":"View with \"Slide Mode\".","slideOptions":"{\"transition\":\"slide\"}","contributors":"[{\"id\":\"27dbcc5c-a89f-4466-8618-5052f83fe9b6\",\"add\":9964,\"del\":5723}]"}