# Kubeflow Pipeline Doc - 2 ###### tags: `Kubeflow` # Data Passing ref: [link](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/#understanding-how-data-is-passed-between-components) 總結來說, **Kubeflow pipeline (Argo)** 都是透過 file 來在不同的step傳參數跟結果(相較 Airflow 是用 DB XCom)。 因此如果 step 之間要傳遞結果,必須將結果寫到一個 file 裡,系統會幫你把這個 file 傳給下一個 step 來讀取。 只是如果是 lightweight version (func_to_container_op) 他會在 compile 時幫你做掉。 以下介紹時會分兩種製作 components 的方法給範例 * lightweight version: 寫 function 直接轉 components * components.yaml: build 好 image,已經寫成 component.yaml 的形式 ## small data ### lightweight version 如果你的 Components 會回傳多個 small outputs (short strings, numbers, or booleans), **則回傳值需要有 typing.NamedTuple hint,並且用 collections.namedtuple 作為回傳值** 注意: 如果是 func_to_container_op,import 要在 function 裡面才會被包進去 components 裡。 ```python= import kfp.dsl as dsl from kfp.components import func_to_container_op from typing import NamedTuple client = kfp.Client(host="https://kubeflow-appinst-densa.appier.us/") @func_to_container_op def sum_and_product(a: float, b: float) -> NamedTuple( 'ExampleOutputs', [('sum', float), ('product', float)] ): """Example function that demonstrates how to return multiple values.""" sum_value = a + b product_value = a * b from collections import namedtuple example_output = namedtuple('ExampleOutputs', ['sum', 'product']) return example_output(sum_value, product_value) @func_to_container_op def scaler(a: float) -> float: '''Calculates sum of two arguments''' return 10*a @dsl.pipeline( name='Addition pipeline', description='An example pipeline that performs addition calculations.' ) def my_pipeline(a = 4, b = 6): # default arguments value sum_and_product_task = sum_and_product(a, b) # 將剛剛步驟的 sum 跟 product 分別 * 10 scaler_sum_task = scaler(sum_and_product_task.outputs['sum']) scale_product_task = scaler(sum_and_product_task.outputs['product']) arguments = {'a': 4, 'b': 6} # Create a pipeline run, using the client you initialized in a prior step. client.create_run_from_pipeline_func( my_pipeline, arguments=arguments, # Specify argument values for your pipeline run. experiment_name="jack_experiments", run_name="tutorial_pipeline_2" ) ``` 舉例: ![](https://i.imgur.com/9efY28v.png) ![](https://i.imgur.com/colUZ4j.png) ### Component.yaml 你的 code 就必須自己處理把結果寫到 file 裡面,跟下面的 artifacts 一樣 參考: https://github.com/ChanYiLin/kfp_notebook_example/tree/master/add_divmod_expample ## medium data - artifacts (file) 如果你的 step 會產出一個 artifacts (e.g. json) 想讓後面的 step 可以使用,則可以透過指定 **inputPath, outputPath** * InputPath: tell the system that the function wants to consume the corresponding input data as a file * OutputPath: tell the system that the function wants to produce the corresponding output data as a file. You can specify the type argument to InputPath and OutputPath ( help users to do type match ) * OutputPath('TFModel') means that the function states that the data it has written to a file has type 'TFModel' * InputPath('TFModel') means that the function states that it expect the data it reads from a file to have type 'TFModel' type Basic types are String, Integer, Float, and Bool. See the full list of [types](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/types.py) defined by the Kubeflow Pipelines SDK. ### lightweight version path 都不用給死,系統會幫你創建 path ,並將資料寫進去或是讀出來 如下面的例子 ```python= import kfp import kfp.gcp import kfp.dsl as dsl import kfp.compiler import kfp.components from kfp.components import InputPath, OutputPath from kfp.components import func_to_container_op # Writing many numbers @func_to_container_op def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10): with open(numbers_path, 'w') as writer: for i in range(start, count): writer.write(str(i) + '\n') # Reading and summing many numbers @func_to_container_op def sum_numbers(numbers_path: InputPath(str)) -> int: sum = 0 with open(numbers_path, 'r') as reader: for line in reader: sum = sum + int(line) return sum # Reading bigger data @func_to_container_op def print_text(text_path: InputPath()): # The "text" input is untyped so that any data can be printed '''Print text''' with open(text_path, 'r') as reader: for line in reader: print(line, end = '') @dsl.pipeline( name='Sum numbers', description='A pipeline to sum 100000 numbers.' ) def sum_pipeline(count: 'Integer' = 100000): # Pipeline to sum 100000 numbers write_numbers_task = write_numbers(count=count) print_text_task = print_text(write_numbers_task.output) # one output, don't need to specify key sum_numbers_task = sum_numbers(write_numbers_task.outputs['numbers']) # or use key numbers_path, but _path can be removed print_text_task2 = print_text(sum_numbers_task.output) ``` 可以看到上面的部分都沒給 inputPath/outputPath 的真實路徑, 這是因為 SDK 在 compile 時,幫我們給了路徑 如果把他 compile 成 argo yaml 可以看到 ```python= kfp.compiler.Compiler().compile( pipeline_func=sum_pipeline, package_path='./sum_pipeline.yaml' ) ``` ```yaml= - name: repeat-line container: args: [--line, Hello, --count, '10', --output-text, /tmp/outputs/output_text/data] command: - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def _make_parent_dirs_and_return_path(file_path: str): # 幫我們創建路徑 import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return file_path def repeat_line(line, output_text_path, count = 10): '''Repeat the line specified number of times''' with open(output_text_path, 'w') as writer: for i in range(count): writer.write(line + '\n') import argparse _parser = argparse.ArgumentParser(prog='Repeat line', description='Repeat the line specified number of times') _parser.add_argument("--line", dest="line", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--count", dest="count", type=int, required=False, default=argparse.SUPPRESS) # compile 的時候幫我們家上 ouput-text 的參數與創建路徑,讓資料可以寫入檔案 _parser.add_argument("--output-text", dest="output_text_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _outputs = repeat_line(**_parsed_args) image: python:3.7 outputs: # 幫我們決定好路徑位置 artifacts: - {name: repeat-line-output_text, path: /tmp/outputs/output_text/data} ``` 至於 print-text 怎麼吃到這個 output path 與他的 file content 搭配 argo 的這個 yaml 範例就會懂了 https://github.com/argoproj/argo-workflows/blob/master/examples/artifact-passing.yaml 他幫你把 artifacts passing 在 compile 的時候做掉了 ### components.yaml 分兩個 Case Case1: add component 其實這個也是透過 artifacts 傳遞的,只是 lightweight 的會在 compile 期間幫你把 write file 做掉 但如果你想將 code 包成 image,不想 lightweight 直接 **func_to_container_op**,則就要自己處理將結果寫入 file 中。 #### add. py 重點: 1. 參數要多一個 output_path,並在最後將結果寫入 output_path(記得轉f.write 要是 string ) 2. 要記得 os 創建該 output_path 並將結果寫入。 ```python= import argparse import os def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--a', type=float) parser.add_argument('--b', type=float) # 要加這個參數,但是實際路徑不用我們給定,系統會幫忙給,定義在 component.yaml 中 parser.add_argument("--output_path", type=str) args = parser.parse_args() result = add(args.a, args.b) # 將結果寫到 file,才可以在 step 之間傳遞 try: os.makedirs(os.path.dirname(args.output_path)) with open(args.output_path, 'w') as f: f.write(str(result)) except OSError: pass ``` #### component.yaml 重點: 1. 記得定義 output 2. --output_path 給 {outputPath: sum} 名字跟剛剛定義的 output name 對應 ```yaml= name: Add description: Calculates sum of two arguments inputs: - {name: a, type: Float} - {name: b, type: Float} outputs: - {name: sum, type: Float} implementation: container: image: jackfantasy/kfp_add:v1 command: [python, add.py] args: - '--a' - {inputValue: a} - '--b' - {inputValue: b} - '--output_path' - {outputPath: sum} ``` ```yaml= name: My divmod description: Divides two numbers and calculate the quotient and remainder inputs: - {name: dividend, type: Float} - {name: divisor, type: Float} outputs: - {name: quotient, type: Float} - {name: remainder, type: Float} - {name: mlpipeline_ui_metadata, type: UI_metadata} - {name: mlpipeline_metrics, type: Metrics} implementation: container: image: jackfantasy/kfp_add:v1 command: [python, divmod.py] args: - '--dividend' - {inputValue: dividend} - '--divisor' - {inputValue: divisor} - '--output_paths' - {outputPath: quotient} - {outputPath: remainder} - {outputPath: mlpipeline_ui_metadata} - {outputPath: mlpipeline_metrics} ``` #### pipeline. py 重點: 1. output_path這個參數不用給,系統會幫你處理好 2. 要拿到裡面的值,add_task.output 直接這樣就可 ```python= @dsl.pipeline( name='Calculation pipeline', description='A toy pipeline that performs arithmetic calculations.' ) def add_div_pipeline( a='1', b='7', c='17', ): #Passing pipeline parameter and a constant value as operation arguments add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. add_task.set_image_pull_policy('Always') #Passing a task output reference as operation arguments #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax divmod_task = divmod_op(add_task.output, b) divmod_task.set_image_pull_policy('Always') ``` 最後你可以看到他會幫你將結果存在 `/tmp/outputs/sum/data` ,因此才要記得用 os.mkdirs ,否則會無法將結果寫入。 Case2: 如果你的 code 已經有一個既定的 path 寫入你的結果 如下舉例,這個 components 會將 data 寫在 `'./args_outputs.json'`,你可以用 fileOutputs 來強制宣告你的 output 位置,讓系統幫你將這個位置的 file 傳給後面的 step。 - output: 宣告這個 components 會有的 output 的名字與類別 (optional) - fileOutputs: 指定這個 component output file 位置 ```yaml= name: RTB Userlist - Train Active Campaign description: Retrieve the campaigns which need to train models inputs: - {name: Countries, type: String, default: 'jp tw', description: 'Country'} outputs: - {name: args_outputs, type: String} implementation: container: image: us.gcr.io/appier-ai/ai-rtb-appinstall:latest command: [python, -m, app.rtb_app_install_userlist.core.train_active_campaign] args: [ --countries, {inputValue: Countries}, --config_file, '/root/config/config.json', --dump_path, './args_outputs.json' ] fileOutputs: args_outputs: './args_outputs.json' ``` 接下來 pipeline 的部分 ```python= import kfp.dsl as dsl @dsl.pipeline( name='Addition pipeline', description='An example pipeline that performs addition calculations.' ) def rtb_userlist_pipeline( countries='jp tw' ): # 取的 jp, tw 的所有要 train model 的 campaigns # e.g. # [ # {"oid": "65AN3v8KTbW_hCSIN4QHlA", "site_id": "android--co.spoonme--afdsp", "country": "jp", "target": "rtNewInstall", "filter_rule": "os=android/osv=5.0"}, # {"oid": "IgB557WtSIyWylcl8LJEMQ", "site_id": "android--co.spoonme--afdsp", "country": "kr", "target": "rtPurchase", "filter_rule": "os=android/osv=5.0"} # ] train_active_campaign_task = train_active_campaign_op(countries=countries) # 這邊會將剛剛的 list 展開,每一個 item 會創造一個 step,並且將 item 傳進去 # 就是 argo 的 withParams with dsl.ParallelFor(train_active_campaign_task.output) as item: op1 = dsl.ContainerOp( name="echo", image="library/bash:4.4.23", command=["sh", "-c"], arguments=["echo do output train_active_campaign_task item: %s" % item], ) ``` ### 總結 Data Passing 的部分比較讓人疑惑,主要是因為有兩種方法做 components 1. lightweight 2. components.yaml 但是方法都是一樣的,將結果寫入 file,讓下一個 step 可以讀取。 而 lightweight 幫你把這部分做掉, components.yaml 則要自己處理,才會顯得麻煩。 總結就是,**要往後傳遞的結果記得存到 file 中**,指定 ouputPath,讓下一個步驟可以指定 inputPath 或是 inputValue 來取值。 # Types KFP 的 types 很讓人疑惑,似乎怎麼寫 (大小寫) 都可以 主要是因為他會幫你 parse https://github.com/kubeflow/pipelines/blob/d9c019641ef9ebd78db60cdb78ea29b0d9933008/sdk/python/kfp/components/_data_passing.py 節錄自上面的 code ```python Converter = NamedTuple('Converter', [ ('types', Sequence[str]), ('type_names', Sequence[str]), ('serializer', Callable[[Any], str]), ('deserializer_code', str), ('definitions', str), ]) _converters = [ Converter([str], ['String', 'str'], _serialize_str, 'str', None), Converter([int], ['Integer', 'int'], _serialize_int, 'int', None), Converter([float], ['Float', 'float'], _serialize_float, 'float', None), Converter([bool], ['Boolean', 'Bool', 'bool'], _serialize_bool, _bool_deserializer_code, _bool_deserializer_definitions), Converter([list], ['JsonArray', 'List', 'list'], _serialize_json, 'json.loads', 'import json'), # ! JSON map keys are always strings. Python converts all keys to strings without warnings Converter([dict], ['JsonObject', 'Dictionary', 'Dict', 'dict'], _serialize_json, 'json.loads', 'import json'), # ! JSON map keys are always strings. Python converts all keys to strings without warnings Converter([], ['Json'], _serialize_json, 'json.loads', 'import json'), Converter([], ['Base64Pickle'], _serialize_base64_pickle, _deserialize_base64_pickle_code, _deserialize_base64_pickle_definitions), ] ``` 所以舉例來說, 你的 yaml 裡面 inputValue type 寫 JsonArray 跟寫 List, list 都是一樣的 ```yaml name: Demo description: Demo input with different type but actually the same inputs: - {name: countries, type: List, default: 'jp tw', description: 'Country'} - {name: countries, type: list, default: 'jp tw', description: 'Country'} - {name: countries, type: JsonArray, default: 'jp tw', description: 'Country'} ``` # Steps 順序 Pipeline 到底怎麼決定執行順續? 答案: input/output 的 dependency 1. **沒有 input/output 相依關係** => 預設平行執行 2. **input/output 相依關係** => 自動幫你設定 `task1 -> task2` 3. 強制設定順序關係 ## after ```python= component_op = components.load_component_from_text( """ name: Print Text inputs: - {name: text, type: String} implementation: container: image: alpine command: - sh - -c - | set -e -x echo "$0" - {inputValue: text} """ ) @dsl.pipeline(name='pipeline-with-after') def my_pipeline(): task1 = component_op(text='1st task') task2 = component_op(text='2nd task').after(task1) task3 = component_op(text='3rd task').after(task1, task2) ``` ## ParallelFor Ex1: ```python= @func_to_container_op def add_op(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b @dsl.pipeline(name='my-pipeline') def pipeline(my_pipe_param: int = 10): loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}] with dsl.ParallelFor(loop_args) as item: add_task = add_op(item['a'], item['b']) ``` Ex2: ```python= from kfp import components from kfp import dsl from typing import List @components.create_component_from_func def print_op(text: str) -> str: print(text) return text @components.create_component_from_func def sum_op(a: float, b: float) -> float: print(a + b) return a + b @components.create_component_from_func def generate_op() -> list: return [{'a': i, 'b': i * 10} for i in range(1, 5)] @dsl.pipeline(name='pipeline-with-loop-parameter') def my_pipeline(greeting='this is a test for looping through parameters'): print_task = print_op(text=greeting) generate_task = generate_op() with dsl.ParallelFor(generate_task.output) as item: sum_task = sum_op(a=item.a, b=item.b) sum_task.after(print_task) print_task_2 = print_op(sum_task.output.ignore_type()) arguments = {'greeting': "Hello"} client.create_run_from_pipeline_func(my_pipeline, arguments=arguments, experiment_name="test", run_name="parallel_for") ``` # Metrics & Metadata https://www.kubeflow.org/docs/components/pipelines/sdk/pipelines-metrics/ https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/ # Condition - with dsl.Condition(): ```python= from kfp import components from kfp import dsl def flip_coin(force_flip_result: str = '') -> str: """Flip a coin and output heads or tails randomly.""" if force_flip_result: return force_flip_result import random result = 'heads' if random.randint(0, 1) == 0 else 'tails' return result def print_msg(msg: str): """Print a message.""" print(msg) flip_coin_op = components.create_component_from_func(flip_coin) print_op = components.create_component_from_func(print_msg) @dsl.pipeline(name='single-condition-pipeline') def my_pipeline(text: str = 'condition test', force_flip_result: str = ''): flip1 = flip_coin_op(force_flip_result) print_op(flip1.output) with dsl.Condition(flip1.output == 'heads'): flip2 = flip_coin_op() print_op(flip2.output) print_op(text) if __name__ == '__main__': kfp.compiler.Compiler().compile(my_pipeline, __file__ + '.yaml') ``` # Volume / Affinity / Tolerations / Resource 以下面為例,task 為一個 Component instantiate 的 task ```python= from kubernetes.client import V1Toleration, V1Affinity, V1NodeSelector, V1NodeSelectorRequirement, V1NodeSelectorTerm, \ V1NodeAffinity, V1PodDNSConfig, V1PodDNSConfigOption from kubernetes import client as k8s_client def appinstall_k8s_decorator(task, cpu='1', memory='125Gi'): # Affinity & Toleration affinity = V1Affinity( node_affinity=V1NodeAffinity( required_during_scheduling_ignored_during_execution=V1NodeSelector( node_selector_terms=[ V1NodeSelectorTerm( match_expressions=[ V1NodeSelectorRequirement( key='appier/node-pool', operator='In', values=['ai-rtb-appinstall-c32-m256-preem-v0'] ) ] ) ] ) ) ) toleration = V1Toleration( effect='NoSchedule', key='appier/node-role', operator='Equal', value='ai-rtb-appinstall' ) toleration1 = V1Toleration( effect='NoSchedule', key='appier/preemptible', operator='Equal', value='true' ) task.add_affinity(affinity) task.add_toleration(toleration) task.add_toleration(toleration1) # Volumes & ENV task.add_volume( k8s_client.V1Volume(name='config', secret=k8s_client.V1SecretVolumeSource(secret_name='config.json')) ) task.add_volume_mount( k8s_client.V1VolumeMount(mount_path='/root/config', name='config') ) task.add_volume( k8s_client.V1Volume(name='gcp-credentials', secret=k8s_client.V1SecretVolumeSource(secret_name='rtb-appinstall-gcp-credentials')) ) task.add_volume_mount( k8s_client.V1VolumeMount(mount_path='/root/gcp', name='gcp-credentials') ) task.add_env_variable( k8s_client.V1EnvVar( name='GOOGLE_APPLICATION_CREDENTIALS', value='/root/gcp/gcp-rtb-appinstall-cred.json' ) ) # Resources task.set_cpu_limit(cpu) task.set_cpu_request(cpu) task.set_memory_limit(memory) task.set_memory_request(memory) return task import kfp.dsl as dsl @dsl.pipeline( name='Addition pipeline', description='An example pipeline that performs addition calculations.' ) def rtb_userlist_pipeline( countries='jp tw' ): train_active_campaign_task = train_active_campaign_op(countries=countries) # 直接一個 functino 可以套所有設定,還可以客製化設定 train_active_campaign_task = appinstall_k8s_decorator(train_active_campaign_task, cpu='4', memory='240Gi') ``` 可以避免 argo 要重複寫這些設定的問題。 # CronJob ![](https://i.imgur.com/ArgBNSh.png) 將你的 pipeline 打包成 tar.gz 後,上傳至 kubeflow pipeline 即可設定 cronJob 1. 可以選定 Catchup 來補跑沒有跑到或是 delay 的 job 2. 在 pipeline parameters 的部分填寫一下幾個 key word 可以取得各種時間,包含他的 scheduledtime ,以利 backfill 時使用 (Argo 無法) 3. 這些 parameters 就是 pipeline parameters,所以可以被各 components 當作 input 。 ```python= [[ScheduledTime]] is substituted by the scheduled time of the workflow (default format) [[CurrentTime]] is substituted by the current time (default format) [[Index]] is substituted by the index of the workflow (e.g. 3 mins that it was the 3rd workflow created) [[ScheduledTime.15-04-05]] is substituted by the sheduled time (custom format specified as a Go time format: https://golang.org/pkg/time/#Parse) [[CurrentTime.15-04-05]] is substituted by the current time (custom format specified as a Go time format: https://golang.org/pkg/time/#Parse) ``` # Pipeline Version "Kubeflow Pipelines requires the all pipelines must have unique names. Otherwise you have to use update the version instead of upload the pipeline. Furthermore, you have to use unique name when uploading the new version of pipeline."" 當你將 pipelien 上傳時,他除了原本的 name,還會有一個 id,可以透過以下 function 來上傳一個新的 pipeline 在同一個 id 底下,但是給不同的 version。 做到 pipeline 的版控 - [client.upload_pipeline_version](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.client.html#kfp.Client.upload_pipeline_version) ```python= client.upload_pipeline_version( './sum_number.tar.gz', pipeline_version_name='sum_number', pipeline_id='e1a168b9-56fd-4cd2-941e-0da0504e34c8' ) ``` ![](https://i.imgur.com/Fwlddb7.png) Kubeflow pipeline client 有很多功能都有 API,可以參考: https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.client.html#kfp.Client.upload_pipeline_version # Cache Output for Steps https://www.kubeflow.org/docs/components/pipelines/caching/#managing-caching-staleness