# Kubeflow Pipeline 使用 ###### tags: `Kubeflow`, `Kubernetes` ## Component pipeline 由一系列 components 組成,每一個 components 代表 workflow 裡面的一個步驟,例如: 資料前處理、資料轉換或是模型訓練。你可以把一個 components 想像成一個 function ,他有參數、回傳值與實際執行的邏輯。 components 實際上是一個 yaml 格式的文件,包含以下三個重要的資訊 - Metadata: 名稱、使用者自定義描述等 - Interface: 輸入與輸出的定義(參數名稱、定義與預設值等) - Implementation: 描述該 components 如何執行,因為 pipeline 每一個步驟其實就是執行一個 `container`,最常見的作法即是運行一個單一使用者自定義的 container ,因此要提供使用者自行打包的 Docker image。除此之外,去部署或是運行一個 k8s 資源也是常見的 workflow 步驟,未來 pipeline 將會提供 `k8sResource` 作為 component ,讓使用者可以透過 `k8sResource` 在 k8s 中部署 k8s 資源或是 CRD 資源。目前也提供讓開發者自行開發,例如目前 `AWS`、`GCP`或是`TFX` 都有提供現成的 components 可以讓使用者選用,也就是提供打包好寫好邏輯的 Docker image,使用者只需填入 input/output 即可。 ([參考](https://github.com/kubeflow/pipelines/tree/master/components)) 以下是範例 - 使用 container 作為執行個體,提供 Docker image,並定義輸入輸出。注意 implementation 裡面的 container 即是 Container v1 spec 。 ```yaml name: Mnist training description: Train a mnist model and save to GCS inputs: - name: model_file description: 'Name of the model file.' type: String - name: bucket description: 'GCS bucket name.' type: String - name: training_data description: 'path for training data.' type: String outputs: - name: model_path description: 'Trained model path.' type: GCSPath implementation: container: image: ${GCR_IMAGE} command: [ python, /app/app.py, --model_file, {inputValue: model_file}, --training_data, {inputValue: training_data}, --bucket, {inputValue: bucket}, ] fileOutputs: model_path: /output.txt ``` ## How to build a Component and then Pipeline 首先需要 ```python= import kfp import kfp.gcp import kfp.dsl as dsl import kfp.compiler import kfp.components ``` ### 1. 直接將 python 函式轉成 component 這是最簡單的方法,我們將直接寫一個 python 函式,然後使用 kubeflow pipeline 的 `kfp.components.func_to_container_op(func)` 將它轉成一個 Component,並在 Pipeline 中使用這個 Component 執行該 Pipeline。 #### 首先定義一個很簡單的函式 ,將參數 a 與參數 b 相加後回傳。 ```python= #Define a Python function def add(a: float, b: float) -> float: '''Calculates sum of two arguments''' return a + b ``` #### 接著便可以直接將此函式轉成 Pipeline 中的 Component ```python= add_op = kfp.components.func_to_container_op(add, base_image="tensorflow/tensorflow:1.15.0-py3") ``` #### 定義 Pipeline 並在其中使用此 Component ```python= @dsl.pipeline( name='Calculation pipeline', description='A toy pipeline that performs arithmetic calculations.' ) def calc_pipeline( a='a', ): #Passing pipeline parameter and a constant value as operation arguments add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. ``` #### 將 Pipeline 送到 Kubeflow pipeline 執行 ```python= pipeline_func = calc_pipeline experiment_name = 'python-functions' #Specify pipeline argument values arguments = {'a': '6'} run_name = pipeline_func.__name__ + ' run' # Submit pipeline directly from pipeline function run_result = client.create_run_from_pipeline_func(pipeline_func, experiment_name=experiment_name, run_name=run_name, arguments=arguments) ``` #### 更複雜的函式 一般函式不會那麼簡單,可能需要 1. 用到許多第三方套件 2. 會用到多個函式 因此對於這種直接將函式轉成components的用法,一來只能 import `base image` 裡面有的套件,因此要挑選包含所需套件的 image。二來是,只能有一個函式,因此多餘的函式都必須定義包含在該主函式內。 ```python= # Advanced function # Demonstrates imports, helper functions and multiple outputs from typing import NamedTuple def my_divmod(dividend: float, divisor: float, ) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), ('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]): '''Divides two numbers and calculate the quotient and remainder''' #Imports inside a component function: import numpy as np #This function demonstrates how to use nested functions inside a component function: def divmod_helper(dividend, divisor): return np.divmod(dividend, divisor) (quotient, remainder) = divmod_helper(dividend, divisor) import json # Exports a sample tensorboard: metadata = { 'outputs' : [{ 'type': 'tensorboard', 'source': 'gs://ml-pipeline-dataset/tensorboard-train', }] } # Exports two sample metrics: metrics = { 'metrics': [{ 'name': 'quotient', 'numberValue': float(quotient), },{ 'name': 'remainder', 'numberValue': float(remainder), }]} from collections import namedtuple divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics']) return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics)) ``` #### 再定義 pipeline 並執行 ```python3= import kfp.dsl as dsl @dsl.pipeline( name='Calculation pipeline', description='A toy pipeline that performs arithmetic calculations.' ) def add_div_pipeline( a='a', b='7', c='17', ): #Passing pipeline parameter and a constant value as operation arguments add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. #Passing a task output reference as operation arguments #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax divmod_task = divmod_op(add_task.output, b) #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax result_task = add_op(divmod_task.outputs['quotient'], c) ``` ```python3= pipeline_func = add_div_pipeline experiment_name = 'python-functions' # same as above #Specify pipeline argument values arguments = {'a': '6', 'b': '2', 'c': '5'} run_name = pipeline_func.__name__ + ' run' # Submit pipeline directly from pipeline function run_result = client.create_run_from_pipeline_func(pipeline_func, experiment_name=experiment_name, run_name=run_name, arguments=arguments) ``` ### 2. 使用你的 python app 與自定義 Dockerfile 轉成 component 因為單純從函式轉成 components 有許多的限制,除了受限只能有一個函式外,也受限於 base image 提供的套件。因此 Kubeflow Pipeline 也支援讓使用者提供一個 python module 與 Dockerfile 來打包成 image 同時轉成一個 component。 #### 定義一個 module 這邊定義一個簡單的 mnist 模型訓練程式碼。 接受兩個輸入參數,一個是 `--model_file` 指定模型名稱,將會用在儲存模型時的路徑名稱。 路徑在 `./tmp/components/mnist_training/app.py` ```python3= import argparse from datetime import datetime import tensorflow as tf parser = argparse.ArgumentParser() parser.add_argument( '--model_file', type=str, required=True, help='Name of the model file.') parser.add_argument( '--bucket', type=str, required=True, help='GCS bucket name.') args = parser.parse_args() bucket=args.bucket model_file=args.model_file model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(512, activation=tf.nn.relu), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10, activation=tf.nn.softmax) ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) print(model.summary()) mnist = tf.keras.datasets.mnist (x_train, y_train),(x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()), # Interrupt training if val_loss stops improving for over 2 epochs tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'), ] model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks, validation_data=(x_test, y_test)) model.save(model_file) from tensorflow import gfile gcs_path = bucket + "/" + model_file if gfile.Exists(gcs_path): gfile.Remove(gcs_path) gfile.Copy(model_file, gcs_path) with open('/output.txt', 'w') as f: f.write(gcs_path) ``` ### 3. 使用你的 python app 與自定義 Dockerfile 打包成可重複使用 Component ### 4. 使用第三方打包好可重複使用的 Component ### arguments Each component in a pipeline executes independently. The components do not run in the same process and cannot directly share in-memory data. You must serialize (to strings or files) all the data pieces that you pass between the components so that the data can travel over the distributed network. You must then deserialize the data for use in the downstream component. 首先要知道的是,pipeline 中的每一個步驟都是獨立用 container 依序執行的,彼此並無法在記憶體中共享資料,因此資料 上面的例子中,training code 會自 `--training_data, {inputValue: training_data}` 取得訓練資料,而一般訓練資料會來自前一個步驟可能是 `資料轉換 ( data transfrom )` 而來,因此如何將前一個步驟的輸出作為 training components的輸入呢? - 在 components 的定義中可能這樣定義 ```shell command: [program.py, --train-set, {inputPath: training_data}] ``` - 在 pipeline 中,定義該參數來自前一個步驟的 output ```shell task1 = component1(......) task2 = component2(training_data=task1.outputs['some_data']) ``` - pipeline 會自動將它轉換成實際的路徑位置 ```shell program.py --train-set /inputs/train_data/data ``` ```shell= gcloud iam service-accounts create jacklin gcloud iam service-accounts add-iam-policy-binding \ user1-gcp@<project-id>.iam.gserviceaccount.com \ --member='serviceAccount:<cluster-name>-admin@<project-id>.iam.gserviceaccount.com' --role='roles/owner' gcloud iam service-accounts add-iam-policy-binding --role=roles/iam.workloadIdentityUser --member="serviceAccount:infuseai-dev.svc.id.goog[jack/default-editor]" jacklin@infuseai-dev.iam.gserviceaccount.com gcloud projects add-iam-policy-binding infuseai-dev --member serviceAccount:jacklin@infuseai-dev.iam.gserviceaccount.com --role roles/storage.admin ```