# Advantage Actor-Critic [TOC] 分類 : * `Model-Free` * `Policy Based` * `on Policy` * `Discrete Action Space` * `Discrete State Space` ## Introduction 後面的內容是基於 [Policy Gradient](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/Sy34FO3fel) 的延伸 **Policy Gradient** 的 **Loss Function** : $Loss=-\frac{1}{N}\sum_{n=1}^N\sum_{t=1}^{T_n} R(\tau^n)\mathrm{log}p_\theta(a^n_t|s^n_t)$ ![image](https://hackmd.io/_uploads/SkaSrUMDxe.png) **Actor-Critic** 是使用 **DQN** 的架構來預測一個狀態下每種 **Action** 的 **Expected Reward**,用來代替 **Total Reward** 的部分,這樣有一個優點就是不再需要整個 **Episode** 的 **reward** 來計算了,但我們這次要用的是 **Advantage Actor-Critic** ![image](https://hackmd.io/_uploads/H1TAPG_Xle.png) 用上面這張圖的第 6 個 ## Advantage Function * $\sum_{t'=t}^Tr_{t'}$ : 從 $t'$ 開始到後面的所有 **reward** 加總,有點像 **TD-Error** 少了 **Discount Factor** $\gamma$' * $Q^\pi(s_t,a_t)$ : 如果可以用 **TD-Error** 來計算 **Expected Reward**,那我們也可以用 DQN 的狀態動作價值函數(**state-action function**) 來代替,這種就是 **Actor-Critic** * $\sum_{t'=t}^Tr_{t'}-b(s_t)$ : 讓 **TD-Error** 減去 $b(s_t)$ 就是 $s_t$ 的 **Expected Reward** 平均值,讓計算上可以跟之前的經歷做比較,看這次比較好或是比較差,也可以使得 **Expected Reward** 正負平衡。 * $V^\pi(s_t)$ : 這個是單純的狀態價值函數(**state value function**),可以看作是 $s_t$ 的情況下,每種動作的 **Expected Reward** 的平均,示意圖如下。 ![image](https://hackmd.io/_uploads/HJjtKUGvel.png) * $Q^\pi(s_t,a_t) - V^\pi(s_t)$ : 就是用兩個 **Neural Network** 達到計算 **Expected Reward** 和 **Baseline** 的效果,但缺點就是要多使用兩個 **Neural Network** * $r_t+V^\pi(s_{t+1})-V^\pi(s_t)$ : 這個就是將 $Q^\pi(s_t,a_t)$ 簡化成 $r_t+V^\pi(s_{t+1})$,這樣會增加隨機性,但跟原本 **Policy Gradient** 計算 **Total Reward** 的方式相比,絕對是這個更穩定 可以寫成 $A(s_t,a_t)=Q^\pi(s_t,a_t)-V^\pi(s_t)\approx r_t+V^\pi(s_{t+1})-V^\pi(s_t)$ 也就是 **A2C** 的 **Advantage Function** ## Update model 要 **Update Actor-Critic** 一樣先跟環境互動,然後將資料存在 **ReplayBuffer**,互動一定次數的 **Step** 後進行訓練,次數可自行設定,因為不需要完整的 **Episode** 資料了,所以一個 **Episode** 結束前 **Update** 也可以,這就是 **Actor-Critic** 的好處之一 **Update Actor** : $Loss=-\frac{1}{N}\sum_{n=1}^N\sum_{t=1}^{T_n} A(s_t,a_t)\mathrm{log}p_\theta(a^n_t|s^n_t)$ $A(s_t,a_t)=r_t+\gamma V^\pi(s_{t+1})-V^\pi(s_t)$ **Update Critic** : $Loss=MSE(\ V^\pi(s_t)\ ,\ r_t+\gamma V^\pi(s_{t+1})\ )$ 使用 **Mean Square Error** 來計算誤差 ## CartPole Example ``` --------------- evaluate_freq_steps = 5000.0 max_train_steps = 1000000.0 batch_size = 16 gamma = 0.99 lr = 0.0001 num_states = 4 num_actions = 2 --------------- [4, 128, 128, 1] Actor( (layers): ModuleList( (0): Linear(in_features=4, out_features=128, bias=True) (1): Linear(in_features=128, out_features=128, bias=True) (2): Linear(in_features=128, out_features=2, bias=True) ) (relu): ReLU() (softmax): Softmax(dim=1) ) Critic( (layers): ModuleList( (0): Linear(in_features=4, out_features=128, bias=True) (1): Linear(in_features=128, out_features=128, bias=True) (2): Linear(in_features=128, out_features=1, bias=True) ) (tanh): Tanh() ) ``` ![image](https://hackmd.io/_uploads/SyQdAA3Dxg.png =70%x) ![1754226307544](https://hackmd.io/_uploads/S1-hAAnvxx.gif) ## Acrobot ```python evaluate_freq_steps = 5000.0 max_train_steps = 1000000.0 batch_size = 8 gamma = 0.99 lr = 0.0001 num_states = 6 num_actions = 3 --------------- [6, 128, 128, 1] Actor( (layers): ModuleList( (0): Linear(in_features=6, out_features=128, bias=True) (1): Linear(in_features=128, out_features=128, bias=True) (2): Linear(in_features=128, out_features=3, bias=True) ) (relu): ReLU() (softmax): Softmax(dim=1) ) Critic( (layers): ModuleList( (0): Linear(in_features=6, out_features=128, bias=True) (1): Linear(in_features=128, out_features=128, bias=True) (2): Linear(in_features=128, out_features=1, bias=True) ) (tanh): Tanh() ) ``` ![image](https://hackmd.io/_uploads/S1m0lk6wge.png =70%x) ![1754226981652](https://hackmd.io/_uploads/r1eSrbyawxl.gif) ## Code **Github Code** : https://github.com/jason19990305/A2C.git ### Actor-Critic #### Actor ```python= import torch.nn as nn class Actor(nn.Module): def __init__(self, args, hidden_layers=[64, 64]): super(Actor, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # Insert input and output sizes into hidden_layers hidden_layers.insert(0, self.num_states) hidden_layers.append(self.num_actions) # Create fully connected layers fc_list = [] for i in range(len(hidden_layers) - 1): num_input = hidden_layers[i] num_output = hidden_layers[i + 1] layer = nn.Linear(num_input, num_output) fc_list.append(layer) # Convert list to ModuleList for proper registration self.layers = nn.ModuleList(fc_list) self.relu = nn.ReLU() self.softmax = nn.Softmax(dim=1) # Softmax for action probabilities def forward(self, x): # Pass input through all layers except the last, applying ReLU activation for i in range(len(self.layers) - 1): x = self.relu(self.layers[i](x)) # The final layer outputs action probabilities action_probability = self.softmax(self.layers[-1](x)) return action_probability ``` * `args` 適從主程式的 **Hyperparameter** 傳進來的,`num_state` 和 `num_actions` 為 **Environment** 要求的 **state**、**action** Input 數量(種類、維度),`num_actions` 為動作的種類,也就是 **Agent** 能夠選擇的動作種類。 * `hidden_layers` 為 **list**,主要用來定義 **Hidden layer** 的維度,也可以理解為每一層的 **Output shape**,**Input**、**Output** **layer** 則由 `num_states` 和 `num_actions` 來決定 #### Critic ```python= class Critic(nn.Module): def __init__(self, args,hidden_layers=[64,64]): super(Critic, self).__init__() self.num_states = args.num_states self.num_actions = args.num_actions # add in list hidden_layers.insert(0,self.num_states) hidden_layers.append(1) print(hidden_layers) # create layers layer_list = [] for i in range(len(hidden_layers)-1): input_num = hidden_layers[i] output_num = hidden_layers[i+1] layer = nn.Linear(input_num,output_num) layer_list.append(layer) # put in ModuleList self.layers = nn.ModuleList(layer_list) self.tanh = nn.Tanh() def forward(self,x): for i in range(len(self.layers)-1): x = self.tanh(self.layers[i](x)) # predicet value v_s = self.layers[-1](x) return v_s ``` **A2C** 的 **Critic** 是狀態價值函數(**state value function**),所以輸入只有 **state**,輸出就是單一的 **value**,所以可以看到最後一層維度固定為1,**Critic** 的目的是根據 **state** 來預測 **Expected Reward**,所以是 **Regression** 問題,最後一層不會有 **Activation Function** ### Replay Buffer ```python= class ReplayBuffer: def __init__(self, args): self.clear_batch() def clear_batch(self): self.s = [] self.a = [] self.r = [] self.s_ = [] self.done = [] self.count = 0 def store(self, s, a , r, s_, done): self.s.append(s) self.a.append(a) self.r.append(r) self.s_.append(s_) self.done.append(done) self.count += 1 def numpy_to_tensor(self): s = torch.tensor(np.array(self.s), dtype=torch.float) a = torch.tensor(np.array(self.a), dtype=torch.int64) r = torch.tensor(np.array(self.r), dtype=torch.float) s_ = torch.tensor(np.array(self.s_), dtype=torch.float) done = torch.tensor(np.array(self.done), dtype=torch.float) self.clear_batch() return s, a, r, s_, done ``` * `store()` : 儲存 state、action、reward、next_state、done * `numpy_to_tensor()` : 將儲存的資料轉換成 tensor 用於訓練神經網路,因為是 on-policy 的算法,資料不能重複使用,所以要使用 `clear_batch()` 將資料清空。 ### Initialize ```python= def __init__(self , args , env , hidden_layer_list=[64,64]): # Hyperparameter self.max_train_steps = args.max_train_steps self.evaluate_freq_steps = args.evaluate_freq_steps self.num_actions = args.num_actions self.num_states = args.num_states self.gamma = args.gamma self.batch_size = args.batch_size self.lr = args.lr # Variable self.episode_count = 0 self.total_steps = 0 self.evaluate_count = 0 # other self.env = env self.env_eval = copy.deepcopy(env) self.replay_buffer = ReplayBuffer(args) # The model interacts with the environment and gets updated continuously self.actor = Actor(args , hidden_layer_list.copy()) self.critic = Critic(args , hidden_layer_list.copy()) print(self.actor) print(self.critic) self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr, eps=1e-5) self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr, eps=1e-5) ``` * `max_train_steps` : **Training loop** 去 **Sample** 資料的最大 **Step** 次數 * `evaluate_freq_steps` : 評估 Agent 的頻率 * `gamma` : 計算 **TD-Error** 的 **Discount Factor** $\gamma$ * `batch_size` : 獲得多少資料後執行參數更新 * `lr ` : **Learning rate** --- * `actor` : 用來與環境互動的 **Neural Network** * `critic` : 用來預測 **Expected Reward** 和計算 **Advantage** 的 **Neural Network** * `optimizer_actor` : Adam 優化器,指定優化 Actor 的參數 * `optimizer_critic` : Adam 優化器,指定優化 Critic 的參數 ### Choose action ```python= def choose_action(self, state): state = torch.tensor(state, dtype=torch.float) with torch.no_grad(): s = torch.unsqueeze(state, dim=0) action_probability = self.actor(s).numpy().flatten() action = np.random.choice(self.num_actions, p=action_probability) return action ``` 根據 Actor 輸出對於每種動作的 **Probability**,然後透過 `np.random.choice` 根據機率來選擇動作 ### Evaluate action ```python= def evaluate_action(self, state): state = torch.tensor(state, dtype=torch.float) with torch.no_grad(): s = torch.unsqueeze(state, dim=0) action_probability = self.actor(s) action = torch.argmax(action_probability).item() return action ``` 與 `choose_action` 相似,但這次是選擇機率最大的動作來執行 ### Train ```python= def train(self): time_start = time.time() epoch_reward_list = [] epoch_count_list = [] epoch_count = 0 # Training loop while self.total_steps < self.max_train_steps: # reset environment s, info = self.env.reset() while True: a = self.choose_action(s) # interact with environment s_ , r , done, truncated, _ = self.env.step(a) done = done or truncated # stoare transition in replay buffer self.replay_buffer.store(s, a, [r], s_, [done]) # update state s = s_ if self.replay_buffer.count >= self.batch_size: self.update() epoch_count += 1 if self.total_steps % self.evaluate_freq_steps == 0: self.evaluate_count += 1 evaluate_reward = self.evaluate(self.env_eval) epoch_reward_list.append(evaluate_reward) epoch_count_list.append(epoch_count) time_end = time.time() h = int((time_end - time_start) // 3600) m = int(((time_end - time_start) % 3600) // 60) second = int((time_end - time_start) % 60) print("---------") print("Time : %02d:%02d:%02d"%(h,m,second)) print("Training epoch : %d\tStep : %d / %d"%(epoch_count,self.total_steps,self.max_train_steps)) print("Evaluate count : %d\tEvaluate reward : %0.2f"%(self.evaluate_count,evaluate_reward)) self.total_steps += 1 if done or truncated : break epoch_count += 1 # Plot the training curve plt.plot(epoch_count_list, epoch_reward_list) plt.xlabel("Epoch") plt.ylabel("Reward") plt.title("Training Curve") plt.show() ``` 迴圈的最外層就是 **Episode**,內層就是 **Step**,跳出的條件是 `total_steps < max_train_steps` , 內層就是一直與環就互動,並儲存資料到 **ReplayBuffer**,只要資料達到 `batch_size` 就會執行參數更新,**Actor-Critic** 會再 `self.update()` 被優化 ### Update ```python= def update(self): s, a, r, s_, done = self.replay_buffer.numpy_to_tensor() a = a.view(-1, 1) # Reshape action from (N) -> (N, 1) for gathering # get target value and advantage value = self.critic(s) with torch.no_grad(): next_value = self.critic(s_) target_value = r + self.gamma * next_value * (1 - done) advantage = target_value - value # baseline advantage # Update critic critic_loss = F.mse_loss(value, target_value) # Mean Squared Error loss self.optimizer_critic.zero_grad() critic_loss.backward() # Backpropagation self.optimizer_critic.step() # Update actor prob = self.actor(s).gather(dim=1, index=a) # Get action probability from the model log_prob = torch.log(prob + 1e-10) # Add small value to avoid log(0) actor_loss = (-advantage * log_prob).mean() # Calculate loss self.optimizer_actor.zero_grad() actor_loss.backward() # Backpropagation self.optimizer_actor.step() ``` 1. 從 ReplayBuffer 取出資料 2. 事先計算 $V^\pi(s_t)$ 和 $V^\pi(s_{t+1})$ 3. 計算 **TD-Error** $r+\gamma V^\pi(s_{t+1})$ 4. 計算 **Advantage** $r+\gamma V^\pi(s_{t+1})-V^\pi(s_t)$ 5. 用 **MSE** 更新 **Critic** 6. 用 **Advantage** 更新 **Actor** ## Conclusion 以上是 **Advantage Actor-Critic(A2C)** 演算法在離散動作空間下的訓練流程,**A2C** 讓 **Actor** 不需要等待整個 **Episode** 完成才能訓練,訓練用資料室穩定的,不像 **Policy Gradient** 每個 **Episode** 多長是不確定的,進而導致訓練不穩定,因為有 **Critic** 所以在估算 **Expected Reward** 更穩定。 **A2C** 是 **on-policy** 的算法,一旦更新 **Actor-Critic**,那資料就無法使用,這是他與 **off-policy** 算法之間的差距,如果希望進一步增加資料的使用率,可以看 **PPO** (**Proximal Policy Optimization**) 。