# SCPL 使用指南 ## SCPL 介紹 ### 什麼是 SCPL (Supervised Contrastive Parallel Learning)? SCPL 是一個免費且開放的 Python 套件 (暫定),用來將由 Pytorch 搭建的模型重構來實現更好的效能。 ### SCPL 安裝 使用 Python 3.10.12 環境,使用前須先安裝 [Pytorch 2.4.0](https://pytorch.org/get-started/locally/)。 安裝完畢後再安裝 SCPL(暫定)。 ``` cmd pip install SCPL ``` ## SCPL 核心概念 * SCPL 會將由 Pytorch 搭建好的模型進行重構,將其拆分為多個區塊 (Block),並分配到多個 GPU 上,透過截斷各個 Block 間的梯度傳遞來實現平行運算。 * 目前 SCPL 僅針對分類任務。 >下圖為 SCPL 運作示意圖,左邊為使用 Pytorch 搭建的模型,右側為使用 SCPL 重構後的模型。 ![SCPL_arch](https://hackmd.io/_uploads/BknVjnL6A.png) * 各個 Block 都會分配一個 Optimizer,並且 loss 會獨立計算,除了最後一個 Block 的 Loss 為 CrossEntropy 之外,各個 Block 都是使用 CL Loss (Contrastive Loss)。 >最後一個 Block 的 loss 之後會修改成讓使用者自由傳入 [name=lyt0310603] ## SCPL 輸入輸出 下面會介紹 SCPL 的輸入輸出 ### training 輸入/輸出 * 訓練時透過 model.train() 進入訓練模式 * 輸入部分為 X 和 Y,和可選的 mask 參數 * 輸出部分會回傳模型預測結果 (Y_hat) 和 loss * SCPL 的 loss 是所有 Block 的 loss 總和 ```python model.train() output, loss = model(X, Y) ``` ### testing 輸入/輸出 * 測試時透過 model.eval() 進入測試模式 * 輸入部分為 X 和 Y,可選的 mask 參數 * 輸出部分會回傳模型預測結果 (Y_hat) 和 Y ```python model.eval() predicted, true_y = model(X, Y) ``` ### Adaptive 推理模式 * 當啟用自適應推理時,測試會返回額外的信息 * 輸出包括預測結果、停止層索引、真實標籤 ```python model.eval() classifier_output, stop_layer, true_y = model(X, Y) ``` ## SCPL 參數介紹 本節會介紹建立 SCPL 模型時會需要傳入的參數 SCPL 有兩個必要參數: `custom_model`、`device_map` 以及多個非必要參數: `projector_type`、`loss_fn`、`num_classes`、`transform_funcs`、`optimizer_fn`、`optimizer_param`、`scheduler_fn`、`scheduler_param`、`multi_t`、`is_adaptive`、`patiencethreshold`、`cosinesimthreshold`、`classifier` ### custom_model * 在建立 SCPL 模型之前都需要先建立一個 Pytorch 模型 * 建議使用 torch.nn.Sequential() 來建立模型 ```python import torch import torch.nn as nn # create a model model = nn.Sequential( nn.Embedding(), nn.LSTM(), nn.LSTM(), nn.Linear(), nn.tanh(), nn.Linear() ) ``` * 可以在 Sequential 中使用自定義類別形式的模型 * 要注意自定義類別形式的模型其回傳值格式 * 如果回傳值需要經過特別處理才能用作 loss 計算的話需要額外回傳如範例 > 如 LSTM 的回傳值需經過取平均處理才能用於損失計算 * 如果不需要的話則可以直接回傳預測結果即可 ```python import torch import torch.nn as nn class PositionalEncoding(nn.Module): def __init__(self, vocab_size, d_model: int, dropout: float = 0.1, max_len: int = 128): super().__init__() ... def forward(self, x): ... return result, for_loss model = nn.Sequential( PositionalEncoding(vocab_size, embed_size), nn.TransformerEncoderLayer(...), nn.TransformerEncoderLayer(...), nn.TransformerEncoderLayer(...), torch.nn.Linear(hidden_size, 300), torch.nn.Tanh(), torch.nn.Linear(300, n_classes) ) ``` > 範例或許仍須調整 ### device_map * 透過一個字典來告訴 SCPL 你要怎麼分割及分配模型 * key 值為想要分配的核心,value 為想要分配的層數 * 要注意層數是累加的 ```python model = nn.Sequential( nn.Embedding(), nn.LSTM(), nn.LSTM(), nn.Linear(), nn.tanh(), nn.Linear() ) device_map = { "cuda:0": 1, # 分配 Embedding 至 GPU 0 "cuda:1": 1, # 分配第一個 LSTM 至 GPU 1 "cuda:2": 1, # 分配第二個 LSTM 至 GPU 2 "cuda:3": 3 # 分配剩餘線性層至 GPU 3 } ``` ### projector_type * 傳入一個字串決定 Block 至 loss 計算前需要經過怎麼樣的轉換 * 支援的類型: * `'i'`: Identity (恆等映射) * `'l'`: 單層 Linear * `'mlp'`: 多層感知機 * `'DeInfo'`: DeInfo 專用的投影器 * 若不傳入預設會是 `'i'` 即 Identity > DeInfo 預計刪除,並加入可以傳入客製投影頭功能 ```python projector_type = 'mlp' # 使用多層感知機投影器 ``` ### loss_fn * 傳入損失函數類型或自定義損失函數 * 支援的字符串類型: * `'CL'`: Contrastive Loss (對比學習損失) * `'DeInfo'`: DeInfo Loss (解耦信息正則化損失) ```python # 使用對比學習損失 loss_fn = "CL" # 使用 DeInfo 損失 loss_fn = "DeInfo" ``` ### num_classes * 在 loss 函數使用 DeInfo 時需要傳入 * 傳入 label 數量 ### trans_fuc * 傳入一個矩陣,矩陣內元素代表 X 在 Block 之間需經過的轉換函數,如果 Block 中間不需用轉換則傳入 None * 轉換函數需要使用者在模型外定義 * 如果不傳入矩陣會默認所有 Block 間傳遞不需要經過轉換 > 下圖為 trans_fuc 作用示意圖,會在 Block 間插入轉換函數。 ![SCPL_arch-trans](https://hackmd.io/_uploads/ByVPyhF6A.png) ```python # LSTM 傳至線性層時須經過變換 # 因為 nn.LSTM 的回傳值為 x, (h, c),所以轉換函數的參數定義成 x, h, c def LSTMtoLinear(x, h, c): return x[:, -1, :] # 不需要傳入變換函數則傳入 None 即可 trans_fucs = [None, None, None, LSTMtoLinear] ``` > 一般而言 LSTM 傳遞至 Linear 層時會取 x[:, -1, :] 來取得最後一個時間步的資料,下面程式碼為範例。 > > ```python > class CustomModel(nn.Module): > def __init__(self, embedding_size, vocab_size, hidden_size, output_size > super(CustomModel, self).__init__() > self.embedding = nn.Embedding(vocab_size, embedding_size) > self.lstm1 = nn.LSTM(embedding_size, hidden_size, batch_first=True) > self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True) > self.fc1 = n.Linear(hidden_size, hidden_size) > self.tanh = nn.Tanh() > self.fc2 = nn.Linear(hidden_size, output_size) > > def forward(self, x): > x = self.embedding(x) > x, _ = self.lstm1(x) > x, _ = self.lstm2(x) > x = self.fc1(x[:, -1, :]) # use the last output of LSTM > x = self.tanh(x) > x = self.fc2(x) > return x > ``` ### optimizer_fn * 傳入你想使用的 Optimizer,如果不傳入的話會默認使用 torch.optim.Adam ```python # 使用者不須建立優化器,只需傳入優化器的名字即可 optimizer_fn = torch.optim.SGD ``` ### optimizer_param * 傳入你想使用的 Optimizer 的參數,型別為字典 (dict),如果不傳入 optimizer_param 的話會使用 optimizer 本身預設的值 * dict 不用包含 model.parameters(),因為會在建立 SCPL 模型中自動賦予 * 需注意因如果沒有傳入 optimizer_fn 的話會使用 Adam 作為預設,所以傳入的參數要符合 Adam 的需求 ```python # 使用 SGD 作為優化器 optimizer_fn = torch.optim.SGD # 透過字典格式傳入 SGD 的參數 optimizer_param = { "lr":1e-4, "momentum":0.9, "weight_decay":1e-5 } ``` ### scheduler_fn * 傳入你想使用的 lr_scheduler,如果不傳入會默認不使用 ```python # 使用者不須建立排程器,只需傳入排程器的名字即可 scheduler_fn = torch.optim.lr_scheduler.StepLR ``` ### scheduler_param * 傳入你想使用的 lr_scheduler 的參數,型別為字典,需要注意傳入的內容要符合指定的 lr_scheduler * dict 不需用包含 optimizer,因為會在建立 SCPL 模型過程中自動賦予 ```python # 使用 StepLR 作為學習率排程器 scheduler_fn = torch.optim.lr_scheduler.StepLR # 透過字典格式傳入 StepLR 的參數 scheduler_param = { "step_size":10, "gamma":0.2 } ``` ### multi_t * multi_t 是一個布林值 (Boolean),決定是否啟用多線程 (multi-thread) * 預設為 True ### is_adaptive * is_adaptive 是一個布林值 (Boolean),決定是否開起自適應推理模式 * 開啟狀態下 testing 的返回會有所變化,預設為 False ### patiencethreshold * 自適應推理的耐心閾值 * 預設為 1 * 僅在 is_adaptive=True 時有效 * 用於控制自適應推理的停止條件 ```python patiencethreshold = 2 ``` ### cosinesimthreshold * 自適應推理的餘弦相似度閾值 * 預設為 0.8 * 僅在 is_adaptive=True 時有效 * 當連續層的預測結果相似度超過此閾值時,可能提前停止推理 ```python cosinesimthreshold = 0.9 ``` ### classifier * 自適應推理使用的分類器 * 僅在 is_adaptive=True 時需要 * 如果不傳入會使用默認的線性分類器 * 每個 Block 都會配備一個分類器來進行早期預測 ```python # 自定義分類器 classifier = nn.Sequential( nn.Linear(1024, 512), nn.ReLU(), nn.Linear(512, num_classes) ) ``` ## SCPL 使用範例 ### 範例一 本範例會簡單示範使用 SCPL 應用於簡單的 NLP 分類任務 1. 引入必要 package > SCPL 的名字會再優化使用[name=lyt0310603] ```python= import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from Model import SCPL_model import numpy as np import random ``` 2. 建立資料集 ```python= class SimpleNLPCategoryDataset(Dataset): def __init__(self, num_samples=1000, vocab_size=50, seq_length=10): self.num_samples = num_samples self.vocab_size = vocab_size self.seq_length = seq_length self.data = [] self.labels = [] for _ in range(num_samples): # Generate a random sentence (sequence) of integers sentence = [torch.randint(1, vocab_size, (1,)).item() for _ in range(seq_length)] # Assign a label: 0 if the sum of the sentence is even, 1 if odd label = 0 if sum(sentence) % 2 == 0 else 1 self.data.append(sentence) self.labels.append(label) def __len__(self): return self.num_samples def __getitem__(self, idx): return torch.tensor(self.data[idx], dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.long) # create Dataloader dataset = SimpleNLPCategoryDataset(num_samples=1000, vocab_size=50, seq_length=10) dataloader = DataLoader(dataset, batch_size=32, shuffle=True) ``` 3. 建立 nn.Sequential 模型 ```python= model = torch.nn.Sequential( nn.Embedding(50, 64), nn.LSTM(64, 128, batch_first=True), nn.LSTM(128, 256, batch_first=True), nn.Linear(256, 128), nn.Tanh(), nn.Linear(128, 2) ) ``` 4. 建立 LSTM 至 Linear 的轉換函數 ```python= def LSTMtoLinear(x, h, c): return x[:, -1, :] ``` 5. 設定 SCPL 所需參數 ```python= device_map = { "cuda:0": 1, # 分配 Embedding 至 GPU 0 "cuda:1": 1, # 分配第一個 LSTM 至 GPU 1 "cuda:2": 1, # 分配第二個 LSTM 至 GPU 2 "cuda:3": 3 # 分配剩餘線性層至 GPU 3 } trans_fuc = [None, None, None, LSTMtoLinear] ``` 6. 建立 SCPL 模型 ```python= model = SCPL_model(model, device_map trans_fucs=trans_fuc) ``` 7. 開始訓練 ```python= SCPL_model.train() for i in range(20): for inputs, label in dataloader: output, loss = scplmodel(inputs, label) print("[epoch {}] loss:{:.3f}".format(i+1, loss)) ``` 8. 測試模型 ```py= SCPL_model.eval() total_samples = 0 correct_predictions = 0 with torch.no_grad(): for inputs, label in test_dataloader: outputs, true_y = scplmodel(inputs, label) predictions = torch.argmax(outputs, dim=1) correct_predictions += (predictions == true_y).sum().item() total_samples += true_y.size(0) accuracy = correct_predictions / total_samples print(accuracy) ``` ### 範例二 本範例會示範如何使用 SCPL 進行圖像分類任務,資料集部分使用 Mnist 資料集 1. 引入必要 package ```python= import torch import torch.nn as nn import torch.optim as optim from Model import SCPL_model import torchvision import torchvision.transforms as transforms ``` 2. 載入資料集 ```python= # Define data transformations (convert image to Tensor and normalize) transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,)) # Mean and std deviation for grayscale images ]) # Load training and test datasets train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform) test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=64, shuffle=True) test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=64, shuffle=False) ``` 3. 建立 nn.Sequential() 模型 ```python= model = nn.Sequential( nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1), # Input is 1 channel (grayscale) nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2), # Output size: 32 x 14 x 14 nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), # 32 -> 64 channels nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2), # Output size: 64 x 7 x 7 nn.Flatten(), # Flatten 2D feature maps into a 1D vector nn.Linear(64 * 7 * 7, 128), # Fully connected layer nn.ReLU(), nn.Linear(128, 10), # 10 outputs for 10 classes (digits 0-9) nn.LogSoftmax(dim=1) # Use LogSoftmax to calculate class probabilities ) ``` 4. 設定 SCPL 所需參數 ```python= device_map = { "cuda:0": 3, "cuda:1": 3, "cuda:2": 5 } scheduler = torch.optim.lr_scheduler.StepLR scheduler_param = { "step_size":2, "gamma":0.1 } ``` 5. 建立 SCPL 模型 ```python= model = SCPL_model(model, device_map proj_type="mlp", scheduler_fn=scheduler, scheduler_param=scheduler_param) ``` 6. 開始訓練 ```python= model.train() num_epochs = 5 for epoch in range(num_epochs): running_loss = 0.0 for i, (images, labels) in enumerate(train_loader): # forward propagation outputs, loss = model(images, labels) running_loss += loss print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}') ``` 7. 測試結果 ```python= model.eval() correct = 0 total = 0 with torch.no_grad(): for images, labels in test_loader: outputs, true_ys = model(images, labels) _, predicted = torch.max(outputs.data, 1) total += true_ys.size(0) correct += (predicted == true_ys).sum().item() print(f'Accuracy of the model on the 10000 test images: {100 * correct / total}%') ```