# 2024.04.19 Proposal [TOC] ## ML Pipeline ### **Best practice** in dealing with large data in airflow * push and pull data to external storage inside tasks. (**Not recommended**) * Use **[custom Xcom backend](https://docs.astronomer.io/learn/xcom-backend-tutorial)**, Xcom is the cross-communication mechanism allows task to send/share data with each other. * By defaut, Xcom use the local database, which has message **size limit** of max 2G. * By setting up a custom backend(AWS S3, GCP Cloud Storage, MinIO......), there will be no size limit. * Pros: * Don't have to modify the code to connect to remote storage. ### Better way to write airflow dag code -- [Taskflow API](https://docs.astronomer.io/learn/airflow-decorators?tab=taskflow#how-to-use-airflow-decorators) #### Comparison with traditional method: * pros: no need to write tasks, Xcom_pull/push, less modification, easy-understanding * Old: ```python= def preprocess_data(): . . . return processed_data def train_model(processed_data): . . . return model def save_model(model): . . . return report(model) # Example usage within a DAG with DAG( "classic_dag", schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False ) as dag: preprocess_task = PythonOperator( task_id="preprocess_task", python_callable=preprocess_data ) train_task = PythonOperator( task_id="train_task", python_callable=train_model, op_kwargs={'processed_data': "{{ task_instance.xcom_pull(task_ids='preprocess_task') }}"} ) save_task = PythonOperator( task_id="save_task", python_callable=save_model, op_kwargs={'model': "{{ task_instance.xcom_pull(task_ids='train_task') }}"} ) preprocess_task >> train_task >> save_task ``` * New ```python= @dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False) def taskflow(): @task(task_id="extract", retries=2) def data_preprocess(): . . . return data @task def model_training(data): . . . return model @task def save_model(model): . . . return report(model) data = preprocess_data() model = model_training(data) save_model(model) taskflow() ``` #### If the data is too large(40G..), the memory may overflow - Passing the storage path instead of full data. - Use DataLoader to load data batch by batch. - S3 example ```python= import boto3 import io from PIL import Image from torch.utils.data import Dataset class S3Dataset(Dataset): def __init__(self, bucket, s3_folder): self.s3 = boto3.client('s3') self.bucket = bucket # List files in a specific folder of the bucket self.files = [obj['Key'] for obj in self.s3.list_objects_v2(Bucket=bucket, Prefix=s3_folder)['Contents']] def __len__(self): return len(self.files) def __getitem__(self, idx): obj = self.s3.get_object(Bucket=self.bucket, Key=self.files[idx]) # Assuming the data is an image image_data = obj['Body'].read() image = Image.open(io.BytesIO(image_data)) return image # Create Dataset and DataLoader bucket_name = 'your-bucket-name' s3_folder = 'path/to/dataset' dataset = S3Dataset(bucket=bucket_name, s3_folder=s3_folder) dataloader = DataLoader(dataset, batch_size=10, shuffle=True) # Usage example for images in dataloader: pass ``` ### Reference * https://robust-dinosaur-2ef.notion.site/XCom-Backends-96ae29e2cd50405eb605853f0df2f108 * https://docs.astronomer.io/learn/xcom-backend-tutorial * [How to Pass Pandas Dataframes as XCom's in Airflow using a Custom XCom Backend](https://www.youtube.com/watch?v=WHalf8d-97Y&ab_channel=TheDataGuy) ## Analyzer and Optimization ### Optimization Don’t optimize spark for now ### Outliers and Drift Outliers - Definition - Short term - Few amount - Remediation - Remove row - Fill with median or mean Drift - Definition - Long term - Large amount, a change in distribution - No algorithm for remediation ### Workflow Separate from drift detection Outlier Removal - pyod - outlier detection library containing above 40 different algorithms - supports combination of multiple algorithms Simple Regressor - catboost - Gradient boost decision tree - Create a simple regressor to evaluate - Train with original data - Train with outlier dropped data - Compare Evaluate - Calculate mse, r2 of the predict result of the regressor Next - Save outlier dropped data through xCom - If needed in the future, can also save into hadoop ## Model Monitoring * mlflow [example code](https://github.com/kevin1010607/MLOps-ASMPT/tree/model-monitor) * evidently ![image](https://hackmd.io/_uploads/SylusoJ-C.png) * atlas * Installation steps * [notion](https://www.notion.so/dasbd72/2024-04-18-Install-Atlas-1ba7a1f9ec474a78a748b51d0c8f3813) ## Database 比較 ### 對於Mlflow和Airflow,各自什麼類型的數據會存在哪一種DB? * Mlflow: ![image](https://hackmd.io/_uploads/rk3w-JJ-C.png)上圖是會放在mysql/postgresql的資料,至於model以及我們自己想存的artifact(例如txt, json, report...)則是會存到hdfs * Airlfow: mysql/postgresql主要存的是metadata還有shared variable,hdfs目前在airflow我們是做為temporary storage用來放intermediate data ### mysql & postgresql儲存及搜索效能比較 * postgresql的效能勝過mysql, 平均來說可能會快30%(根據不同operation/size而有所差異,可能甚至數倍),大部分的operation都是比mysql還要快或持平,缺點是postgresql比較嚴謹,操作比較複雜,能夠參考的資料也少於mysql ### 兩個模組與各自DB溝通的sample code * Mlflow: 架設mlflow server時可以指定儲存方式&位置,在把server架起來之後,coding上沒有任何不同,`--backend-store-uri`填入mysql或postgresql的uri,`--default-artifact-root`填入hdfs的path ```bash mlflow server --backend-store-uri mysql+pymysql://user:password@host:port/database --default-artifact-root /path/to/artifact/root --host 0.0.0.0 ``` * Airflow: * 使用airflow內建的Xcom進行task間溝通時,資料就會存進DB以供其他task使用