# 2024.04.19 Proposal
[TOC]
## ML Pipeline
### **Best practice** in dealing with large data in airflow
* push and pull data to external storage inside tasks. (**Not recommended**)
* Use **[custom Xcom backend](https://docs.astronomer.io/learn/xcom-backend-tutorial)**, Xcom is the cross-communication mechanism allows task to send/share data with each other.
* By defaut, Xcom use the local database, which has message **size limit** of max 2G.
* By setting up a custom backend(AWS S3, GCP Cloud Storage, MinIO......), there will be no size limit.
* Pros:
* Don't have to modify the code to connect to remote storage.
### Better way to write airflow dag code -- [Taskflow API](https://docs.astronomer.io/learn/airflow-decorators?tab=taskflow#how-to-use-airflow-decorators)
#### Comparison with traditional method:
* pros: no need to write tasks, Xcom_pull/push, less modification, easy-understanding
* Old:
```python=
def preprocess_data():
. . .
return processed_data
def train_model(processed_data):
. . .
return model
def save_model(model):
. . .
return report(model)
# Example usage within a DAG
with DAG(
"classic_dag", schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False
) as dag:
preprocess_task = PythonOperator(
task_id="preprocess_task",
python_callable=preprocess_data
)
train_task = PythonOperator(
task_id="train_task",
python_callable=train_model,
op_kwargs={'processed_data': "{{ task_instance.xcom_pull(task_ids='preprocess_task') }}"}
)
save_task = PythonOperator(
task_id="save_task",
python_callable=save_model,
op_kwargs={'model': "{{ task_instance.xcom_pull(task_ids='train_task') }}"}
)
preprocess_task >> train_task >> save_task
```
* New
```python=
@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def taskflow():
@task(task_id="extract", retries=2)
def data_preprocess():
. . .
return data
@task
def model_training(data):
. . .
return model
@task
def save_model(model):
. . .
return report(model)
data = preprocess_data()
model = model_training(data)
save_model(model)
taskflow()
```
#### If the data is too large(40G..), the memory may overflow
- Passing the storage path instead of full data.
- Use DataLoader to load data batch by batch.
- S3 example
```python=
import boto3
import io
from PIL import Image
from torch.utils.data import Dataset
class S3Dataset(Dataset):
def __init__(self, bucket, s3_folder):
self.s3 = boto3.client('s3')
self.bucket = bucket
# List files in a specific folder of the bucket
self.files = [obj['Key'] for obj in self.s3.list_objects_v2(Bucket=bucket, Prefix=s3_folder)['Contents']]
def __len__(self):
return len(self.files)
def __getitem__(self, idx):
obj = self.s3.get_object(Bucket=self.bucket, Key=self.files[idx])
# Assuming the data is an image
image_data = obj['Body'].read()
image = Image.open(io.BytesIO(image_data))
return image
# Create Dataset and DataLoader
bucket_name = 'your-bucket-name'
s3_folder = 'path/to/dataset'
dataset = S3Dataset(bucket=bucket_name, s3_folder=s3_folder)
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)
# Usage example
for images in dataloader:
pass
```
### Reference
* https://robust-dinosaur-2ef.notion.site/XCom-Backends-96ae29e2cd50405eb605853f0df2f108
* https://docs.astronomer.io/learn/xcom-backend-tutorial
* [How to Pass Pandas Dataframes as XCom's in Airflow using a Custom XCom Backend](https://www.youtube.com/watch?v=WHalf8d-97Y&ab_channel=TheDataGuy)
## Analyzer and Optimization
### Optimization
Don’t optimize spark for now
### Outliers and Drift
Outliers
- Definition
- Short term
- Few amount
- Remediation
- Remove row
- Fill with median or mean
Drift
- Definition
- Long term
- Large amount, a change in distribution
- No algorithm for remediation
### Workflow
Separate from drift detection
Outlier Removal
- pyod
- outlier detection library containing above 40 different algorithms
- supports combination of multiple algorithms
Simple Regressor
- catboost
- Gradient boost decision tree
- Create a simple regressor to evaluate
- Train with original data
- Train with outlier dropped data
- Compare
Evaluate
- Calculate mse, r2 of the predict result of the regressor
Next
- Save outlier dropped data through xCom
- If needed in the future, can also save into hadoop
## Model Monitoring
* mlflow
[example code](https://github.com/kevin1010607/MLOps-ASMPT/tree/model-monitor)
* evidently

* atlas
* Installation steps
* [notion](https://www.notion.so/dasbd72/2024-04-18-Install-Atlas-1ba7a1f9ec474a78a748b51d0c8f3813)
## Database 比較
### 對於Mlflow和Airflow,各自什麼類型的數據會存在哪一種DB?
* Mlflow: 上圖是會放在mysql/postgresql的資料,至於model以及我們自己想存的artifact(例如txt, json, report...)則是會存到hdfs
* Airlfow: mysql/postgresql主要存的是metadata還有shared variable,hdfs目前在airflow我們是做為temporary storage用來放intermediate data
### mysql & postgresql儲存及搜索效能比較
* postgresql的效能勝過mysql, 平均來說可能會快30%(根據不同operation/size而有所差異,可能甚至數倍),大部分的operation都是比mysql還要快或持平,缺點是postgresql比較嚴謹,操作比較複雜,能夠參考的資料也少於mysql
### 兩個模組與各自DB溝通的sample code
* Mlflow: 架設mlflow server時可以指定儲存方式&位置,在把server架起來之後,coding上沒有任何不同,`--backend-store-uri`填入mysql或postgresql的uri,`--default-artifact-root`填入hdfs的path
```bash
mlflow server --backend-store-uri mysql+pymysql://user:password@host:port/database --default-artifact-root /path/to/artifact/root --host 0.0.0.0
```
* Airflow:
* 使用airflow內建的Xcom進行task間溝通時,資料就會存進DB以供其他task使用