# Advantage Actor-Critic
[TOC]
分類 :
* `Model-Free`
* `Policy Based`
* `on Policy`
* `Discrete Action Space`
* `Discrete State Space`
## Introduction
後面的內容是基於 [Policy Gradient](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/Sy34FO3fel) 的延伸
**Policy Gradient** 的 **Loss Function** :
$Loss=-\frac{1}{N}\sum_{n=1}^N\sum_{t=1}^{T_n} R(\tau^n)\mathrm{log}p_\theta(a^n_t|s^n_t)$

**Actor-Critic** 是使用 **DQN** 的架構來預測一個狀態下每種 **Action** 的 **Expected Reward**,用來代替 **Total Reward** 的部分,這樣有一個優點就是不再需要整個 **Episode** 的 **reward** 來計算了,但我們這次要用的是 **Advantage Actor-Critic**

用上面這張圖的第 6 個
## Advantage Function
* $\sum_{t'=t}^Tr_{t'}$ : 從 $t'$ 開始到後面的所有 **reward** 加總,有點像 **TD-Error** 少了 **Discount Factor** $\gamma$'
* $Q^\pi(s_t,a_t)$ : 如果可以用 **TD-Error** 來計算 **Expected Reward**,那我們也可以用 DQN 的狀態動作價值函數(**state-action function**) 來代替,這種就是 **Actor-Critic**
* $\sum_{t'=t}^Tr_{t'}-b(s_t)$ : 讓 **TD-Error** 減去 $b(s_t)$ 就是 $s_t$ 的 **Expected Reward** 平均值,讓計算上可以跟之前的經歷做比較,看這次比較好或是比較差,也可以使得 **Expected Reward** 正負平衡。
* $V^\pi(s_t)$ : 這個是單純的狀態價值函數(**state value function**),可以看作是 $s_t$ 的情況下,每種動作的 **Expected Reward** 的平均,示意圖如下。

* $Q^\pi(s_t,a_t) - V^\pi(s_t)$ : 就是用兩個 **Neural Network** 達到計算 **Expected Reward** 和 **Baseline** 的效果,但缺點就是要多使用兩個 **Neural Network**
* $r_t+V^\pi(s_{t+1})-V^\pi(s_t)$ : 這個就是將 $Q^\pi(s_t,a_t)$ 簡化成 $r_t+V^\pi(s_{t+1})$,這樣會增加隨機性,但跟原本 **Policy Gradient** 計算 **Total Reward** 的方式相比,絕對是這個更穩定
可以寫成 $A(s_t,a_t)=Q^\pi(s_t,a_t)-V^\pi(s_t)\approx r_t+V^\pi(s_{t+1})-V^\pi(s_t)$
也就是 **A2C** 的 **Advantage Function**
## Update model
要 **Update Actor-Critic** 一樣先跟環境互動,然後將資料存在 **ReplayBuffer**,互動一定次數的 **Step** 後進行訓練,次數可自行設定,因為不需要完整的 **Episode** 資料了,所以一個 **Episode** 結束前 **Update** 也可以,這就是 **Actor-Critic** 的好處之一
**Update Actor** :
$Loss=-\frac{1}{N}\sum_{n=1}^N\sum_{t=1}^{T_n} A(s_t,a_t)\mathrm{log}p_\theta(a^n_t|s^n_t)$
$A(s_t,a_t)=r_t+\gamma V^\pi(s_{t+1})-V^\pi(s_t)$
**Update Critic** :
$Loss=MSE(\ V^\pi(s_t)\ ,\ r_t+\gamma V^\pi(s_{t+1})\ )$
使用 **Mean Square Error** 來計算誤差
## CartPole Example
```
---------------
evaluate_freq_steps = 5000.0
max_train_steps = 1000000.0
batch_size = 16
gamma = 0.99
lr = 0.0001
num_states = 4
num_actions = 2
---------------
[4, 128, 128, 1]
Actor(
(layers): ModuleList(
(0): Linear(in_features=4, out_features=128, bias=True)
(1): Linear(in_features=128, out_features=128, bias=True)
(2): Linear(in_features=128, out_features=2, bias=True)
)
(relu): ReLU()
(softmax): Softmax(dim=1)
)
Critic(
(layers): ModuleList(
(0): Linear(in_features=4, out_features=128, bias=True)
(1): Linear(in_features=128, out_features=128, bias=True)
(2): Linear(in_features=128, out_features=1, bias=True)
)
(tanh): Tanh()
)
```


## Acrobot
```python
evaluate_freq_steps = 5000.0
max_train_steps = 1000000.0
batch_size = 8
gamma = 0.99
lr = 0.0001
num_states = 6
num_actions = 3
---------------
[6, 128, 128, 1]
Actor(
(layers): ModuleList(
(0): Linear(in_features=6, out_features=128, bias=True)
(1): Linear(in_features=128, out_features=128, bias=True)
(2): Linear(in_features=128, out_features=3, bias=True)
)
(relu): ReLU()
(softmax): Softmax(dim=1)
)
Critic(
(layers): ModuleList(
(0): Linear(in_features=6, out_features=128, bias=True)
(1): Linear(in_features=128, out_features=128, bias=True)
(2): Linear(in_features=128, out_features=1, bias=True)
)
(tanh): Tanh()
)
```


## Code
**Github Code** : https://github.com/jason19990305/A2C.git
### Actor-Critic
#### Actor
```python=
import torch.nn as nn
class Actor(nn.Module):
def __init__(self, args, hidden_layers=[64, 64]):
super(Actor, self).__init__()
self.num_states = args.num_states
self.num_actions = args.num_actions
# Insert input and output sizes into hidden_layers
hidden_layers.insert(0, self.num_states)
hidden_layers.append(self.num_actions)
# Create fully connected layers
fc_list = []
for i in range(len(hidden_layers) - 1):
num_input = hidden_layers[i]
num_output = hidden_layers[i + 1]
layer = nn.Linear(num_input, num_output)
fc_list.append(layer)
# Convert list to ModuleList for proper registration
self.layers = nn.ModuleList(fc_list)
self.relu = nn.ReLU()
self.softmax = nn.Softmax(dim=1) # Softmax for action probabilities
def forward(self, x):
# Pass input through all layers except the last, applying ReLU activation
for i in range(len(self.layers) - 1):
x = self.relu(self.layers[i](x))
# The final layer outputs action probabilities
action_probability = self.softmax(self.layers[-1](x))
return action_probability
```
* `args` 適從主程式的 **Hyperparameter** 傳進來的,`num_state` 和 `num_actions` 為 **Environment** 要求的 **state**、**action** Input 數量(種類、維度),`num_actions` 為動作的種類,也就是 **Agent** 能夠選擇的動作種類。
* `hidden_layers` 為 **list**,主要用來定義 **Hidden layer** 的維度,也可以理解為每一層的 **Output shape**,**Input**、**Output** **layer** 則由 `num_states` 和 `num_actions` 來決定
#### Critic
```python=
class Critic(nn.Module):
def __init__(self, args,hidden_layers=[64,64]):
super(Critic, self).__init__()
self.num_states = args.num_states
self.num_actions = args.num_actions
# add in list
hidden_layers.insert(0,self.num_states)
hidden_layers.append(1)
print(hidden_layers)
# create layers
layer_list = []
for i in range(len(hidden_layers)-1):
input_num = hidden_layers[i]
output_num = hidden_layers[i+1]
layer = nn.Linear(input_num,output_num)
layer_list.append(layer)
# put in ModuleList
self.layers = nn.ModuleList(layer_list)
self.tanh = nn.Tanh()
def forward(self,x):
for i in range(len(self.layers)-1):
x = self.tanh(self.layers[i](x))
# predicet value
v_s = self.layers[-1](x)
return v_s
```
**A2C** 的 **Critic** 是狀態價值函數(**state value function**),所以輸入只有 **state**,輸出就是單一的 **value**,所以可以看到最後一層維度固定為1,**Critic** 的目的是根據 **state** 來預測 **Expected Reward**,所以是 **Regression** 問題,最後一層不會有 **Activation Function**
### Replay Buffer
```python=
class ReplayBuffer:
def __init__(self, args):
self.clear_batch()
def clear_batch(self):
self.s = []
self.a = []
self.r = []
self.s_ = []
self.done = []
self.count = 0
def store(self, s, a , r, s_, done):
self.s.append(s)
self.a.append(a)
self.r.append(r)
self.s_.append(s_)
self.done.append(done)
self.count += 1
def numpy_to_tensor(self):
s = torch.tensor(np.array(self.s), dtype=torch.float)
a = torch.tensor(np.array(self.a), dtype=torch.int64)
r = torch.tensor(np.array(self.r), dtype=torch.float)
s_ = torch.tensor(np.array(self.s_), dtype=torch.float)
done = torch.tensor(np.array(self.done), dtype=torch.float)
self.clear_batch()
return s, a, r, s_, done
```
* `store()` : 儲存 state、action、reward、next_state、done
* `numpy_to_tensor()` : 將儲存的資料轉換成 tensor 用於訓練神經網路,因為是 on-policy 的算法,資料不能重複使用,所以要使用 `clear_batch()` 將資料清空。
### Initialize
```python=
def __init__(self , args , env , hidden_layer_list=[64,64]):
# Hyperparameter
self.max_train_steps = args.max_train_steps
self.evaluate_freq_steps = args.evaluate_freq_steps
self.num_actions = args.num_actions
self.num_states = args.num_states
self.gamma = args.gamma
self.batch_size = args.batch_size
self.lr = args.lr
# Variable
self.episode_count = 0
self.total_steps = 0
self.evaluate_count = 0
# other
self.env = env
self.env_eval = copy.deepcopy(env)
self.replay_buffer = ReplayBuffer(args)
# The model interacts with the environment and gets updated continuously
self.actor = Actor(args , hidden_layer_list.copy())
self.critic = Critic(args , hidden_layer_list.copy())
print(self.actor)
print(self.critic)
self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr, eps=1e-5)
self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr, eps=1e-5)
```
* `max_train_steps` : **Training loop** 去 **Sample** 資料的最大 **Step** 次數
* `evaluate_freq_steps` : 評估 Agent 的頻率
* `gamma` : 計算 **TD-Error** 的 **Discount Factor** $\gamma$
* `batch_size` : 獲得多少資料後執行參數更新
* `lr ` : **Learning rate**
---
* `actor` : 用來與環境互動的 **Neural Network**
* `critic` : 用來預測 **Expected Reward** 和計算 **Advantage** 的 **Neural Network**
* `optimizer_actor` : Adam 優化器,指定優化 Actor 的參數
* `optimizer_critic` : Adam 優化器,指定優化 Critic 的參數
### Choose action
```python=
def choose_action(self, state):
state = torch.tensor(state, dtype=torch.float)
with torch.no_grad():
s = torch.unsqueeze(state, dim=0)
action_probability = self.actor(s).numpy().flatten()
action = np.random.choice(self.num_actions, p=action_probability)
return action
```
根據 Actor 輸出對於每種動作的 **Probability**,然後透過 `np.random.choice` 根據機率來選擇動作
### Evaluate action
```python=
def evaluate_action(self, state):
state = torch.tensor(state, dtype=torch.float)
with torch.no_grad():
s = torch.unsqueeze(state, dim=0)
action_probability = self.actor(s)
action = torch.argmax(action_probability).item()
return action
```
與 `choose_action` 相似,但這次是選擇機率最大的動作來執行
### Train
```python=
def train(self):
time_start = time.time()
epoch_reward_list = []
epoch_count_list = []
epoch_count = 0
# Training loop
while self.total_steps < self.max_train_steps:
# reset environment
s, info = self.env.reset()
while True:
a = self.choose_action(s)
# interact with environment
s_ , r , done, truncated, _ = self.env.step(a)
done = done or truncated
# stoare transition in replay buffer
self.replay_buffer.store(s, a, [r], s_, [done])
# update state
s = s_
if self.replay_buffer.count >= self.batch_size:
self.update()
epoch_count += 1
if self.total_steps % self.evaluate_freq_steps == 0:
self.evaluate_count += 1
evaluate_reward = self.evaluate(self.env_eval)
epoch_reward_list.append(evaluate_reward)
epoch_count_list.append(epoch_count)
time_end = time.time()
h = int((time_end - time_start) // 3600)
m = int(((time_end - time_start) % 3600) // 60)
second = int((time_end - time_start) % 60)
print("---------")
print("Time : %02d:%02d:%02d"%(h,m,second))
print("Training epoch : %d\tStep : %d / %d"%(epoch_count,self.total_steps,self.max_train_steps))
print("Evaluate count : %d\tEvaluate reward : %0.2f"%(self.evaluate_count,evaluate_reward))
self.total_steps += 1
if done or truncated :
break
epoch_count += 1
# Plot the training curve
plt.plot(epoch_count_list, epoch_reward_list)
plt.xlabel("Epoch")
plt.ylabel("Reward")
plt.title("Training Curve")
plt.show()
```
迴圈的最外層就是 **Episode**,內層就是 **Step**,跳出的條件是 `total_steps < max_train_steps` , 內層就是一直與環就互動,並儲存資料到 **ReplayBuffer**,只要資料達到 `batch_size` 就會執行參數更新,**Actor-Critic** 會再 `self.update()` 被優化
### Update
```python=
def update(self):
s, a, r, s_, done = self.replay_buffer.numpy_to_tensor()
a = a.view(-1, 1) # Reshape action from (N) -> (N, 1) for gathering
# get target value and advantage
value = self.critic(s)
with torch.no_grad():
next_value = self.critic(s_)
target_value = r + self.gamma * next_value * (1 - done)
advantage = target_value - value # baseline advantage
# Update critic
critic_loss = F.mse_loss(value, target_value) # Mean Squared Error loss
self.optimizer_critic.zero_grad()
critic_loss.backward() # Backpropagation
self.optimizer_critic.step()
# Update actor
prob = self.actor(s).gather(dim=1, index=a) # Get action probability from the model
log_prob = torch.log(prob + 1e-10) # Add small value to avoid log(0)
actor_loss = (-advantage * log_prob).mean() # Calculate loss
self.optimizer_actor.zero_grad()
actor_loss.backward() # Backpropagation
self.optimizer_actor.step()
```
1. 從 ReplayBuffer 取出資料
2. 事先計算 $V^\pi(s_t)$ 和 $V^\pi(s_{t+1})$
3. 計算 **TD-Error** $r+\gamma V^\pi(s_{t+1})$
4. 計算 **Advantage** $r+\gamma V^\pi(s_{t+1})-V^\pi(s_t)$
5. 用 **MSE** 更新 **Critic**
6. 用 **Advantage** 更新 **Actor**
## Conclusion
以上是 **Advantage Actor-Critic(A2C)** 演算法在離散動作空間下的訓練流程,**A2C** 讓 **Actor** 不需要等待整個 **Episode** 完成才能訓練,訓練用資料室穩定的,不像 **Policy Gradient** 每個 **Episode** 多長是不確定的,進而導致訓練不穩定,因為有 **Critic** 所以在估算 **Expected Reward** 更穩定。
**A2C** 是 **on-policy** 的算法,一旦更新 **Actor-Critic**,那資料就無法使用,這是他與 **off-policy** 算法之間的差距,如果希望進一步增加資料的使用率,可以看 **PPO** (**Proximal Policy Optimization**) 。