# 2024/7/26
[TOC]
## ML Pipeline
### Note

#### 目前的k8s架構
* 目前是使用kinD(k8s in Docker)來創建k8s cluster,kinD是開發k8s的工具,可以在docker上模擬multi-node k8s cluster的運行,但需要注意這是開發用的工具,實際production用的cluster要用kubeadm等真正部屬在multi-node server的工具。
#### Pod & Container
* Pod是k8s中的最小單位,每個Pod都會有對應的Image,而pod內實際運行的會是container(pod比較像是一個更大範圍的標籤),一個Pod內的container可以是一到多個,但多個container不是用來平行運算的,default就會是一個去運行目標task,多個container的情況會是有額外安裝[sidecar container](https://kubernetes.io/docs/concepts/workloads/pods/sidecar-containers/),sidecar container通常是拿來增加功能,例如匯集log等,我們目前不會用到。
#### Helm
* Helm是一個k8s套件管理工具,可以幫助我們更輕鬆的在k8s上設置、使用不同的application,以我們的case為例,要手動在k8s上deploy airflow可能需要額外寫很多dockerfile、設定環境,但用helm可以輕鬆的用helm install airflow的command就可以完成設置
#### 目前的架構
* 目前我們在kinD創建的K8s上,使用helm deploy airflow。可以看到下圖,airflow的各個component都被deploy到了不同的Pod上(每個row代表一個不同的Pod)。

### Airflow Note
#### Operator
* airflow裡面會有很多DAG,而每個DAG由tasks組成,每一個task都有對應到的Operator,也就是**這個task執行的環境**,像是pythonOperator代表這個task會執行一個python function、BashOperator會執行一個bash command、k8sPodOperator代表這個task會是透過Pod去執行一個task,以下面這個task為例,他會產生image為alpine:3.12的pod,並在該Pod中執行echo Hello from the Alpine pod!的指令
```python
KubernetesPodOperator(
task_id="task2",
namespace='default',
image="alpine:3.12",
cmds=["echo"],
arguments=["Hello from the Alpine pod!"],
name="task2-pod",
dag=dag,
)
```
#### Executor
* Executor是一個airflow的全局config,整個airflow server只能有一種executor,executor管理tasks該如何執行,例如sequential executor代表所有task必須sequential的執行。
- CeleryExecutor
- 必須先在k8s cluster中創建worker pods,這些workers會一直活著,當有新的task時就會執行在這些worker上,因此worker的image就必須涵蓋所有task的所需的環境,也就是當task環境變化很大時,為了應付所有不同的task,worker的環境會很肥大,適合所有task環境相似的情況。
- 另一個缺點是要去管理resource(該開幾個worker pod, pod image怎麼設置等)
- KubernetesExecutor
- 沒有一直活著的worker pod,當有task要跑時,會馬上生出一個Pods來跑,跑完就會殺掉。
- 非常彈性,可以自定義每個pod的環境,提供很好的isolation。
- 缺點是每個pod都有生成時間,當task所需時間不大,生成時間會是overhead,故kubernetesExecutor比較適合每個task isolation需求大的情況。
- CeleryKubernetesExecutor
- Hybrid的executor,會依據task queue去決定要使用celery or k8s executor
#### 結論
* Operator固定都是用pythonOperator,不需要用k8sPodOperator因為task不需要強烈的isolation。
* Executor的部分,我們的tasks需要的環境差異不大,都是python function而已,比較適合用celery executor,worker pod的image也很好設置,我們會先以celery Executor為主,期間去測試Celeryk8sExecutor會不會比較好
* 整個k8s cluster我們需要提供2個image,一個是airflow的component跑在k8s pod上的image,另外一個是worker pod的image,也就是我們task會跑的環境
* 管理dags我們使用**GitSync**的方式,airflow server會自動去track一個github repo裡面的dags,開發和管理上方便很多
-----------------------------------
### Run airflow on K8s using helm
- (Priority) model dag 還不行正常運作,因為 spark 相關的所有設定像是 vpn, java runtime 全部都需要在 docker image 中設置好。
- 可能需要將 airflow image 和 pod_template image 分成不同的 image。
- 不確定 pod_template 是不是只會套用在 worker 上,還沒測試。
- (Future) gitSync 的 repo 會有大小限制,直接 sync `git@github.com:kevin1010607/MLOps-ASMPT.git` 會有 error。
- 應該是要修改 `values.yaml` 中的 dags.gitSync.resources。
- (Future) 目前使用的 CeleryExecutor,官方說 原本就有 default 的 pvc,所以暫時沒有加上 pv 和 pvc,也沒有 enable logs.persistence。
- If you are using CeleryExecutor, workers persist logs by default to a volume claim created with a volumeClaimTemplate.
- 如果 enable logs.persistence 會有 error。
---
- Git sync repo: https://github.com/kevin1010607/airflow-dags
- A script for one-click automated deployment.
### Operator & Executor
- CeleryExecutor
- Allows for horizontal scaling across multiple worker nodes.
```
# kubectl get pods -n airflow
# before
NAME READY STATUS RESTARTS AGE
airflow-worker-0 1/1 Running 0 10m
# after
NAME READY STATUS RESTARTS AGE
airflow-worker-0 1/1 Running 0 15m
airflow-worker-1 1/1 Running 0 2m
airflow-worker-2 1/1 Running 0 1m
```
- KubernetesExecutor
- Runs each task in a separate Kubernetes pod.
- Provides dynamic scaling based on workload.
```
# kubectl get pods -n airflow
# before
NAME READY STATUS RESTARTS AGE
# after
NAME READY STATUS RESTARTS AGE
airflow-task-abc123-xyz789 1/1 Running 0 2m
airflow-task-def456-uvw321 1/1 Running 0 1m
```
- CeleryKubernetesExecutor
- Hybrid approach combining aspects of both Celery and Kubernetes executors.
```
# kubectl get pods -n airflow
# before
NAME READY STATUS RESTARTS AGE
airflow-worker-0 1/1 Running 0 10m
# after
NAME READY STATUS RESTARTS AGE
airflow-worker-0 1/1 Running 0 1h
airflow-worker-1 1/1 Running 0 1h
airflow-task-abc123-xyz789 1/1 Running 0 2m
airflow-task-def456-uvw321 1/1 Running 0 1m
```
### Resource management
- Modify the config of values.yaml.
- K8s resource management.
## Model Versioning
### Model Service
* Implemented: get metrics from multiple models or versions
* [md](https://github.com/kevin1010607/MLOps-ASMPT/blob/main/model-versioning/model_service.md#get-multi-model-metrics)
### Protecting source code
* Using cython to compile python file into .so file
* No need to modify any code
* [md](https://github.com/kevin1010607/MLOps-ASMPT/tree/main/hadoop-services/source_code_protection/setup.md)
* [code](https://github.com/kevin1010607/MLOps-ASMPT/tree/main/hadoop-services/source_code_protection/setup.py)
### Key sharing for data encryption
* Idea: turn key into python module then compile
* Example:
* ```python
def get_key():
return "my_key"
```
* Using cython to compile it
* ```python
from keyfile.key import get_key
key = get_key()
```
* The ***pythonnet*** in C# could execute python function
### TODO
* Pack model service into image and deploy on k8s
* reverse compilation of cython
* doable but very difficult, it needs lots of effort
* currently no decompiler for cython
* Until which version of .NET, we could use pythonnet?
* .NET Framework (netfx): at least version 4.7.2
* .NET Core (coreclr): at least version 3.1
## ML monitoring
- implement a new hadoop monitor using kafka
- Currently use pseudo data generator
- 
- refactor old component to satisfy new proposal
- alert server now will only execute model_update, data analysis and model rollback
- cancel the daily model training
- **model score** decrease and **model status** error will trigger the model rollback
- reqeust payload add *lot_id* and *date* for dag **data_analysis** and **model_update**
- update the api document
- TODOs
- need to check the atomic of alert server. otherwise it may generator same dag id.
- integrate the monitor with real kafka
- packing all components to docker and write corresponding yaml for k8s
## Data Analysis & Data ETL
### Data Analysis DAG
Refactored
```mermaid
graph LR;
CC[Collect Current] --> OD[Outlier Detection] --> GC[Garbage Collection]
CR[Collect Reference] --> DD[Drift Detection] --> GC[Garbage Collection]
CC --> DD
```
### Data ETL
* convert using spark pandas is fast
* but slow when inserting to Hive
* when going to insert to Hive use spark.createDataFrame
Optimization comparison
* Before
* Without pyarrow + convert dataframe with pyspark.pandas
* After
* With pyarrow + convert dataframe with spark.createDataframe
```yaml
time_record
predict_anomaly 11.903 -> 8.775
predict_qa 32.529 -> 13.813
raw_motion 10.546 -> 8.081
raw_qa 2.546 -> 2.367
run
predict_anomaly 11.868 -> 8.737
predict_qa 32.491 -> 13.779
raw_motion 10.508 -> 8.049
raw_qa 2.511 -> 2.334
run_process
predict_anomaly 6.832 -> 3.736
predict_qa 24.717 -> 6.975
raw_motion 5.488 -> 2.898
raw_qa 0.440 -> 0.292
run_insert
predict_anomaly 4.859 -> 3.884
predict_qa 7.594 -> 6.598
raw_motion 4.909 -> 4.625
raw_qa 1.965 -> 1.926
to_spark_df
anomaly 1.103 -> 0.152
anomaly_mf_bb_mf.wb.000001 3.887 -> 0.760
motion_bb_mf.wb.000001 3.593 -> 0.832
predict 4.765 -> 0.338
predict_mf_bb_mf.wb.000001 14.683 -> 1.230
qa 0.245 -> 0.102
to_numeric
__run_predict_anomaly 0.538 -> 0.569
__run_predict_qa 3.332 -> 3.331
__run_raw_motion_v2 0.436 -> 0.428
```