# Deep Deterministic Policy Gradient
###### tags: `Reinforcement Learning` `OpenAI` `DDPG` `Deep Reinforcement Learning`
Google Deepmind 原始論文 : https://arxiv.org/pdf/1509.02971.pdf
分類 :
* `Model-Free`
* `Value Base`
* `Off Policy`
* `Continuous Action Space`
* `Continuous State Space`
可以先參考 [DQN](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/rkWNLbwMxg) 的內容
## Key Idea
**DDPG** 結合 **Actor-Critic** 和 **DQN**,用於解決連續動作空間(**Continuous Action Space**),**Forward** 的部分 **Actor** 將直接輸出動作,但大部分還是會將輸出限制在 [-1 , 1] 之間,再根據環境的每個動作的最大最小值來決定動作。
> ex. **Actor** 輸出 0.4,而環境是要控制機械手臂,動作空間為 [-3.14 , 3.14],這時候就可以 $-3.14 + (0.4*6.28)=-0.62$,環境就會按照 **Actor** 給的輸出將馬達轉到 -0.62 Radian
而 **Critic** 的部分輸出單一數值,也就是 **Expected Reward**,**DDPG** 不像 **DQN** 是一個 **Action-Value Function**,而是單純的 **Value Function**,但他的輸入除了 **State** 還多了 **Action**,目的就是用來評估 **Actor** 在 $s_t$ 的情況下做出 $a_t$ 的 **Expected Reward**,從而更新參數。
## Background

在 **DQN** 我們會訓練 **Neural Network** 給我們 **Expected Reward**,**Input** 為 **State** $s_t$,並輸出每種動作的 **Expected Reward**, **Target Function** 和 **Loss Function** 如下 :
**TD-Error** :
$$\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)$$
**Loss Function** :
$$e= [Q(s_t,a_t)-\max_a(r_{t+1}+\gamma Q(s_{t+1,a)}))]^2$$
$$MSE=\frac{1}{N}\sum_i^n e_i$$
可以想像如果要將 **DQN** 用在連續動作空間,如兩軸馬達,他可以接收 -3.14 ~ 3.14 之間的 float 來控制馬達角度,**DQN** 要達成的話可以增加 **Neural Network** 的 **Output** 數量,切割成非常多種動作,讓 **Neural Network** 選擇 **Expected Reward** 最高的,但這種方法是治標不治本,且犧牲效能與精度,所以才有了 **DDPG**。
---
## Architecture
在 **DDPG** 算法中,會使用 **Actor-Critic** 的架構

**Actor** 的部分 **Input** 為 $s_t$ ,**Output** 為 $a_t$,並且需要與環境互動,**DDPG** 被設計來處理連續動作空間(**Continuous Action Space**) 的環境,所以 $a_t$ 是連續的,他的數值可能性是無限多的,不像 **DQN** 是固定數量的。
**Critic** 的部分 **Input** 為 $s_t$、$a_t$,**Output** 為 **Expected Reward** $Q(s_t,a_t)$,也可以稱為 **Value Function**,他不像 **DQN** 會針對每種 **Action** 輸出一個 **Value**,他是透過 **Input** $a_t$ 來決定 **Value**,所以他可以說是在評估 **Actor** 的好壞,所以這樣就能夠預測無限多種可能的 **Action** $a_t$ 和 $s_t$ 狀態下的 **Expected Reward**。
這樣就能跳脫 **DQN** 只能處理離散動作空間的問題。
## Training
**DDPG** 與 **DQN** 相似,有兩份 **Neural Network(Evaluate、Target)**,分為 **Evalute**、**Target** **Network**,為了防止訓練 **Network** 快速變動,**Evaluate** 變動較大,**Target** 變動較小,希望能夠讓 **Agent** 不要因為被雜訊干擾,訓練也能更穩定,**DDPG** 使用 **soft update** 來更新 **Target Network**。
### Update Actor
訓練 **Actor** 的方式非常簡單 , 我們會使用 **Critic** 的輸出來評估 **Actor** 的動作好不好 , **Actor** 的 **Loss Function** :
$$Loss = -\frac{1}{N}\sum_i^NQ^{eval}(s_i,\mu^{eval}(s_i))$$
* $Q^{eval}$ : **Evaluate** 的 **Value Function**
* $\mu^{eval}$ : **Evaluate** 的 **Actor**
**DDPG** 一樣會有 **Exporation** 策略,會在 **Action** 加入雜訊,我們在更新時會讓 **Evaluate** 的 **Actor** 重新給我們一個 **Action** ,讓 **Actor** 參與這個 **Forward** 的運算,這樣才能更新 **Actor** 的參數。
**Actor** 的更新比較簡單,只使用到 **Evaluate** 的 **Actor** 和 **Critic**
### Update Critic
**Critic** 跟 **DQN** 一樣 ,可以使用 **TD Error** 來更新,也都會有 **Target Network** 參與,**Target Function**為 :
$$\Large Q^{eval}(s_t,a_t)=r+\gamma Q^{target}(s_{t+1},\mu^{target}(s_{t+1}))$$
* $r$ : **Reward**,從 **Replay Buffer** 取出
* $s_{t+1}$ : **Next State** , 從 **Replay Buffer** 取出
* $s_t$ : **Current State** , 從 **Replay Buffer** 取出
* $a_t$ : **Action** , 從 **Replay Buffer** 取出,**Actor** 與環境互動的 **Action** , 可能會帶有雜訊(為了 **Exporation**)
* $\mu^{target}(s_{t+1})$ : **Next State** 為 **Input**,**Target Actor** 的 **Output Action**
可以看到計算 **TD-Error** 的右半部分是用 **Target Network** (**Actor** 和 **Critic**) 和 **Next State** 計算出 **Target Value** 用來更新 **Evalute** 的 **Critic**。
而 **Loss Function** 跟 **DQN** 一樣使用 **MSE** 就可以了
$$\Large Loss =\frac{1}{N}(Q^{eval}(s_t,a_t) - [r+\gamma Q^{target}(s_{t+1},\mu^{target}(s_{t+1}))])^2$$
## Soft Update
在 **DDPG** 中,為了穩定訊過程,我們並不會直接將 **Evaluate Network** 的參數整個複製到 **Target Network** (**Hard Update**),而是將 **Evaluate Network** 的參數以一個很小的比例融入到 **Target Network** 中。
$\theta^{\mu'} =\tau\theta^\mu+(1-\tau)\theta^{\mu'}$
$\theta^{Q'} =\tau\theta^Q+(1-\tau)\theta^{Q'}$
* $\tau$(tau) : 一個非常小的正數,通常為 $0.001$ ~ $0.01$
* 當 $\tau=1$ 時,就等同於 **Hard Update** (直接複製)
*
隨著訓練次數增加,**Target Network** 將逐漸像 **Evaluate Network** 靠攏
---
## Pendulum Example
這個環境屬於控制理論中的經典倒立擺擺動問題,擺錘的初始位置隨機,目標是透過對擺錘施加扭矩,使其擺動到直立的位置。


黑色的部分是固定點,紅色(擺錘)則是根據固定點為旋轉的中心
### Action Space
需要給與環境互動的 Action 為 -2.0 ~ 2.0 之間的數值,也就是連續動作空間(Continuous Action Space),對於 DQN 來說可以說是有無限多的動作可以選擇,但對於 DDPG 這種直接輸出數值的架構沒問題。

### Observation Space
1. $x=cos(\theta)$
2. $y=sin(\theta)$
3. 角速度 $\dot\theta$ -8.0 ~ 8.0

### Rewards
每個 Step 的 Reward 計算公式為 :
$r=-(\theta^2+0.1 \dot\theta+0.001\tau^2)$
* $\theta$ : 擺錘角度,以$[-\pi,\pi]$ 正規畫,0 代表擺錘為直立狀態
* $\dot\theta$ : 角速度
* $\tau$ : 所施加的扭矩
* 最大值 : 0 (直立、無速度、無扭矩)
* 最小值 : $-(\pi^2+0.1*8^2+0.001*2^2)\approx-16.2736$
### Starting State
Reset 時,擺錘的狀態隨機設定為 :
* 角度 $\theta \in[-\pi,\pi]$
* 角速度 $\dot\theta\in[-1,1]$
### Episode End
每個 Episode 最多執行 200 Step,達到後被中斷,無其他終止條件
## Pendulum DDPG
Github Code : https://github.com/jason19990305/DDPG
### Actor-Critic
#### Actor
```python=
class Actor(nn.Module):
def __init__(self,args,hidden_layers=[64,64]):
super(Actor, self).__init__()
self.num_states = args.num_states
self.num_actions = args.num_actions
# add in list
hidden_layers.insert(0,self.num_states) # first layer
hidden_layers.append(self.num_actions) # last layer
print(hidden_layers)
# create layers
layer_list = []
for i in range(len(hidden_layers)-1):
input_num = hidden_layers[i]
output_num = hidden_layers[i+1]
layer = nn.Linear(input_num,output_num)
layer_list.append(layer)
orthogonal_init(layer_list[-1])
orthogonal_init(layer_list[-1], gain=0.01)
# put in ModuleList
self.layers = nn.ModuleList(layer_list)
self.tanh = nn.Tanh()
# when actor(s) will activate the function
def forward(self,s):
for layer in self.layers:
s = self.tanh(layer(s))
return s * self.action_max
```
* `args` 是從主程式的 **Hyperparameter** 傳進來的, `num_states` 和 `num_actions` 為 **Environment** 的 **state** 、 **action** 大小,`num_actions` 為動作的種類,實際輸出的動作只有一個 float,有些環境要求輸出多個 **action**,如六軸機械手臂,要多注意。
* `hidden_layers` 為 list,主要用來定義 **Hidden layer** 的維度,也可以理解為 **Output shape**,定義這一層要輸出多大的 **Vector**,**Input/Output layer** 則由 `num_states` 和 `num_actions` 決定。
以 `Pendulum` 這個環境來說的話 `num_states` 就是 3,分別為 **x**、**y**、**Angular velocity**,`num_action` 為 1,也就是要施加的 **Torque**。
`forward` 的部分有將 Action 乘上 Max Action,也就是 Env 要的 Action 的最大值,以 **Pendulum** 來說的話就會從 -1~1 變成 -2~2。
>這是偷懶的寫法,要注意 Environment 的需求
---
```python=
layer_list = []
for i in range(len(hidden_layers)-1):
input_num = hidden_layers[i]
output_num = hidden_layers[i+1]
layer = nn.Linear(input_num,output_num)
layer_list.append(layer)
orthogonal_init(layer_list[-1])
orthogonal_init(layer_list[-1], gain=0.01)
# put in ModuleList
self.layers = nn.ModuleList(layer_list)
self.tanh = nn.Tanh()
```
上面是建立全連接層並將它儲存在 `ModuleList`,因為適用在 **Continuous Action Space** ,所以將輸出限制在 -1~1 之間,**Activation Function** 用 `Tanh` 可以達到這個需求。
---
#### Critic
```python=
class Critic(nn.Module):
def __init__(self, args,hidden_layers=[64,64]):
super(Critic, self).__init__()
self.num_states = args.num_states
self.num_actions = args.num_actions
# add in list
hidden_layers.insert(0,self.num_states+self.num_actions)
hidden_layers.append(1)
print(hidden_layers)
# create layers
layer_list = []
for i in range(len(hidden_layers)-1):
input_num = hidden_layers[i]
output_num = hidden_layers[i+1]
layer = nn.Linear(input_num,output_num)
layer_list.append(layer)
orthogonal_init(layer_list[-1])
# put in ModuleList
self.layers = nn.ModuleList(layer_list)
self.tanh = nn.Tanh()
def forward(self,s,a):
input_data = torch.cat((s,a),dim=1)
for i in range(len(self.layers)-1):
input_data = self.tanh(self.layers[i](input_data))
# predicet value
v_s = self.layers[-1](input_data)
return v_s
```
**Critic** 要注意的是 **Input** 是 **State** 的大小和 **Action** 的大小,**DDPG** 的架構就是這樣設計的,輸出的話是固定為 1,也就是用 $s_t$ 和 $a_t$ 來預測 **Expected Reward**,用來 **Train Actor**。
---
### Replay Buffer
```python=
class ReplayBuffer:
def __init__(self, args):
self.max_length = args.buffer_size
self.s = deque(maxlen = self.max_length)
self.a = deque(maxlen = self.max_length)
self.r = deque(maxlen = self.max_length)
self.s_ = deque(maxlen = self.max_length)
self.dw = deque(maxlen = self.max_length)
self.done = deque(maxlen = self.max_length)
self.count = 0
def store(self, s, a, r, s_, done):
self.s.append(s)
self.a.append(a)
self.r.append(r)
self.s_.append(s_)
self.done.append([done])
if self.count <= self.max_length:
self.count += 1
def numpy_to_tensor(self):
s = torch.tensor(np.array(self.s), dtype=torch.float)
a = torch.tensor(np.array(self.a), dtype=torch.float)
r = torch.tensor(np.array(self.r), dtype=torch.float)
s_ = torch.tensor(np.array(self.s_), dtype=torch.float)
done = torch.tensor(np.array(self.done), dtype=torch.float)
return s, a, r, s_, done
```
* **Replay Buffer** 是用來訓練時採樣數據用的,使用 `deque` 來實作,`deque` 可以在 Append 新資料時,把舊資料丟棄(如果超過儲存大小)
* Initial 時定義最大長度,透過 `args` 由 **Hyperparameter** 傳遞
* `store()` 儲存 **state**、**action**、**reward**、**next_state**、**done**,這邊有順便儲存環境是否終止,不管是中途停止或是因長度而中止
* `numpy_to_tensor()` 將 `deque` 的資料轉換成 `tensor`,這樣才能讓 **Pytorch** 計算並訓練
---
### Initialize
```python=
def __init__(self,args,env,hidden_layer_num_list=[64,64]):
# Hyperparameter
self.evaluate_freq_steps = args.evaluate_freq_steps
self.max_train_steps = args.max_train_steps
self.num_actions = args.num_actions
self.batch_size = args.batch_size
self.num_states = args.num_states
self.mem_min = args.mem_min
self.gamma = args.gamma
self.set_var = args.var
self.var = self.set_var
self.tau = args.tau
self.lr = args.lr
# Variable
self.total_steps = 0
self.training_count = 0
self.evaluate_count = 0
# other
self.env = env
self.replay_buffer = ReplayBuffer(args)
# Actor-Critic
self.actor = Actor(args,hidden_layer_num_list.copy())
self.critic = Critic(args,hidden_layer_num_list.copy())
self.actor_target = Actor(args,hidden_layer_num_list.copy())
self.critic_target = Critic(args,hidden_layer_num_list.copy())
self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr, eps=1e-5)
self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr, eps=1e-5)
print(self.actor)
print(self.critic)
print("-----------")
```
各參數初始化 :
* `evaluate_freq_steps` : 當 **Training Loop** 的 **Step** 到達這個次數就會評估 **Actor**,讓 **Actor** 不加 **Noise** 執行測試
* `max_train_steps` : **Training Loop** 去 **Sample** 資料的最大次數
* `num_actions` : **Env** 要求的 **Action** 數量
* `batch_size` : 每次 **Update Actor-Critic** 的時候要取用多少資料
* `num_states` : **Env** 回傳的 **State** 數量(大小)
* `mem_min` : **ReplayBuffer** 的大小要超過這個設定值,才可以去 **Update Actor-Critic**,為了防止資料太少導致訓練不穩
* `gamma` : 計算 **TD-Error** 的 $\gamma$
* `set_var` : 初始 **Variance**,用來加入 **Normal noise**
* `tau` : 用於 **Soft Update** 的 $\tau$
* `lr` : **Learning rate**
---
* `actor` : **Evaluate Actor**,實際與環境互動的 **Neural Network**
* `critic` : **Evaluate Critic**,會一直被 **Update** 的 **Value Function Neural Network**
* `actor_target` : **actor** 的參數(權重)會緩慢更新這個 **Target Actor**
* `critic_target` : **critic** 的參數(權重)會緩慢更新這個 **Target Actor**
* `optimizer_critic` : **Adam** 優化器,指定優化 **Critic** 的參數
* `optimizer_actor` : **Adam** 優化器,指定優化 **Actor** 的參數
### Choose action
```python=
def choose_action(self,state):
state = torch.tensor(state, dtype=torch.float)
s = torch.unsqueeze(state,0)
with torch.no_grad():
a = self.actor(s)
a = Normal(a,self.var).sample()
a = torch.clamp(a,-1,1)
return a.cpu().numpy().flatten()
```
`Normal()` 是 **Pytorch** 的常態分佈 **Function**,輸入 $\mu$ 和 $\sigma$ 控制分布機率,然後 **Call** `sample()` 就可以得到新的數值,這作法是用來提高 **Exporation**,$\mu$ 是機率最高點,而這邊的參數就是 **Actor** 輸出的 **Action**
### Evaluate action
```python=
def evaluate_action(self,state):
state = torch.tensor(state, dtype=torch.float)
s = torch.unsqueeze(state,0)
with torch.no_grad():
a = self.actor(s)
return a.cpu().numpy().flatten()
```
與 `choose_action()` 相似,但少了 Noise
### Var decay
```python=
def var_decay(self, total_steps):
new_var = self.set_var * (1 - total_steps / self.max_train_steps)
self.var = new_var + 10e-10
```
前面提到加入 Noise 的常態分佈 `Normal` 使用的 Variance 會在這邊更新數值,隨著訓練次是緩慢下降,也就是 Exporation 會慢慢降低
### Train
```python=
def train(self):
time_start = time.time()
episode_reward_list = []
episode_count_list = []
episode_count = 0
# Training Loop
while self.total_steps < self.max_train_steps:
s = self.env.reset()[0]
while True:
a = self.choose_action(s)
s_, r, done , truncated , _ = self.env.step(a)
# storage data
self.replay_buffer.store(s, a, [r], s_, done)
# update state
s = s_
if self.replay_buffer.count >= self.mem_min:
self.training_count += 1
self.update()
if self.total_steps % self.evaluate_freq_steps == 0:
self.evaluate_count += 1
evaluate_reward = self.evaluate_policy(self.env)
episode_reward_list.append(evaluate_reward)
episode_count_list.append(episode_count)
time_end = time.time()
h = int((time_end - time_start) // 3600)
m = int(((time_end - time_start) % 3600) // 60)
second = int((time_end - time_start) % 60)
print("---------")
print("Time : %02d:%02d:%02d"%(h,m,second))
print("Training episode : %d\tStep : %d / %d"%(episode_count,self.total_steps,self.max_train_steps))
print("Evaluate count : %d\tEvaluate reward : %0.2f"%(self.evaluate_count,evaluate_reward))
self.total_steps += 1
if done or truncated:
break
episode_count += 1
# Plot the training curve
plt.plot(episode_count_list, episode_reward_list)
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Training Curve")
plt.show()
```
迴圈的部分最外層就是 **Episode**,內層就是 **Step**,跳出的條件是 `total_steps < max_train_steps`,內層就是一直與環境互動,並儲存資料到 **ReplayBuffer** , 只要資料夠就是每個 **Step** 後會更新一次 **Actor-Critic**,主要更新 **Actor-Critic** 的 Function 是 `self.update()`。
### Update
```python=
def update(self):
s, a, r, s_, done = self.replay_buffer.numpy_to_tensor() # Get training data .type is tensor
index = np.random.choice(len(r),self.batch_size,replace=False)
# Get minibatch
minibatch_s = s[index]
minibatch_a = a[index]
minibatch_r = r[index]
minibatch_s_ = s_[index]
minibatch_done = done[index]
# update Actor
action = self.actor(minibatch_s)
value = self.critic(minibatch_s,action)
actor_loss = -torch.mean(value)
self.optimizer_actor.zero_grad()
actor_loss.backward()
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) # Trick : Clip grad
self.optimizer_actor.step()
# Update Critic
next_action = self.actor_target(minibatch_s_)
next_value = self.critic_target(minibatch_s_,next_action)
v_target = minibatch_r + self.gamma * next_value * (1 - minibatch_done)
value = self.critic(minibatch_s,minibatch_a)
critic_loss = F.mse_loss(value,v_target)
self.optimizer_critic.zero_grad()
critic_loss.backward()
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5) # Trick : Clip grad
self.optimizer_critic.step()
self.var_decay(total_steps=self.total_steps)
# Update target networks
self.soft_update(self.critic_target,self.critic, self.tau)
self.soft_update(self.actor_target, self.actor, self.tau)
```
1. 從 **ReplayBuffer** 取出 **minibatch**
2. 更新 **Actor**
3. 更新 **Critic**
4. 用於 `Normal()` 的 **Variance** 的 **decay**
5. **Soft Update**
### Soft update
```python=
def soft_update(self, target, source, tau):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
```
$$\large \theta_{target} = (1-\tau)\theta_{target} + \tau\theta_{eval}$$
### main
```python=
import gymnasium as gym # openai gym
import numpy as np
import argparse
from DDPG.Agent import Agent
class main():
def __init__(self,args):
env_name = 'Pendulum-v1'
env = gym.make('Pendulum-v1')
num_states = env.observation_space.shape[0]
num_actions = env.action_space.shape[0]
print(num_actions)
print(num_states)
# args
args.num_actions = num_actions
args.num_states = num_states
# print args
print("---------------")
for arg in vars(args):
print(arg,"=",getattr(args, arg))
print("---------------")
# create agent
hidden_layer_num_list = [256,256]
agent = Agent(args , env , hidden_layer_num_list)
# trainning
agent.train()
# evaluate
render_env = gym.make(env_name,render_mode='human')
for i in range(10000):
evaluate_reward = agent.evaluate_policy(render_env)
print(f"Evaluate Episode {i+1}: Average Reward = {evaluate_reward:.2f}")
```
主程式 `Agent(args, env , [256,256])` 參數加入 `args` **Hyperparameter**,`[256,256]` 為每個 **hidden layer** 的 **Output** 大小
### Result
使用的參數 :
```
lr = 0.001
var = 3
tau = 0.001
gamma = 0.99
mem_min = 100
batch_size = 32
buffer_size = 10000
max_train_steps = 30000
evaluate_freq_steps = 2000.0
num_actions = 1
num_states = 3
action_max = 2.0
---------------
[3, 256, 256, 1]
[4, 256, 256, 1]
[3, 256, 256, 1]
[4, 256, 256, 1]
Actor(
(layers): ModuleList(
(0): Linear(in_features=3, out_features=256, bias=True)
(1): Linear(in_features=256, out_features=256, bias=True)
(2): Linear(in_features=256, out_features=1, bias=True)
)
(tanh): Tanh()
)
Critic(
(layers): ModuleList(
(0): Linear(in_features=4, out_features=256, bias=True)
(1): Linear(in_features=256, out_features=256, bias=True)
(2): Linear(in_features=256, out_features=1, bias=True)
)
(tanh): Tanh()
)
```

