# PyTorch to TensorFlow Lite 模型轉換實作
想把PyTorch的模型放到行動裝置上使用,只好研究一下怎麼轉成TensorFlow Lite了。
謝謝專題展......
## 使用的模型
#### EchoNet-Dynamic
1. 網站: [EchoNet-Dynamic: A Large New Cardiac Motion Video Data Resource for Medical Machine Learning](https://echonet.github.io/dynamic/)
2. 程式碼: [github](https://github.com/echonet/dynamic)
3. 論文: [Video-based AI for beat-to-beat assessment of cardiac function](https://www.nature.com/articles/s41586-020-2145-8)
此為Standford大學關於自動化測量左心室射血分數的研究,詳見
以下使用的是其中關於左心室語意分割的PyTorch模型
也就是由主要由../echonet/utils/segmentation.py產生之模型
#### 模型的框架為:
```python=
# Set up model
model = torchvision.models.segmentation.__dict__["deeplabv3_resnet50"](pretrained=False, aux_loss=False)
# change number of outputs to 1
model.classifier[-1] = torch.nn.Conv2d(model.classifier[-1].in_channels, 1, kernel_size=model.classifier[-1].kernel_size)
```
#### 使用的優化器設定:
```python=
# Set up optimizer
optim = torch.optim.SGD(model.parameters(), lr=1e-5, momentum=0.9, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optim, None)
```
#### 儲存的字典為:
```python=
# Save checkpoint
save = {
'epoch': epoch,
'state_dict': model.state_dict(), # 儲存的key中有'module'前綴
'best_loss': bestLoss,
'loss': loss,
'opt_dict': optim.state_dict(),
'scheduler_dict': scheduler.state_dict(),
}
```
#### 關於model = torch.nn.DataParallel(model)的調整
模型中因為設備會採用**多平行GPU**運算
```python=
if device.type == "cuda":
model = torch.nn.DataParallel(model)
model.to(device)
```
所以儲存的最佳模型參數best.pt中的每個key都會是`model.module.(key_name)`,再轉換模型時使用單一CPU或GPU時需要將key中的module刪掉。
```python=
new_checkpoint = OrderedDict()
for k, v in checkpoint['state_dict'].items():
name = k[7:] # remove `module.`
new_checkpoint[name] = v
```
上面這個方法是因為模型雖然是用 `model = torch.nn.DataParallel(model)`訓練,但實際設備中只有一顆GPU,因此訓練等同於在一顆GPU上進行,只是key的名字中多了.module,所以能使用~~暴力~~迴圈刪除法。
如果今天是真的在多GPU上跑訓練,建議應該在儲存模型參數時使用(其實我也不清楚有沒有差別)
```python=
# Save checkpoint
save = {
# 儲存的key中沒有'module'前綴
'state_dict': model.module.state_dict()
}
torch.save(save, 'best.pt')`
```
這樣儲存的key就不會有module前綴,可以直接用在單GPU或CPU。如果是要load到多GPU的模型測試時使用
```python=
checkpoint = torch.load("best.pt", map_location=torch.device('cuda'))
model.module.load_state_dict(checkpoint['state_dict'])
```
就可以加回.module前綴。
在segmentation.py訓練完後做testing時因為還是使用Parallel GPU,因此key中沒有module會報錯,所以在有testing的狀況下儲存參數都不去掉.module直接使用。
當然也可以在segmentation.py只做訓練並使用model.module.state_dict()去掉.module,不testing直接拿模型去轉TensorFlow Lite。
~~但是重新訓練好麻煩...跳過~~
> 參考: [pytorch GPU和CPU模型相互加载](https://blog.csdn.net/ytusdc/article/details/122137188)
---
## 環境建置
使用Anaconda3建構虛擬環境
* Python 3.9.18
* TensorFlow 2.8.0
* TensorFlow-addons 0.17.1
* ONNX 1.10.2
* ONNX Runtime 1.10.0
* ONNX-TensorFlow 1.10.0
* ONNX-Simplifier
* TensorFlow-Probability 0.16.0
* Pillow 8.1.2
* Numpy 1.26.4
~~我其實用conda裝1.20.1 不知道為啥變這版本~~
* OpenCV-Python 4.5.1.48
* PyTorch 1.8.0
* Torchaudio 0.8.0
* Torchvision 0.9.0
```=
pip install tensorflow==2.8.0
pip install tensorflow-addons==0.17.0
pip install onnx==1.10.2
pip install onnxruntime==1.10.0
pip install onnx-tf==1.10.0
pip install onnx-simplifier
pip install tensorflow-probability==0.16.0
pip install protobuf==3.20.0
conda install Pillow==8.1.2
conda install numpy==1.26.4
conda install opencv-python==4.5.1.48
conda install pytorch==1.8.0 torchaudio==0.8.0 cudatoolkit=11.1 -c pytorch -c conda-forge
conda install torchvision==0.9.0
```
---
## 模型轉換
我使用以下兩種方法都有成功轉換
### PyTorch to TensorFlow Lite
> 參考: [pth转onnx,onnx转tflite,亲测有效](https:/https://blog.csdn.net/Guoqi1911/article/details/127422902?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-5-127422902-blog-123793559.235^v43^pc_blog_bottom_relevance_base6&spm=1001.2101.3001.4242.4&utm_relevant_index=8/)
先轉成.onnx之後使用別人的開源工具[**onnx2tflite**](https://github.com/MPolaris/onnx2tflite)進行轉換,先git clone之後再按照指示使用,裡面也有使用範例。
轉換過程不會用到onnx-tf,而且轉換後的模型input和output都會變成==常規TensorFlow Lite資料格式[n, w, h, c]形式==。
```python=
import os
import torch
import torchvision
import numpy as np
from collections import OrderedDict
test_arr = np.random.randn(20, 3, 112, 112).astype(np.float32)
dummy_input = torch.tensor(test_arr)
# 載入模型字典
#(五個鍵值對:'epoch': epoch,'state_dict': model.state_dict(),'best_loss': bestLoss,'loss': loss,'opt_dict': optim.state_dict(),'scheduler_dict': scheduler.state_dict(),)
checkpoint = torch.load("best.pt", map_location=torch.device('cpu'))
new_checkpoint = OrderedDict()
for k, v in checkpoint['state_dict'].items():
name = k[7:] # remove `module.`
new_checkpoint[name] = v
# Set up model
model = torchvision.models.segmentation.__dict__["deeplabv3_resnet50"](pretrained=False, aux_loss=False)
model.classifier[-1] = torch.nn.Conv2d(model.classifier[-1].in_channels, 1, kernel_size=model.classifier[-1].kernel_size) # change number of outputs to 1
# Set up optimizer
optim = torch.optim.SGD(model.parameters(), lr=1e-5, momentum=0.9, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optim, None)
# Load best weights
model.load_state_dict(new_checkpoint)
optim.load_state_dict(checkpoint['opt_dict'])
scheduler.load_state_dict(checkpoint['scheduler_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
for parameter in model.parameters():
parameter.requires_grad = False
model.eval()
input_names = ["input"]
output_names = ["output"]
dynamic_axes = {"input": {0: "batch_size"}}
torch.onnx.export(model,
dummy_input,
"segmentation_20_3_112_112.onnx",
opset_version=10, # or 11, 13
verbose=False,
input_names=input_names,
output_names=output_names,
dynamic_axes = dynamic_axes)
from converter import onnx_converter
onnx_converter(
onnx_model_path = "C:/Users/bmilab/Pytorch_to_TFlite/onnx2tflite/segmentation_20_3_112_112.onnx",
need_simplify = True,
output_path = "./",
target_formats = ['tflite'], #or ['keras'], ['keras', 'tflite']
weight_quant = False,
int8_model = False, # do quantification
int8_mean = None, # give mean of image preprocessing
int8_std = None, # give std of image preprocessing
image_root = None # give image folder of train
)
```
### 分三段進行
> 參考:
> 1. [模型转换:由Pytorch到TFlite](https://zhuanlan.zhihu.com/p/363317178)
> 2. [深度学习—权重文件格式.pt转.tflite](https://zhuanlan.zhihu.com/p/686330401)
> 3. [pytorch模型转tflite【以EfficientNet-BTS为例】](https://blog.csdn.net/qq_40600539/article/details/123142541)
使用相同方法轉成.onnx檔,再利用**onnx-tf**工具轉成TensorFlow,最後用TensorFlow內建工具轉換成Lite版本。
因為使用onnx-tf進行轉換,轉換後的模型input和output都==和原本PyTorch的常規資料格式[n, c, w, h]一樣==。
onnx-tf 似乎是不太會更新版本,且對應的TensorFlow版本範圍頗嚴格,環境建置蠻麻煩的。我使用最新的onnx-tf版本1.10.0(大概2022年的更新)必須使用TensorFlow 2.8.0,因此所有配套工具都要找好對應版本。原本使用TensorFlow 2.16.0就會報錯。
#### PyTorch to ONNX
```python=
import os
import torch
import torchvision
import numpy as np
from collections import OrderedDict
test_arr = np.random.randn(20, 3, 112, 112).astype(np.float32)
dummy_input = torch.tensor(test_arr)
# 載入模型字典
#(五個鍵值對:'epoch': epoch,'state_dict': model.state_dict(),'best_loss': bestLoss,'loss': loss,'opt_dict': optim.state_dict(),'scheduler_dict': scheduler.state_dict(),)
checkpoint = torch.load("best.pt", map_location=torch.device('cpu'))
new_checkpoint = OrderedDict()
for k, v in checkpoint['state_dict'].items():
name = k[7:] # remove `module.`
new_checkpoint[name] = v
# Set up model
model = torchvision.models.segmentation.__dict__["deeplabv3_resnet50"](pretrained=False, aux_loss=False)
model.classifier[-1] = torch.nn.Conv2d(model.classifier[-1].in_channels, 1, kernel_size=model.classifier[-1].kernel_size) # change number of outputs to 1
# Set up optimizer
optim = torch.optim.SGD(model.parameters(), lr=1e-5, momentum=0.9, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optim, None)
# Load best weights
model.load_state_dict(new_checkpoint)
optim.load_state_dict(checkpoint['opt_dict'])
scheduler.load_state_dict(checkpoint['scheduler_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
for parameter in model.parameters():
parameter.requires_grad = False
model.eval()
input_names = ["input"]
output_names = ["output"]
dynamic_axes = {"input": {0: "batch_size"}}
torch.onnx.export(model,
dummy_input,
"segmentation_20_3_112_112.onnx",
opset_version=10, # or 11, 13
verbose=False,
input_names=input_names,
output_names=output_names,
dynamic_axes = dynamic_axes)
```
#### ONNX to TensorFlow
```python=
import onnx
import onnxruntime as ort
import tensorflow as tf
from onnxsim import simplify
from onnx_tf.backend import prepare
TF_PATH = "TF模型儲存的資料夾目的地"
ONNX_PATH = ".onnx檔儲存位置" # path to my existing ONNX model
# Load your predefined ONNX model
onnx_model = onnx.load(ONNX_PATH)
# Simplify model
model_simp, check = simplify(onnx_model)
assert check, "Simplified ONNX model could not be validated"
tf_rep = prepare(model_simp) # creating TensorflowRep object
tf_rep.export_graph(TF_PATH)
```
#### TensorFlow to TensorFlow Lite
```python=
import tensorflow as tf
TFLITE_PATH = "TFLite模型儲存PATH/模型名稱.tflite"
converter = tf.lite.TFLiteConverter.from_saved_model(TF_PATH)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tf_lite_model = converter.convert()
with open(TFLITE_PATH, 'wb') as f:
f.write(tf_lite_model)
```
---
## 轉換後使用模型
目前使用第一個方法跑出來的TensorFlow Lite的模型
```python=
import os
import cv2
import time
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from keras.preprocessing.image import img_to_array, load_img
# 加載模型
interpreter = tf.lite.Interpreter(model_path="D:/Echonet/Pytorch_to_TFlite/segmentation_20_3_112_112.tflite")
interpreter.allocate_tensors()
# 獲取輸入和輸出張量
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(input_details)
print(output_details)
# 準備輸入數據
input_shape = input_details[0]['shape']
print(input_shape)
start = time.time()
# 影片資料夾路徑
videos_folder = "D:/Echonet/Pytorch_to_TFlite/Videos_Test"
# 遍歷影片資料夾中的影片
for filename in os.listdir(videos_folder):
if filename.endswith(".avi"):
print("Processing video:", filename)
# 讀取 .avi 檔案
capture = cv2.VideoCapture(os.path.join(videos_folder, filename))
# 定義影像尺寸
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 讀取影片並進行前處理
video_frames = np.zeros((frame_count, frame_height, frame_width, 3), np.uint8)
for frame_index in range(frame_count):
ret, frame = capture.read()
if not ret:
raise ValueError("Failed to load frame #{} of {}.".format(frame_index, filename))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
video_frames[frame_index, :, :, :] = frame
input_data = video_frames.astype(np.float32)
# input_data: [frame_count, height, width, channel] 值為 np.float32 類型
print(input_data.shape)
# 重新調整輸入數據為 [channels, -1]
reshaped_input = np.transpose(input_data, (3, 0, 1, 2)).reshape(input_data.shape[0], -1)
mean = np.mean(reshaped_input, axis=0)
std = np.std(reshaped_input, axis=0)
mean = mean.astype(np.float32)
std = std.astype(np.float32)
input_data = input_data - mean.reshape(1, 1, 1, 3)
input_data = input_data / std.reshape(1, 1, 1, 3)
# input_data: 標準化後的[frame_count, height, width, channel] 值為 np.float32 類型
for frame_index in range(frame_count):
# 將輸入資料設置到tflite模型中(只有1張)
interpreter.set_tensor(input_details[0]['index'], input_data[frame_index]) # input_data 是你的輸入張量
# 進行推理
interpreter.invoke()
# 獲取模型的輸出
output_data = interpreter.get_tensor(output_details[0]['index'])
# output: [frame, height, width, channel]
output_image = output_data[0, :, :, 0]
mask = (output_image > 0).astype(np.uint8)
mask_img = Image.fromarray((mask * 255).astype(np.uint8))
mask_img.save(f"D:/Echonet/Pytorch_to_TFlite/all_mask/{filename}_{frame_index}.png")
show_frame = video_frames[0].transpose(1, 2, 0)
gray_frame = np.mean(show_frame, axis=2)
# 顯示輸出影像
red_overlay = np.zeros_like(gray_frame)
red_overlay[mask == 1] = 255 # 在 mask 中值為 1 的地方對應的 gray_frame 中設為紅色
# 顯示灰度影像和紅色區域
plt.imshow(mask_img, cmap='gray') # 顯示灰度影像
plt.axis('off') # 不顯示坐標軸
plt.show()
print("All videos processed.")
end = time.time()
print("Time: ", (end - start))
```