###### tags: `reinforcement learning`
# 深度強化學習 Ch3.3 : Q-Learning 實作 2
<br>
## 1. 災難型失憶
### (1). 發生原因
在之前的訓練過程中,每次執行動作後都會更新 model 權重,
但如此就很有可能發生災難性失憶,如以下情況 :
```
遊戲 A 遊戲 B
[[' ','W',' ',' '], [[' ','W',' ',' '],
[' ','+','P','-'], [' ','-','P','+'],
[' ',' ',' ',' '], [' ',' ',' ',' '],
[' ',' ',' ',' ']] [' ',' ',' ',' ']]
```
當 model 訓練完 [ 遊戲A ] 情況後,學到只要 player 向左就會獲勝( 正回饋 ),
但接著訓練 [ 遊戲 B ] 時卻會在同樣情況學到向右走會輸 ( 負回饋 ),
就導致之前訓練的觀念被"顛覆",造成 model 混亂,即稱災難型失憶。
<br>
### (2). 解決方法 : 經驗回放
如果回憶常見的深度學習問題( ex : 監督式學習、非監督式學習 ),
其資料會以 batch 為單位來訓練,這種方式不會導致災難性失憶,所以這邊也用類似的技巧,
將訓練的資料儲存成 [ 記憶串列 ],再從中取出 Batch。
:::warning
#### **訓練架構 :**
基本架構與之前相同,差別是會等資料存成 Batch 後才進行 model 的訓練更新
<br>
![](https://i.imgur.com/nDRAvXI.png)
- **Batch 資料**
Batch 資料的形式如下
==( 現在狀態( s1 ), 現在動作( a ), Reward( r ), 未來狀態( s2 ), 遊戲是否結束( done ) )==
當要更新 model 時,會再將這些資料分別取出,來算 TD-target、error 等
- **儲存 Batch 方式**
會用 ==Deque== 的資料結構儲存 Batch,此資料結構要設定資料大小,
如儲存資料數量超過儲存大小,則會將最一開始的資料去除。
:::
:::spoiler :secret:訓練程式碼
```python=
from collections import deque # 記憶串列資料結構(先進先出)
epochs = 5000
losses = [] # 紀錄 loss(用來印出)
mem_size = 1000 # 記憶串列大小
batch_size = 200 # batch 大小
replay = deque(maxlen=mem_size) # 記憶串列
max_mov = 50 # 限制一場遊戲最多可用動作(步伐)
for i in range(epochs):
# 建立遊戲
game = Gridworld(size=4, mode='random')
# 獲得遊戲狀態 State
state_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10 # 將 shape(4*4*4) => 64,並加上雜訊
# 將當前狀態轉為 Tensor
state1 = torch.from_numpy(state_).float()
# 追蹤是否還繼續遊戲
status = 1 # 1:還在繼續
# 紀錄遊戲動作次數
mov = 0
while(status == 1):
mov += 1
# ------------------------------ 當前預測 Q 值 ----------------------------------------
qval = model(state1) # 得到預測 Q
qval_ = qval.data.numpy() # 將預測值轉為 numpy 陣列
# ------------------------------ 選擇執行動作 (使用 epsilon-貪婪策略) ----------------------------------------
if(random.random() < epsilon):
action_ = np.random.randint(0,4) # 選擇隨機動作
else:
action_ = np.argmax(qval_) # 選擇最大動作(數字)
action = action_set[action_] # 數字轉換為對應動作
# ------------------------------ 執行動作、更新State、取得 Reward -----------------------------
# 執行動作
game.makeMove(action)
# 取得新狀態
state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10.0
state2 = torch.from_numpy(state2_).float()
# 取得 Reward
reward = game.reward()
# ------------------------------ 將資訊儲存到記憶串列 -----------------------------
done = True if reward != -1 else False # 查看是否遊戲已結束(已結束:True)
exp = (state1, action_, reward, state2, done) # 儲存型式 (當前狀態, 當前動作(idx), 新狀態, reward, done)
replay.append(exp) # 儲存到記憶串列
# 更新當前狀態
state1 = state2
# ------------------------------ 更新 model 參數 (資料數量達到 batch size ) -----------------------------
if len(replay) > batch_size:
########################################
miniBatch = random.sample( replay, batch_size ) # 隨機從記憶串列中取樣 batch
# --- 分成各個元素的小 batch ---
state1_batch = torch.cat([s1 for (s1,a,r,s2,d) in miniBatch]) # 當前 state batch (shape = 4*4*4*batch_size)
state2_batch = torch.cat([s2 for (s1,a,r,s2,d) in miniBatch]) # 新 state batch
action_batch = torch.Tensor([a for (s1,a,r,s2,d) in miniBatch])
reward_batch = torch.Tensor([r for (s1,a,r,s2,d) in miniBatch])
done_batch = torch.Tensor([d for (s1,a,r,s2,d) in miniBatch])
#########################################
# 計算[目前狀態 batch]的 Q (相當於 qval)
Q1 = model(state1_batch)
# 計算[新狀態 batch]的 Q ( newQ )
with torch.no_grad():
Q2 = model(state2_batch)
########################################
# --- 計算 TD-target ---
Y = reward_batch + gamma * ( (1 - done_batch) * torch.max(Q2,dim=1)[0]) # 如是遊戲結束狀態: Y=reward
X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze() # 只取出 Q1 中執行的動作的 Q
# --- 計算 TD-Error ---
loss = loss_fn(X,Y.detach()) # detach: 分離 Y 的運算圖( 不更新 Y model )
print(i, loss.item())
clear_output(wait=True)
# --- Update ---
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 紀錄 loss
losses.append(loss.item())
# 遊戲結束
if (abs(reward) == 10) or ( mov > max_mov ):
status = 0 # 如遊戲結束 status 設為0
mov = 0 # 重製動作數
# 遞減 epsilon
if(epsilon > 0.1):
epsilon -= (1/epochs)
```
:::
<br>
**PS.** 因為這次遊戲是使用 "random" 模式,情況較多,所以要訓練多次一點( ex : 5000 )
<br>
### (3). 訓練結果
- **Loss** : 可以明顯看到 Loss 下降,但卻有不少雜訊
![](https://i.imgur.com/sxcx35g.png =500x300)
- **Test model**
測試的函數用前一篇所寫的就好
```python=
max_game = 1000 # 測試次數
wins = 0
for i in range(max_game):
win =test_model( model, 'random', display=False)
if (win):
wins += 1
win_percent = float(wins) / float(max_game)
print(f"Game Played:{max_game}, # of wins:{wins}")
print(f"Win percentage: {win_percent}%")
```
```
# 結果有約 88% 遊戲獲勝
Game Played:1000, # of wins:886
Win percentage: 0.886%
```
---
<br>
## 2. 目標網路 ( Target Network )
雖然已經解決災難性失憶,但收斂的過程中還是有許多的雜訊
這時可以用目標網路來加以優化訓練過程
### (1). 發生原因
1. 因為每次我們做訓練時,**只有獲勝 or 輸才會獲得明顯 Reward(10,-10)**,
所以做其他動作時**無法區分出動作好壞**,導致 model 混亂
2. **每次在更新 Q 函數時,目標 Q ( TD-Target ) 都會不同**,
頻繁的更動更新的目標導致 **Q 在更新時產生偏誤,變得難以收斂**(可見下圖)
![](https://i.imgur.com/hu7BMSm.png )
<br>
### (2). 解決方法 : 目標網路
:::success
詞彙意義
- **目標 Q : ($\hat{Q}$)**
就是 **TD target** 中的 $\mathop{max}\limits_{a}Q^{*}(s_{t+1}, A)$
- **主要 Q : ($Q$)**
即為 $Q_{\pi}(s_{t}, a_t)$
:::
:::warning
**目標網路的想法** :
- 要將目標 Q ( TD-Target ) 先固定住,不讓他每次都隨著 Q 函數( 主要 Q ) 一起更新,
而是隨著我們設定的頻率更新 ( [目標Q] 同步 [主要Q] )
![](https://i.imgur.com/M6gRmg9.png)
<br>
**實作方法** :
- 要先複製一個 model 用來當作 [ 目標 Q ] 區隔出來
```python=
# 複製神經網路架構
model2 = copy.deepcopy(model)
# 複製網路權重
model2.load_state_dict( model.state_dict())
```
- 在訓練過程也要設定更新 [ 目標 Q ] 權重的頻率
j 則是紀錄動作次數,每做一個動作就要加 1
```python=
sync_freq = 500 # 設定 Q_hat (model2) 更新頻率
j = 0 # 訓練次數記錄 (查看是否達到更新頻率)
```
- 要將目標 Q 的 model 換成 model2
```python=+
# 計算[新狀態 batch]的 Q ( newQ )
with torch.no_grad():
Q2 = model2(state2_batch) # 使用 model2 (Q_hat) 做預測
```
- 最後在動作次數達到設定的頻率時,更新[目標 Q] model
**這邊所說的"更新"是指同步 model 的參數**
```python=+
# --- 更新 model2 ( Q_hat ) 的權重 ---
if ( j % sync_freq == 0 ):
model2.load_state_dict(model.state_dict())
```
:::
:::spoiler :secret: model 程式碼
```python=
import copy
# 神經網路設定
layer1_size = 4*4*4
layer2_size = 150
layer3_size = 100
layer4_size = 4
model = nn.Sequential(
nn.Linear(in_features=layer1_size, out_features=layer2_size),
nn.ReLU(),
nn.Linear(layer2_size, layer3_size),
nn.ReLU(),
nn.Linear(layer3_size, layer4_size)
)
# 複製神經網路架構
model2 = copy.deepcopy(model)
# 複製網路權重
model2.load_state_dict( model.state_dict())
# 其他設定
loss_fn = nn.MSELoss()
lr = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
gamma = 0.9
epsilon = 1
```
:::
:::spoiler :secret: 訓練程式碼
```python=
epochs = 5000
losses = []
mem_size = 1000
batch_size = 200
replay = deque(maxlen=mem_size)
max_mov = 50
sync_freq = 500 # 設定 Q_hat (model2) 更新頻率
j = 0 # 訓練次數記錄 (查看是否達到更新頻率)
for i in range(epochs):
game = Gridworld(size=4, mode='random')
state_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10 # 將 shape(4*4*4) => 64,並加上雜訊
state1 = torch.from_numpy(state_).float()
status = 1
mov = 0
while(status == 1):
j += 1 # 每做一個動作 j + 1
mov += 1
# ------------------------------ 當前預測 Q 值 ----------------------------------------
qval = model(state1)
qval_ = qval.data.numpy()
# ------------------------------ 選擇執行動作 (使用 epsilon-貪婪策略) ----------------------------------------
if(random.random() < epsilon):
action_ = np.random.randint(0,4)
else:
action_ = np.argmax(qval_)
action = action_set[action_]
# ------------------------------ 執行動作、更新State、取得 Reward -----------------------------
game.makeMove(action)
state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10.0
state2 = torch.from_numpy(state2_).float()
reward = game.reward()
# ------------------------------ 將資訊儲存到記憶串列 -----------------------------
done = True if reward != -1 else False
exp = (state1, action_, reward, state2, done)
replay.append(exp)
state1 = state2
# ------------------------------ 更新 model 參數 (資料數量達到 batch size ) -----------------------------
if len(replay) > batch_size:
########################################
miniBatch = random.sample( replay, batch_size )
# --- 分成各個元素的小 batch ---
state1_batch = torch.cat([s1 for (s1,a,r,s2,d) in miniBatch])
state2_batch = torch.cat([s2 for (s1,a,r,s2,d) in miniBatch])
action_batch = torch.Tensor([a for (s1,a,r,s2,d) in miniBatch])
reward_batch = torch.Tensor([r for (s1,a,r,s2,d) in miniBatch])
done_batch = torch.Tensor([d for (s1,a,r,s2,d) in miniBatch])
#########################################
# 計算[目前狀態 batch]的 Q (相當於 qval)
Q1 = model(state1_batch)
# 計算[新狀態 batch]的 Q ( newQ )
with torch.no_grad():
Q2 = model2(state2_batch) # 使用 model2 (Q_hat) 做預測
########################################
# --- 計算 TD-target ---
Y = reward_batch + gamma * ( (1 - done_batch) * torch.max(Q2,dim=1)[0])
X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()
# --- 計算 TD-Error ---
loss = loss_fn(X,Y.detach())
if i % 100:
print(i, loss.item())
clear_output(wait=True)
# --- Update ---
optimizer.zero_grad()
loss.backward()
optimizer.step()
# --- 更新 model2 ( Q_hat ) 的權重 ---
if ( j % sync_freq == 0 ):
model2.load_state_dict(model.state_dict())
# 紀錄 loss
losses.append(loss.item())
# 遊戲結束
if (abs(reward) == 10) or ( mov > max_mov ):
status = 0
mov = 0
# 遞減 epsilon
if(epsilon > 0.1):
epsilon -= (1/epochs)
```
:::
<br>
### (3). 訓練結果
訓練出來的結果還不錯,理論上來說主要出現的雜訊會是在每次更新 [目標 Q] 時出現
所以雜訊較少。
![](https://i.imgur.com/tsVe39y.png =600x300)
測試出來的獲勝率也提升了一些
```
# 獲勝率約介於 91%-92%
Game Played:2000, # of wins:1832
Win percentage: 0.916%
```
---
<br>
## 3. 增加目標
這裡我們嘗試設定 **[ 撞牆 ]** 也會拿到負回饋(-5)來更明確的給予動作的價值
將程式碼部分更改為以下
```python=
# 訓練迴圈外
# 定義 Action 的實際向量
move_pos = [(-1,0),(1,0),(0,-1),(0,1)]
```
```python=+
# 訓練過程中
if(random.random() < epsilon):
action_ = np.random.randint(0,4)
else:
action_ = np.argmax(qval_)
# 如果 player 無法移動即是撞到牆
hit_wall = game.validateMove('Player', move_pos[action_]) == 1
action = action_set[action_]
game.makeMove(action)
# noise 改成除 100 結果較好
state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/100.0
state2 = torch.from_numpy(state2_).float()
# 撞到牆 reward = -5
reward = -5 if hit_wall else game.reward()
done = True if reward != -1 else False
```
<br>
**訓練及測試結果如下**
:::info
會發現結果都很好,甚至不加目標網路也沒差。
- 不使用目標網路
![](https://i.imgur.com/aQ4zEvq.png )
```
Game Played:1000, # of wins:978
Win percentage: 0.978%
```
<br>
- 加上目標網路
![](https://i.imgur.com/yxLy8zO.png)
```
Game Played:1000, # of wins:975
Win percentage: 0.975%
```
:::