# Kubeflow Overview ###### tags: `ETL pipeline` `ML pipeline` ## Introduction to Kubeflow > [ref](https://www.kubeflow.org/docs/about/kubeflow/) ### What is Kubeflow? The Kubeflow project is dedicated to making deployments of machine learning (ML) workflows on Kubernetes simple, portable and scalable. Our goal is not to recreate other services, but to provide a straightforward way to deploy best-of-breed open-source systems for ML to diverse infrastructures. Anywhere you are running Kubernetes, you should be able to run Kubeflow. ### The Kubeflow mission Our goal is to make scaling machine learning (ML) models and deploying them to production as simple as possible, by letting Kubernetes do what it’s great at: * **Easy, repeatable, portable deployments on a diverse infrastructure (for example, experimenting on a laptop, then moving to an on-premises cluster or to the cloud)** * **Deploying and managing loosely-coupled microservices** * **Scaling based on demand** Ultimately, we want to have a set of simple manifests that give you an easy to use ML stack anywhere Kubernetes is already running, and that can self configure based on the cluster it deploys into. ### History Kubeflow started as an open sourcing of the way Google ran TensorFlow internally, based on a pipeline called TensorFlow Extended. It began as just a simpler way to run TensorFlow jobs on Kubernetes, but has since expanded to be a multi-architecture, multi-cloud framework for running end-to-end machine learning workflows. <!-- **The following components also have roadmaps:** * Kubeflow Pipelines * KF Serving * Katib * Training Operator * Kubeflow Notebooks --> ## Conceptual overview > [ref](https://www.kubeflow.org/docs/started/kubeflow-overview/) Kubeflow is the ML toolkit for Kubernetes. The following diagram shows Kubeflow as a platform for arranging the components of your ML system on top of Kubernetes: ![](https://i.imgur.com/jTmTgat.png) Kubeflow builds on Kubernetes as a system for deploying, scaling, and managing complex systems. Using the Kubeflow configuration interfaces (see below) you can specify the ML tools required for your workflow. Then you can deploy the workflow to various clouds, local, and on-premises platforms for experimentation and for production use. ## Introducing the ML workflow When you develop and deploy an ML system, the ML workflow typically consists of several stages. Developing an ML system is an iterative process. You need to evaluate the output of various stages of the ML workflow, and apply changes to the model and parameters when necessary to ensure the model keeps producing the results you need. ![](https://i.imgur.com/pHJJTmQ.png) Looking at the stages in more detail: * In the experimental phase, you develop your model based on initial assumptions, and test and update the model iteratively to produce the results you’re looking for: - Identify the problem you want the ML system to solve. - Collect and analyze the data you need to train your ML model. - Choose an ML framework and algorithm, and code the initial version of your model. - Experiment with the data and with training your model. - Tune the model hyperparameters to ensure the most efficient processing and the most accurate results possible. * In the production phase, you deploy a system that performs the following processes: - Transform the data into the format that your training system needs. To ensure that your model behaves consistently during training and prediction, the transformation process must be the same in the experimental and production phases. - Train the ML model. - Serve the model for online prediction or for running in batch mode. - Monitor the model’s performance, and feed the results into your processes for tuning or retraining the model. ![](https://i.imgur.com/Lqcd2R9.png) ![](https://i.imgur.com/TO378nL.png) ## Kubeflow user interface (UI) The Kubeflow UI looks like this: ![](https://i.imgur.com/BdAAPTo.png) :::info Server Login: 帳號:group0@gmail.com (group0 換成小組組別 eg. group1, group2.. etc. ) 密碼:組長的學號 (請自行詢問組長 eg. 110522001 ) **[ Login web site ](https://pdc2.csie.ncu.edu.tw:31207)** ![](https://i.imgur.com/uJwDGiK.png) 請使用 vscode SFTP 連線至機器中 帳號: s000000000 請改成自己學號 密碼: s000000000team0 ( 如lab1,帳號跟組別都要改,不知道組別請到[網站](https://pdc2.csie.ncu.edu.tw/)上看 ) port 記得換成 10023 ![](https://i.imgur.com/U2C96n1.png) ::: :::success ## Machine learning pipeline The data set used by lab this time is MNIST,The MNIST database is a large database of handwritten digits that is commonly used for training various image processing systems. ![](https://i.imgur.com/EUFePkF.png) #### Experimental purpose: 1、Training a model for recognizing handwritten digits 2、Select the epoch (10, 15) for the model ::: :::success ### Mnist pipeline ![](https://i.imgur.com/bcasKI4.png) ### machine learning code ```python = from keras.datasets import mnist from keras import models from keras import layers from keras.utils import to_categorical current_opt = 'rmsprop' ### load_data (train_images, train_labels), (test_images, test_labels) = mnist.load_data() ### data_processing fix_train_images = train_images.reshape((60000, 28 * 28)).astype('float32') / 255 fix_test_images = test_images.reshape((10000, 28 * 28)).astype('float32') / 255 fix_train_labels = to_categorical(train_labels) fix_test_labels = to_categorical(test_labels) ### create_model network = models.Sequential() network.add(layers.Dense(512, activation='relu', input_shape=(28 * 28,))) network.add(layers.Dense(10, activation='softmax')) network.compile(optimizer=current_opt, loss='categorical_crossentropy', metrics=['accuracy']) ### train model result = network.fit( fix_train_images, fix_train_labels, epochs=20, batch_size=128, validation_data=(fix_test_images, fix_test_labels)) ### evaluate model test_loss, test_acc = network.evaluate(fix_test_images, fix_test_labels) print('test_loss:', test_loss) print('test_acc:', test_acc) ``` ::: ### Mnist pipeline Each module corresponds to a component ![](https://i.imgur.com/Br229QE.png) Install kfp python3 ``` bash= pip3 install kfp --upgrade ``` Change group number ``` bash= ### Ex: group0.py nano group_<Group Number>.py ``` Copy **Mnist pipeline code** and paste into group_0.py file then saved ( ctrl + x ) **Mnist pipeline code** ``` python = import kfp from kfp import dsl import kfp.components as comp from typing import NamedTuple def load_data_and_data_processing_op( ftrain_images_path: comp.OutputPath(str), ftrain_labels_path: comp.OutputPath(str), ftest_images_path: comp.OutputPath(str), ftest_labels_path: comp.OutputPath(str))->NamedTuple('Outputs',[('loop',int),('state', bool)]): from keras.datasets import mnist from tensorflow.keras.utils import to_categorical import numpy as np # load data (train_images, train_labels), (test_images, test_labels) = mnist.load_data() # processing data fix_train_images = train_images.reshape((60000, 28 * 28)).astype('float32') / 255 fix_test_images = test_images.reshape((10000, 28 * 28)).astype('float32') / 255 fix_train_labels = to_categorical(train_labels) fix_test_labels = to_categorical(test_labels) with open(ftrain_images_path, 'wb') as f: np.save(f, fix_train_images) with open(ftrain_labels_path, 'wb') as f: np.save(f, fix_train_labels) with open(ftest_images_path, 'wb') as f: np.save(f, fix_test_images) with open(ftest_labels_path, 'wb') as f: np.save(f, fix_test_labels) return (0, True) def train_model_op( epochList, cur_index, ftrain_images_path:comp.InputPath(str), ftrain_labels_path:comp.InputPath(str), train_model_path: comp.OutputPath('KerasModelHdf5'), )-> NamedTuple( 'Outputs', [('cur_loop',int), ('loop_len', int)] ): from keras import models from keras import layers import numpy as np cindex = int(cur_index) epochs = epochList.split(',') with open(ftrain_images_path, 'rb') as f: fix_train_images = np.load(f) with open(ftrain_labels_path, 'rb') as f: fix_train_labels = np.load(f) network = models.Sequential() network.add(layers.Dense(512, activation='relu', input_shape=(28 * 28,))) network.add(layers.Dense(10, activation='softmax')) network.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) result = network.fit( fix_train_images, fix_train_labels, epochs=int(epochs[cindex]), batch_size=128, ) network.save(train_model_path) return (cindex + 1, len(epochs)) def evaluate_op( cur_index, epochList, ftest_images_path:comp.InputPath(str), ftest_labels_path:comp.InputPath(str), model_path:comp.InputPath('KerasModelHdf5'), )-> NamedTuple( 'Outputs', [('score',float), ('cur_state', bool), ('cur_loop', int), ('loop_len', int)] ): import keras import numpy as np score = False cur_index = cur_index + 1 epochs = epochList.split(',') with open(ftest_images_path, 'rb') as f: fix_test_images = np.load(f) with open(ftest_labels_path, 'rb') as f: fix_test_labels = np.load(f) network = keras.models.load_model(model_path) test_loss, test_acc = network.evaluate(fix_test_images, fix_test_labels) print('test_loss:', test_loss) print('test_acc:', test_acc) print(test_acc) if test_acc > 0.93: score = True return (test_acc, score, cur_index, len(epochs)) load_data_and_data_processing_op_comp = comp.create_component_from_func( func=load_data_and_data_processing_op, packages_to_install=["tensorflow", 'numpy', 'keras']) train_model_op_comp = comp.create_component_from_func( func=train_model_op, packages_to_install=["tensorflow", 'numpy', 'keras']) evaluate_op_comp = comp.create_component_from_func( func=evaluate_op, packages_to_install=["tensorflow", 'numpy', 'keras']) @dsl.graph_component def train_epoch_loop(epochList, curloop, ftrain_images, ftest_images, ftrain_labels, ftest_labels): train_model_out = train_model_op_comp( epochList, curloop, ftrain_images, ftrain_labels, ) evaluate_out = evaluate_op_comp( epochList, curloop, ftest_images, ftest_labels, train_model_out.outputs['train_model'], ) with dsl.Condition(evaluate_out.outputs['cur_loop'] < evaluate_out.outputs['loop_len']): train_epoch_loop(epochList, evaluate_out.outputs['cur_loop'], ftrain_images, ftest_images, ftrain_labels, ftest_labels) @dsl.pipeline( name='deep learning pipeline', description='mnist train pipeline processing.' ) def mnist_pipeline( epochList="10,15", ): load_data_and_data_processing_out = load_data_and_data_processing_op_comp() train_epoch_loop( epochList = epochList, curloop =load_data_and_data_processing_out.outputs['loop'], ftrain_images =load_data_and_data_processing_out.outputs['ftrain_images'], ftest_images = load_data_and_data_processing_out.outputs['ftest_images'], ftrain_labels = load_data_and_data_processing_out.outputs['ftrain_labels'], ftest_labels = load_data_and_data_processing_out.outputs['ftest_labels'] ) if __name__ == '__main__': kfp.compiler.Compiler().compile(mnist_pipeline, __file__ + '.yaml') ``` :::danger ### Notice 1 Each component is independent, the installation package of the component needs to be imported independently we need to write package into "package_to_install" ![](https://i.imgur.com/cbji7Rr.png) ::: :::success ## Upload the pipelie file ### First:compile the file The python file needs to be compiled into yaml file ``` bash= ### Ex: dsl-compile --py group_0.py --output group_0.yaml dsl-compile --py group_<Group Number>.py --output group_<Group Number>.yaml ``` ### Second: unpload the yaml file ![](https://i.imgur.com/VMR4xeP.png) ![](https://i.imgur.com/DrNWrfW.png) #### 1.Write your pipeline name (eg. group_0 .. etc) ![](https://i.imgur.com/mQQtPTe.png) ### Third: create the experiment Think of an Experiment as a space that contains the history of all pipelines and their associated runs. ![](https://i.imgur.com/ApBlReG.png) ![](https://i.imgur.com/Xth806F.png) ![](https://i.imgur.com/1lSL7oF.png) ### Fourth: run the pipeline ![](https://i.imgur.com/tyfNek8.png) ![](https://i.imgur.com/xTtQzU2.png) ![](https://i.imgur.com/26xtQJz.png) ![](https://i.imgur.com/CpAYPfW.png) ### Fifth: observe the pipeline ![](https://i.imgur.com/5Z3H5Wl.png) ![](https://i.imgur.com/VKfE2Ec.png) ::: :::info ## Practice In the code provided, load data is combined during data processing in a component and Batch size is fixed 1、 divide them into two component 2、 change batch size as a parameter from the pipeline input ![](https://i.imgur.com/3mCxDOF.png) ### OUTPUT ![](https://i.imgur.com/tOKzQJF.png) :::