# Deep Q Network (DQN) [TOC] 分類 : * `Model-Free` * `Value Base` * `Off Policy` * `Discrete Action Space` * `Discrete/Continuous State Space` 可以先看過前一個 [Q Learning](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/HJdmus4-lx) 實作 ## Key Idea ![image](https://hackmd.io/_uploads/HJ8-I__Gxx.png =70%x) 知道 **Q Learning** 如何訓練後,我們可以把 **Q Table** 換成 **Neural Network**,當成 **Linear Regression** 來 **Training**,而 **Linear Regression** 會使用 **MSE(Mean Square Error)** 來計算 **Loss** 並更新 **Neural Network**,而 **Expected Reward** 算法一樣使用 **TD-Error** , **Target Function** 如下 : $$\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)$$ 計算 **MSE** 更新 **Neural Network** 的方法 : $$ e= [Q(s_t,a_t)-\max_a(r_{t+1}+\gamma Q(s_{t+1,a)}))]^2$$ $$MSE=\frac{1}{N}\sum_i^n e_i$$ ## Detail ### Experience Replay **Q Learning** 每次與環境互動一次,就單筆更新一次 Q Table,而DQN 則可以用一個 Buffer 儲存過去的資料重複利用,還可以隨機採樣進行訓練,所以參數裡會定義 `buffer_size` 決定可以儲存幾筆資料,而 `batch_size` 為每次隨機採樣的數量。 ### Target Network DQN 在訓練時會有兩個 **Neural Network**,分別為 **Evaluate** 、 **Target Network**,為了防止訓練 **Network** 時的快速波動 : * **Evaluate Network** : 負責與環境互動,並且會一直被更新、優化。 * **Target Network** : 在計算 TD-Error 時使用 Target Nertwork 來計算 $\large \max_aQ(s_{t+1},a)$ 所以 MSE 寫得清楚一點的如下 : $$\large e= \frac{1}{N}[Q^{eval}(s_t,a_t)-\max_a(r_{t+1}+\gamma Q^{target}(s_{t+1,a)}))]^2$$ > 因為我們使用的是 **TD-Error** ,期望獎勵(**Expected Reward**)有一部分是自己估計出來的,本來就會比較不穩或是不正確,而多一個 **target network** 則是在算 **TD-Error** 時,不會一直變動,而是慢慢變動,這樣同一個 **state** 和 **action** 下的 **Expected Reward** 會比較相近,用以正確的評估未來的 **Reward**。 ## CartPole Example ![1748694012588](https://hackmd.io/_uploads/Hy2utddzxe.gif) 這個 **Environment** 是一個不固定的木棍接在推車上,推車沿著無摩擦的軌道左右移動,目標是通過推車左右移動來保持平衡。 ### Action Space 這個 **Environment** 是離散動作空間(**Discrete Action Space**),與環境互動需要的動作數量只要 1 * 0 : **Move Left** * 1 : **Move Right** ### Observation Space State 會回傳四個 float,分別為推車位置(**Cart Position**)、推車速度(**Cart Velocity**)、木棍角度(**Pole Angle**)、木棍角速度(**Pole Angular Velocity**) ![image](https://hackmd.io/_uploads/HkIEtjdzxx.png) --- ### Rewards 這個環境的目標是盡可能長時間保持木棍直立,每一個 **Step** 都會給予 Reward 1,包括木棍失去平衡倒下而中止。 如果 `sutton_barto_reward=True` ,則每一個 **Step** 都會獲得 Reward 0,而中止時會獲得 -1,這個參數可以增加 **DQN** 訓練穩定性,原因如下 : 1. 他不像 **Frozen Lake** 有明顯的終點,某個狀態就代表結束 2. 如果只有 +1 **reward** 的話,**DQN** 就不會避免倒下 3. **TD-Error** $\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)$ 右半邊是代表未來 **reward** 的加總,如果沒有停止條件它就會無限的加,這樣就會沒有鑑別度 Example : 這個 **Episode** 的實際 **Total reward** 為 9(最多500),代表他中途就倒下了,但是利用 **TD-Error** 算出來的 **Expected Reward** 卻一直往上升 : ![image](https://hackmd.io/_uploads/BJLE-3OGxx.png) 使用 `sutton_barto_reward=True` 後的 TD-**Error**,**Expected Reward** : ![image](https://hackmd.io/_uploads/BJAWXnuMlx.png) 這樣就可以讓 **Agent** 明確知道後面因倒下而中止的 **Expected Reward** 要比較低,也能知道甚麼動作木棍倒下。 但我們可以紀錄"是否中止"的狀態,來修改 **TD-Error** 的算法,這樣就可以不使用`sutton_barto_reward=True` 也能訓練 , 後續會說明實作細節。 --- ### Starting State 起始的 State 會隨機將數值分配在 (-0.05,0.05) 的範圍。 --- ### Episode End Termination : * 木棍角度大於 +-$12$ * 推車超過邊緣 **Truncation** (有使用 `TimeLimit` 時) * **Episode** 的 Step 長度大於 500 (v0 為200) ## CartPole-v1 DQN **Github Code** : https://github.com/jason19990305/DQN.git ### Network ```python= class Network(nn.Module): def __init__(self, args, hidden_layers=[64, 64]): super(Network, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # Insert input and output sizes into hidden_layers hidden_layers.insert(0, self.num_states) hidden_layers.append(self.num_actions) # Create fully connected layers fc_list = [] for i in range(len(hidden_layers) - 1): num_input = hidden_layers[i] num_output = hidden_layers[i + 1] layer = nn.Linear(num_input, num_output) fc_list.append(layer) # Convert list to ModuleList for proper registration self.layers = nn.ModuleList(fc_list) self.relu = nn.ReLU() def forward(self, x): # Pass input through all layers except the last, applying ReLU activation for i in range(len(self.layers) - 1): x = self.relu(self.layers[i](x)) # The final layer outputs the Q-value directly (no activation) q_value = self.layers[-1](x) return q_value ``` * 建立一個 **Neural Network** 的工具很多,這邊選擇用 **Pytorch**,`class Network(nn.Module)` 是繼承 `torch.nn.Module` 類別,建立起一個基本的 **Neural Network** * `args` 是從主程式的 **Hyperparameter** 傳進來的, `num_states` 和 `num_actions` 為 **Environment** 的 **state** 、 **action** 大小,`num_actions` 為動作的種類,實際輸出的動作只有一個純量,有些環境要求輸出多個 action,如六軸機械手臂,要多注意。 * `hidden_layers` 為 list,主要用來定義 **Hidden layer** 的維度,也可以理解為 **Output shape**,定義這一層要輸出多大的 **Vector**,**Input/Output layer** 則由 `num_states` 和 `num_actions` 決定。 --- ```python= # Create fully connected layers fc_list = [] for i in range(len(hidden_layers) - 1): num_input = hidden_layers[i] num_output = hidden_layers[i + 1] layer = nn.Linear(num_input, num_output) fc_list.append(layer) # Convert list to ModuleList for proper registration self.layers = nn.ModuleList(fc_list) self.relu = nn.ReLU() ``` * 上面是建立 **Linear Layer** 的實體(**DNN**,全連接),並轉為 `ModuleList` 儲存。 * **Activation Function** 選擇用常見的 **ReLU** ```python= def forward(self, x): for i in range(len(self.layers) - 1): x = self.relu(self.layers[i](x)) q_value = self.layers[-1](x) return q_value ``` * Overwrite `forward()` 這個 Function,定義 DQN 的計算方式,`i` 代表的是當前層數 * `self.relu(self.layers[i](x))` 為經過全連接層後再經過 **ReLU Function** * 最後一層不能經過 **Activation Function**,我們的目標是讓 **Neural Network** 學習 **Expected Reward**,所以要讓他的 **Output Range** 不受限制。 --- ### Replay Buffer ```python= class ReplayBuffer: def __init__(self, args): self.max_length = args.buffer_size self.s = deque(maxlen = self.max_length) self.a = deque(maxlen = self.max_length) self.r = deque(maxlen = self.max_length) self.s_ = deque(maxlen = self.max_length) self.dw = deque(maxlen = self.max_length) self.done = deque(maxlen = self.max_length) self.count = 0 def store(self, s, a, r, s_, done): self.s.append(s) self.a.append(a) self.r.append(r) self.s_.append(s_) self.done.append(done) if self.count <= self.max_length: self.count += 1 def numpy_to_tensor(self): s = torch.tensor(np.array(self.s), dtype=torch.float) a = torch.tensor(np.array(self.a), dtype=torch.int64) r = torch.tensor(np.array(self.r), dtype=torch.float) s_ = torch.tensor(np.array(self.s_), dtype=torch.float) done = torch.tensor(np.array(self.done), dtype=torch.float) return s, a, r, s_, done ``` * **Replay Buffer** 是用來訓練時採樣數據用的,使用 `deque` 來實作,`deque` 可以在 Append 新資料時,把舊資料丟棄(如果超過儲存大小) * Initial 時定義最大長度,透過 `args` 由 **Hyperparameter** 傳遞 * `store()` 儲存 **state**、**action**、**reward**、**next_state**、**done**,這邊有順便儲存環境是否終止,不管是中途停止或是因長度而中止 * `numpy_to_tensor()` 將 `deque` 的資料轉換成 `tensor`,這樣才能讓 **Pytorch** 計算並訓練 ### Initialize ```python= def __init__(self , args , env , hidden_layer_list=[64,64]): # Hyperparameter self.buffer_size = args.buffer_size self.min_epsilon = args.min_epsilon self.batch_size = args.batch_size self.decay_rate = args.decay_rate self.epochs = args.epochs self.gamma = args.gamma self.tau = args.tau self.lr = args.lr # Variable self.total_steps = 0 self.training_count = 0 self.episode_count = 0 self.epsilon = 0.9 # initial epsilon # other self.env = env self.replay_buffer = ReplayBuffer(args) # The model interacts with the environment and gets updated continuously self.eval_Q_model = Network(args , hidden_layer_list.copy()) print(self.eval_Q_model) # The model be replaced regularly self.target_Q_model = Network(args , hidden_layer_list.copy()) print(self.target_Q_model) # Copy the parameters of eval_Q_model to target_Q_model self.target_Q_model.load_state_dict(self.eval_Q_model.state_dict()) self.optimizer_Q_model = torch.optim.Adam(self.eval_Q_model.parameters() , lr = self.lr , eps=1e-5) self.Loss_function = nn.SmoothL1Loss() ``` 上面的是 Class `Agent` 的初始化,各變數說明如下 : * `buffer_size` : **Replay Buffer** 最大容量 * `min_epsilon` : 最小 **epsilon** 值,**Exporation** 策略是用 **epsilon-greedy** * `decay_rate` : epsil****on 的衰減率,每個 **episode** 結束會衰減 * `gamma` : 計算 **TD-Error** 時的折扣因子 * `tau` : 用於 **soft update** * `lr` : **Learning Rate** --- * `eval_Q_model` : 實際與環境互動和被更新的 **Neural Network** * `target_Q_model` : 計算 **TD-Error** 會使用的 **Neural Network** * `optimizer_Q_model` : 使用 **Adam** 這個優化器,並指定優化 `eval_Q_model` 的參數 * `loss_function` : 定義 **Loss Function** 為 **SmoothL1Loss**,相較於 **MSE** 會比較平滑 ### Choose action ```python= def choose_action(self, state): # Epsilon-greedy action selection with torch.no_grad(): random_number = np.random.random() # Random float in [0, 1) if random_number > self.epsilon: # Exploitation: choose the action with the highest Q-value state = torch.unsqueeze(torch.tensor(state), dim=0) action = torch.argmax(self.eval_Q_model(state)).squeeze().numpy() else: # Exploration: choose a random action action = self.env.action_space.sample() return action ``` 這個 **Function** 用於 **Training** 階段選擇動作,一樣和 **Q Learning** 使用 **epsilon-greedy Exporation**,要注意需要使用 `torch.no_grad()`,不然會納入計算 gradient,導致更新參數計算錯誤,`torch.unsqueeze()`是用來增加維度,**Pytorch Network Input** 第一維是資料數量,維度變化 Ex. [4] -> [1,4],`argmax` 則是用來取得最大值的 index,Ex.`argmax([2.0 , 3.0])` -> 1 --- ### Evaluate action ```python= def evaluate_action(self, state): with torch.no_grad(): # choose the action that have max q value by current state state = torch.unsqueeze(torch.tensor(state), dim=0) action = torch.argmax(self.eval_Q_model(state)).squeeze().numpy() return action ``` 用於在 **Training** 後的 **Evaluate** 階段選擇 **Action**,會選擇最大 **Q Value** 的動作 --- ### Epsilon decay ```python= def epsilon_decay(self, epoch): self.epsilon = self.epsilon * self.decay_rate self.epsilon = max(self.epsilon, self.min_epsilon) ``` 每局 **Epsoide** 都會執行 `epsilon_decay()`,`max()` 則是為了保持 `epsilon` 不低於 `min_epsilon` --- ### Train ```python= def train(self): episode_reward_list = [] episode_count_list = [] episode_count = 0 # Training loop for epoch in range(self.epochs): # reset environment state, info = self.env.reset() done = False while not done: # Choose action based on epsilon-greedy policy action = self.choose_action(state) # interact with environment next_state , reward , terminated, truncated, _ = self.env.step(action) done = terminated or truncated self.replay_buffer.store(state, action, [reward], next_state, [done]) state = next_state # Update Q-table if self.replay_buffer.count > self.batch_size: self.update() # Decay epsilon self.epsilon_decay(epoch) if epoch % 100 == 0: evaluate_reward = self.evaluate(self.env) print("Epoch : %d / %d\t Reward : %0.2f"%(epoch,self.epochs , evaluate_reward)) episode_reward_list.append(evaluate_reward) episode_count_list.append(episode_count) episode_count += 1 # Plot the training curve plt.plot(episode_count_list, episode_reward_list) plt.xlabel("Episode") plt.ylabel("Reward") plt.title("Training Curve") plt.show() ``` 主要的 **Rollout** 流程,`epochs` 為 Training Episode 數 * `replay_buffer.store()` : 儲存與環境互動的資料 ```python if self.replay_buffer.count > self.batch_size: self.update() ``` 這邊是檢查 **Buffer** 大小超過 **batch size** 才可以訓練,因為 **batch size** 是訓練時要從 **Buffer Sample** 出的大小,所以要確保 **Buffer** 夠多資料。 ### Update ```python= def update(self): #print("------------------") s, a, r, s_, done = self.replay_buffer.numpy_to_tensor() # Get training data .type is tensor index = np.random.choice(len(r), self.batch_size, replace=False) minibatch_s = s[index] minibatch_a = a[index] minibatch_r = r[index] minibatch_s_ = s_[index] minibatch_done = done[index] minibatch_a = minibatch_a.view(-1,1) # Use target network to calculate the TD-error with torch.no_grad(): next_value = self.target_Q_model(minibatch_s_).max(dim=1, keepdim=True).values target_value = minibatch_r + self.gamma * next_value * (1 - minibatch_done) # MSE current_value = self.eval_Q_model(minibatch_s).gather(dim=1, index=minibatch_a) loss = self.Loss_function(current_value, target_value) self.optimizer_Q_model.zero_grad() loss.backward() self.optimizer_Q_model.step() self.soft_update(self.target_Q_model, self.eval_Q_model, self.tau) ``` 這個 **Function** 是更新 **Neural Network** ```python= with torch.no_grad(): next_value = self.target_Q_model(minibatch_s_).max(dim=1, keepdim=True).values target_value = minibatch_r + self.gamma * next_value * (1 - minibatch_done) ``` 計算 `target_value`,這邊多使用 `done` 來代表下個 **State** 為終止,如果為 True 就不能加右半邊(未來的 **Reward**) : $$\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)(1-done)$$ --- ### Soft update ```python= def soft_update(self, target, eval, tau): for target_param, eval_param in zip(target.parameters(), eval.parameters()): target_param.data.copy_(target_param.data * (1.0 - tau) + eval_param.data * tau) ``` 在訓練 DQN 時會有兩個 **Neural Network**,**Target Network** 為了穩定會更新的比較慢,更新的方式有分 **Hard update**、**Soft update**,這邊用 **Soft update**,每次更新 **Network** 之後都會緩慢地將 **target_model** 的參數以 `tau` 的比例接近 **eval_model**。 $$\large target = target(1-\tau) + (eval)\tau$$ ### main ```python= class main(): def __init__(self , args): args.num_states = 4 # position , velocity , pole angle , pole angular velocity args.num_actions = 2 # left or right # Pring hyperparameters print("---------------") for arg in vars(args): print(arg,"=",getattr(args, arg)) print("---------------") # create FrozenLake environment env = gym.make('CartPole-v1')#sutton_barto_reward=True self.agent = Agent(args, env , [128,128]) # hidden layer size self.agent.train() render_env = gym.make('CartPole-v1', render_mode="human") for i in range(1000): self.agent.evaluate(render_env) ``` 主程式 `Agent(args, env , [128,128])` 參數加入 `args` **Hyperparameter**,`[128,128]` 為每個 **hidden layer** 的 **Output** 大小 ## Result 使用的參數 : ``` buffer_size = 10000 min_epsilon = 0.05 batch_size = 512 decay_rate = 0.995 epochs = 200 gamma = 0.99 tau = 0.005 lr = 0.0001 num_states = 4 num_actions = 2 --------------- Network( (layers): ModuleList( (0): Linear(in_features=4, out_features=128, bias=True) (1): Linear(in_features=128, out_features=128, bias=True) (2): Linear(in_features=128, out_features=2, bias=True) ) (relu): ReLU() ) ``` 結果 ![image](https://hackmd.io/_uploads/SkLYM_Ofgx.png) ![1748694012588](https://hackmd.io/_uploads/Hy2utddzxe.gif) ## Conclusion * **DQN** 將 **Q Table** 使用 **Neural Network** 來近似 **TD-Error**,為了給予 **Pytorch** 進行訓練,所以要計算誤差,**Pytorch** 就會對 **Neural Network** 參數進行優化,使誤差變小。 * 基本的 **DQN** 只能用在離散的環境,如果要使用在 **Continuous Action Space** 的話就要用 **DDPG**、**TD3**、**SAC** 等 * **DQN** 容易受 **Noise** 影響,會有過度估計的問題,**Double DQN** 有針對這個問題改進。 ## Double DQN **DQN** **Target Value** 計算方式 : $$\large Q^{eval}(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q^{target}(s_{t+1},a)$$ **DQN** 會有過度估計(**Overestimate**)的問題,因為他總是選擇 **Value** 最高的動作,**DQN** 是一個 **Neural Network**,一定會有誤差,只要他預測的 $Q(s_t,a_t)$ 不小心太高了,就會被拿去訓練,並且因為 **TD-Error** 的關係,會一直往下傳遞,使用 **Double DQN** 就能夠改善這個問題,改的方式也很簡單,只要修改 **Target Value** 的計算方式就好 : $$\large Q^{eval}(s_t,a_t)=r_{t+1}+\gamma Q^{target}(s_{t+1}, arg\max_{a}Q^{eval}(s_{t+1},a))$$ 唯一修改的地方就是 **Target Q** 選擇 **Action** 的策略,變成由 **Evaluate Q** 來選擇最大 **Value** 的 **Action**,實際的 **Value** 還是從 **Target Q** 來的。 ![image](https://hackmd.io/_uploads/S1XsYrPtxl.png)