# Deep Deterministic Policy Gradient ###### tags: `Reinforcement Learning` `OpenAI` `DDPG` `Deep Reinforcement Learning` Google Deepmind 原始論文 : https://arxiv.org/pdf/1509.02971.pdf 分類 : * `Model-Free` * `Value Base` * `Off Policy` * `Continuous Action Space` * `Continuous State Space` 可以先參考 [DQN](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/rkWNLbwMxg) 的內容 ## Key Idea **DDPG** 結合 **Actor-Critic** 和 **DQN**,用於解決連續動作空間(**Continuous Action Space**),**Forward** 的部分 **Actor** 將直接輸出動作,但大部分還是會將輸出限制在 [-1 , 1] 之間,再根據環境的每個動作的最大最小值來決定動作。 > ex. **Actor** 輸出 0.4,而環境是要控制機械手臂,動作空間為 [-3.14 , 3.14],這時候就可以 $-3.14 + (0.4*6.28)=-0.62$,環境就會按照 **Actor** 給的輸出將馬達轉到 -0.62 Radian 而 **Critic** 的部分輸出單一數值,也就是 **Expected Reward**,**DDPG** 不像 **DQN** 是一個 **Action-Value Function**,而是單純的 **Value Function**,但他的輸入除了 **State** 還多了 **Action**,目的就是用來評估 **Actor** 在 $s_t$ 的情況下做出 $a_t$ 的 **Expected Reward**,從而更新參數。 ## Background ![image](https://hackmd.io/_uploads/HJ8-I__Gxx.png =70%x) 在 **DQN** 我們會訓練 **Neural Network** 給我們 **Expected Reward**,**Input** 為 **State** $s_t$,並輸出每種動作的 **Expected Reward**, **Target Function** 和 **Loss Function** 如下 : **TD-Error** : $$\large Q(s_t,a_t)=r_{t+1}+\gamma \max_{a}Q(s_{t+1},a)$$ **Loss Function** : $$e= [Q(s_t,a_t)-\max_a(r_{t+1}+\gamma Q(s_{t+1,a)}))]^2$$ $$MSE=\frac{1}{N}\sum_i^n e_i$$ 可以想像如果要將 **DQN** 用在連續動作空間,如兩軸馬達,他可以接收 -3.14 ~ 3.14 之間的 float 來控制馬達角度,**DQN** 要達成的話可以增加 **Neural Network** 的 **Output** 數量,切割成非常多種動作,讓 **Neural Network** 選擇 **Expected Reward** 最高的,但這種方法是治標不治本,且犧牲效能與精度,所以才有了 **DDPG**。 --- ## Architecture 在 **DDPG** 算法中,會使用 **Actor-Critic** 的架構 ![image](https://hackmd.io/_uploads/H1CWSZVrxl.png) **Actor** 的部分 **Input** 為 $s_t$ ,**Output** 為 $a_t$,並且需要與環境互動,**DDPG** 被設計來處理連續動作空間(**Continuous Action Space**) 的環境,所以 $a_t$ 是連續的,他的數值可能性是無限多的,不像 **DQN** 是固定數量的。 **Critic** 的部分 **Input** 為 $s_t$、$a_t$,**Output** 為 **Expected Reward** $Q(s_t,a_t)$,也可以稱為 **Value Function**,他不像 **DQN** 會針對每種 **Action** 輸出一個 **Value**,他是透過 **Input** $a_t$ 來決定 **Value**,所以他可以說是在評估 **Actor** 的好壞,所以這樣就能夠預測無限多種可能的 **Action** $a_t$ 和 $s_t$ 狀態下的 **Expected Reward**。 這樣就能跳脫 **DQN** 只能處理離散動作空間的問題。 ## Training **DDPG** 與 **DQN** 相似,有兩份 **Neural Network(Evaluate、Target)**,分為 **Evalute**、**Target** **Network**,為了防止訓練 **Network** 快速變動,**Evaluate** 變動較大,**Target** 變動較小,希望能夠讓 **Agent** 不要因為被雜訊干擾,訓練也能更穩定,**DDPG** 使用 **soft update** 來更新 **Target Network**。 ### Update Actor 訓練 **Actor** 的方式非常簡單 , 我們會使用 **Critic** 的輸出來評估 **Actor** 的動作好不好 , **Actor** 的 **Loss Function** : $$Loss = -\frac{1}{N}\sum_i^NQ^{eval}(s_i,\mu^{eval}(s_i))$$ * $Q^{eval}$ : **Evaluate** 的 **Value Function** * $\mu^{eval}$ : **Evaluate** 的 **Actor** **DDPG** 一樣會有 **Exporation** 策略,會在 **Action** 加入雜訊,我們在更新時會讓 **Evaluate** 的 **Actor** 重新給我們一個 **Action** ,讓 **Actor** 參與這個 **Forward** 的運算,這樣才能更新 **Actor** 的參數。 **Actor** 的更新比較簡單,只使用到 **Evaluate** 的 **Actor** 和 **Critic** ### Update Critic **Critic** 跟 **DQN** 一樣 ,可以使用 **TD Error** 來更新,也都會有 **Target Network** 參與,**Target Function**為 : $$\Large Q^{eval}(s_t,a_t)=r+\gamma Q^{target}(s_{t+1},\mu^{target}(s_{t+1}))$$ * $r$ : **Reward**,從 **Replay Buffer** 取出 * $s_{t+1}$ : **Next State** , 從 **Replay Buffer** 取出 * $s_t$ : **Current State** , 從 **Replay Buffer** 取出 * $a_t$ : **Action** , 從 **Replay Buffer** 取出,**Actor** 與環境互動的 **Action** , 可能會帶有雜訊(為了 **Exporation**) * $\mu^{target}(s_{t+1})$ : **Next State** 為 **Input**,**Target Actor** 的 **Output Action** 可以看到計算 **TD-Error** 的右半部分是用 **Target Network** (**Actor** 和 **Critic**) 和 **Next State** 計算出 **Target Value** 用來更新 **Evalute** 的 **Critic**。 而 **Loss Function** 跟 **DQN** 一樣使用 **MSE** 就可以了 $$\Large Loss =\frac{1}{N}(Q^{eval}(s_t,a_t) - [r+\gamma Q^{target}(s_{t+1},\mu^{target}(s_{t+1}))])^2$$ ## Soft Update 在 **DDPG** 中,為了穩定訊過程,我們並不會直接將 **Evaluate Network** 的參數整個複製到 **Target Network** (**Hard Update**),而是將 **Evaluate Network** 的參數以一個很小的比例融入到 **Target Network** 中。 $\theta^{\mu'} =\tau\theta^\mu+(1-\tau)\theta^{\mu'}$ $\theta^{Q'} =\tau\theta^Q+(1-\tau)\theta^{Q'}$ * $\tau$(tau) : 一個非常小的正數,通常為 $0.001$ ~ $0.01$ * 當 $\tau=1$ 時,就等同於 **Hard Update** (直接複製) * 隨著訓練次數增加,**Target Network** 將逐漸像 **Evaluate Network** 靠攏 --- ## Pendulum Example 這個環境屬於控制理論中的經典倒立擺擺動問題,擺錘的初始位置隨機,目標是透過對擺錘施加扭矩,使其擺動到直立的位置。 ![pendulum](https://hackmd.io/_uploads/Skj3zucLxe.png) ![1753279880812](https://hackmd.io/_uploads/ByEsTvCIee.gif =40%x) 黑色的部分是固定點,紅色(擺錘)則是根據固定點為旋轉的中心 ### Action Space 需要給與環境互動的 Action 為 -2.0 ~ 2.0 之間的數值,也就是連續動作空間(Continuous Action Space),對於 DQN 來說可以說是有無限多的動作可以選擇,但對於 DDPG 這種直接輸出數值的架構沒問題。 ![image](https://hackmd.io/_uploads/S1Y4NOqUee.png) ### Observation Space 1. $x=cos(\theta)$ 2. $y=sin(\theta)$ 3. 角速度 $\dot\theta$ -8.0 ~ 8.0 ![image](https://hackmd.io/_uploads/HJ8Jru9Ulx.png) ### Rewards 每個 Step 的 Reward 計算公式為 : $r=-(\theta^2+0.1 \dot\theta+0.001\tau^2)$ * $\theta$ : 擺錘角度,以$[-\pi,\pi]$ 正規畫,0 代表擺錘為直立狀態 * $\dot\theta$ : 角速度 * $\tau$ : 所施加的扭矩 * 最大值 : 0 (直立、無速度、無扭矩) * 最小值 : $-(\pi^2+0.1*8^2+0.001*2^2)\approx-16.2736$ ### Starting State Reset 時,擺錘的狀態隨機設定為 : * 角度 $\theta \in[-\pi,\pi]$ * 角速度 $\dot\theta\in[-1,1]$ ### Episode End 每個 Episode 最多執行 200 Step,達到後被中斷,無其他終止條件 ## Pendulum DDPG Github Code : https://github.com/jason19990305/DDPG ### Actor-Critic #### Actor ```python= class Actor(nn.Module): def __init__(self,args,hidden_layers=[64,64]): super(Actor, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # add in list hidden_layers.insert(0,self.num_states) # first layer hidden_layers.append(self.num_actions) # last layer print(hidden_layers) # create layers layer_list = [] for i in range(len(hidden_layers)-1): input_num = hidden_layers[i] output_num = hidden_layers[i+1] layer = nn.Linear(input_num,output_num) layer_list.append(layer) orthogonal_init(layer_list[-1]) orthogonal_init(layer_list[-1], gain=0.01) # put in ModuleList self.layers = nn.ModuleList(layer_list) self.tanh = nn.Tanh() # when actor(s) will activate the function def forward(self,s): for layer in self.layers: s = self.tanh(layer(s)) return s * self.action_max ``` * `args` 是從主程式的 **Hyperparameter** 傳進來的, `num_states` 和 `num_actions` 為 **Environment** 的 **state** 、 **action** 大小,`num_actions` 為動作的種類,實際輸出的動作只有一個 float,有些環境要求輸出多個 **action**,如六軸機械手臂,要多注意。 * `hidden_layers` 為 list,主要用來定義 **Hidden layer** 的維度,也可以理解為 **Output shape**,定義這一層要輸出多大的 **Vector**,**Input/Output layer** 則由 `num_states` 和 `num_actions` 決定。 以 `Pendulum` 這個環境來說的話 `num_states` 就是 3,分別為 **x**、**y**、**Angular velocity**,`num_action` 為 1,也就是要施加的 **Torque**。 `forward` 的部分有將 Action 乘上 Max Action,也就是 Env 要的 Action 的最大值,以 **Pendulum** 來說的話就會從 -1~1 變成 -2~2。 >這是偷懶的寫法,要注意 Environment 的需求 --- ```python= layer_list = [] for i in range(len(hidden_layers)-1): input_num = hidden_layers[i] output_num = hidden_layers[i+1] layer = nn.Linear(input_num,output_num) layer_list.append(layer) orthogonal_init(layer_list[-1]) orthogonal_init(layer_list[-1], gain=0.01) # put in ModuleList self.layers = nn.ModuleList(layer_list) self.tanh = nn.Tanh() ``` 上面是建立全連接層並將它儲存在 `ModuleList`,因為適用在 **Continuous Action Space** ,所以將輸出限制在 -1~1 之間,**Activation Function** 用 `Tanh` 可以達到這個需求。 --- #### Critic ```python= class Critic(nn.Module): def __init__(self, args,hidden_layers=[64,64]): super(Critic, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # add in list hidden_layers.insert(0,self.num_states+self.num_actions) hidden_layers.append(1) print(hidden_layers) # create layers layer_list = [] for i in range(len(hidden_layers)-1): input_num = hidden_layers[i] output_num = hidden_layers[i+1] layer = nn.Linear(input_num,output_num) layer_list.append(layer) orthogonal_init(layer_list[-1]) # put in ModuleList self.layers = nn.ModuleList(layer_list) self.tanh = nn.Tanh() def forward(self,s,a): input_data = torch.cat((s,a),dim=1) for i in range(len(self.layers)-1): input_data = self.tanh(self.layers[i](input_data)) # predicet value v_s = self.layers[-1](input_data) return v_s ``` **Critic** 要注意的是 **Input** 是 **State** 的大小和 **Action** 的大小,**DDPG** 的架構就是這樣設計的,輸出的話是固定為 1,也就是用 $s_t$ 和 $a_t$ 來預測 **Expected Reward**,用來 **Train Actor**。 --- ### Replay Buffer ```python= class ReplayBuffer: def __init__(self, args): self.max_length = args.buffer_size self.s = deque(maxlen = self.max_length) self.a = deque(maxlen = self.max_length) self.r = deque(maxlen = self.max_length) self.s_ = deque(maxlen = self.max_length) self.dw = deque(maxlen = self.max_length) self.done = deque(maxlen = self.max_length) self.count = 0 def store(self, s, a, r, s_, done): self.s.append(s) self.a.append(a) self.r.append(r) self.s_.append(s_) self.done.append([done]) if self.count <= self.max_length: self.count += 1 def numpy_to_tensor(self): s = torch.tensor(np.array(self.s), dtype=torch.float) a = torch.tensor(np.array(self.a), dtype=torch.float) r = torch.tensor(np.array(self.r), dtype=torch.float) s_ = torch.tensor(np.array(self.s_), dtype=torch.float) done = torch.tensor(np.array(self.done), dtype=torch.float) return s, a, r, s_, done ``` * **Replay Buffer** 是用來訓練時採樣數據用的,使用 `deque` 來實作,`deque` 可以在 Append 新資料時,把舊資料丟棄(如果超過儲存大小) * Initial 時定義最大長度,透過 `args` 由 **Hyperparameter** 傳遞 * `store()` 儲存 **state**、**action**、**reward**、**next_state**、**done**,這邊有順便儲存環境是否終止,不管是中途停止或是因長度而中止 * `numpy_to_tensor()` 將 `deque` 的資料轉換成 `tensor`,這樣才能讓 **Pytorch** 計算並訓練 --- ### Initialize ```python= def __init__(self,args,env,hidden_layer_num_list=[64,64]): # Hyperparameter self.evaluate_freq_steps = args.evaluate_freq_steps self.max_train_steps = args.max_train_steps self.num_actions = args.num_actions self.batch_size = args.batch_size self.num_states = args.num_states self.mem_min = args.mem_min self.gamma = args.gamma self.set_var = args.var self.var = self.set_var self.tau = args.tau self.lr = args.lr # Variable self.total_steps = 0 self.training_count = 0 self.evaluate_count = 0 # other self.env = env self.replay_buffer = ReplayBuffer(args) # Actor-Critic self.actor = Actor(args,hidden_layer_num_list.copy()) self.critic = Critic(args,hidden_layer_num_list.copy()) self.actor_target = Actor(args,hidden_layer_num_list.copy()) self.critic_target = Critic(args,hidden_layer_num_list.copy()) self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr, eps=1e-5) self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr, eps=1e-5) print(self.actor) print(self.critic) print("-----------") ``` 各參數初始化 : * `evaluate_freq_steps` : 當 **Training Loop** 的 **Step** 到達這個次數就會評估 **Actor**,讓 **Actor** 不加 **Noise** 執行測試 * `max_train_steps` : **Training Loop** 去 **Sample** 資料的最大次數 * `num_actions` : **Env** 要求的 **Action** 數量 * `batch_size` : 每次 **Update Actor-Critic** 的時候要取用多少資料 * `num_states` : **Env** 回傳的 **State** 數量(大小) * `mem_min` : **ReplayBuffer** 的大小要超過這個設定值,才可以去 **Update Actor-Critic**,為了防止資料太少導致訓練不穩 * `gamma` : 計算 **TD-Error** 的 $\gamma$ * `set_var` : 初始 **Variance**,用來加入 **Normal noise** * `tau` : 用於 **Soft Update** 的 $\tau$ * `lr` : **Learning rate** --- * `actor` : **Evaluate Actor**,實際與環境互動的 **Neural Network** * `critic` : **Evaluate Critic**,會一直被 **Update** 的 **Value Function Neural Network** * `actor_target` : **actor** 的參數(權重)會緩慢更新這個 **Target Actor** * `critic_target` : **critic** 的參數(權重)會緩慢更新這個 **Target Actor** * `optimizer_critic` : **Adam** 優化器,指定優化 **Critic** 的參數 * `optimizer_actor` : **Adam** 優化器,指定優化 **Actor** 的參數 ### Choose action ```python= def choose_action(self,state): state = torch.tensor(state, dtype=torch.float) s = torch.unsqueeze(state,0) with torch.no_grad(): a = self.actor(s) a = Normal(a,self.var).sample() a = torch.clamp(a,-1,1) return a.cpu().numpy().flatten() ``` `Normal()` 是 **Pytorch** 的常態分佈 **Function**,輸入 $\mu$ 和 $\sigma$ 控制分布機率,然後 **Call** `sample()` 就可以得到新的數值,這作法是用來提高 **Exporation**,$\mu$ 是機率最高點,而這邊的參數就是 **Actor** 輸出的 **Action** ### Evaluate action ```python= def evaluate_action(self,state): state = torch.tensor(state, dtype=torch.float) s = torch.unsqueeze(state,0) with torch.no_grad(): a = self.actor(s) return a.cpu().numpy().flatten() ``` 與 `choose_action()` 相似,但少了 Noise ### Var decay ```python= def var_decay(self, total_steps): new_var = self.set_var * (1 - total_steps / self.max_train_steps) self.var = new_var + 10e-10 ``` 前面提到加入 Noise 的常態分佈 `Normal` 使用的 Variance 會在這邊更新數值,隨著訓練次是緩慢下降,也就是 Exporation 會慢慢降低 ### Train ```python= def train(self): time_start = time.time() episode_reward_list = [] episode_count_list = [] episode_count = 0 # Training Loop while self.total_steps < self.max_train_steps: s = self.env.reset()[0] while True: a = self.choose_action(s) s_, r, done , truncated , _ = self.env.step(a) # storage data self.replay_buffer.store(s, a, [r], s_, done) # update state s = s_ if self.replay_buffer.count >= self.mem_min: self.training_count += 1 self.update() if self.total_steps % self.evaluate_freq_steps == 0: self.evaluate_count += 1 evaluate_reward = self.evaluate_policy(self.env) episode_reward_list.append(evaluate_reward) episode_count_list.append(episode_count) time_end = time.time() h = int((time_end - time_start) // 3600) m = int(((time_end - time_start) % 3600) // 60) second = int((time_end - time_start) % 60) print("---------") print("Time : %02d:%02d:%02d"%(h,m,second)) print("Training episode : %d\tStep : %d / %d"%(episode_count,self.total_steps,self.max_train_steps)) print("Evaluate count : %d\tEvaluate reward : %0.2f"%(self.evaluate_count,evaluate_reward)) self.total_steps += 1 if done or truncated: break episode_count += 1 # Plot the training curve plt.plot(episode_count_list, episode_reward_list) plt.xlabel("Episode") plt.ylabel("Reward") plt.title("Training Curve") plt.show() ``` 迴圈的部分最外層就是 **Episode**,內層就是 **Step**,跳出的條件是 `total_steps < max_train_steps`,內層就是一直與環境互動,並儲存資料到 **ReplayBuffer** , 只要資料夠就是每個 **Step** 後會更新一次 **Actor-Critic**,主要更新 **Actor-Critic** 的 Function 是 `self.update()`。 ### Update ```python= def update(self): s, a, r, s_, done = self.replay_buffer.numpy_to_tensor() # Get training data .type is tensor index = np.random.choice(len(r),self.batch_size,replace=False) # Get minibatch minibatch_s = s[index] minibatch_a = a[index] minibatch_r = r[index] minibatch_s_ = s_[index] minibatch_done = done[index] # update Actor action = self.actor(minibatch_s) value = self.critic(minibatch_s,action) actor_loss = -torch.mean(value) self.optimizer_actor.zero_grad() actor_loss.backward() torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) # Trick : Clip grad self.optimizer_actor.step() # Update Critic next_action = self.actor_target(minibatch_s_) next_value = self.critic_target(minibatch_s_,next_action) v_target = minibatch_r + self.gamma * next_value * (1 - minibatch_done) value = self.critic(minibatch_s,minibatch_a) critic_loss = F.mse_loss(value,v_target) self.optimizer_critic.zero_grad() critic_loss.backward() torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5) # Trick : Clip grad self.optimizer_critic.step() self.var_decay(total_steps=self.total_steps) # Update target networks self.soft_update(self.critic_target,self.critic, self.tau) self.soft_update(self.actor_target, self.actor, self.tau) ``` 1. 從 **ReplayBuffer** 取出 **minibatch** 2. 更新 **Actor** 3. 更新 **Critic** 4. 用於 `Normal()` 的 **Variance** 的 **decay** 5. **Soft Update** ### Soft update ```python= def soft_update(self, target, source, tau): for target_param, param in zip(target.parameters(), source.parameters()): target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau) ``` $$\large \theta_{target} = (1-\tau)\theta_{target} + \tau\theta_{eval}$$ ### main ```python= import gymnasium as gym # openai gym import numpy as np import argparse from DDPG.Agent import Agent class main(): def __init__(self,args): env_name = 'Pendulum-v1' env = gym.make('Pendulum-v1') num_states = env.observation_space.shape[0] num_actions = env.action_space.shape[0] print(num_actions) print(num_states) # args args.num_actions = num_actions args.num_states = num_states # print args print("---------------") for arg in vars(args): print(arg,"=",getattr(args, arg)) print("---------------") # create agent hidden_layer_num_list = [256,256] agent = Agent(args , env , hidden_layer_num_list) # trainning agent.train() # evaluate render_env = gym.make(env_name,render_mode='human') for i in range(10000): evaluate_reward = agent.evaluate_policy(render_env) print(f"Evaluate Episode {i+1}: Average Reward = {evaluate_reward:.2f}") ``` 主程式 `Agent(args, env , [256,256])` 參數加入 `args` **Hyperparameter**,`[256,256]` 為每個 **hidden layer** 的 **Output** 大小 ### Result 使用的參數 : ``` lr = 0.001 var = 3 tau = 0.001 gamma = 0.99 mem_min = 100 batch_size = 32 buffer_size = 10000 max_train_steps = 30000 evaluate_freq_steps = 2000.0 num_actions = 1 num_states = 3 action_max = 2.0 --------------- [3, 256, 256, 1] [4, 256, 256, 1] [3, 256, 256, 1] [4, 256, 256, 1] Actor( (layers): ModuleList( (0): Linear(in_features=3, out_features=256, bias=True) (1): Linear(in_features=256, out_features=256, bias=True) (2): Linear(in_features=256, out_features=1, bias=True) ) (tanh): Tanh() ) Critic( (layers): ModuleList( (0): Linear(in_features=4, out_features=256, bias=True) (1): Linear(in_features=256, out_features=256, bias=True) (2): Linear(in_features=256, out_features=1, bias=True) ) (tanh): Tanh() ) ``` ![image](https://hackmd.io/_uploads/SyeshhJDxe.png) ![1753365726734](https://hackmd.io/_uploads/S1Bx6nkvgx.gif)