# Week12 RPN Penn
## bbox
```python=
def bbox_generation(images, targets, X_FM, Y_FM):
global ratios
global anchor_scales
num_batch = len(images) # batch size (15)
X_IMG, Y_IMG = images[0].shape[1:] # 原始影像寬高 (800, 800)
# targets 是 tuple 有 batch size 個 item
# item 是 dictionary
# item['boxes'] 是 object num*4 的 tensor
# item['labels'] 是 object num*1 的 tensor
bbox_all = [item['boxes'] for item in targets] # tuple 轉 list
labels_all = [item['labels'] for item in targets] # tuple 轉 list
#imgs_torch_all = torch.stack([item for item in images])
#if is_cuda:
# imgs_torch_all = imgs_torch_all.cuda()
#k = imgs_torch_all.clone()
#for m in req_features:
# k = m(k)
#print(k.shape)
# 計算 feature map 與原始影像的縮小倍率 (通過四次 max pooling -> 16)
sub_sampling_x = int(X_IMG/X_FM)
sub_sampling_y = int(Y_IMG/Y_FM)
# 3種ratios 3種anchor_scales, x y w h 四維
anchor_base = np.zeros((len(ratios)*len(anchor_scales), 4), dtype=np.float32)
# 若 feature map 是 50*50,則要找出這2500組anchor(每組9個) 中心在原始影像上的位置
# 這些位置會存在 ctr 內
ctr_x = np.arange(sub_sampling_x, (X_FM+1) * sub_sampling_x, sub_sampling_x)
ctr_y = np.arange(sub_sampling_y, (Y_FM+1) * sub_sampling_y, sub_sampling_y)
index = 0
ctr = np.zeros((len(ctr_y)*len(ctr_y),2),dtype=np.float32)
for x in range(len(ctr_x)):
for y in range(len(ctr_y)):
ctr[index, 1] = ctr_x[x] - 8
ctr[index, 0] = ctr_y[y] - 8
index +=1
# 根據 ctr 的 2500 個中心座標計算 2500*9 個 anchor 在原始影像上 左上右下的座標
# 4 代表 (x1, y1, x2, y2)
# anchors 紀錄所有 anchor 的左上右下的座標
anchors = np.zeros((X_FM * Y_FM * 9, 4))
index = 0
for ctr_y, ctr_x in ctr:
for i in range(len(ratios)):
for j in range(len(anchor_scales)):
h = sub_sampling_x * anchor_scales[j] * np.sqrt(ratios[i])
w = sub_sampling_y * anchor_scales[j] * np.sqrt(1./ ratios[i])
anchors[index, 0] = ctr_y - h / 2.
anchors[index, 1] = ctr_x - w / 2.
anchors[index, 2] = ctr_y + h / 2.
anchors[index, 3] = ctr_x + w / 2.
index += 1
# 從算好 2500*9 個 anchor 座標來判斷哪些 anchor 超出影像邊界
index_inside = np.where(
(anchors[:, 0] >= 0) &
(anchors[:, 1] >= 0) &
(anchors[:, 2] <= Y_IMG) &
(anchors[:, 3] <= X_IMG)
)[0]
# label 的大小為影像邊界內的 anchor 數量,數值都先設為-1 (1:正樣本 0:負樣本 -1:停用樣本)
label = np.empty((len(index_inside), ), dtype=np.int32)
label.fill(-1)
# 沒超出影像邊界的 anchor
valid_anchors = anchors[index_inside]
#print(label.shape, valid_anchors.shape)
#print(valid_anchors[0])
ious_all = []
# bbox_all is list
# bx 是 object num*4 的 tensor 代表每張圖片裡,每個 bbox 的位置大小
for bx in bbox_all:
# len of ious_all: 15
# bx.size()[0]: 有幾個 boxes
ious = np.empty((len(label), bx.size()[0]), dtype=np.float32)
ious.fill(0)
# for loop: 每個 valid_anchor 會跟每一張 image 的每一個 box 計算 iou
# 一個 ious 存一張 image 所有 ground truth bounding box 跟所有 valid_anchor 的 iou
# ious_all 存 batch 裡 所有 image 的 ious
for num1, i in enumerate(valid_anchors):
ya1, xa1, ya2, xa2 = i
anchor_area = (ya2 - ya1) * (xa2 - xa1)
for num2, j in enumerate(bx):
yb1, xb1, yb2, xb2 = j
box_area = (yb2- yb1) * (xb2 - xb1)
inter_x1 = max([xb1, xa1])
inter_y1 = max([yb1, ya1])
inter_x2 = min([xb2, xa2])
inter_y2 = min([yb2, ya2])
if (inter_x1 < inter_x2) and (inter_y1 < inter_y2):
iter_area = (inter_y2 - inter_y1) * (inter_x2 - inter_x1)
iou = iter_area / (anchor_area + box_area - iter_area)
else:
iou = 0.
ious[num1, num2] = iou
ious_all.append(ious)
# len(ious_all): 15
# ious_all[0].shape: (8940, 2)
# ious_all[0].shape: (valid anchor 的數量, boxes 的數量)
# ground truth iou
# 為每個 ground truth bounding box 找跟他 iou 最大的 valid anchor
# 注意: 如果 iou 一樣(如:都是0、有兩個0.7)則記錄第一個出現的
gt_argmax_ious_all = []
gt_max_ious_all = []
for ious_ in ious_all:
gt_argmax_ious = ious_.argmax(axis=0)
gt_max_ious = ious_[gt_argmax_ious, np.arange(ious_.shape[1])]
gt_argmax_ious_all.append(gt_argmax_ious)
gt_max_ious_all.append(gt_max_ious)
# len(gt_argmax_ious_all):15
# gt_argmax_ious_all[0].shape: (2, 0)
# 為每個 valid anchor 找跟他 iou 最大的 ground truth bounding box
# argmax_ious 紀錄一張圖裡所有 valid anchor 其 iou 最大的 GTBbox 的 index
# max_ious 紀錄一張圖裡所有 valid anchor 其 iou 最大的 GTBbox 的 iou
# 注意: 如果 iou 一樣(如:都是0、有兩個0.7)則記錄第一個出現的
# argmax_ious_all 是記錄 batch 裡每張圖的 argmax_ious
# max_ious_all 是記錄 batch 裡每張圖的 max_ious
argmax_ious_all = []
max_ious_all = []
for ious_ in ious_all:
argmax_ious = ious_.argmax(axis=1)
max_ious = ious_[np.arange(len(label)), argmax_ious]
argmax_ious_all.append(argmax_ious)
max_ious_all.append(max_ious)
# len(argmax_ious_all): 15
# argmax_ious_all[0].shape: (8940, )
# len(max_ious_all): 15
# max_ious_all[0].shape: (8940, )
# 避免同時多個最大值只抓一個,因此根據最大值把相同值的 index 重抓一次
gt_argmax_ious_all = []
for gt_max_ious_, ious_ in zip(gt_max_ious_all, ious_all):
gt_argmax_ious = np.where(ious_ == gt_max_ious_)[0]
gt_argmax_ious_all.append(gt_argmax_ious)
# 正負樣本 IOU threshold
pos_iou_threshold = 0.7
neg_iou_threshold = 0.3
# 根據 iou 初步篩選正負樣本
# label_all 紀錄每張圖每個 anchor 的樣本分類 (1:正樣本 0:負樣本 -1:停用樣本)
label_all = []
for n in range(num_batch):
l = copy.deepcopy(label)
# 若某張圖的某個 anchor 跟所有 GTBbox 的 iou 中的最大值 < neg_iou_threshold(0.3)
# 設為負樣本 (label = 0)
l[max_ious_all[n] < neg_iou_threshold] = 0
# 若某張圖的某個 anchor 跟任一 GTBbox 的 iou 為所有 anchor 中最大的
# 設為正樣本 (label = 1)
l[gt_argmax_ious_all[n]] = 1
# 若某張圖的某個 anchor 跟所有 GTBbox 的 iou 中的最大值 > pos_iou_threshold(0.7)
# 設為正樣本 (label = 1)
l[max_ious_all[n] >= pos_iou_threshold] = 1
label_all.append(l)
pos_ratio = 0.5 # 正樣本在總樣本數中佔比
n_sample = 256 # 總樣本數
n_pos = int(pos_ratio * n_sample) # 正樣本數(上限)
# 選出正負樣本共 256 個
for n in range(num_batch):
# 獲得所有正樣本 index
pos_index = np.where(label_all[n] == 1)[0]
# 正樣本數量超出上限的話,隨機選 n_pos 個保留,其他的 label 設 -1 (停用樣本)
if len(pos_index) > n_pos:
disable_index = np.random.choice(pos_index, size=(len(pos_index) - n_pos), replace=False)
label_all[n][disable_index] = -1
# 負樣本數上限為 總樣本數 - 正樣本數
n_neg = n_sample - np.sum(label_all[n] == 1)
# 獲得所有負樣本 index
neg_index = np.where(label_all[n] == 0)[0]
# 負樣本數量超出上限的話,隨機選 n_neg 個保留,其他的 label 設 -1 (停用樣本)
if len(neg_index) > n_neg:
disable_index = np.random.choice(neg_index, size=(len(neg_index) - n_neg), replace = False)
label_all[n][disable_index] = -1
# max_iou_bbox_all 紀錄每張圖每個 valid anchors 最大 iou 對應的 GTBbox 的位置大小
max_iou_bbox_all = []
for n in range(num_batch):
max_iou_bbox_all.append(bbox_all[n][argmax_ious_all[n]])
# Anchor box
# 這邊就是上課講的 Ax, Ay, Aw, Ah
height = valid_anchors[:, 2] - valid_anchors[:, 0]
width = valid_anchors[:, 3] - valid_anchors[:, 1]
ctr_y = valid_anchors[:, 0] + 0.5 * height
ctr_x = valid_anchors[:, 1] + 0.5 * width
# Ground truth
# 這邊就是上課講的 Gx, Gy, Gw, Gh
base_height_all = []
base_width_all = []
base_ctr_y_all = []
base_ctr_x_all = []
for n in range(num_batch):
base_height = max_iou_bbox_all[n][:, 2] - max_iou_bbox_all[n][:, 0]
base_width = max_iou_bbox_all[n][:, 3] - max_iou_bbox_all[n][:, 1]
base_ctr_y = max_iou_bbox_all[n][:, 0] + 0.5 * base_height
base_ctr_x = max_iou_bbox_all[n][:, 1] + 0.5 * base_width
base_height_all.append(base_height)
base_width_all.append(base_width)
base_ctr_y_all.append(base_ctr_y)
base_ctr_x_all.append(base_ctr_x)
# Prevent devide by 0
eps = np.finfo(height.dtype).eps
height = np.maximum(height, eps)
width = np.maximum(width, eps)
# d_{} calculatrion
# 計算 dx(A), dy(A), dw(A), dh(A)
# anchor_locs_all 紀錄每張圖每個 valid anchors 修正到對應 GTBbox 的 d(A)
anchor_locs_all = []
for n in range(num_batch):
dy = (base_ctr_y_all[n].numpy() - ctr_y) / height
dx = (base_ctr_x_all[n].numpy() - ctr_x) / width
dh = np.log(base_height_all[n].numpy()/ height)
dw = np.log(base_width_all[n].numpy() / width)
anchor_locs_all.append(np.vstack((dy, dx,dh, dw)).transpose())
# anchor_labels_all 記錄所有 anchor(包含超出範圍的) 的樣本分類 (1:正樣本 0:負樣本 -1:停用樣本)
anchor_labels_all = []
for n in range(num_batch):
anchor_labels = np.empty((len(anchors),), dtype=label_all[n].dtype)
anchor_labels.fill(-1)
anchor_labels[index_inside] = label_all[n]
anchor_labels_all.append(anchor_labels)
# list of tensor to tensor(dim 多 1)
anchor_labels_all_merge = np.stack(anchor_labels_all, 0)
# anchor_locations_all 記錄所有 anchor(包含超出範圍的) 修正到對應 GTBbox 的 d(A),超出的全設 0
anchor_locations_all = []
for n in range(num_batch):
anchor_locations = np.empty((len(anchors), anchors.shape[1]), dtype=anchor_locs_all[n].dtype)
anchor_locations.fill(0)
anchor_locations[index_inside, :] = anchor_locs_all[n]
anchor_locations_all.append(anchor_locations)
# list of tensor to tensor(dim 多 1)
anchor_locations_all_merge = np.stack(anchor_locations_all, 0)
return anchor_locations_all_merge, anchor_labels_all_merge, anchors
```
## Train epoch
```python=
def train_epocs(req_features, model, optimizer, train_dl, val_dl, epochs=10, rpn_lambda=10):
for epoch in range(epochs):
model.train()
total = 0
sum_loss = 0
sum_loss_cls = 0
sum_loss_loc = 0
idx = 0
for images, targets in train_dl:
# images: tuple 裝了 batch size 個 tensor image (3*800*800)
# images: len(images): 15
# targets: len(targets): 15
idx += 1
num_batch = len(images)
print("######### %s epoch: %s" %(idx, epoch))
# 15 個 tensor image 組成一個 tensor
# imgs_torch_all: (15, 3, 800, 800)
imgs_torch_all = torch.stack([item for item in images])
if is_cuda:
imgs_torch_all = imgs_torch_all.cuda()
# 複製一份並通過 backbone 產生 feature map (15, 512, 50, 50)
k = imgs_torch_all.clone()
for m in req_features:
k = m(k)
# 獲得 feature map 寬高 50, 50
X_FM, Y_FM = k.shape[2:]
# Generate bbox
# anchor_locations_all_merge: 每張圖每個 anchor 修正到對應 GTBbox 的正確 d(A),ndarray(15, 22500, 4)
# anchor_labels_all_merge: 每張圖每個 anchor 的樣本分類 (1:正樣本 0:負樣本 -1:停用樣本),ndarray(15, 22500)
# 注意: RPN 不管分類,只管有無 object,因此正樣本代表有 object 就是 1,負樣本沒 object 就是 0,-1 到時候會被忽視
# anchors: 所有 anchor 的左上右下的座標,ndarray(22500, 4)
anchor_locations_all_merge, anchor_labels_all_merge, anchors = bbox_generation(images, targets, X_FM, Y_FM)
# 前兩個轉成 tensor
# gt_rpn_loc_all: torch.Size([337500, 4])
# gt_rpn_score_all: torch.Size([337500])
gt_rpn_loc_all = torch.from_numpy(anchor_locations_all_merge.astype(np.float32)).view(1,-1,4).squeeze(0)
gt_rpn_score_all = torch.from_numpy(anchor_labels_all_merge.astype(np.float32)).view(1,-1).squeeze(0)
# 放進 GPU Vram
if is_cuda:
gt_rpn_loc_all = gt_rpn_loc_all.cuda()
gt_rpn_score_all = gt_rpn_score_all.cuda()
# RPN
# pred_anchor_locs: 預測的 anchor 的 offset (15, 22500, 4)
# pred_cls_scores: 預測的 anchor 有無 object (15, 22500, 2)
# objectness_score: 預測的 anchor object score (15, 22500)
pred_anchor_locs, pred_cls_scores, objectness_score = model(k)
# rpn_loc_all: 把 batch size 也混在一起 (337500, 4)
rpn_loc_all = pred_anchor_locs.view(1,-1,4).squeeze(0)
# rpn_score_all: 把 batch size 也混在一起 (337500, 2)
rpn_score_all = pred_cls_scores.view(1,-1,2).squeeze(0)
# count score loss
# 計算有無 object 的 loss,停用樣本(value = -1)會被忽略
rpn_cls_loss_all = F.cross_entropy(rpn_score_all, gt_rpn_score_all.long(), ignore_index = -1)
# Positive samples
# 把負樣本跟停用樣本設為 0
pos_all = gt_rpn_score_all > 0
# 在後面新增一維(337500, 1)並複製擴成4倍大小(337500, 4)作為遮罩
mask_all = pos_all.unsqueeze(1).expand_as(rpn_loc_all)
# 應該單純只是計算正樣本數量
num_valid_loc_all = 0
for i, j in enumerate(pos_all):
if j.item() == True:
num_valid_loc_all += 1
#print (num_valid_loc_all)
# Count loc loss
# 使用遮罩過濾出正樣本的 x, y, w, h offset 預測值
mask_loc_preds_all = rpn_loc_all[mask_all].view(-1, 4)
#print(mask_loc_preds_all.shape)
# 使用遮罩過濾出正樣本的 x, y, w, h offset 理想值
mask_loc_targets_all = gt_rpn_loc_all[mask_all].view(-1, 4)
#print(mask_loc_preds_all.dtype, mask_loc_targets_all.dtype,gt_rpn_loc_all.dtype )
# 每個正樣本各自計算 x, y, w, h offset 的 loss (Smooth L1 Loss)
x_all = torch.abs(mask_loc_targets_all - mask_loc_preds_all)
rpn_loc_loss_all = ((x_all < 1).float() * 0.5 * x_all**2) + ((x_all >= 1).float() * (x_all-0.5))
# 計算整個 batch 裡的正樣本數量
N_reg_all = (gt_rpn_score_all > 0).float().sum()
# 每個正樣本的 offset loss 算平均
rpn_loc_loss_all = rpn_loc_loss_all.sum() / N_reg_all
# 有無 object 的 loss 跟正樣本的 offset loss 組成最終的 loss
rpn_loss_all = rpn_cls_loss_all + (rpn_lambda * rpn_loc_loss_all)
# backpropagation 等等
optimizer.zero_grad()
rpn_loss_all.backward()
optimizer.step()
# 計算 epoch 的 loss 等等
total += num_batch
sum_loss += rpn_loss_all.item()
sum_loss_cls += rpn_cls_loss_all.item()
sum_loss_loc += (rpn_lambda * rpn_loc_loss_all).item()
train_loss = sum_loss/total
train_loss_cls = sum_loss_cls/total
train_loss_loc = sum_loss_loc/total
if (epoch+1) % 5 == 0:
torch.save(model.state_dict(), './rpn_%s.pth'%epoch)
print("train_loss %.3f cls_loss %.3f loc_loss %.3f" % (train_loss, train_loss_cls, train_loss_loc))
return model
```
## RPN
```python=
class RPN(nn.Module):
def __init__(self, in_channels=512, mid_channels=512, n_anchor=9):
super(RPN, self).__init__()
self.mid_channels = mid_channels # 第一層 3*3 conv layer 的輸出 channel 數
self.in_channels = in_channels # backbone 輸出的 channel 數,vgg 16 => 512
self.n_anchor = n_anchor # feature map 每個 pixel 的 anchor 數
self.conv1 = nn.Conv2d(self.in_channels, self.mid_channels, 3, 1, 1)
self.reg_layer = nn.Conv2d(mid_channels, n_anchor *4, 1, 1, 0) # anchor 的 offset (x, y, w, h)
self.cls_layer = nn.Conv2d(mid_channels, n_anchor *2, 1, 1, 0) # anchor 有無 object
# conv sliding layer
self.conv1.weight.data.normal_(0, 0.01)
self.conv1.bias.data.zero_()
# Regression layer
self.reg_layer.weight.data.normal_(0, 0.01)
self.reg_layer.bias.data.zero_()
# classification layer
self.cls_layer.weight.data.normal_(0, 0.01)
self.cls_layer.bias.data.zero_()
def forward(self, k):
# bat_num = batch size
bat_num = k.shape[0]
x = self.conv1(k)
pred_anchor_locs = self.reg_layer(x)
pred_cls_scores = self.cls_layer(x)
# pred_anchor_locs reshape 把 feature map 位置、每個位置 anchor 數都放在同一維,最後一維是 x, y, w, h offset
pred_anchor_locs = pred_anchor_locs.permute(0, 2, 3, 1).contiguous().view(bat_num, -1, 4)
pred_cls_scores = pred_cls_scores.permute(0, 2, 3, 1).contiguous()
# pred_cls_scores 兩個分數(, 2)只取有 object 的分數,feature map 位置(50, 50)、每個位置 anchor 數(, 9,)都放在同一維
objectness_score = pred_cls_scores.view(bat_num, 50, 50, 9, 2)[:, :, :, :, 1].contiguous().view(bat_num, -1)
# 同上,但是有跟沒有 object 的分數都保留,所以多一維而且是兩個值
pred_cls_scores = pred_cls_scores.view(bat_num, -1, 2)
return pred_anchor_locs, pred_cls_scores, objectness_score
```
```python=
# 正負樣本 IOU threshold
pos_iou_threshold = 0.7
neg_iou_threshold = 0.3
# 根據 iou 初步篩選正負樣本
# label_all 紀錄每張圖每個 anchor 的樣本分類 (1:正樣本 0:負樣本 -1:停用樣本)
label_all = []
for n in range(num_batch):
l = copy.deepcopy(label)
# 若某張圖的某個 anchor 跟所有 GTBbox 的 iou 中的最大值 < neg_iou_threshold(0.3)
# 設為負樣本 (label = 0)
l[max_ious_all[n] < neg_iou_threshold] = 0
# 若某張圖的某個 anchor 跟任一 GTBbox 的 iou 為所有 anchor 中最大的
# 設為正樣本 (label = 1)
l[gt_argmax_ious_all[n]] = 1
# 若某張圖的某個 anchor 跟所有 GTBbox 的 iou 中的最大值 > pos_iou_threshold(0.7)
# 設為正樣本 (label = 1)
l[max_ious_all[n] >= pos_iou_threshold] = 1
label_all.append(l)
pos_ratio = 0.5 # 正樣本在總樣本數中佔比
n_sample = 256 # 總樣本數
n_pos = int(pos_ratio * n_sample) # 正樣本數(上限)
# 選出正負樣本共 256 個
for n in range(num_batch):
# 獲得所有正樣本 index
pos_index = np.where(label_all[n] == 1)[0]
# 正樣本數量超出上限的話,隨機選 n_pos 個保留,其他的 label 設 -1 (停用樣本)
if len(pos_index) > n_pos:
disable_index = np.random.choice(pos_index, size=(len(pos_index) - n_pos), replace=False)
label_all[n][disable_index] = -1
# 負樣本數上限為 總樣本數 - 正樣本數
n_neg = n_sample - np.sum(label_all[n] == 1)
# 獲得所有負樣本 index
neg_index = np.where(label_all[n] == 0)[0]
# 負樣本數量超出上限的話,隨機選 n_neg 個保留,其他的 label 設 -1 (停用樣本)
if len(neg_index) > n_neg:
disable_index = np.random.choice(neg_index, size=(len(neg_index) - n_neg), replace = False)
label_all[n][disable_index] = -1
```
```python=
# d_{} calculatrion
# 計算 dx(A), dy(A), dw(A), dh(A)
# anchor_locs_all 紀錄每張圖每個 valid anchors 修正到對應 GTBbox 的 d(A)
anchor_locs_all = []
for n in range(num_batch):
dy = (base_ctr_y_all[n].numpy() - ctr_y) / height
dx = (base_ctr_x_all[n].numpy() - ctr_x) / width
dh = np.log(base_height_all[n].numpy()/ height)
dw = np.log(base_width_all[n].numpy() / width)
anchor_locs_all.append(np.vstack((dy, dx,dh, dw)).transpose())
```