# Kubeflow Pipeline Doc
###### tags: `Kubeflow`
# 基礎概念
本文以 Docker/Kubernetes 為主,Container 皆為 Docker Container
- **Container**: 是一個在機器(裸機orVM)上獨立運行的 process , 他有自己的 filesystem/network/process namespace。同一台機器上的 Container 彼此互不影響,也如同 VM 般互不可見,卻又比 VM 輕量。 [more1](https://www.youtube.com/watch?v=EnJ7qX9fkcU&ab_channel=VMwareCloudNativeApps) [more2](https://www.youtube.com/watch?v=cjXI-yxqGTI&ab_channel=IBMCloud)
- **Container image**: Container 可以透過撰寫 Dockerfile,將所要運行的 code 與其 dependency 打包成一個可以隨時部署到任一機器 (有Docker環境),直接啟動該 Container 的 image。 並且 image 可以打上 tag 做版本控制。
- **Kubernetes**: 是一個管理 Container 的系統,可以透過撰寫 yaml 描述你所想要部署的任意數量的 Container 和其運作模式,Kubernetes 會根據 yaml 來處理 failures 與各種 operation 問題,而不需要人為介入。(Kubernetes 叫做 K8s 因為中間有8個字母) [more](https://www.youtube.com/watch?v=daVUONZqn88&list=PLLasX02E8BPCrIhFrc_ZiINhbRkYMKdPT&index=19&ab_channel=MicrosoftAzure)
- **Argo**:
- K8s 無法處理 Container 的執行順序(預設一次全部啟動),Argo 是一個 K8s 插件可以讓使用者在 yaml 描述 Container 的運行順序(DAG),於是讓 Container 變成了 Steps ,組成 Pipeline。
- Argo 設計了一套方法讓 Container 的結果可以傳遞與暫存 ,達成了 Steps/Pipeline 之間的 Parameters 和 Arguments。
- **Kubeflow Pipeline**:
- 為了讓 ML Scientist 更好的在 K8s 上面跑 Pipeline,又不須要寫yaml,因此實作了一套 SDK 與 Compiler,可以將 python 寫的 kubeflow pipeline 轉成 argo 可以跑的 yaml
- 實作了更好的 UI 來可視化每一個 Step 的 Output 與 lineage tracking
# Kubeflow Pipeline - Components
Components
- 是一個 reusable template,是組成 Pipeline 的 unit(step)
- Components 定義了這個 Step 的
- input
- output
- implementation (Container 與其啟動的 command)
- Components 可以存成 yaml,讓其他人隨時 load 進來做 reuse
- load 進來的 Components 是一個 factor function(XXX_op),照著 input 給參數後, return 一個 Task。
- Tasks 組成 Pipeline
關聯:
- `function/app.py` -> `Component(function: XXX_op / yaml: XXX.yaml)` -> `Task (instantiate)`
- `multiple Tasks` -> `Pipeline`
## 製作 Components
1. 如果是單純一個 function -> 用 kfp API 來幫你轉成一個 Components
2. 直接 load 共享的 yaml
3. 如果你已經有 docker image -> 寫 yaml,定義 input/output/command
### 使用 KFP API 將 function 轉 Components & 共享的 yaml
有兩個API,但其實他們做一樣的事情
- **[create_component_from_func](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=create_component_from_func#kfp.components.create_component_from_func)**
Converts a Python function to a component and returns a task factory (a function that accepts arguments and returns a task object).
```python=
kfp.components.create_component_from_func(
func: Callable,
output_component_file: Optional[str] = None,
base_image: Optional[str] = None,
packages_to_install: List[str] = None,
annotations: Optional[Mapping[str, str]] = None
)
```
- **[func_to_container_op](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=create_component_from_func#kfp.components.func_to_container_op)**
Converts a Python function to a component and returns a task factory。
同時也是 decorator 可以直接用。
```python=
kfp.components.func_to_container_op(
func: Callable,
output_component_file: Optional[str] = None,
base_image: Optional[str] = None,
extra_code: Optional[str] = '',
packages_to_install: List[str] = None,
modules_to_capture: List[str] = None,
use_code_pickling: bool = False,
annotations: Optional[Mapping[str, str]] = None
)
```
- **[load_component](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=create_component_from_func#kfp.components.load_component)**
Loads component from text, file or URL and creates a task factory function
```python=
kfp.components.load_component(
filename=None,
url=None,
text=None
)
```
範例:
```python=
import kfp.components
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
# add_op 就是一個 components,也是一個 task factor functinon
add_op = kfp.components.create_component_from_func(
add, output_component_file='add.yaml'
) # 產出 add.yaml 做 reusable components,其他人不用再寫一次 code
# 同上
add_op2 = kfp.components.func_to_container_op(add)
# 將 add.yaml load 回來,
add_op3 = kfp.components.load_component(filename='./add.yaml')
# 對著 factor function 給參數,就會回傳一個 task
# 多個 task 可以組成 pipeline
add_task = add_op(1, 4)
```
用 decorator 範例:
```python=
from kfp.components import func_to_container_op
@func_to_container_op
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
# add() 經過 decorator 已經直接就是 add_op ,task factory 了。
add_task = add(1, 4)
```
以下是 add.yaml 實際內容,這就是一個 reusable component,任何人都可以 load 進來用
他將 function 的參數定義成input,回傳值定義成 output
code 直接變成 command 的一部分
```yaml=
name: Add
description: Calculates sum of two arguments
inputs:
- {name: a, type: Float}
- {name: b, type: Float}
outputs:
- {name: Output, type: Float}
implementation:
container:
image: python:3.7
args:
- --a
- {inputValue: a}
- --b
- {inputValue: b}
- '----output-paths'
- {outputPath: Output}
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def add(a, b):
'''Calculates sum of two arguments'''
return a + b
def _serialize_float(float_value: float) -> str:
if isinstance(float_value, str):
return float_value
if not isinstance(float_value, (float, int)):
raise TypeError('Value "{}" has type "{}" instead of float.'.format(str(float_value), str(type(float_value))))
return str(float_value)
import argparse
_parser = argparse.ArgumentParser(prog='Add', description='Calculates sum of two arguments')
_parser.add_argument("--a", dest="a", type=float, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--b", dest="b", type=float, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = add(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_float,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
```
Note:
Kubeflow Pipeline 概念上就是,先寫 components,components 給參數得到 task, tasks 組成 pipeline
### 已經有 docker image
ref: [link](https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/)
延續上面的概念,如果 code 已經包好 docker image,想做成一個 Components (yaml),
只需要定義好
1. input
2. output
3. image
4. command
拿上面的 add function 來說
`add.py`
```python=
import argparse
import os
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--a', type=float)
parser.add_argument('--b', type=float)
# 要加這個參數,但是實際路徑不用我們給定,系統會幫忙給,定義在 component.yaml 中
parser.add_argument("--output_path", type=str)
args = parser.parse_args()
result = add(args.a, args.b)
# 將結果寫到 file,才可以在 step 之間傳遞
try:
os.makedirs(os.path.dirname(args.output_path))
with open(args.output_path, 'w') as f:
f.write(str(result))
except OSError:
pass
```
`Dockerfile`
```dockerfile=
FROM python:3.7
COPY ./add.py .
CMD ["python", 'add.py']
```
`add.yaml`
```yaml=
name: Add
description: Calculates sum of two arguments
inputs:
- {name: a, type: Float}
- {name: b, type: Float}
outputs:
- {name: sum, type: Float}
implementation:
container:
image: XXX/XXX:v1
command: [python, add.py]
args:
- '--a'
- {inputValue: a}
- '--b'
- {inputValue: b}
- '--output_path'
- {outputPath: sum}
```
Component 的詳細欄位定義
https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/#detailed-specification-componentspec
Kubeflow pipeline 提供很多已經有的 Components 可以參考: [link](https://github.com/kubeflow/pipelines/tree/master/components)
關於 output 的部分,詳細請看 Data Passing
## Pipeline
Pipeline 也分成兩種
1. 直接將 function submit 到 Kubeflow pipeline 執行
2. 將 functino compile 成 argo yaml,上傳到 Kubeflow pipeline 從 UI 執行。
### 直接 Submit pipeline
我們已經知道怎麼寫 Component,並透過 Component 回傳實際執行的 Task。
將 Tasks 放在同一個 function 底下,透過下面這個 API,就可以將該 function 轉成 Pipeline。
- [kfp.dsl.pipeline](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.pipeline)
Decorator of pipeline functions.
```python=
@pipeline(
name='my awesome pipeline',
description='Is it really awesome?'
pipeline_root='gs://my-bucket/my-output-path'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
...
```
接著透過下面的 API 可以將 pipeline 直接 submit 執行
- [create_run_from_pipeline_func](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.client.html#kfp.Client.create_run_from_pipeline_func)
Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.
```python
client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/")
client.create_run_from_pipeline_func(
my_pipeline,
arguments={'a': '7', 'b': '8'}, # Specify argument values for your pipeline run.
experiment_name="jack_experiments",
run_name="tutorial1"
)
```
完整範例
```python=
import kfp.dsl as dsl
from kfp.components import func_to_container_op
client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/")
@func_to_container_op
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
@dsl.pipeline(
name='Addition pipeline',
description='An example pipeline that performs addition calculations.'
)
def add_pipeline(a='1', b='7'): # default arguments value
# Passes a pipeline parameter and a constant value to the `add_op` factory
# function.
first_add_task = add_op(a, 4)
# Passes an output reference from `first_add_task` and a pipeline parameter
# to the `add_op` factory function. For operations with a single return
# value, the output reference can be accessed as `task.output` or
# `task.outputs['output_name']`.
second_add_task = add_op(first_add_task.output, b)
# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(
add_pipeline,
arguments={'a': '7', 'b': '8'}, # Specify argument values for your pipeline run.
experiment_name="jack_experiments",
run_name="tutorial_pipeline_1"
)
```


### Compile 成 argo yaml 上傳
使用以下 Function
- [kfp.compiler.Compiler().compile](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.compiler.html)
```python=
kfp.compiler.Compiler().compile(
pipeline_func=add_pipeline,
package_path='./add_pipeline.yaml'
)
```
完整範例:
```python=
import kfp.dsl as dsl
from kfp.components import func_to_container_op
client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/")
@func_to_container_op
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
@dsl.pipeline(
name='Addition pipeline',
description='An example pipeline that performs addition calculations.'
)
def add_pipeline(a='1', b='7'): # default arguments value
# Passes a pipeline parameter and a constant value to the `add_op` factory
# function.
first_add_task = add_op(a, 4)
# Passes an output reference from `first_add_task` and a pipeline parameter
# to the `add_op` factory function. For operations with a single return
# value, the output reference can be accessed as `task.output` or
# `task.outputs['output_name']`.
second_add_task = add_op(first_add_task.output, b)
# Create a pipeline package
kfp.compiler.Compiler().compile(
pipeline_func=add_pipeline,
package_path='./add_pipeline.yaml'
)
```
step1

step2

step3

## Experiment & Run
Experiment 是一個分類的概念,你可以將同性質的 pipeline 跑在同一個 Experiments 裡,以利查找比較。
Run 是 Pipeline 實際跑下去的實例。