# Kubeflow Pipeline Doc ###### tags: `Kubeflow` # 基礎概念 本文以 Docker/Kubernetes 為主,Container 皆為 Docker Container - **Container**: 是一個在機器(裸機orVM)上獨立運行的 process , 他有自己的 filesystem/network/process namespace。同一台機器上的 Container 彼此互不影響,也如同 VM 般互不可見,卻又比 VM 輕量。 [more1](https://www.youtube.com/watch?v=EnJ7qX9fkcU&ab_channel=VMwareCloudNativeApps) [more2](https://www.youtube.com/watch?v=cjXI-yxqGTI&ab_channel=IBMCloud) - **Container image**: Container 可以透過撰寫 Dockerfile,將所要運行的 code 與其 dependency 打包成一個可以隨時部署到任一機器 (有Docker環境),直接啟動該 Container 的 image。 並且 image 可以打上 tag 做版本控制。 - **Kubernetes**: 是一個管理 Container 的系統,可以透過撰寫 yaml 描述你所想要部署的任意數量的 Container 和其運作模式,Kubernetes 會根據 yaml 來處理 failures 與各種 operation 問題,而不需要人為介入。(Kubernetes 叫做 K8s 因為中間有8個字母) [more](https://www.youtube.com/watch?v=daVUONZqn88&list=PLLasX02E8BPCrIhFrc_ZiINhbRkYMKdPT&index=19&ab_channel=MicrosoftAzure) - **Argo**: - K8s 無法處理 Container 的執行順序(預設一次全部啟動),Argo 是一個 K8s 插件可以讓使用者在 yaml 描述 Container 的運行順序(DAG),於是讓 Container 變成了 Steps ,組成 Pipeline。 - Argo 設計了一套方法讓 Container 的結果可以傳遞與暫存 ,達成了 Steps/Pipeline 之間的 Parameters 和 Arguments。 - **Kubeflow Pipeline**: - 為了讓 ML Scientist 更好的在 K8s 上面跑 Pipeline,又不須要寫yaml,因此實作了一套 SDK 與 Compiler,可以將 python 寫的 kubeflow pipeline 轉成 argo 可以跑的 yaml - 實作了更好的 UI 來可視化每一個 Step 的 Output 與 lineage tracking # Kubeflow Pipeline - Components Components - 是一個 reusable template,是組成 Pipeline 的 unit(step) - Components 定義了這個 Step 的 - input - output - implementation (Container 與其啟動的 command) - Components 可以存成 yaml,讓其他人隨時 load 進來做 reuse - load 進來的 Components 是一個 factor function(XXX_op),照著 input 給參數後, return 一個 Task。 - Tasks 組成 Pipeline 關聯: - `function/app.py` -> `Component(function: XXX_op / yaml: XXX.yaml)` -> `Task (instantiate)` - `multiple Tasks` -> `Pipeline` ## 製作 Components 1. 如果是單純一個 function -> 用 kfp API 來幫你轉成一個 Components 2. 直接 load 共享的 yaml 3. 如果你已經有 docker image -> 寫 yaml,定義 input/output/command ### 使用 KFP API 將 function 轉 Components & 共享的 yaml 有兩個API,但其實他們做一樣的事情 - **[create_component_from_func](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=create_component_from_func#kfp.components.create_component_from_func)** Converts a Python function to a component and returns a task factory (a function that accepts arguments and returns a task object). ```python= kfp.components.create_component_from_func( func: Callable, output_component_file: Optional[str] = None, base_image: Optional[str] = None, packages_to_install: List[str] = None, annotations: Optional[Mapping[str, str]] = None ) ``` - **[func_to_container_op](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=create_component_from_func#kfp.components.func_to_container_op)** Converts a Python function to a component and returns a task factory。 同時也是 decorator 可以直接用。 ```python= kfp.components.func_to_container_op( func: Callable, output_component_file: Optional[str] = None, base_image: Optional[str] = None, extra_code: Optional[str] = '', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling: bool = False, annotations: Optional[Mapping[str, str]] = None ) ``` - **[load_component](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=create_component_from_func#kfp.components.load_component)** Loads component from text, file or URL and creates a task factory function ```python= kfp.components.load_component( filename=None, url=None, text=None ) ``` 範例: ```python= import kfp.components def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b # add_op 就是一個 components,也是一個 task factor functinon add_op = kfp.components.create_component_from_func( add, output_component_file='add.yaml' ) # 產出 add.yaml 做 reusable components,其他人不用再寫一次 code # 同上 add_op2 = kfp.components.func_to_container_op(add) # 將 add.yaml load 回來, add_op3 = kfp.components.load_component(filename='./add.yaml') # 對著 factor function 給參數,就會回傳一個 task # 多個 task 可以組成 pipeline add_task = add_op(1, 4) ``` 用 decorator 範例: ```python= from kfp.components import func_to_container_op @func_to_container_op def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b # add() 經過 decorator 已經直接就是 add_op ,task factory 了。 add_task = add(1, 4) ``` 以下是 add.yaml 實際內容,這就是一個 reusable component,任何人都可以 load 進來用 他將 function 的參數定義成input,回傳值定義成 output code 直接變成 command 的一部分 ```yaml= name: Add description: Calculates sum of two arguments inputs: - {name: a, type: Float} - {name: b, type: Float} outputs: - {name: Output, type: Float} implementation: container: image: python:3.7 args: - --a - {inputValue: a} - --b - {inputValue: b} - '----output-paths' - {outputPath: Output} command: - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def add(a, b): '''Calculates sum of two arguments''' return a + b def _serialize_float(float_value: float) -> str: if isinstance(float_value, str): return float_value if not isinstance(float_value, (float, int)): raise TypeError('Value "{}" has type "{}" instead of float.'.format(str(float_value), str(type(float_value)))) return str(float_value) import argparse _parser = argparse.ArgumentParser(prog='Add', description='Calculates sum of two arguments') _parser.add_argument("--a", dest="a", type=float, required=True, default=argparse.SUPPRESS) _parser.add_argument("--b", dest="b", type=float, required=True, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) _outputs = add(**_parsed_args) _outputs = [_outputs] _output_serializers = [ _serialize_float, ] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) ``` Note: Kubeflow Pipeline 概念上就是,先寫 components,components 給參數得到 task, tasks 組成 pipeline ### 已經有 docker image ref: [link](https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/) 延續上面的概念,如果 code 已經包好 docker image,想做成一個 Components (yaml), 只需要定義好 1. input 2. output 3. image 4. command 拿上面的 add function 來說 `add.py` ```python= import argparse import os def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--a', type=float) parser.add_argument('--b', type=float) # 要加這個參數,但是實際路徑不用我們給定,系統會幫忙給,定義在 component.yaml 中 parser.add_argument("--output_path", type=str) args = parser.parse_args() result = add(args.a, args.b) # 將結果寫到 file,才可以在 step 之間傳遞 try: os.makedirs(os.path.dirname(args.output_path)) with open(args.output_path, 'w') as f: f.write(str(result)) except OSError: pass ``` `Dockerfile` ```dockerfile= FROM python:3.7 COPY ./add.py . CMD ["python", 'add.py'] ``` `add.yaml` ```yaml= name: Add description: Calculates sum of two arguments inputs: - {name: a, type: Float} - {name: b, type: Float} outputs: - {name: sum, type: Float} implementation: container: image: XXX/XXX:v1 command: [python, add.py] args: - '--a' - {inputValue: a} - '--b' - {inputValue: b} - '--output_path' - {outputPath: sum} ``` Component 的詳細欄位定義 https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/#detailed-specification-componentspec Kubeflow pipeline 提供很多已經有的 Components 可以參考: [link](https://github.com/kubeflow/pipelines/tree/master/components) 關於 output 的部分,詳細請看 Data Passing ## Pipeline Pipeline 也分成兩種 1. 直接將 function submit 到 Kubeflow pipeline 執行 2. 將 functino compile 成 argo yaml,上傳到 Kubeflow pipeline 從 UI 執行。 ### 直接 Submit pipeline 我們已經知道怎麼寫 Component,並透過 Component 回傳實際執行的 Task。 將 Tasks 放在同一個 function 底下,透過下面這個 API,就可以將該 function 轉成 Pipeline。 - [kfp.dsl.pipeline](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.pipeline) Decorator of pipeline functions. ```python= @pipeline( name='my awesome pipeline', description='Is it really awesome?' pipeline_root='gs://my-bucket/my-output-path' ) def my_pipeline(a: PipelineParam, b: PipelineParam): ... ``` 接著透過下面的 API 可以將 pipeline 直接 submit 執行 - [create_run_from_pipeline_func](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.client.html#kfp.Client.create_run_from_pipeline_func) Runs pipeline on KFP-enabled Kubernetes cluster. This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution. ```python client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/") client.create_run_from_pipeline_func( my_pipeline, arguments={'a': '7', 'b': '8'}, # Specify argument values for your pipeline run. experiment_name="jack_experiments", run_name="tutorial1" ) ``` 完整範例 ```python= import kfp.dsl as dsl from kfp.components import func_to_container_op client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/") @func_to_container_op def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b @dsl.pipeline( name='Addition pipeline', description='An example pipeline that performs addition calculations.' ) def add_pipeline(a='1', b='7'): # default arguments value # Passes a pipeline parameter and a constant value to the `add_op` factory # function. first_add_task = add_op(a, 4) # Passes an output reference from `first_add_task` and a pipeline parameter # to the `add_op` factory function. For operations with a single return # value, the output reference can be accessed as `task.output` or # `task.outputs['output_name']`. second_add_task = add_op(first_add_task.output, b) # Create a pipeline run, using the client you initialized in a prior step. client.create_run_from_pipeline_func( add_pipeline, arguments={'a': '7', 'b': '8'}, # Specify argument values for your pipeline run. experiment_name="jack_experiments", run_name="tutorial_pipeline_1" ) ``` ![](https://i.imgur.com/0B84rEe.png) ![](https://i.imgur.com/oZPIZHs.png) ### Compile 成 argo yaml 上傳 使用以下 Function - [kfp.compiler.Compiler().compile](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.compiler.html) ```python= kfp.compiler.Compiler().compile( pipeline_func=add_pipeline, package_path='./add_pipeline.yaml' ) ``` 完整範例: ```python= import kfp.dsl as dsl from kfp.components import func_to_container_op client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/") @func_to_container_op def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b @dsl.pipeline( name='Addition pipeline', description='An example pipeline that performs addition calculations.' ) def add_pipeline(a='1', b='7'): # default arguments value # Passes a pipeline parameter and a constant value to the `add_op` factory # function. first_add_task = add_op(a, 4) # Passes an output reference from `first_add_task` and a pipeline parameter # to the `add_op` factory function. For operations with a single return # value, the output reference can be accessed as `task.output` or # `task.outputs['output_name']`. second_add_task = add_op(first_add_task.output, b) # Create a pipeline package kfp.compiler.Compiler().compile( pipeline_func=add_pipeline, package_path='./add_pipeline.yaml' ) ``` step1 ![](https://i.imgur.com/pMcKJAg.png) step2 ![](https://i.imgur.com/gfJX6qx.png) step3 ![](https://i.imgur.com/Rsqoieu.png) ## Experiment & Run Experiment 是一個分類的概念,你可以將同性質的 pipeline 跑在同一個 Experiments 裡,以利查找比較。 Run 是 Pipeline 實際跑下去的實例。