###### tags: `reinforcement learning` # 深度強化學習 Ch3.3 : Q-Learning 實作 2 <br> ## 1. 災難型失憶 ### (1). 發生原因 在之前的訓練過程中,每次執行動作後都會更新 model 權重, 但如此就很有可能發生災難性失憶,如以下情況 : ``` 遊戲 A 遊戲 B [[' ','W',' ',' '], [[' ','W',' ',' '], [' ','+','P','-'], [' ','-','P','+'], [' ',' ',' ',' '], [' ',' ',' ',' '], [' ',' ',' ',' ']] [' ',' ',' ',' ']] ``` 當 model 訓練完 [ 遊戲A ] 情況後,學到只要 player 向左就會獲勝( 正回饋 ), 但接著訓練 [ 遊戲 B ] 時卻會在同樣情況學到向右走會輸 ( 負回饋 ), 就導致之前訓練的觀念被"顛覆",造成 model 混亂,即稱災難型失憶。 <br> ### (2). 解決方法 : 經驗回放 如果回憶常見的深度學習問題( ex : 監督式學習、非監督式學習 ), 其資料會以 batch 為單位來訓練,這種方式不會導致災難性失憶,所以這邊也用類似的技巧, 將訓練的資料儲存成 [ 記憶串列 ],再從中取出 Batch。 :::warning #### **訓練架構 :** 基本架構與之前相同,差別是會等資料存成 Batch 後才進行 model 的訓練更新 <br> ![](https://i.imgur.com/nDRAvXI.png) - **Batch 資料** Batch 資料的形式如下 ==( 現在狀態( s1 ), 現在動作( a ), Reward( r ), 未來狀態( s2 ), 遊戲是否結束( done ) )== 當要更新 model 時,會再將這些資料分別取出,來算 TD-target、error 等 - **儲存 Batch 方式** 會用 ==Deque== 的資料結構儲存 Batch,此資料結構要設定資料大小, 如儲存資料數量超過儲存大小,則會將最一開始的資料去除。 ::: :::spoiler :secret:訓練程式碼 ```python= from collections import deque # 記憶串列資料結構(先進先出) epochs = 5000 losses = [] # 紀錄 loss(用來印出) mem_size = 1000 # 記憶串列大小 batch_size = 200 # batch 大小 replay = deque(maxlen=mem_size) # 記憶串列 max_mov = 50 # 限制一場遊戲最多可用動作(步伐) for i in range(epochs): # 建立遊戲 game = Gridworld(size=4, mode='random') # 獲得遊戲狀態 State state_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10 # 將 shape(4*4*4) => 64,並加上雜訊 # 將當前狀態轉為 Tensor state1 = torch.from_numpy(state_).float() # 追蹤是否還繼續遊戲 status = 1 # 1:還在繼續 # 紀錄遊戲動作次數 mov = 0 while(status == 1): mov += 1 # ------------------------------ 當前預測 Q 值 ---------------------------------------- qval = model(state1) # 得到預測 Q qval_ = qval.data.numpy() # 將預測值轉為 numpy 陣列 # ------------------------------ 選擇執行動作 (使用 epsilon-貪婪策略) ---------------------------------------- if(random.random() < epsilon): action_ = np.random.randint(0,4) # 選擇隨機動作 else: action_ = np.argmax(qval_) # 選擇最大動作(數字) action = action_set[action_] # 數字轉換為對應動作 # ------------------------------ 執行動作、更新State、取得 Reward ----------------------------- # 執行動作 game.makeMove(action) # 取得新狀態 state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10.0 state2 = torch.from_numpy(state2_).float() # 取得 Reward reward = game.reward() # ------------------------------ 將資訊儲存到記憶串列 ----------------------------- done = True if reward != -1 else False # 查看是否遊戲已結束(已結束:True) exp = (state1, action_, reward, state2, done) # 儲存型式 (當前狀態, 當前動作(idx), 新狀態, reward, done) replay.append(exp) # 儲存到記憶串列 # 更新當前狀態 state1 = state2 # ------------------------------ 更新 model 參數 (資料數量達到 batch size ) ----------------------------- if len(replay) > batch_size: ######################################## miniBatch = random.sample( replay, batch_size ) # 隨機從記憶串列中取樣 batch # --- 分成各個元素的小 batch --- state1_batch = torch.cat([s1 for (s1,a,r,s2,d) in miniBatch]) # 當前 state batch (shape = 4*4*4*batch_size) state2_batch = torch.cat([s2 for (s1,a,r,s2,d) in miniBatch]) # 新 state batch action_batch = torch.Tensor([a for (s1,a,r,s2,d) in miniBatch]) reward_batch = torch.Tensor([r for (s1,a,r,s2,d) in miniBatch]) done_batch = torch.Tensor([d for (s1,a,r,s2,d) in miniBatch]) ######################################### # 計算[目前狀態 batch]的 Q (相當於 qval) Q1 = model(state1_batch) # 計算[新狀態 batch]的 Q ( newQ ) with torch.no_grad(): Q2 = model(state2_batch) ######################################## # --- 計算 TD-target --- Y = reward_batch + gamma * ( (1 - done_batch) * torch.max(Q2,dim=1)[0]) # 如是遊戲結束狀態: Y=reward X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze() # 只取出 Q1 中執行的動作的 Q # --- 計算 TD-Error --- loss = loss_fn(X,Y.detach()) # detach: 分離 Y 的運算圖( 不更新 Y model ) print(i, loss.item()) clear_output(wait=True) # --- Update --- optimizer.zero_grad() loss.backward() optimizer.step() # 紀錄 loss losses.append(loss.item()) # 遊戲結束 if (abs(reward) == 10) or ( mov > max_mov ): status = 0 # 如遊戲結束 status 設為0 mov = 0 # 重製動作數 # 遞減 epsilon if(epsilon > 0.1): epsilon -= (1/epochs) ``` ::: <br> **PS.** 因為這次遊戲是使用 "random" 模式,情況較多,所以要訓練多次一點( ex : 5000 ) <br> ### (3). 訓練結果 - **Loss** : 可以明顯看到 Loss 下降,但卻有不少雜訊 ![](https://i.imgur.com/sxcx35g.png =500x300) - **Test model** 測試的函數用前一篇所寫的就好 ```python= max_game = 1000 # 測試次數 wins = 0 for i in range(max_game): win =test_model( model, 'random', display=False) if (win): wins += 1 win_percent = float(wins) / float(max_game) print(f"Game Played:{max_game}, # of wins:{wins}") print(f"Win percentage: {win_percent}%") ``` ``` # 結果有約 88% 遊戲獲勝 Game Played:1000, # of wins:886 Win percentage: 0.886% ``` --- <br> ## 2. 目標網路 ( Target Network ) 雖然已經解決災難性失憶,但收斂的過程中還是有許多的雜訊 這時可以用目標網路來加以優化訓練過程 ### (1). 發生原因 1. 因為每次我們做訓練時,**只有獲勝 or 輸才會獲得明顯 Reward(10,-10)**, 所以做其他動作時**無法區分出動作好壞**,導致 model 混亂 2. **每次在更新 Q 函數時,目標 Q ( TD-Target ) 都會不同**, 頻繁的更動更新的目標導致 **Q 在更新時產生偏誤,變得難以收斂**(可見下圖) ![](https://i.imgur.com/hu7BMSm.png ) <br> ### (2). 解決方法 : 目標網路 :::success 詞彙意義 - **目標 Q : ($\hat{Q}$)** 就是 **TD target** 中的 $\mathop{max}\limits_{a}Q^{*}(s_{t+1}, A)$ - **主要 Q : ($Q$)** 即為 $Q_{\pi}(s_{t}, a_t)$ ::: :::warning **目標網路的想法** : - 要將目標 Q ( TD-Target ) 先固定住,不讓他每次都隨著 Q 函數( 主要 Q ) 一起更新, 而是隨著我們設定的頻率更新 ( [目標Q] 同步 [主要Q] ) ![](https://i.imgur.com/M6gRmg9.png) <br> **實作方法** : - 要先複製一個 model 用來當作 [ 目標 Q ] 區隔出來 ```python= # 複製神經網路架構 model2 = copy.deepcopy(model) # 複製網路權重 model2.load_state_dict( model.state_dict()) ``` - 在訓練過程也要設定更新 [ 目標 Q ] 權重的頻率 j 則是紀錄動作次數,每做一個動作就要加 1 ```python= sync_freq = 500 # 設定 Q_hat (model2) 更新頻率 j = 0 # 訓練次數記錄 (查看是否達到更新頻率) ``` - 要將目標 Q 的 model 換成 model2 ```python=+ # 計算[新狀態 batch]的 Q ( newQ ) with torch.no_grad(): Q2 = model2(state2_batch) # 使用 model2 (Q_hat) 做預測 ``` - 最後在動作次數達到設定的頻率時,更新[目標 Q] model **這邊所說的"更新"是指同步 model 的參數** ```python=+ # --- 更新 model2 ( Q_hat ) 的權重 --- if ( j % sync_freq == 0 ): model2.load_state_dict(model.state_dict()) ``` ::: :::spoiler :secret: model 程式碼 ```python= import copy # 神經網路設定 layer1_size = 4*4*4 layer2_size = 150 layer3_size = 100 layer4_size = 4 model = nn.Sequential( nn.Linear(in_features=layer1_size, out_features=layer2_size), nn.ReLU(), nn.Linear(layer2_size, layer3_size), nn.ReLU(), nn.Linear(layer3_size, layer4_size) ) # 複製神經網路架構 model2 = copy.deepcopy(model) # 複製網路權重 model2.load_state_dict( model.state_dict()) # 其他設定 loss_fn = nn.MSELoss() lr = 0.001 optimizer = torch.optim.Adam(model.parameters(), lr=lr) gamma = 0.9 epsilon = 1 ``` ::: :::spoiler :secret: 訓練程式碼 ```python= epochs = 5000 losses = [] mem_size = 1000 batch_size = 200 replay = deque(maxlen=mem_size) max_mov = 50 sync_freq = 500 # 設定 Q_hat (model2) 更新頻率 j = 0 # 訓練次數記錄 (查看是否達到更新頻率) for i in range(epochs): game = Gridworld(size=4, mode='random') state_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10 # 將 shape(4*4*4) => 64,並加上雜訊 state1 = torch.from_numpy(state_).float() status = 1 mov = 0 while(status == 1): j += 1 # 每做一個動作 j + 1 mov += 1 # ------------------------------ 當前預測 Q 值 ---------------------------------------- qval = model(state1) qval_ = qval.data.numpy() # ------------------------------ 選擇執行動作 (使用 epsilon-貪婪策略) ---------------------------------------- if(random.random() < epsilon): action_ = np.random.randint(0,4) else: action_ = np.argmax(qval_) action = action_set[action_] # ------------------------------ 執行動作、更新State、取得 Reward ----------------------------- game.makeMove(action) state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/10.0 state2 = torch.from_numpy(state2_).float() reward = game.reward() # ------------------------------ 將資訊儲存到記憶串列 ----------------------------- done = True if reward != -1 else False exp = (state1, action_, reward, state2, done) replay.append(exp) state1 = state2 # ------------------------------ 更新 model 參數 (資料數量達到 batch size ) ----------------------------- if len(replay) > batch_size: ######################################## miniBatch = random.sample( replay, batch_size ) # --- 分成各個元素的小 batch --- state1_batch = torch.cat([s1 for (s1,a,r,s2,d) in miniBatch]) state2_batch = torch.cat([s2 for (s1,a,r,s2,d) in miniBatch]) action_batch = torch.Tensor([a for (s1,a,r,s2,d) in miniBatch]) reward_batch = torch.Tensor([r for (s1,a,r,s2,d) in miniBatch]) done_batch = torch.Tensor([d for (s1,a,r,s2,d) in miniBatch]) ######################################### # 計算[目前狀態 batch]的 Q (相當於 qval) Q1 = model(state1_batch) # 計算[新狀態 batch]的 Q ( newQ ) with torch.no_grad(): Q2 = model2(state2_batch) # 使用 model2 (Q_hat) 做預測 ######################################## # --- 計算 TD-target --- Y = reward_batch + gamma * ( (1 - done_batch) * torch.max(Q2,dim=1)[0]) X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze() # --- 計算 TD-Error --- loss = loss_fn(X,Y.detach()) if i % 100: print(i, loss.item()) clear_output(wait=True) # --- Update --- optimizer.zero_grad() loss.backward() optimizer.step() # --- 更新 model2 ( Q_hat ) 的權重 --- if ( j % sync_freq == 0 ): model2.load_state_dict(model.state_dict()) # 紀錄 loss losses.append(loss.item()) # 遊戲結束 if (abs(reward) == 10) or ( mov > max_mov ): status = 0 mov = 0 # 遞減 epsilon if(epsilon > 0.1): epsilon -= (1/epochs) ``` ::: <br> ### (3). 訓練結果 訓練出來的結果還不錯,理論上來說主要出現的雜訊會是在每次更新 [目標 Q] 時出現 所以雜訊較少。 ![](https://i.imgur.com/tsVe39y.png =600x300) 測試出來的獲勝率也提升了一些 ``` # 獲勝率約介於 91%-92% Game Played:2000, # of wins:1832 Win percentage: 0.916% ``` --- <br> ## 3. 增加目標 這裡我們嘗試設定 **[ 撞牆 ]** 也會拿到負回饋(-5)來更明確的給予動作的價值 將程式碼部分更改為以下 ```python= # 訓練迴圈外 # 定義 Action 的實際向量 move_pos = [(-1,0),(1,0),(0,-1),(0,1)] ``` ```python=+ # 訓練過程中 if(random.random() < epsilon): action_ = np.random.randint(0,4) else: action_ = np.argmax(qval_) # 如果 player 無法移動即是撞到牆 hit_wall = game.validateMove('Player', move_pos[action_]) == 1 action = action_set[action_] game.makeMove(action) # noise 改成除 100 結果較好 state2_ = game.board.render_np().reshape(1,64) + np.random.rand(1,64)/100.0 state2 = torch.from_numpy(state2_).float() # 撞到牆 reward = -5 reward = -5 if hit_wall else game.reward() done = True if reward != -1 else False ``` <br> **訓練及測試結果如下** :::info 會發現結果都很好,甚至不加目標網路也沒差。 - 不使用目標網路 ![](https://i.imgur.com/aQ4zEvq.png ) ``` Game Played:1000, # of wins:978 Win percentage: 0.978% ``` <br> - 加上目標網路 ![](https://i.imgur.com/yxLy8zO.png) ``` Game Played:1000, # of wins:975 Win percentage: 0.975% ``` :::