# Deep Q Network (DQN)
[TOC]
分類 :
* `Model-Free`
* `Value Base`
* `Off Policy`
* `Discrete Action Space`
* `Discrete/Continuous State Space`
可以先看過前一個 [Q Learning](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/HJdmus4-lx) 實作
## Key Idea

知道 **Q Learning** 如何訓練後,我們可以把 **Q Table** 換成 **Neural Network**,當成 **Linear Regression** 來 **Training**,而 **Linear Regression** 會使用 **MSE(Mean Square Error)** 來計算 **Loss** 並更新 **Neural Network**,而 **Expected Reward** 算法一樣使用 **TD-Error** , **Target Function** 如下 :
$$\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)$$
計算 **MSE** 更新 **Neural Network** 的方法 :
$$ e= [Q(s_t,a_t)-\max_a(r_{t+1}+\gamma Q(s_{t+1,a)}))]^2$$
$$MSE=\frac{1}{N}\sum_i^n e_i$$
## Detail
### Experience Replay
**Q Learning** 每次與環境互動一次,就單筆更新一次 Q Table,而DQN 則可以用一個 Buffer 儲存過去的資料重複利用,還可以隨機採樣進行訓練,所以參數裡會定義 `buffer_size` 決定可以儲存幾筆資料,而 `batch_size` 為每次隨機採樣的數量。
### Target Network
DQN 在訓練時會有兩個 **Neural Network**,分別為 **Evaluate** 、 **Target Network**,為了防止訓練 **Network** 時的快速波動 :
* **Evaluate Network** : 負責與環境互動,並且會一直被更新、優化。
* **Target Network** : 在計算 TD-Error 時使用 Target Nertwork 來計算 $\large \max_aQ(s_{t+1},a)$
所以 MSE 寫得清楚一點的如下 :
$$\large e= \frac{1}{N}[Q^{eval}(s_t,a_t)-\max_a(r_{t+1}+\gamma Q^{target}(s_{t+1,a)}))]^2$$
> 因為我們使用的是 **TD-Error** ,期望獎勵(**Expected Reward**)有一部分是自己估計出來的,本來就會比較不穩或是不正確,而多一個 **target network** 則是在算 **TD-Error** 時,不會一直變動,而是慢慢變動,這樣同一個 **state** 和 **action** 下的 **Expected Reward** 會比較相近,用以正確的評估未來的 **Reward**。
## CartPole Example

這個 **Environment** 是一個不固定的木棍接在推車上,推車沿著無摩擦的軌道左右移動,目標是通過推車左右移動來保持平衡。
### Action Space
這個 **Environment** 是離散動作空間(**Discrete Action Space**),與環境互動需要的動作數量只要 1
* 0 : **Move Left**
* 1 : **Move Right**
### Observation Space
State 會回傳四個 float,分別為推車位置(**Cart Position**)、推車速度(**Cart Velocity**)、木棍角度(**Pole Angle**)、木棍角速度(**Pole Angular Velocity**)

---
### Rewards
這個環境的目標是盡可能長時間保持木棍直立,每一個 **Step** 都會給予 Reward 1,包括木棍失去平衡倒下而中止。
如果 `sutton_barto_reward=True` ,則每一個 **Step** 都會獲得 Reward 0,而中止時會獲得 -1,這個參數可以增加 **DQN** 訓練穩定性,原因如下 :
1. 他不像 **Frozen Lake** 有明顯的終點,某個狀態就代表結束
2. 如果只有 +1 **reward** 的話,**DQN** 就不會避免倒下
3. **TD-Error** $\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)$ 右半邊是代表未來 **reward** 的加總,如果沒有停止條件它就會無限的加,這樣就會沒有鑑別度
Example :
這個 **Episode** 的實際 **Total reward** 為 9(最多500),代表他中途就倒下了,但是利用 **TD-Error** 算出來的 **Expected Reward** 卻一直往上升 :

使用 `sutton_barto_reward=True` 後的 TD-**Error**,**Expected Reward** :

這樣就可以讓 **Agent** 明確知道後面因倒下而中止的 **Expected Reward** 要比較低,也能知道甚麼動作木棍倒下。
但我們可以紀錄"是否中止"的狀態,來修改 **TD-Error** 的算法,這樣就可以不使用`sutton_barto_reward=True` 也能訓練 , 後續會說明實作細節。
---
### Starting State
起始的 State 會隨機將數值分配在 (-0.05,0.05) 的範圍。
---
### Episode End
Termination :
* 木棍角度大於 +-$12$
* 推車超過邊緣
**Truncation** (有使用 `TimeLimit` 時)
* **Episode** 的 Step 長度大於 500 (v0 為200)
## CartPole-v1 DQN
**Github Code** : https://github.com/jason19990305/DQN.git
### Network
```python=
class Network(nn.Module):
def __init__(self, args, hidden_layers=[64, 64]):
super(Network, self).__init__()
self.num_states = args.num_states
self.num_actions = args.num_actions
# Insert input and output sizes into hidden_layers
hidden_layers.insert(0, self.num_states)
hidden_layers.append(self.num_actions)
# Create fully connected layers
fc_list = []
for i in range(len(hidden_layers) - 1):
num_input = hidden_layers[i]
num_output = hidden_layers[i + 1]
layer = nn.Linear(num_input, num_output)
fc_list.append(layer)
# Convert list to ModuleList for proper registration
self.layers = nn.ModuleList(fc_list)
self.relu = nn.ReLU()
def forward(self, x):
# Pass input through all layers except the last, applying ReLU activation
for i in range(len(self.layers) - 1):
x = self.relu(self.layers[i](x))
# The final layer outputs the Q-value directly (no activation)
q_value = self.layers[-1](x)
return q_value
```
* 建立一個 **Neural Network** 的工具很多,這邊選擇用 **Pytorch**,`class Network(nn.Module)` 是繼承 `torch.nn.Module` 類別,建立起一個基本的 **Neural Network**
* `args` 是從主程式的 **Hyperparameter** 傳進來的, `num_states` 和 `num_actions` 為 **Environment** 的 **state** 、 **action** 大小,`num_actions` 為動作的種類,實際輸出的動作只有一個純量,有些環境要求輸出多個 action,如六軸機械手臂,要多注意。
* `hidden_layers` 為 list,主要用來定義 **Hidden layer** 的維度,也可以理解為 **Output shape**,定義這一層要輸出多大的 **Vector**,**Input/Output layer** 則由 `num_states` 和 `num_actions` 決定。
---
```python=
# Create fully connected layers
fc_list = []
for i in range(len(hidden_layers) - 1):
num_input = hidden_layers[i]
num_output = hidden_layers[i + 1]
layer = nn.Linear(num_input, num_output)
fc_list.append(layer)
# Convert list to ModuleList for proper registration
self.layers = nn.ModuleList(fc_list)
self.relu = nn.ReLU()
```
* 上面是建立 **Linear Layer** 的實體(**DNN**,全連接),並轉為 `ModuleList` 儲存。
* **Activation Function** 選擇用常見的 **ReLU**
```python=
def forward(self, x):
for i in range(len(self.layers) - 1):
x = self.relu(self.layers[i](x))
q_value = self.layers[-1](x)
return q_value
```
* Overwrite `forward()` 這個 Function,定義 DQN 的計算方式,`i` 代表的是當前層數
* `self.relu(self.layers[i](x))` 為經過全連接層後再經過 **ReLU Function**
* 最後一層不能經過 **Activation Function**,我們的目標是讓 **Neural Network** 學習 **Expected Reward**,所以要讓他的 **Output Range** 不受限制。
---
### Replay Buffer
```python=
class ReplayBuffer:
def __init__(self, args):
self.max_length = args.buffer_size
self.s = deque(maxlen = self.max_length)
self.a = deque(maxlen = self.max_length)
self.r = deque(maxlen = self.max_length)
self.s_ = deque(maxlen = self.max_length)
self.dw = deque(maxlen = self.max_length)
self.done = deque(maxlen = self.max_length)
self.count = 0
def store(self, s, a, r, s_, done):
self.s.append(s)
self.a.append(a)
self.r.append(r)
self.s_.append(s_)
self.done.append(done)
if self.count <= self.max_length:
self.count += 1
def numpy_to_tensor(self):
s = torch.tensor(np.array(self.s), dtype=torch.float)
a = torch.tensor(np.array(self.a), dtype=torch.int64)
r = torch.tensor(np.array(self.r), dtype=torch.float)
s_ = torch.tensor(np.array(self.s_), dtype=torch.float)
done = torch.tensor(np.array(self.done), dtype=torch.float)
return s, a, r, s_, done
```
* **Replay Buffer** 是用來訓練時採樣數據用的,使用 `deque` 來實作,`deque` 可以在 Append 新資料時,把舊資料丟棄(如果超過儲存大小)
* Initial 時定義最大長度,透過 `args` 由 **Hyperparameter** 傳遞
* `store()` 儲存 **state**、**action**、**reward**、**next_state**、**done**,這邊有順便儲存環境是否終止,不管是中途停止或是因長度而中止
* `numpy_to_tensor()` 將 `deque` 的資料轉換成 `tensor`,這樣才能讓 **Pytorch** 計算並訓練
### Initialize
```python=
def __init__(self , args , env , hidden_layer_list=[64,64]):
# Hyperparameter
self.buffer_size = args.buffer_size
self.min_epsilon = args.min_epsilon
self.batch_size = args.batch_size
self.decay_rate = args.decay_rate
self.epochs = args.epochs
self.gamma = args.gamma
self.tau = args.tau
self.lr = args.lr
# Variable
self.total_steps = 0
self.training_count = 0
self.episode_count = 0
self.epsilon = 0.9 # initial epsilon
# other
self.env = env
self.replay_buffer = ReplayBuffer(args)
# The model interacts with the environment and gets updated continuously
self.eval_Q_model = Network(args , hidden_layer_list.copy())
print(self.eval_Q_model)
# The model be replaced regularly
self.target_Q_model = Network(args , hidden_layer_list.copy())
print(self.target_Q_model)
# Copy the parameters of eval_Q_model to target_Q_model
self.target_Q_model.load_state_dict(self.eval_Q_model.state_dict())
self.optimizer_Q_model = torch.optim.Adam(self.eval_Q_model.parameters() , lr = self.lr , eps=1e-5)
self.Loss_function = nn.SmoothL1Loss()
```
上面的是 Class `Agent` 的初始化,各變數說明如下 :
* `buffer_size` : **Replay Buffer** 最大容量
* `min_epsilon` : 最小 **epsilon** 值,**Exporation** 策略是用 **epsilon-greedy**
* `decay_rate` : epsil****on 的衰減率,每個 **episode** 結束會衰減
* `gamma` : 計算 **TD-Error** 時的折扣因子
* `tau` : 用於 **soft update**
* `lr` : **Learning Rate**
---
* `eval_Q_model` : 實際與環境互動和被更新的 **Neural Network**
* `target_Q_model` : 計算 **TD-Error** 會使用的 **Neural Network**
* `optimizer_Q_model` : 使用 **Adam** 這個優化器,並指定優化 `eval_Q_model` 的參數
* `loss_function` : 定義 **Loss Function** 為 **SmoothL1Loss**,相較於 **MSE** 會比較平滑
### Choose action
```python=
def choose_action(self, state):
# Epsilon-greedy action selection
with torch.no_grad():
random_number = np.random.random() # Random float in [0, 1)
if random_number > self.epsilon:
# Exploitation: choose the action with the highest Q-value
state = torch.unsqueeze(torch.tensor(state), dim=0)
action = torch.argmax(self.eval_Q_model(state)).squeeze().numpy()
else:
# Exploration: choose a random action
action = self.env.action_space.sample()
return action
```
這個 **Function** 用於 **Training** 階段選擇動作,一樣和 **Q Learning** 使用 **epsilon-greedy Exporation**,要注意需要使用 `torch.no_grad()`,不然會納入計算 gradient,導致更新參數計算錯誤,`torch.unsqueeze()`是用來增加維度,**Pytorch Network Input** 第一維是資料數量,維度變化 Ex. [4] -> [1,4],`argmax` 則是用來取得最大值的 index,Ex.`argmax([2.0 , 3.0])` -> 1
---
### Evaluate action
```python=
def evaluate_action(self, state):
with torch.no_grad():
# choose the action that have max q value by current state
state = torch.unsqueeze(torch.tensor(state), dim=0)
action = torch.argmax(self.eval_Q_model(state)).squeeze().numpy()
return action
```
用於在 **Training** 後的 **Evaluate** 階段選擇 **Action**,會選擇最大 **Q Value** 的動作
---
### Epsilon decay
```python=
def epsilon_decay(self, epoch):
self.epsilon = self.epsilon * self.decay_rate
self.epsilon = max(self.epsilon, self.min_epsilon)
```
每局 **Epsoide** 都會執行 `epsilon_decay()`,`max()` 則是為了保持 `epsilon` 不低於 `min_epsilon`
---
### Train
```python=
def train(self):
episode_reward_list = []
episode_count_list = []
episode_count = 0
# Training loop
for epoch in range(self.epochs):
# reset environment
state, info = self.env.reset()
done = False
while not done:
# Choose action based on epsilon-greedy policy
action = self.choose_action(state)
# interact with environment
next_state , reward , terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
self.replay_buffer.store(state, action, [reward], next_state, [done])
state = next_state
# Update Q-table
if self.replay_buffer.count > self.batch_size:
self.update()
# Decay epsilon
self.epsilon_decay(epoch)
if epoch % 100 == 0:
evaluate_reward = self.evaluate(self.env)
print("Epoch : %d / %d\t Reward : %0.2f"%(epoch,self.epochs , evaluate_reward))
episode_reward_list.append(evaluate_reward)
episode_count_list.append(episode_count)
episode_count += 1
# Plot the training curve
plt.plot(episode_count_list, episode_reward_list)
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Training Curve")
plt.show()
```
主要的 **Rollout** 流程,`epochs` 為 Training Episode 數
* `replay_buffer.store()` : 儲存與環境互動的資料
```python
if self.replay_buffer.count > self.batch_size:
self.update()
```
這邊是檢查 **Buffer** 大小超過 **batch size** 才可以訓練,因為 **batch size** 是訓練時要從 **Buffer Sample** 出的大小,所以要確保 **Buffer** 夠多資料。
### Update
```python=
def update(self):
#print("------------------")
s, a, r, s_, done = self.replay_buffer.numpy_to_tensor() # Get training data .type is tensor
index = np.random.choice(len(r), self.batch_size, replace=False)
minibatch_s = s[index]
minibatch_a = a[index]
minibatch_r = r[index]
minibatch_s_ = s_[index]
minibatch_done = done[index]
minibatch_a = minibatch_a.view(-1,1)
# Use target network to calculate the TD-error
with torch.no_grad():
next_value = self.target_Q_model(minibatch_s_).max(dim=1, keepdim=True).values
target_value = minibatch_r + self.gamma * next_value * (1 - minibatch_done)
# MSE
current_value = self.eval_Q_model(minibatch_s).gather(dim=1, index=minibatch_a)
loss = self.Loss_function(current_value, target_value)
self.optimizer_Q_model.zero_grad()
loss.backward()
self.optimizer_Q_model.step()
self.soft_update(self.target_Q_model, self.eval_Q_model, self.tau)
```
這個 **Function** 是更新 **Neural Network**
```python=
with torch.no_grad():
next_value = self.target_Q_model(minibatch_s_).max(dim=1, keepdim=True).values
target_value = minibatch_r + self.gamma * next_value * (1 - minibatch_done)
```
計算 `target_value`,這邊多使用 `done` 來代表下個 **State** 為終止,如果為 True 就不能加右半邊(未來的 **Reward**) :
$$\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)(1-done)$$
---
### Soft update
```python=
def soft_update(self, target, eval, tau):
for target_param, eval_param in zip(target.parameters(), eval.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + eval_param.data * tau)
```
在訓練 DQN 時會有兩個 **Neural Network**,**Target Network** 為了穩定會更新的比較慢,更新的方式有分 **Hard update**、**Soft update**,這邊用 **Soft update**,每次更新 **Network** 之後都會緩慢地將 **target_model** 的參數以 `tau` 的比例接近 **eval_model**。
$$\large target = target(1-\tau) + (eval)\tau$$
### main
```python=
class main():
def __init__(self , args):
args.num_states = 4 # position , velocity , pole angle , pole angular velocity
args.num_actions = 2 # left or right
# Pring hyperparameters
print("---------------")
for arg in vars(args):
print(arg,"=",getattr(args, arg))
print("---------------")
# create FrozenLake environment
env = gym.make('CartPole-v1')#sutton_barto_reward=True
self.agent = Agent(args, env , [128,128]) # hidden layer size
self.agent.train()
render_env = gym.make('CartPole-v1', render_mode="human")
for i in range(1000):
self.agent.evaluate(render_env)
```
主程式 `Agent(args, env , [128,128])` 參數加入 `args` **Hyperparameter**,`[128,128]` 為每個 **hidden layer** 的 **Output** 大小
## Result
使用的參數 :
```
buffer_size = 10000
min_epsilon = 0.05
batch_size = 512
decay_rate = 0.995
epochs = 200
gamma = 0.99
tau = 0.005
lr = 0.0001
num_states = 4
num_actions = 2
---------------
Network(
(layers): ModuleList(
(0): Linear(in_features=4, out_features=128, bias=True)
(1): Linear(in_features=128, out_features=128, bias=True)
(2): Linear(in_features=128, out_features=2, bias=True)
)
(relu): ReLU()
)
```
結果


## Conclusion
* **DQN** 將 **Q Table** 使用 **Neural Network** 來近似 **TD-Error**,為了給予 **Pytorch** 進行訓練,所以要計算誤差,**Pytorch** 就會對 **Neural Network** 參數進行優化,使誤差變小。
* 基本的 **DQN** 只能用在離散的環境,如果要使用在 **Continuous Action Space** 的話就要用 **DDPG**、**TD3**、**SAC** 等
* **DQN** 容易受 **Noise** 影響,會有過度估計的問題,**Double DQN** 有針對這個問題改進。
## Double DQN
**DQN** **Target Value** 計算方式 :
$$\large Q^{eval}(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q^{target}(s_{t+1},a)$$
**DQN** 會有過度估計(**Overestimate**)的問題,因為他總是選擇 **Value** 最高的動作,**DQN** 是一個 **Neural Network**,一定會有誤差,只要他預測的 $Q(s_t,a_t)$ 不小心太高了,就會被拿去訓練,並且因為 **TD-Error** 的關係,會一直往下傳遞,使用 **Double DQN** 就能夠改善這個問題,改的方式也很簡單,只要修改 **Target Value** 的計算方式就好 :
$$\large Q^{eval}(s_t,a_t)=r_{t+1}+\gamma Q^{target}(s_{t+1}, arg\max_{a}Q^{eval}(s_{t+1},a))$$
唯一修改的地方就是 **Target Q** 選擇 **Action** 的策略,變成由 **Evaluate Q** 來選擇最大 **Value** 的 **Action**,實際的 **Value** 還是從 **Target Q** 來的。
