# SCPL 使用指南
## SCPL 介紹
### 什麼是 SCPL (Supervised Contrastive Parallel Learning)?
SCPL 是一個免費且開放的 Python 套件 (暫定),用來將由 Pytorch 搭建的模型重構來實現更好的效能。
### SCPL 安裝
使用 Python 3.10.12 環境,使用前須先安裝 [Pytorch 2.4.0](https://pytorch.org/get-started/locally/)。
安裝完畢後再安裝 SCPL(暫定)。
``` cmd
pip install SCPL
```
## SCPL 核心概念
* SCPL 會將由 Pytorch 搭建好的模型進行重構,將其拆分為多個區塊 (Block),並分配到多個 GPU 上,透過截斷各個 Block 間的梯度傳遞來實現平行運算。
* 目前 SCPL 僅針對分類任務。
>下圖為 SCPL 運作示意圖,左邊為使用 Pytorch 搭建的模型,右側為使用 SCPL 重構後的模型。

* 各個 Block 都會分配一個 Optimizer,並且 loss 會獨立計算,除了最後一個 Block 的 Loss 為 CrossEntropy 之外,各個 Block 都是使用 CL Loss (Contrastive Loss)。
>最後一個 Block 的 loss 之後會修改成讓使用者自由傳入 [name=lyt0310603]
## SCPL 輸入輸出
下面會介紹 SCPL 的輸入輸出
### training 輸入/輸出
* 訓練時透過 model.train() 進入訓練模式
* 輸入部分為 X 和 Y,和可選的 mask 參數
* 輸出部分會回傳模型預測結果 (Y_hat) 和 loss
* SCPL 的 loss 是所有 Block 的 loss 總和
```python
model.train()
output, loss = model(X, Y)
```
### testing 輸入/輸出
* 測試時透過 model.eval() 進入測試模式
* 輸入部分為 X 和 Y,可選的 mask 參數
* 輸出部分會回傳模型預測結果 (Y_hat) 和 Y
```python
model.eval()
predicted, true_y = model(X, Y)
```
### Adaptive 推理模式
* 當啟用自適應推理時,測試會返回額外的信息
* 輸出包括預測結果、停止層索引、真實標籤
```python
model.eval()
classifier_output, stop_layer, true_y = model(X, Y)
```
## SCPL 參數介紹
本節會介紹建立 SCPL 模型時會需要傳入的參數
SCPL 有兩個必要參數: `custom_model`、`device_map`
以及多個非必要參數: `projector_type`、`loss_fn`、`num_classes`、`transform_funcs`、`optimizer_fn`、`optimizer_param`、`scheduler_fn`、`scheduler_param`、`multi_t`、`is_adaptive`、`patiencethreshold`、`cosinesimthreshold`、`classifier`
### custom_model
* 在建立 SCPL 模型之前都需要先建立一個 Pytorch 模型
* 建議使用 torch.nn.Sequential() 來建立模型
```python
import torch
import torch.nn as nn
# create a model
model = nn.Sequential(
nn.Embedding(),
nn.LSTM(),
nn.LSTM(),
nn.Linear(),
nn.tanh(),
nn.Linear()
)
```
* 可以在 Sequential 中使用自定義類別形式的模型
* 要注意自定義類別形式的模型其回傳值格式
* 如果回傳值需要經過特別處理才能用作 loss 計算的話需要額外回傳如範例
> 如 LSTM 的回傳值需經過取平均處理才能用於損失計算
* 如果不需要的話則可以直接回傳預測結果即可
```python
import torch
import torch.nn as nn
class PositionalEncoding(nn.Module):
def __init__(self, vocab_size, d_model: int, dropout: float = 0.1, max_len: int = 128):
super().__init__()
...
def forward(self, x):
...
return result, for_loss
model = nn.Sequential(
PositionalEncoding(vocab_size, embed_size),
nn.TransformerEncoderLayer(...),
nn.TransformerEncoderLayer(...),
nn.TransformerEncoderLayer(...),
torch.nn.Linear(hidden_size, 300),
torch.nn.Tanh(),
torch.nn.Linear(300, n_classes)
)
```
> 範例或許仍須調整
### device_map
* 透過一個字典來告訴 SCPL 你要怎麼分割及分配模型
* key 值為想要分配的核心,value 為想要分配的層數
* 要注意層數是累加的
```python
model = nn.Sequential(
nn.Embedding(),
nn.LSTM(),
nn.LSTM(),
nn.Linear(),
nn.tanh(),
nn.Linear()
)
device_map = {
"cuda:0": 1, # 分配 Embedding 至 GPU 0
"cuda:1": 1, # 分配第一個 LSTM 至 GPU 1
"cuda:2": 1, # 分配第二個 LSTM 至 GPU 2
"cuda:3": 3 # 分配剩餘線性層至 GPU 3
}
```
### projector_type
* 傳入一個字串決定 Block 至 loss 計算前需要經過怎麼樣的轉換
* 支援的類型:
* `'i'`: Identity (恆等映射)
* `'l'`: 單層 Linear
* `'mlp'`: 多層感知機
* `'DeInfo'`: DeInfo 專用的投影器
* 若不傳入預設會是 `'i'` 即 Identity
> DeInfo 預計刪除,並加入可以傳入客製投影頭功能
```python
projector_type = 'mlp' # 使用多層感知機投影器
```
### loss_fn
* 傳入損失函數類型或自定義損失函數
* 支援的字符串類型:
* `'CL'`: Contrastive Loss (對比學習損失)
* `'DeInfo'`: DeInfo Loss (解耦信息正則化損失)
```python
# 使用對比學習損失
loss_fn = "CL"
# 使用 DeInfo 損失
loss_fn = "DeInfo"
```
### num_classes
* 在 loss 函數使用 DeInfo 時需要傳入
* 傳入 label 數量
### trans_fuc
* 傳入一個矩陣,矩陣內元素代表 X 在 Block 之間需經過的轉換函數,如果 Block 中間不需用轉換則傳入 None
* 轉換函數需要使用者在模型外定義
* 如果不傳入矩陣會默認所有 Block 間傳遞不需要經過轉換
> 下圖為 trans_fuc 作用示意圖,會在 Block 間插入轉換函數。

```python
# LSTM 傳至線性層時須經過變換
# 因為 nn.LSTM 的回傳值為 x, (h, c),所以轉換函數的參數定義成 x, h, c
def LSTMtoLinear(x, h, c):
return x[:, -1, :]
# 不需要傳入變換函數則傳入 None 即可
trans_fucs = [None, None, None, LSTMtoLinear]
```
> 一般而言 LSTM 傳遞至 Linear 層時會取 x[:, -1, :] 來取得最後一個時間步的資料,下面程式碼為範例。
>
> ```python
> class CustomModel(nn.Module):
> def __init__(self, embedding_size, vocab_size, hidden_size, output_size
> super(CustomModel, self).__init__()
> self.embedding = nn.Embedding(vocab_size, embedding_size)
> self.lstm1 = nn.LSTM(embedding_size, hidden_size, batch_first=True)
> self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
> self.fc1 = n.Linear(hidden_size, hidden_size)
> self.tanh = nn.Tanh()
> self.fc2 = nn.Linear(hidden_size, output_size)
>
> def forward(self, x):
> x = self.embedding(x)
> x, _ = self.lstm1(x)
> x, _ = self.lstm2(x)
> x = self.fc1(x[:, -1, :]) # use the last output of LSTM
> x = self.tanh(x)
> x = self.fc2(x)
> return x
> ```
### optimizer_fn
* 傳入你想使用的 Optimizer,如果不傳入的話會默認使用 torch.optim.Adam
```python
# 使用者不須建立優化器,只需傳入優化器的名字即可
optimizer_fn = torch.optim.SGD
```
### optimizer_param
* 傳入你想使用的 Optimizer 的參數,型別為字典 (dict),如果不傳入 optimizer_param 的話會使用 optimizer 本身預設的值
* dict 不用包含 model.parameters(),因為會在建立 SCPL 模型中自動賦予
* 需注意因如果沒有傳入 optimizer_fn 的話會使用 Adam 作為預設,所以傳入的參數要符合 Adam 的需求
```python
# 使用 SGD 作為優化器
optimizer_fn = torch.optim.SGD
# 透過字典格式傳入 SGD 的參數
optimizer_param = {
"lr":1e-4,
"momentum":0.9,
"weight_decay":1e-5
}
```
### scheduler_fn
* 傳入你想使用的 lr_scheduler,如果不傳入會默認不使用
```python
# 使用者不須建立排程器,只需傳入排程器的名字即可
scheduler_fn = torch.optim.lr_scheduler.StepLR
```
### scheduler_param
* 傳入你想使用的 lr_scheduler 的參數,型別為字典,需要注意傳入的內容要符合指定的 lr_scheduler
* dict 不需用包含 optimizer,因為會在建立 SCPL 模型過程中自動賦予
```python
# 使用 StepLR 作為學習率排程器
scheduler_fn = torch.optim.lr_scheduler.StepLR
# 透過字典格式傳入 StepLR 的參數
scheduler_param = {
"step_size":10,
"gamma":0.2
}
```
### multi_t
* multi_t 是一個布林值 (Boolean),決定是否啟用多線程 (multi-thread)
* 預設為 True
### is_adaptive
* is_adaptive 是一個布林值 (Boolean),決定是否開起自適應推理模式
* 開啟狀態下 testing 的返回會有所變化,預設為 False
### patiencethreshold
* 自適應推理的耐心閾值
* 預設為 1
* 僅在 is_adaptive=True 時有效
* 用於控制自適應推理的停止條件
```python
patiencethreshold = 2
```
### cosinesimthreshold
* 自適應推理的餘弦相似度閾值
* 預設為 0.8
* 僅在 is_adaptive=True 時有效
* 當連續層的預測結果相似度超過此閾值時,可能提前停止推理
```python
cosinesimthreshold = 0.9
```
### classifier
* 自適應推理使用的分類器
* 僅在 is_adaptive=True 時需要
* 如果不傳入會使用默認的線性分類器
* 每個 Block 都會配備一個分類器來進行早期預測
```python
# 自定義分類器
classifier = nn.Sequential(
nn.Linear(1024, 512),
nn.ReLU(),
nn.Linear(512, num_classes)
)
```
## SCPL 使用範例
### 範例一
本範例會簡單示範使用 SCPL 應用於簡單的 NLP 分類任務
1. 引入必要 package
> SCPL 的名字會再優化使用[name=lyt0310603]
```python=
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from Model import SCPL_model
import numpy as np
import random
```
2. 建立資料集
```python=
class SimpleNLPCategoryDataset(Dataset):
def __init__(self, num_samples=1000, vocab_size=50, seq_length=10):
self.num_samples = num_samples
self.vocab_size = vocab_size
self.seq_length = seq_length
self.data = []
self.labels = []
for _ in range(num_samples):
# Generate a random sentence (sequence) of integers
sentence = [torch.randint(1, vocab_size, (1,)).item() for _ in range(seq_length)]
# Assign a label: 0 if the sum of the sentence is even, 1 if odd
label = 0 if sum(sentence) % 2 == 0 else 1
self.data.append(sentence)
self.labels.append(label)
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
return torch.tensor(self.data[idx], dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.long)
# create Dataloader
dataset = SimpleNLPCategoryDataset(num_samples=1000, vocab_size=50, seq_length=10)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
```
3. 建立 nn.Sequential 模型
```python=
model = torch.nn.Sequential(
nn.Embedding(50, 64),
nn.LSTM(64, 128, batch_first=True),
nn.LSTM(128, 256, batch_first=True),
nn.Linear(256, 128),
nn.Tanh(),
nn.Linear(128, 2)
)
```
4. 建立 LSTM 至 Linear 的轉換函數
```python=
def LSTMtoLinear(x, h, c):
return x[:, -1, :]
```
5. 設定 SCPL 所需參數
```python=
device_map = {
"cuda:0": 1, # 分配 Embedding 至 GPU 0
"cuda:1": 1, # 分配第一個 LSTM 至 GPU 1
"cuda:2": 1, # 分配第二個 LSTM 至 GPU 2
"cuda:3": 3 # 分配剩餘線性層至 GPU 3
}
trans_fuc = [None, None, None, LSTMtoLinear]
```
6. 建立 SCPL 模型
```python=
model = SCPL_model(model, device_map
trans_fucs=trans_fuc)
```
7. 開始訓練
```python=
SCPL_model.train()
for i in range(20):
for inputs, label in dataloader:
output, loss = scplmodel(inputs, label)
print("[epoch {}] loss:{:.3f}".format(i+1, loss))
```
8. 測試模型
```py=
SCPL_model.eval()
total_samples = 0
correct_predictions = 0
with torch.no_grad():
for inputs, label in test_dataloader:
outputs, true_y = scplmodel(inputs, label)
predictions = torch.argmax(outputs, dim=1)
correct_predictions += (predictions == true_y).sum().item()
total_samples += true_y.size(0)
accuracy = correct_predictions / total_samples
print(accuracy)
```
### 範例二
本範例會示範如何使用 SCPL 進行圖像分類任務,資料集部分使用 Mnist 資料集
1. 引入必要 package
```python=
import torch
import torch.nn as nn
import torch.optim as optim
from Model import SCPL_model
import torchvision
import torchvision.transforms as transforms
```
2. 載入資料集
```python=
# Define data transformations (convert image to Tensor and normalize)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)) # Mean and std deviation for grayscale images
])
# Load training and test datasets
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)
```
3. 建立 nn.Sequential() 模型
```python=
model = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1), # Input is 1 channel (grayscale)
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2), # Output size: 32 x 14 x 14
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), # 32 -> 64 channels
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2), # Output size: 64 x 7 x 7
nn.Flatten(), # Flatten 2D feature maps into a 1D vector
nn.Linear(64 * 7 * 7, 128), # Fully connected layer
nn.ReLU(),
nn.Linear(128, 10), # 10 outputs for 10 classes (digits 0-9)
nn.LogSoftmax(dim=1) # Use LogSoftmax to calculate class probabilities
)
```
4. 設定 SCPL 所需參數
```python=
device_map = {
"cuda:0": 3,
"cuda:1": 3,
"cuda:2": 5
}
scheduler = torch.optim.lr_scheduler.StepLR
scheduler_param = {
"step_size":2,
"gamma":0.1
}
```
5. 建立 SCPL 模型
```python=
model = SCPL_model(model, device_map
proj_type="mlp",
scheduler_fn=scheduler,
scheduler_param=scheduler_param)
```
6. 開始訓練
```python=
model.train()
num_epochs = 5
for epoch in range(num_epochs):
running_loss = 0.0
for i, (images, labels) in enumerate(train_loader):
# forward propagation
outputs, loss = model(images, labels)
running_loss += loss
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}')
```
7. 測試結果
```python=
model.eval()
correct = 0
total = 0
with torch.no_grad():
for images, labels in test_loader:
outputs, true_ys = model(images, labels)
_, predicted = torch.max(outputs.data, 1)
total += true_ys.size(0)
correct += (predicted == true_ys).sum().item()
print(f'Accuracy of the model on the 10000 test images: {100 * correct / total}%')
```