# 2024/7/26 [TOC] ## ML Pipeline ### Note ![image](https://hackmd.io/_uploads/SyaHESEF0.png) #### 目前的k8s架構 * 目前是使用kinD(k8s in Docker)來創建k8s cluster,kinD是開發k8s的工具,可以在docker上模擬multi-node k8s cluster的運行,但需要注意這是開發用的工具,實際production用的cluster要用kubeadm等真正部屬在multi-node server的工具。 #### Pod & Container * Pod是k8s中的最小單位,每個Pod都會有對應的Image,而pod內實際運行的會是container(pod比較像是一個更大範圍的標籤),一個Pod內的container可以是一到多個,但多個container不是用來平行運算的,default就會是一個去運行目標task,多個container的情況會是有額外安裝[sidecar container](https://kubernetes.io/docs/concepts/workloads/pods/sidecar-containers/),sidecar container通常是拿來增加功能,例如匯集log等,我們目前不會用到。 #### Helm * Helm是一個k8s套件管理工具,可以幫助我們更輕鬆的在k8s上設置、使用不同的application,以我們的case為例,要手動在k8s上deploy airflow可能需要額外寫很多dockerfile、設定環境,但用helm可以輕鬆的用helm install airflow的command就可以完成設置 #### 目前的架構 * 目前我們在kinD創建的K8s上,使用helm deploy airflow。可以看到下圖,airflow的各個component都被deploy到了不同的Pod上(每個row代表一個不同的Pod)。 ![image](https://hackmd.io/_uploads/HJ2nQHEF0.png) ### Airflow Note #### Operator * airflow裡面會有很多DAG,而每個DAG由tasks組成,每一個task都有對應到的Operator,也就是**這個task執行的環境**,像是pythonOperator代表這個task會執行一個python function、BashOperator會執行一個bash command、k8sPodOperator代表這個task會是透過Pod去執行一個task,以下面這個task為例,他會產生image為alpine:3.12的pod,並在該Pod中執行echo Hello from the Alpine pod!的指令 ```python KubernetesPodOperator( task_id="task2", namespace='default', image="alpine:3.12", cmds=["echo"], arguments=["Hello from the Alpine pod!"], name="task2-pod", dag=dag, ) ``` #### Executor * Executor是一個airflow的全局config,整個airflow server只能有一種executor,executor管理tasks該如何執行,例如sequential executor代表所有task必須sequential的執行。 - CeleryExecutor - 必須先在k8s cluster中創建worker pods,這些workers會一直活著,當有新的task時就會執行在這些worker上,因此worker的image就必須涵蓋所有task的所需的環境,也就是當task環境變化很大時,為了應付所有不同的task,worker的環境會很肥大,適合所有task環境相似的情況。 - 另一個缺點是要去管理resource(該開幾個worker pod, pod image怎麼設置等) - KubernetesExecutor - 沒有一直活著的worker pod,當有task要跑時,會馬上生出一個Pods來跑,跑完就會殺掉。 - 非常彈性,可以自定義每個pod的環境,提供很好的isolation。 - 缺點是每個pod都有生成時間,當task所需時間不大,生成時間會是overhead,故kubernetesExecutor比較適合每個task isolation需求大的情況。 - CeleryKubernetesExecutor - Hybrid的executor,會依據task queue去決定要使用celery or k8s executor #### 結論 * Operator固定都是用pythonOperator,不需要用k8sPodOperator因為task不需要強烈的isolation。 * Executor的部分,我們的tasks需要的環境差異不大,都是python function而已,比較適合用celery executor,worker pod的image也很好設置,我們會先以celery Executor為主,期間去測試Celeryk8sExecutor會不會比較好 * 整個k8s cluster我們需要提供2個image,一個是airflow的component跑在k8s pod上的image,另外一個是worker pod的image,也就是我們task會跑的環境 * 管理dags我們使用**GitSync**的方式,airflow server會自動去track一個github repo裡面的dags,開發和管理上方便很多 ----------------------------------- ### Run airflow on K8s using helm - (Priority) model dag 還不行正常運作,因為 spark 相關的所有設定像是 vpn, java runtime 全部都需要在 docker image 中設置好。 - 可能需要將 airflow image 和 pod_template image 分成不同的 image。 - 不確定 pod_template 是不是只會套用在 worker 上,還沒測試。 - (Future) gitSync 的 repo 會有大小限制,直接 sync `git@github.com:kevin1010607/MLOps-ASMPT.git` 會有 error。 - 應該是要修改 `values.yaml` 中的 dags.gitSync.resources。 - (Future) 目前使用的 CeleryExecutor,官方說 原本就有 default 的 pvc,所以暫時沒有加上 pv 和 pvc,也沒有 enable logs.persistence。 - If you are using CeleryExecutor, workers persist logs by default to a volume claim created with a volumeClaimTemplate. - 如果 enable logs.persistence 會有 error。 --- - Git sync repo: https://github.com/kevin1010607/airflow-dags - A script for one-click automated deployment. ### Operator & Executor - CeleryExecutor - Allows for horizontal scaling across multiple worker nodes. ``` # kubectl get pods -n airflow # before NAME READY STATUS RESTARTS AGE airflow-worker-0 1/1 Running 0 10m # after NAME READY STATUS RESTARTS AGE airflow-worker-0 1/1 Running 0 15m airflow-worker-1 1/1 Running 0 2m airflow-worker-2 1/1 Running 0 1m ``` - KubernetesExecutor - Runs each task in a separate Kubernetes pod. - Provides dynamic scaling based on workload. ``` # kubectl get pods -n airflow # before NAME READY STATUS RESTARTS AGE # after NAME READY STATUS RESTARTS AGE airflow-task-abc123-xyz789 1/1 Running 0 2m airflow-task-def456-uvw321 1/1 Running 0 1m ``` - CeleryKubernetesExecutor - Hybrid approach combining aspects of both Celery and Kubernetes executors. ``` # kubectl get pods -n airflow # before NAME READY STATUS RESTARTS AGE airflow-worker-0 1/1 Running 0 10m # after NAME READY STATUS RESTARTS AGE airflow-worker-0 1/1 Running 0 1h airflow-worker-1 1/1 Running 0 1h airflow-task-abc123-xyz789 1/1 Running 0 2m airflow-task-def456-uvw321 1/1 Running 0 1m ``` ### Resource management - Modify the config of values.yaml. - K8s resource management. ## Model Versioning ### Model Service * Implemented: get metrics from multiple models or versions * [md](https://github.com/kevin1010607/MLOps-ASMPT/blob/main/model-versioning/model_service.md#get-multi-model-metrics) ### Protecting source code * Using cython to compile python file into .so file * No need to modify any code * [md](https://github.com/kevin1010607/MLOps-ASMPT/tree/main/hadoop-services/source_code_protection/setup.md) * [code](https://github.com/kevin1010607/MLOps-ASMPT/tree/main/hadoop-services/source_code_protection/setup.py) ### Key sharing for data encryption * Idea: turn key into python module then compile * Example: * ```python def get_key(): return "my_key" ``` * Using cython to compile it * ```python from keyfile.key import get_key key = get_key() ``` * The ***pythonnet*** in C# could execute python function ### TODO * Pack model service into image and deploy on k8s * reverse compilation of cython * doable but very difficult, it needs lots of effort * currently no decompiler for cython * Until which version of .NET, we could use pythonnet? * .NET Framework (netfx): at least version 4.7.2 * .NET Core (coreclr): at least version 3.1 ## ML monitoring - implement a new hadoop monitor using kafka - Currently use pseudo data generator - ![image](https://hackmd.io/_uploads/S1RLypROC.png) - refactor old component to satisfy new proposal - alert server now will only execute model_update, data analysis and model rollback - cancel the daily model training - **model score** decrease and **model status** error will trigger the model rollback - reqeust payload add *lot_id* and *date* for dag **data_analysis** and **model_update** - update the api document - TODOs - need to check the atomic of alert server. otherwise it may generator same dag id. - integrate the monitor with real kafka - packing all components to docker and write corresponding yaml for k8s ## Data Analysis & Data ETL ### Data Analysis DAG Refactored ```mermaid graph LR; CC[Collect Current] --> OD[Outlier Detection] --> GC[Garbage Collection] CR[Collect Reference] --> DD[Drift Detection] --> GC[Garbage Collection] CC --> DD ``` ### Data ETL * convert using spark pandas is fast * but slow when inserting to Hive * when going to insert to Hive use spark.createDataFrame Optimization comparison * Before * Without pyarrow + convert dataframe with pyspark.pandas * After * With pyarrow + convert dataframe with spark.createDataframe ```yaml time_record predict_anomaly 11.903 -> 8.775 predict_qa 32.529 -> 13.813 raw_motion 10.546 -> 8.081 raw_qa 2.546 -> 2.367 run predict_anomaly 11.868 -> 8.737 predict_qa 32.491 -> 13.779 raw_motion 10.508 -> 8.049 raw_qa 2.511 -> 2.334 run_process predict_anomaly 6.832 -> 3.736 predict_qa 24.717 -> 6.975 raw_motion 5.488 -> 2.898 raw_qa 0.440 -> 0.292 run_insert predict_anomaly 4.859 -> 3.884 predict_qa 7.594 -> 6.598 raw_motion 4.909 -> 4.625 raw_qa 1.965 -> 1.926 to_spark_df anomaly 1.103 -> 0.152 anomaly_mf_bb_mf.wb.000001 3.887 -> 0.760 motion_bb_mf.wb.000001 3.593 -> 0.832 predict 4.765 -> 0.338 predict_mf_bb_mf.wb.000001 14.683 -> 1.230 qa 0.245 -> 0.102 to_numeric __run_predict_anomaly 0.538 -> 0.569 __run_predict_qa 3.332 -> 3.331 __run_raw_motion_v2 0.436 -> 0.428 ```