# PPO Detail [TOC] 內容參考 [The 37 Implementation Details of Proximal Policy Optimization](https://iclr-blog-track.github.io/2022/03/25/ppo-implementation-details/) **Implement Code** : https://github.com/jason19990305/PPO.git ## Introduction 這邊會介紹如何讓 **PPO** 用於 **Continuous Action Space**,還有許多提高 **PPO** 效果的技巧 ## Orthogonal Initialization 為了防止在一開始訓練實出現梯度爆炸或梯度消失等問題所提出的一種初始化方式,我們會把 **Actor** 的 **Output Layer** `gain=0.01` 其他都設為 `gain=1` ```python= def orthogonal_init(layer, gain=1.0): nn.init.orthogonal_(layer.weight, gain=gain) nn.init.constant_(layer.bias, 0) class Actor(nn.Module): def __init__(self, args, hidden_layers=[64, 64]): super(Actor, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # Insert input and output sizes into hidden_layers hidden_layers.insert(0, self.num_states) hidden_layers.append(self.num_actions) # Create fully connected layers fc_list = [] for i in range(len(hidden_layers) - 1): num_input = hidden_layers[i] num_output = hidden_layers[i + 1] layer = nn.Linear(num_input, num_output) fc_list.append(layer) orthogonal_init(fc_list[-1]) orthogonal_init(fc_list[-1], gain=0.01) ``` ## Gradient Clip 、Adam Optimizer 為防止梯度爆炸,在更新參數前可以將 Gradient 做裁切 ```python= torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) ``` **Critic** 也可以用 將優化器 **Adam** 的 **Epsilon** 設為 `1e-5` ```python= self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr, eps=1e-5) self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr, eps=1e-5) ``` ## Learning rate decay ```python= def lr_decay(self, total_steps): lr_a_now = self.lr * (1 - total_steps / self.max_train_steps) lr_c_now = self.lr * (1 - total_steps / self.max_train_steps) for opt in self.optimizer_actor.param_groups: opt['lr'] = lr_a_now for opt in self.optimizer_critic.param_groups: opt['lr'] = lr_c_now ``` ## Policy entropy $\large -\sum_{a_t}\pi_\theta(a_t|s_t)log(\pi_\theta(a_t|s_t))$ ```python= prob_entropy = dist.entropy().sum(dim=1, keepdim=True) ``` 計算 **Actor Loss function** : ```python= actor_loss = torch.min(p1, p2) - prob_entropy * self.entropy_coef # Mean actor loss and add entropy term actor_loss = -actor_loss.mean() ``` **Entropy coefficient** 通常設為 `0.01` ## Value Loss Clipping PPO 論文中有提到如果 Actor-Critic 是共用 Parameter 的話,也就是一個 **Neural Network** 輸出 **Action** 機率和 **Value**,這種情況下用 **Value Loss Clipping** 會比較好 $\large L^V=max[(V_{\theta_t}-V_{targ})^2,(clip(V_{\theta_t},V_{\theta_{t-1}}-\epsilon,V_{\theta_{t-1}}+\epsilon)-V_{targ})^2]$ ## Continuous Action Space 如果 **Environment** 要求的 **Action Space** 是 `-1~1` 之間的連續數值,**PPO** 是屬於 **Policy-based** 的算法,要計算它的 **Loss Function**,他的每種 **Action** 都必須要能知道 **Actor Sample** 出來的機率,通常會選擇使用一個機率分布的模型,我們這次用 **Normal Distribution**,**Neural Network** 的 **Output** 改為輸出 **Normal Distribution** 需要的 $\mu$ ![2YRaKVS](https://hackmd.io/_uploads/S1jreHEtxe.png) **Environment** 要求的 **Action** 個數有幾個,**Output** 就要有幾個 $\mu$ 各種 $\mu$ 和 $\sigma^2$ 的搭配出來的分布 : ![image](https://hackmd.io/_uploads/BJwpeSNYgl.png) 上面有看到 **Neural Network** 只有 $\mu$,$\sigma^2$ 可以設為固定、遞減或是在 **Pytorch** 的 **Actor** 中當作可訓練參數,或者也有人會讓 **Actor** 額外 **Output** 每個 **Action** 的 $\sigma^2$ ```python= class Actor(nn.Module): def __init__(self, args, hidden_layers=[64, 64]): super(Actor, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # Insert input and output sizes into hidden_layers hidden_layers.insert(0, self.num_states) hidden_layers.append(self.num_actions) # Create fully connected layers fc_list = [] for i in range(len(hidden_layers) - 1): num_input = hidden_layers[i] num_output = hidden_layers[i + 1] layer = nn.Linear(num_input, num_output) fc_list.append(layer) orthogonal_init(fc_list[-1]) orthogonal_init(fc_list[-1], gain=0.01) # Convert list to ModuleList for proper registration self.layers = nn.ModuleList(fc_list) self.log_std = nn.Parameter(torch.zeros(1, self.num_actions) + 1.5) self.tanh = nn.Tanh() def forward(self, x): # Pass input through all layers except the last, applying ReLU activation for i in range(len(self.layers)): x = self.tanh(self.layers[i](x)) return x def get_dist(self,state): mean = self.forward(state) std = torch.exp(self.log_std.expand_as(mean)) try: dist = Normal(mean,std) except Exception as e: for param in self.parameters(): print("Actor output out of range, check the input state or model parameters.") print("actor parameter:",param.data) return dist ``` 建立可訓練的參數 $\sigma$ : `self.log_std = nn.Parameter(torch.zeros(1, self.num_actions) + 1.5)` 原本最後一層的 **Activation Function** 是 **Softmax**,這邊就改成 `Tanh`,他的輸出範圍是 `-1~1`,然後多一個 **Function** 叫 `get_dist()`,功能試是 **forward** 出 $\mu$,然後還有 `Parameter` $\sigma^2$,搭配 **Pytorch** 的 `from torch.distributions.Normal`,就能用他取得機率、**Sample Action** 等 **Choose Action** : ```python= # choose action def choose_action(self, state): state = torch.tensor(state, dtype=torch.float) with torch.no_grad(): s = torch.unsqueeze(state, dim=0) dist = self.actor.get_dist(s) a = dist.sample() a = torch.clamp(a, -1, 1) # Ensure action is within bounds log_prob = dist.log_prob(a) return a.numpy().flatten() , log_prob.numpy().flatten() ``` 用 **Normal DIstribution Sample Action**,然後防止出界所以有 **clamp** Actor Update 的方法 : ```python= # Update Actor # Get action probability from the model dist = self.actor.get_dist(s[index]) # get entropy of actor distribution prob_entropy = dist.entropy().sum(dim=1, keepdim=True) # Get log probability log_prob = dist.log_prob(a[index]) # Calculate the ratio of new and old probabilities ratio = torch.exp(log_prob.sum(dim=1, keepdim=True) - old_log_prob[index].sum(dim=1, keepdim=True)) p1 = ratio * adv[index] p2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * adv[index] actor_loss = torch.min(p1, p2) - prob_entropy * self.entropy_coef # Mean actor loss and add entropy term actor_loss = -actor_loss.mean() self.optimizer_actor.zero_grad() actor_loss.backward() # Backpropagation # Gradient clipping torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) self.optimizer_actor.step() ``` ## Gymnasium Vector **Vector environment** 可以透過同時 **Sampling** 多個 **sub-environment**,線性提升每秒執行步驟的速度,平常我們要自己在 **done** 或 **trucated** 之後自己 **Reset**,**Vector environment** 會自動 **Reset** **sub-environment** ```python! class gymnasium.vector.AsyncVectorEnv(env_fns: Sequence[callable], observation_space: Optional[Space] = None, action_space: Optional[Space] = None, shared_memory: bool = True, copy: bool = True, context: Optional[str] = None, daemon: bool = True, worker: Optional[callable] = None ``` **Example** : ```python! num_envs = 8 env_name = 'Pendulum-v1' env_fns = [lambda : gym.make(env_name) for _ in range(num_envs)] ``` 平常使用的話就是給一個由 `env` 的 function 組成的 `list` 當作給 **AsyncVectorEnv** 的參數就可以用了,根據 `env` 的數量,每次 **Call** `step()` 的時候就會產生對應數量的 **next state** 和 **reward** 等 --- 因為他會自己 **Reset** , 假如上一次的 `step()` 中的其中一個 **sub-envIronment** **Return** 了 `s_, r, done, truncated`,獲得的 **done** 或 **truncated** 其中一個為 `True`,那該 **sub-environment** 就會被 **Reset**,**Reset** 後的 **state** 會覆蓋最後一個 **step** 的 **state**,該 **sub-environment** 的最後一個 **step** 的 **state** 儲存在 `info` 裡面,叫做 `final_observation` 為了不遺失 **final observation**,我們要將 **AutoReset Mode** 修改成 **Same-Step Mode** : ![image](https://hackmd.io/_uploads/SJzF2lJtxl.png) Example : ```python! def main(): num_envs = 8 env_name = 'Acrobot-v1' env_fns = [lambda : gym.make(env_name) for _ in range(num_envs)] venv = AsyncVectorEnv(env_fns , autoreset_mode= gym.vector.AutoresetMode.SAME_STEP) total_steps = 501 print("Observation Space : ",venv.single_observation_space.shape[0]) print("Action Space : ",venv.single_action_space.n) s, infos = venv.reset() # obs shape: (num_envs, obs_dim) for step in range(total_steps): # a = choose_action(s) # Replace with your action selection logic actions = np.array([venv.single_action_space.sample() for _ in range(num_envs)],dtype=venv.single_action_space.dtype) s_, r, done, truncated, infos = venv.step(actions) print("------------") print("state : ", s[0]) print("next state : ", s_[0]) print("reward : ", r[0]) print("truncated : ", truncated[0]) print(infos.keys()) s = s_ print(venv.autoreset_mode) if __name__ == "__main__": main() ``` Output : ```python! state : [ 0.955378 -0.295386 -0.32671106 0.94512427 -0.35943484 -1.5961629 ] next state : [ 0.9982749 -0.05871289 0.99571955 0.0924263 0.00807058 0.07217021] reward : -1.0 truncated : True dict_keys(['final_info', '_final_info', 'final_obs', '_final_obs']) ------------ state : [ 0.9982749 -0.05871289 0.99571955 0.0924263 0.00807058 0.07217021] next state : [ 0.9981087 -0.06147393 0.99209154 0.1255163 -0.03400526 0.2543154 ] reward : -1.0 truncated : False dict_keys([]) AutoresetMode.SAME_STEP ``` ## GAE https://arxiv.org/abs/1506.02438 **Generalized Advantage Estimation** 是針對我們原本的 **Advantage Function** 做的優化 原本的 **Advantage** $A^\theta_t(s_t,a_t)$ 為 : $\large \delta_t^V=r_t+\gamma V^\pi(s_{t+1})-V^\pi(s_t)$ 也就是 **A2C** 用的 **TD-Error** + **BaseLine** **GAE** 做為新的 `Advantage Function` : $\large A^\theta_t(s_t,a_t) = \sum_{j=0}^{T-t}(\gamma\lambda)^j\delta^V_{t+j}$ * $\gamma$ : 跟 **TD-Error** 共用的參數,通常為 `0.99` * $\lambda$ : 是一個 **0~1** 的參數,通常為 `0.95` 如果想知道最後一個 $s_T,a_T$ 的 **Advantage** 我們給定$t=T,\space T-1,\space T-2$ : $A^\theta_T(s_T,a_T)=\sum^0_{j=0}(\gamma \lambda)^0\delta^V_{T+j}=\delta^V_{T}$ $A^\theta_{T-1}(s_{T-1},a_{T-1})=\sum^1_{j=0}(\gamma \lambda)^j\delta^V_{T-1+j}$ $=\delta^V_{T-1}\space\space\space+\space\space (\gamma\lambda)\delta^V_{T}$ $A^\theta_{T-2}(s_{T-2},a_{T-2})=\sum^2_{j=0}(\gamma \lambda)^j\delta^V_{T-2+j}$ $=\delta^V_{T-2}\space\space\space+\space\space (\gamma\lambda)\delta^V_{T-1}+(\gamma\lambda)^2\delta^V_{T}$ 我們代換一下 : $G_T=A^\theta_T(s_T,a_T)$ $G_{T-1} = A^\theta_T(s_{T-1},s_{T-1})$ $G_{T-2} = A^\theta_T(s_{T-2},s_{T-2})$ --- $G_{T-1} = \delta^V_{T-1}+(\gamma\lambda )G_T$ $G_{T-2} = \delta^V_{T-2}+(\gamma\lambda)G_{T-1}$ 所以可以發現我們能夠從最尾巴開始算,慢慢算完整個 **Episode** 的 **GAE** ```python= def GAE(self , vs , vs_ , r , done , truncated): adv = [] gae = 0 with torch.no_grad(): # adv and v_target have no gradient deltas = r + self.gamma * (1.0 - done) * vs_ - vs for delta, d in zip(reversed(deltas.flatten().numpy()), reversed(truncated.flatten().numpy())): gae = delta + self.gamma * self.lamda * gae * (1.0 - d) adv.insert(0, gae) adv = torch.tensor(adv, dtype=torch.float).view(-1, 1) v_target = adv + vs return v_target , adv ``` 這個 **GAE** 可以接受多個 **Episode**,但是一定要按照時序排好,如果打亂的話就會計算錯誤,`truncated` 就是為了 Reset 用來往下累乘的變數,所以才能計算多個 **Episode** ## Advantage Normalization 在論文 "**The Mirage of Action-Dependent Baselines in Reinforcement Learning**"**提出對 Advantage** 進行 **normalization** 的操作,可以提升 **Policy Gradient** 的性能 我們可以每次使用 **GAE** 計算完 **Advantage** **之後做Normalization**: $\large A = \frac{A-mean(A)}{std(a)+10^-8}$ ## State Normalization ```python= class RunningMeanStd: # Dynamically calculate mean and std def __init__(self, shape): # shape:the dimension of input data self.n = 0 self.mean = np.zeros(shape) self.S = np.zeros(shape) self.std = np.sqrt(self.S) def update(self, x): x = np.array(x) self.n += 1 if self.n == 1: self.mean = x self.std = x else: old_mean = self.mean.copy() self.mean = old_mean + (x - old_mean) / self.n self.S = self.S + (x - old_mean) * (x - self.mean) self.std = np.sqrt(self.S / self.n ) ``` ```python= class Normalization: def __init__(self, shape): self.running_ms = RunningMeanStd(shape=shape) def __call__(self, x, update=True): # Whether to update the mean and std,during the evaluating,update=Flase if update: self.running_ms.update(x) x = (x - self.running_ms.mean) / (self.running_ms.std + 1e-8) return x ``` 原理是使用 **Moving Average** 使用上將 **class** **Normalization** 建立 **Object**,然後要用的時候 **call** 他就行,但是要注意的是用了這個 **Normalization** 之後,想要 **Inference** 這個 **Agent** 的話就必須記下每個 **state** 的 **mean**、**std**,他的參數 `shape` 就是 **state** 的大小或數量 ## Reward Scaling ```python= class RewardScaling: def __init__(self, shape, gamma): self.shape = shape # reward shape=1 self.gamma = gamma # discount factor self.running_ms = RunningMeanStd(shape=self.shape) self.R = np.zeros(self.shape) def __call__(self, x): self.R = self.gamma * self.R + x self.running_ms.update(self.R) x = x / (self.running_ms.std + 1e-8) # Only divided std return x def reset(self): # When an episode is done,we should reset 'self.R' self.R = np.zeros(self.shape) ``` ## Result `Continuous Action Space` **Pendulum-v1** ![image](https://hackmd.io/_uploads/SylnYeXtxx.png) ![1755519429206](https://hackmd.io/_uploads/HyYAYqlYxe.gif) --- `Discrete Action Space` **LunarLander-v3** ![image](https://hackmd.io/_uploads/rkkiCpWYle.png) ![1755598549208](https://hackmd.io/_uploads/rkkg10-Kll.gif) --- `Continuous Action Space` **BipedalWalker-v3** ![image](https://hackmd.io/_uploads/Hk50TlNYgg.png) ![1755674431168](https://hackmd.io/_uploads/Sk4LveQtxe.gif) --- `Continuous Action Space` **Humanoid-v5** ![image](https://hackmd.io/_uploads/H1y4nVVKxg.png) ![1755774965993](https://hackmd.io/_uploads/SyPblFNtlg.gif)