# Diffusion Policy
[TOC]
**Paper** : [Diffusion Policy: Visuomotor Policy Learning via Action Diffusion](https://arxiv.org/abs/2303.04137v5)
網頁 : https://diffusion-policy.cs.columbia.edu/
之前 **Transformer** 相關的文章 :
* [Self-Attention](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/rJRwYNRX-g)
* [Transformer Decoder](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/rJvmAClSbl)
**Diffusion Model** 相關的文章 :
* [Denoising Diffusion Probabilistic Model](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/HkcbC12zWe)
* [Latent Diffusion Transformer](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/B1vYJqVu-g)
## Introduction
由簡入深,從實作的層面與視覺化來理解 **Diffusion Policy**,**Diffusion Policy** 主要是從圖片的生成,變成 **Sequence Action** 的生成,因為 **Sequence Action** 有時間資訊(如$[a_{t-1},a_t,a_{t+1}]$),所以 **Diffusion Process** 的過 **Timestep** 就改用 $k$ 來描述
## Diffusion Policy vs. 傳統 Behavior Cloning
作者有清楚的針對這個差異舉例,假如想讓 **AI** 模仿開車,並且路中間有一棵樹,**Dataset** 內有兩條路徑,一個向左一個向右,為了擬和這兩條路徑,讓 **Predict** 出的路徑誤差做最低,他會選擇直走,最後就會撞到樹。
1. **Vanilla BC (MSE)** : 他預測的是一個確定的數值,也就是一個 **State** 只能對應一個 **Action**,這個數值通常是 **Dataset** 內平均誤差最低的,有可能會選擇兩種不同 **Action** 的中間值
2. **GMM-BC** : 我們知道 **Gaussian Distribution** 是可以組合的,變成一個多峰形式的 **Gaussian Mixture Model**,因為是多個分布所組成,難以擴展到高維動作空間

3. **IBC (Implicit Behavior Cloning)** : 使用能量模型 (**EBM**),效果好但訓練極度不穩,**Inference** 速度很慢

其中黑色線是期望的軌跡,藍色是低能量,白色是高能量,可以看到能很好的吻合期望軌跡
而 **Diffusion Policy** 則可以完美達成多峰的需求,速度也很快
## DDIM
關於 **Diffusion Model** 的方法最一開始是使用 **DDPM(Denoising Diffusion Probabilitic Models)**,訓練一個 **Noise Predictor**,將充滿雜訊的圖片,分成 **1000** 步來慢慢除去雜訊,但現在的 **Diffusion Process** 會改用 **DDIM**,可以將步驟縮減,但會從馬可夫鏈變成非馬可夫鏈,**Inference** 的過程不再隨機,是可預期的。
**Diffusion Policy** 則不是用來去除圖片的雜訊,而是 **Sequence Action**,
## Forward Process
添加雜訊的方式 :
$$
x_k=\sqrt{\bar{\alpha}_k} x_{0} + \sqrt{1-\bar\alpha_k}\epsilon
$$
1. $\alpha_k=1-\beta_k$
2. $\bar\alpha_k=\prod^k_{i=1}\alpha_i$
3. $\epsilon\sim\mathcal{N}(0,I)$
4. $x_0$ : 原圖,無雜訊
5. $x_k$ : $k$ 時刻後的雜訊圖
上面的是 **Reparameterization** 的表示方式,為 **Implement** 時會用到的方法,但他也可以寫成 **Distribution** 的形式 :
$$
q(x_k|x_0)=\mathcal{N}(x_k;\sqrt{\bar\alpha_k}x_0,(1-\bar\alpha_k)I)
$$
## Reverse Process
這個步驟只會在 **Inference** 時用到,**DDIM** 的 **Reverse Process** 如下 :
$$
x_{t-1}=\sqrt{\bar\alpha_{t-1}}\cdot(\frac{x_t-\sqrt{1-\bar\alpha_t}\epsilon_\theta(x_t,t)}{\sqrt{\bar\alpha}_t})+\sqrt{1-\bar\alpha_{t-1}}\cdot\epsilon_\theta(x_t,t)
$$
其中 $t-1$ 和 $t$ 是可以允許跳點的,如 `12 -> 8`,重複這個步驟,直到得到原圖
## Architecture
1. 準備 **Input**
* **Observation Conditioning** : 過去的 **State Sequence** $[s_{t-1},s_t]$(假設長度為2)
* **Noised Action Sequence** : $x_0$ 經過 **Diffusion Forward Process** 到 $x_k$ 的 **Action Sequence**
* **Diffusion Step** : 提供現在是 **Forward Process** 的第幾步 $k$
2. 對齊維度
* 利用 **Linear Layer** 將 $s_{t-1},s_t$ 從 `(2,state_dim)` 變成 `(2,embed_dim)`
* 利用 **Linear Layer** 將 $a_1...a_{10}$ 從 `(10,action_dim)` 變成 `(10,embed_dim)`
3. 加入位置資訊
* 處理 **Diffusion Step** ($k$) : 讓整數 $k$ 通過 **Sinusoidal Embedding** + **Linear Layer**,將 **Diffusion Step** 的位置資訊變成 **Vector** ,**Shape** 為 `(1,embed_dim)`
* **Sequence Position** : 我們的 **Noised Action Sequence** 和 **Observation Conditioning** 也是需要位置資訊的,通常使用 `nn.Embedding(13, embed_dim)` 來實現,它是可學習的 **Look up Table**
* 將 **Action Tokens** 和 **Observation Tokens** 都與 **Position Embedding** 相加
* 注意這邊跟 **Diffusion Model** 不一樣,不會加 **Sinusoidal Embedding** 的輸出,而是用拼接的
4. **Sequence** 拼接與 **Transformer**
* 將前面準備好的 **Tokens** 串接成 : `[Sinusoidal Embedding,Observation Conditioning,Action Sequence]`,以前面舉例的長度,這邊的 **Shape** 為 `(sequence length , embed_size)=(13, embed_size)`
* >也可以用 **Cross Attention** 將多個 **Input** 組合
* 接著送入 **Transformer Block** 計算,要注意的是這邊不會使用 **Causal Mask**,不需要刻意遮擋未來資訊,因為我們是在預測雜訊,不是在做文字接龍
5. 輸出擷取
* **Transformer** 的輸出最後依然是 `(13,embed_size)`,但我們只要最後面 10 個 **Token**
* 將最後 10 個 **Tokens** 通過 **Final Linear Layer**,將 `embed_size` 降維回 `action_dim`
6. 計算 **Loss**
* 拿這 **10** 個 **Predict** 出的 **Noise** 與當初加進去 **Noise** 計算 **MSE Loss**
## Neural Network
其中 **MLP** 的部分會使用 **Zero initialize**,這個方法在 **Diffusion Model** 很常使用,**MLP** 內部為 **SwiGLU** 架構,在 **Concatenate** 的時候順序很重要, **Noised Action Sequence** 請一定要放在最後面

## Trajectory Example
**Dataset** 產生兩個方向的 8 字型軌跡,一個順向,一個逆向,顏色越深代表越後面的 **Action**


在初始點(**Observation**) 加一點雜訊,就有機會看到正向和反向,這也展現了 **Diffsusion Policy** 的多峰性(**Multimodality**),如果有兩種路徑可以達到終點,他兩種都能擬和

### Dataset
因為 **Diffusion Model** 對 **Input** 資料的 **Distribution** 是很敏感的,所以我建立一個 **NumpyNormalizer** 的 **class**,要記住 **Training** 和 **Inference** 都要用 :
```python!
class NumpyNormalizer:
"""
Standard Min-Max Normalizer implemented in NumPy.
"""
def __init__(self, data: np.ndarray):
# data: [N, dim]
self.min = np.min(data, axis=0)
self.max = np.max(data, axis=0)
self.range = self.max - self.min
self.range[self.range == 0] = 1e-5
def normalize(self, x: np.ndarray) -> np.ndarray:
return 2.0 * (x - self.min) / self.range - 1.0
def unnormalize(self, x_norm: np.ndarray) -> np.ndarray:
return (x_norm + 1.0) / 2.0 * self.range + self.min
```
因為用普通的 $\mu$、$\sigma$ 來 **Normalize** 的話總會有數值超出 $[-1,1]$,所以 **Diffusion Policy** 通常當採用 **Min-Max Normalization** :
$$
x_{norm}=2\cdot\frac{x-x_{min}}{x_{mzx}-x_{min}}-1
$$
```python!
# ---------------------------------
# Trajectory Dataset Class
# ---------------------------------
from utils.normalization import NumpyNormalizer
class TrajectoryDataset(Dataset):
def __init__(self, pred_horizon, obs_horizon=8):
self.pred_horizon = pred_horizon
self.obs_horizon = obs_horizon
# 1. define dimension of action and state
self.action_dim = 2
self.state_dim = 2 # Here, state is just the (x, y) position
# 2. generate the two trajectories
self.traj_fwd_np = generate_trajectory_forward(num_steps=100) # shape: (100, 2)
self.traj_rev_np = generate_trajectory_reverse(num_steps=100) # shape: (100, 2)
# Initialize normalizer on the combined dataset to ensure scale is equal
combined_np = np.concatenate([self.traj_fwd_np, self.traj_rev_np], axis=0)
self.normalizer = NumpyNormalizer(combined_np)
# apply normalization
self.traj_fwd_np = self.normalizer.normalize(self.traj_fwd_np)
self.traj_rev_np = self.normalizer.normalize(self.traj_rev_np)
# convert to torch tensors
self.traj_fwd = torch.from_numpy(self.traj_fwd_np).float()
self.traj_rev = torch.from_numpy(self.traj_rev_np).float()
self.step_num = 100 # Steps per trajectory
# 3. Calculate dataset length (100 steps * 2 modalities)
self.length = self.step_num * 2
def __len__(self):
return self.length
def __getitem__(self, idx):
# is forward trajectory or reverse trajectory
is_forward = idx < self.step_num
t = idx % self.step_num
# select forward or reverse trajectory
trajectory = self.traj_fwd if is_forward else self.traj_rev
# 1. Observation sequence: length 'obs_horizon' ending at 't'
obs_indices = torch.arange(t - self.obs_horizon + 1, t + 1)
# Pad start of the episode by repeating the first frame
obs_indices = torch.clamp(obs_indices, min=0, max=self.step_num - 1)
obs_seq = trajectory[obs_indices] # shape: (obs_horizon, state_dim)
# 2. Action sequence: length 'pred_horizon' starting from 't + 1'
action_indices = torch.arange(t + 1, t + 1 + self.pred_horizon)
# Pad end of the episode by repeating the last frame
action_indices = torch.clamp(action_indices, min=0, max=self.step_num - 1)
action_seq = trajectory[action_indices] # shape: (pred_horizon, action_dim)
return {
"obs": obs_seq,
"action": action_seq
}
```
這個 **Class** 繼承了 **Pytorch** 的 **Dataset**,初始化包含 :
* 定義 **action、state** 的 **Shape**,因為是 **2D Trajectory**,**State** 也是過去的 **Trajectory**,所以都是 **2**
* 產生順向與逆向的兩種 **Trajectory**
* 對 **Trajectory Normalize**
這邊有兩個關鍵的參數 :
1. `obs_horizon` : 代表 **Observation** 的 **Window** 大小,也就是要給 **Noise Predictor** 的 **Observation steps**
2. `pred_horizon` : 代表要預測的 **Window** 大小,也就是 **Noise Predictor** 預測的 **Noise**,這個 **Noise** 要用來對 **Noised Trajectory** 去雜訊
---
* `self.length = self.step_num * 2` : 我們定義完整的 **Trajectory** 為 **100** 個 **step**,正反兩個 **Trajectory** 就為 **200**,並 **overwrite** `__len__` **return** **200**
* `def __getitem__(self, idx)` : **overwrite** **Dataset** 用來取一個 **Batch** 的 **Function**,`idx` 會隨機取 **0~199** 的整數
* `is_forward = idx < self.step_num` : 我們用這個整數是否超過 **100** 來判斷要給他正或反的 **Trajectory**
* `t` : 代表是 **Trajectory** 的第幾個 **Step**
* `obs_indices` : 利用 `torch.arange` 取得 $t-H_o+1\sim t$ 的序列整數,如我們 `obs_horizon`$H_o$ 設為 **8**,且 $t$ 為 **10**,那就是 **3~10**,但是因為有可能為負數,我們的處理方式就是將負數設定為 **0**,藉由 `torch.clamp` 裁切就 **Ok** 了
* `obs_seq` : 前面的為 **index** 序列,這邊就是從 **Trajectory** 中取得一段資料作為這次的 **Batch**
* `action_seq` : 也是一樣的方式,只是 **Range** 變成 $t+1\sim t+H_p$,也就是未來的軌跡
---
### Noise Predictor

```python!
# ---------------------------------
# SwiGLU Activation Class
# ---------------------------------
class SwiGLU(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super(SwiGLU, self).__init__()
self.w1 = nn.Linear(in_features, hidden_features)
self.w2 = nn.Linear(in_features, hidden_features)
self.w_out = nn.Linear(hidden_features, out_features)
nn.init.zeros_(self.w_out.weight)
nn.init.zeros_(self.w_out.bias)
def forward(self, x):
# 1. create gate
gate = F.silu(self.w1(x)) # shape: (batch_size, hidden_features)
# 2. apply gate to second linear transformation
x = self.w2(x) * gate
# 3. project to output features
x = self.w_out(x)
return x
```
**Transformer Block** 內的 **MLP** 我們採用 **SwiGLU**,他引入了 **Gate** 的機制,`w1` 的輸出作為 **Gate**,與數值 `w2` 相乘後輸出
$$ \text{SwiGLU}(x) = \left( \text{SiLU}(x W_1) \otimes (x W_2) \right) W_{out} $$
$$ \text{SiLU}(z) = z \cdot \sigma(z) = \frac{z}{1 + e^{-z}} $$
並且採用了 **Diffusion Model** 常用的 **Zero-Initialization**,在預測雜訊的這個任務中特別適合
---

```python!
# ---------------------------------
# Time Embedding Class for Diffusion Step Information
# ---------------------------------
class TimestepEmbedder(nn.Module):
"""
Standard sinusoidal timestep embedding followed by an MLP.
"""
def __init__(self, freq_dim: int, embed_dim=256):
super().__init__()
self.freq_dim = freq_dim
self.mlp = nn.Sequential(
nn.Linear(freq_dim, embed_dim),
nn.SiLU(),
nn.Linear(embed_dim, embed_dim)
)
self.embed_dim = embed_dim
@staticmethod
def sinusoidal(t: torch.Tensor, dim: int) -> torch.Tensor:
"""
Generates sinusoidal embeddings for the given timesteps.
This is a common technique in diffusion models to encode the timestep information.
"""
half = dim // 2
freqs = torch.exp(
-math.log(10000) * torch.arange(half, dtype=torch.float32, device=t.device) / half
)
args = t[:, None].float() * freqs[None] # (B, half)
return torch.cat([torch.cos(args), torch.sin(args)], dim=-1) # (B, dim)
def forward(self, t: torch.Tensor) -> torch.Tensor:
x = self.sinusoidal(t, self.freq_dim)
return self.mlp(x)
```
這個 **Class** 主要是用來做位置編碼,要注意這邊代表的是 **Diffusion Process** 的 **Step** 資訊,**Input** 為 $k$,最重要的核心是使用 **Sinusoidal Embedding**,是一個純數學的技巧,並搭配一個 **MLP** 然後再輸出,這個 **MLP** 用來讓 **Transformer** 自己學習解讀時間資訊

並且使用數值穩定的寫法 :
$$
\omega_i = 10000^{-\frac{2i}{d}} = \exp\left( \ln(10000^{-\frac{2i}{d}}) \right) = \exp\left( -\ln(10000) \cdot \frac{i}{d/2} \right)
$$
* `freq_dim` : **Sinusoidal Embedding** 編碼的 **Shape**,也就是 **Vector** 的長度,不管設多少最後都會用 **MLP** 去對齊其他 **Embedding Vector**(如 **Observation**、**Noised Action**)
*
---

```python!
# ---------------------------------
# Transformer Block Class
# ---------------------------------
class TransformerBlock(nn.Module):
def __init__(self, embed_dim=256, num_heads=8, mlp_ratio=4.0):
super().__init__()
self.norm1 = nn.LayerNorm(embed_dim)
self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
self.norm2 = nn.LayerNorm(embed_dim)
hidden_dim = int(embed_dim * mlp_ratio)
self.mlp = SwiGLU(embed_dim, hidden_dim, embed_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# 1. residual connection for attention
x_res = x
# 2. layer norm
x = self.norm1(x)
# 3. self-attention
x, _ = self.attn(x, x, x)
# 4. add residual
x = x + x_res
# 5. residual connection for MLP
x_res = x
# 6. layer norm
x = self.norm2(x)
# 7. MLP
x = self.mlp(x)
# 8. add residual
x = x + x_res
return x
```
* `embed_dim` : 定義每一個 **Token** 的特徵維度
* `num_heads` : 定義 **Multi-Head Attention** 的 **head** 數量,用來尋找 **Token** 之間的多種不同的關聯性
* `mlp_ratio` : 確定 **MLP** 中間層要放大幾倍
* `LayerNorm` : **Transformer** 標配的,對 **Token Embedding** 特徵進行標準化
* `MultiheadAttention` : 當 `batch_first=True` 時就比較符合直覺,他的 **Input** 資料 **Shape** 應該要是 `(sequence, batch, embed_dim)`
$$
\text{Attention}(Q, K, V) = \text{Softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V
$$
**Forward** :
1. 存下 **Residual**,經過 **LayerNorm**
2. 通過 **attention**
3. **Residual Connection**
4. 存下 **Residual**,經過 **LayerNorm**
5. 通過 MLP,這邊用的是前面說過的 **SwiGLU**
6. **Residual Connection**
---
```python!
# ---------------------------------
# Diffusion Policy Class
# ---------------------------------
class DiffusionPolicy(nn.Module):
def __init__(self, action_dim=2, state_dim=0, embed_dim=256, num_heads=8, mlp_ratio=4.0, num_blocks=4):
super().__init__()
self.action_dim = action_dim
self.state_dim = state_dim
self.embed_dim = embed_dim
# 1. Observation embedding
self.obs_embedder = nn.Linear(state_dim, embed_dim)
# 2. Time embedding for diffusion step information
self.time_embedder = TimestepEmbedder(freq_dim=embed_dim, embed_dim=embed_dim)
# 3. Noised action embedding
self.action_embedder = nn.Linear(action_dim, embed_dim)
# 4. Transformer blocks for processing the combined embeddings
self.blocks = nn.ModuleList([
TransformerBlock(embed_dim, num_heads, mlp_ratio) for _ in range(num_blocks)
])
# 5. Final layer projection to action dimension
self.final_proj = nn.Linear(embed_dim, action_dim)
def forward(self, observations: torch.Tensor, diffusion_steps: torch.Tensor, noised_actions: torch.Tensor) -> torch.Tensor:
obs_emb = self.obs_embedder(observations) # (B, embed_dim)
# 2. Time embedding for diffusion steps
time_emb = self.time_embedder(diffusion_steps).reshape(-1,1, self.embed_dim) # (B, 1, embed_dim)
# 3. Embed noised actions
action_emb = self.action_embedder(noised_actions) # (B, embed_dim)
# 4. concatenate embeddings (if obs exists) or just use action and time embeddings
x = torch.cat((obs_emb, time_emb, action_emb), dim=1)
# 5. Add position encoding for transformer
seq_len = x.shape[1]
positions = torch.arange(seq_len, device=x.device)
pos_emb = TimestepEmbedder.sinusoidal(positions, self.embed_dim)
x = x + pos_emb
# 6. Pass through transformer blocks
for block in self.blocks:
x = block(x)
# 7. Final projection to action dimension
action_seq_len = noised_actions.shape[1]
action_out = x[:,-action_seq_len:,:]
output = self.final_proj(action_out)
return output # (Batch, action_seq_len , action_dim)
```
這邊是將所有組件結合,其中 **forward** **Input** 說明如下 :
* `observation` : `(Batch, obs_horizon, state_dim)`
* `diffusion_steps` ($k$) : `(Batch,)`,代表 **Diffusion Process** 的加噪是第幾步
* `noised_actions` ($x_k$) : `(Batch, pred_horizon, action_dim)`
步驟 :
1. 用 **Linear Layer** 將 **Observation**、**Action** 投影成 **Embedding** 維度
2. 輸入 $k$ 獲得 `time_emb` ,**Shape** 為 `(Batch,embed_dim)`,Reshape 成 `(Batch,1,embed_dim)` 變成一個 **Sequence** 長度為 1 的 **Token**
3. **Concatenation** 三組 **Token** 成一整組 **Token**,按照 `[obs,time,action]` 的順序
4. Transformer 對於我們給的 Token 是沒有順序性的,我們隨意調換順序的或對她來說都一樣,所以我們重複使用 `TimestepEmbedder.sinusoidal` 產生位置資訊,然後加在 **Token** 上
5. 通過 **N** 個 **Transformer Block** 輸出會保持 Input 時的 **Shape**
6. 我們前面特意將 **Action** 放在 **Sequence Token** 的尾巴,我們要預測的 **Noise** 會從最後 **16**(`action_seq_len`) 個 **Token** 切出來
7. 取出 Token 後我們還需要最後一個 Linear Layer 將 `(Batch,action_seq_len,embed_dim)` 投影並壓縮回 `(Batch,action_seq_len,action_dim)`
---
```python!
# ---------------------------------
# EMA (Exponential Moving Average) Class
# ---------------------------------
class EMA:
def __init__(self, model: nn.Module, beta: float = 0.995):
self.beta = beta
self.step = 0
# Create a copy of the model for EMA
self.ema_model = copy.deepcopy(model)
# Freeze the EMA model parameters
for param in self.ema_model.parameters():
param.requires_grad_(False)
def update(self, model: nn.Module):
"""
Update the EMA model parameters using the current model parameters.
This should be called after each training step.
"""
self.step += 1
for current_param , ema_param in zip(model.parameters(), self.ema_model.parameters()):
# Update EMA parameter
ema_param.data.mul_(self.beta)
ema_param.data.add_(current_param.data * (1.0 - self.beta))
def copy_to(self, model: nn.Module):
model.load_state_dict(self.ema_model.state_dict())
def save_pretrained(self, path: str):
torch.save(self.ema_model.state_dict(), path)
```
在 **Diffusion Model** 中很常使用 **Exponential Moving Average**,有助於減少生成圖片的割裂感,**Diffusion Policy** 使用的話就可以減少軌跡的抖動(**Jittery**),他會放在 **Training** 階段的每個 **Epoch** 後,他會將 **Model** 的 **Weight** 拿出來進行指數移動平均 :
$$
\theta_{EMA}^{(t)} = \beta \cdot \theta_{EMA}^{(t-1)} + (1 - \beta) \cdot \theta_{current}^{(t)}
$$
這個流程不影響 **Training**,主要是用在 **Inference** 階段
### Temporal Ensembler
在 **Receding Horizon Control** 中,模型每一步都會預測一個長度為 `pred_horizon` 的動作序列。然而,前一個時間步預測的未來軌跡,與當前時間步預測的未來軌跡在重疊部分(**Overlapping**)可能會有細微的差異。
**Temporal Ensembling** 的目的就是將這些重疊的預測值進行**加權平均**,從而獲得更平滑、更連續的控制指令,避免機器人關節出現劇烈抖動。
```python!
class NumpyTemporalEnsembler:
"""
Temporal Ensembling for action smoothing across overlapping prediction horizons.
"""
def __init__(self, pred_horizon: int, action_dim: int):
self.pred_horizon = pred_horizon
self.action_dim = action_dim
self.action_sum = np.zeros((pred_horizon, action_dim))
self.action_count = np.zeros((pred_horizon, 1))
def update(self, predicted_action_seq: np.ndarray):
"""Add a new predicted sequence to the ensemble buffer."""
length = min(len(predicted_action_seq), self.pred_horizon)
self.action_sum[:length] += predicted_action_seq[:length]
self.action_count[:length] += 1
def get_and_shift_actions(self, n_actions: int) -> np.ndarray:
"""Compute average actions and shift the buffer window forward."""
# Prevent division by zero
counts = np.clip(self.action_count[:n_actions], a_min=1, a_max=None)
avg_actions = self.action_sum[:n_actions] / counts
# Update and shift the buffer
new_sum = np.zeros_like(self.action_sum)
new_count = np.zeros_like(self.action_count)
if self.pred_horizon > n_actions:
new_sum[:-n_actions] = self.action_sum[n_actions:]
new_count[:-n_actions] = self.action_count[n_actions:]
self.action_sum = new_sum
self.action_count = new_count
return avg_actions
```
#### 運作範例 (Example)
假設 `pred_horizon = 4`,每步執行 `n_actions = 1`:
1. **Step 0**: 模型預測 $[a_0, a_1, a_2, a_3]$。
- `action_sum` 變為 $[a_0, a_1, a_2, a_3]$,`counts` 變為 $[1, 1, 1, 1]$。
- 執行 $a_0$,**Window** 位移後剩下 $[a_1, a_2, a_3, 0]$。
2. **Step 1**: 模型預測新的序列 $[b_1, b_2, b_3, b_4]$。
- 將新序列加入後,`action_sum` 變為 $[a_1+b_1, a_2+b_2, a_3+b_3, b_4]$。
- `counts` 變為 $[2, 2, 2, 1]$。
- 下一步執行的動作將會是 $(a_1+b_1)/2$,即兩次預測的平均值。
透過這種方式,軌跡會變得非常絲滑,因為每個動作都是多次預測共同決定的結果。

從這張圖可以看到剛開啟幾步是不穩的,但後面的軌跡與前面的軌跡透過算法更新變得更加平滑,並且可以更貼近期望的軌跡
### Training
**Initialize** :
```python!
if __name__ == "__main__":
# ==========================================
# 0. Setup & Hyperparameters
# ==========================================
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[trajectory_train] Using device: {DEVICE}")
# Hyperparameters for Diffusion Policy
TIMESTEPS = 100 # Total diffusion steps
EMBED_DIM = 256 # Embedding dimension for the model
NUM_HEADS = 8 # Number of attention heads
MLP_RATIO = 4.0 # MLP expansion ratio
PRED_HORIZON = 16 # Prediction horizon for training samples
OBS_HORIZON = 8 # Observation horizon
BATCH_SIZE = 32 # Batch size for training
EPOCHS = 3000 # Number of epochs to train
DEPTH = 4 # Number of transformer blocks
LR = 1e-4 # Learning rate for optimizer
SAVE_DIR = "checkpoints"
os.makedirs(SAVE_DIR, exist_ok=True)
# ==========================================
# 1. Dataset & DataLoader
# ==========================================
print("[trajectory_train] Initializing Dataset and DataLoader...")
trajectory_dataset = TrajectoryDataset(pred_horizon=PRED_HORIZON, obs_horizon=OBS_HORIZON)
dataloader = DataLoader(trajectory_dataset, batch_size=BATCH_SIZE, shuffle=True)
print(f"[trajectory_train] Dataset size: {len(trajectory_dataset)} | Batches/epoch: {len(dataloader)}")
# ==========================================
# 2. Model & EMA Definition
# ==========================================
print("[trajectory_train] Initializing Model and EMA...")
model = DiffusionPolicy(
action_dim=trajectory_dataset.action_dim,
state_dim=trajectory_dataset.state_dim,
embed_dim=EMBED_DIM,
num_heads=NUM_HEADS,
mlp_ratio=MLP_RATIO,
num_blocks=DEPTH
).to(DEVICE)
model.train()
ema_model = EMA(model, beta=0.995)
# ==========================================
# 3. Optimizer, Scheduler & Diffusion Setup
# ==========================================
scheduler = DDIMScheduler(
num_train_timesteps=TIMESTEPS,
beta_schedule="squaredcos_cap_v2",
clip_sample=True,
set_alpha_to_one=True,
steps_offset=0,
prediction_type="epsilon"
)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)
mse_loss = torch.nn.MSELoss()
```
1. **Hyperparameters**
* `DEVICE` : **GPU** 加速
* `OBS_HORIZON = 8` : 作為 **Observation**,過去 8 步
* `PRED_HORIZON = 16` : 預測未來 **16** 步,**Noised Action** 的 **Sequence Length**
* `EMBED_DIM = 256` : **Token** 的特徵維度
* `BATCH_SIZE = 200`、`EPOCHS = 2000`、`LR = 1e-4` : **Training** 相關設定
2. **Dataset & DataLoader**
* 建立 `TrajectoryDataset` 並透過 **DataLoader** 打包,`shuffle=True` 讓模型每個 **Epoch** 都能看到打亂的 **Batch** 資料(**Trajectory** 內沒有打亂)
3. **EMA** 初始化,$\beta=0.995$
4. **Diffusion Scheduler**
* `TIMESTEPS = 100` : **Diffusion Process** 的最大步數 $T=100$
* `beta_schedule="squaredcos_cap_v2"` : 對 $\beta$ 使用 **Cosine** **Schedule**,相對於 **Linear** 能更平滑
* `prediction_type="epsilon"` : 定義模型的任務是預測雜訊 $\epsilon$
5. **Optimizer & Loss** :
* **AdamW** : 目前 **Transformer** 標配的 **Optimizer**,自帶 `weight_decay` 能有效防止 **overfitting**
* **MSELoss** : 用來計算 "模型預測的雜訊" 與 "實際加入的雜訊" 之間的 **Mean Square Error**
**Training Loop** :
```python!
# Training loop
for epoch in range(1, EPOCHS + 1):
pbar = tqdm(dataloader, desc=f"Epoch {epoch}/{EPOCHS} Training")
for step, batch in enumerate(pbar):
# 1. Get the batch of action sequences
actions = batch['action'].to(DEVICE)
obs = batch['obs'].to(DEVICE)
# 2. Sample random diffusion steps for each sequence in the batch
k = torch.randint(0, TIMESTEPS, (actions.shape[0],), device=actions.device) # shape: (batch_size,)
# 3. Add noise to the actions according to the sampled diffusion steps
noise = torch.randn_like(actions)
noised_actions = scheduler.add_noise(actions, noise, k)
# 4. Predict the noise using the Diffusion Policy model
predicted_noise = model(observations=obs, diffusion_steps=k, noised_actions=noised_actions)
# 5. Compute the loss between the predicted noise and the true noise
loss = mse_loss(predicted_noise, noise)
# 6. Backpropagation and optimization step
optimizer.zero_grad()
loss.backward()
optimizer.step()
ema_model.update(model)
pbar.set_postfix({"Epoch": epoch, "Loss": loss.item()})
if epoch % 500 == 0 or epoch == EPOCHS:
checkpoint_path = os.path.join(SAVE_DIR, f"trajectory_diffusion_policy_{epoch}.pth")
ema_model.save_pretrained(checkpoint_path)
print(f"\nSaved checkpoint to {checkpoint_path}")
```
1. `actions` : 乾淨的未來軌跡
2. `obs` : 過去的軌跡
3. `k` : 隨機抽樣 `batch_size` 個 **Diffusion Step**
4. `noise` : $\epsilon\sim\mathcal{N}(0,I)$ **Sample** 雜訊,也是我們要猜的標準答案
5. `noised_actions` : 對乾淨的軌跡加上雜訊,並根據 $k$ 來調整雜訊的多寡,然後透過 **Scheduler** 幫我套 **Forward Process** 的公式 $x_k=\sqrt{\bar\alpha_k}x_0+\sqrt{1-\bar\alpha_k}\epsilon$
6. `predicted_noise` : 神經網路預測的 **Noise** $\epsilon_\theta$
7. **MSE** 計算損失函數
8. **Optimization & EMA**
9. **Checkpointing**,這邊注意,要存的是 **EMA Model**,到時候推論要用的是這個,如果是要接續訓練的話就是存原本的
### Inference
```python!
def main():
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# --- 1. Parameters (strictly matching trajectory_gif_eval.py) ---
TIMESTEPS = 100
ACTION_DIM = 2
STATE_DIM = 2
EMBED_DIM = 256
NUM_HEADS = 8
MLP_RATIO = 4.0
OBS_HORIZON = 8
PRED_HORIZON = 16
ACTION_HORIZON = 8
INFERENCE_STEPS = 50
MAX_EPISODE_STEPS = 100
CHECKPOINT_PATH = "checkpoints/trajectory_diffusion_policy_3000.pth"
# --- 2. Initialize Model and Scheduler ---
model = DiffusionPolicy(action_dim=ACTION_DIM, state_dim=STATE_DIM,
embed_dim=EMBED_DIM, num_heads=NUM_HEADS,
mlp_ratio=MLP_RATIO, num_blocks=4).to(DEVICE)
print(f"Loading checkpoint from: {CHECKPOINT_PATH}")
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=DEVICE, weights_only=True))
model.eval()
scheduler = DDIMScheduler(num_train_timesteps=TIMESTEPS,
beta_schedule="squaredcos_cap_v2",
clip_sample=True, set_alpha_to_one=True,
prediction_type="epsilon")
scheduler.set_timesteps(INFERENCE_STEPS)
# --- 3. Setup Inference environment ---
reference_traj = generate_trajectory_forward(num_steps=100)
normalizer = NumpyNormalizer(reference_traj)
current_state = reference_traj[0].copy()
# Observation buffer (window of past states)
obs_buffer = collections.deque([current_state.copy()] * OBS_HORIZON, maxlen=OBS_HORIZON)
actual_trajectory = [current_state.copy()]
# Initialize Temporal Ensembling
temporal_ensembler = NumpyTemporalEnsembler(pred_horizon=PRED_HORIZON, action_dim=ACTION_DIM)
```
1. **Hyperprarmeter** :
* `INFERENCE_STEPS` : 這個代表是要用多少 **Step** 來進行 **Reverse Process**,**Scheduler** 會根據這個來決定跳點的 $k$ 序列
* 其他參數都跟 **Training** 一樣,要用來建立相同的 **model** 與 **Scheduler**
2. **Model and Scheduler** :
* 初始化相同架構的 **Model** 並載入 **Weight**,然後設定為 **eval** 模式
* 建立相同的 **Scheduler**,並設定 **DDIM** 要使用多少 **Step** 來進行 **Reverse Process**
3. **Setup Inference environment** :
* 產生 **Target** 的軌跡 `reference_traj`
* 建立 **Normalizer**、軌跡的第一個點作為起始點
* 建立一個 **state** 的 **Queue buffer (deque)** `obs_buffer`,並用第一個點來填滿,最大上限為 `OBS_HORIZON`, 之後填入新資料的話舊的會被捨棄
* `actual_trajecory` : 建立一個 **buffer** 來儲存完整軌跡
---
```python!
# --- 4. Closed-loop Inference ---
print(f"Starting closed-loop inference for {MAX_EPISODE_STEPS} steps...")
step_count = 0
while step_count < MAX_EPISODE_STEPS:
# Prepare Observation (shape: 1, OBS_HORIZON, STATE_DIM)
obs_np = np.stack(obs_buffer)
obs_norm = normalizer.normalize(obs_np)
obs_tensor = torch.from_numpy(obs_norm).float().unsqueeze(0).to(DEVICE)
# Action Prediction (Gaussian Noise -> Predicted Trajectory)
noised_actions = torch.randn((1, PRED_HORIZON, ACTION_DIM), device=DEVICE)
with torch.no_grad():
for k in scheduler.timesteps:
t_batch = torch.full((1,), k, device=DEVICE, dtype=torch.long)
predicted_noise = model(observations=obs_tensor,
diffusion_steps=t_batch,
noised_actions=noised_actions)
noised_actions = scheduler.step(model_output=predicted_noise,
timestep=k,
sample=noised_actions).prev_sample
# Post-process: Unnormalize
predicted_action_seq = normalizer.unnormalize(noised_actions.squeeze(0).cpu().numpy())
# Update Ensembler and get smoothed actions
temporal_ensembler.update(predicted_action_seq)
exec_actions = temporal_ensembler.get_and_shift_actions(ACTION_HORIZON)
# Execute Actions (Receding Horizon)
for i in range(ACTION_HORIZON):
if step_count >= MAX_EPISODE_STEPS:
break
# Use predicted point as next state (simple simulator)
current_state = exec_actions[i]
actual_trajectory.append(current_state.copy())
obs_buffer.append(current_state.copy())
step_count += 1
print(f"Progress: {step_count}/{MAX_EPISODE_STEPS}", end='\r')
```
1. 建立最外層迴圈 ,`while step_count < MAX_EPISODE_STEPS:` 其中 `MAX_EPISODE_STEPS` 為整個軌跡的長度,讓產生的軌跡不要超過沒訓練過的範圍
2. `obs_buffer` 會一直存當前 **Step** 的過去 8 個點,然後將這些點轉為 **numpy->normalize->tensor**
3. 初始化一個純雜訊 `noised_actions`,代表為 **Diffusion** 的起點 $x_T$,大小為我們 **Training** 設定的 `(1, PRED_HORIZON,ACTION_DIM)`
4. 根據 **scheduler** 產生的 **timesteps** (跳點的 **List**,**ex**.`[98, 96, 94, 92, 90, ...]`) 進行 **Reverse Process**
5. 將 `obs_tensor`、`t_batch`、`noised_actions` 帶入 **Noise Predictor**,並使用 **DDIM Scheduler** 和預測出的 **Noise**,把 `noised_actions` 內的雜訊去除,去的次數為 `INFERENCE_STEPS`
6. 當前的乾淨 **action**,也就是未來的軌跡還在 **Range** $[-1,1]$ 之間,透過 **unnormalize** 還原回原本的尺度
7. 我們預測了後面 `PRED_HORIZON` 個點,但為了穩定我們只會拿一半 (`ACTION_HORIZON`) 來用,然後這一半的點會填入整體軌跡和 **observation buffer**(舊資料會被丟棄),並用做下一次的軌跡生成
後面就是 **Plot** 的部分,就不提了
## Minari Point Maze Example
**Minari** 是一個標準的 **Offline Reinforcement Learning** 集合推託管的 **Interface**,且大多遵循 **Gymnasium API** 的 **Reinforcement Learning**,並提供資料蒐集、取樣等功能,簡化 **Dataset** 的處理,我想用 **D4RL** 的 **PointMaze** 來實作,但是 **Diffusion Policy** 算是一種 **Behavior Cloning**,其實不需要他提供的 **Reward**
使用固定的 **seed** 去跑 **10** 次,並記錄下軌跡 :

### Dataset
我們使用了 `minari` 庫提供的 **Offline Dataset**。**Minari** 是 **Gymnasium** 官方維護的一個用於離線強化學習的資料集接口,繼承了 **D4RL** 的精神並提供更現代化的支援。
在 `PointMazeDataset` 中,我們針對 `pointmaze-large-v2` 進行處理。這個資料集包含了一個代理器在迷宮中移動到隨機目標點的軌跡。
```python!
from point_maze_dataset import PointMazeDataset
# 初始化 Dataset
# observation 包含:[pos_x, pos_y, vel_x, vel_y] (4維)
# desired_goal 包含:[goal_x, goal_y] (2維)
dataset = PointMazeDataset(dataset_id="D4RL/pointmaze/large-v2", pred_horizon=16, obs_horizon=2)
```
1. **State Vector**: PointMaze 的 Observation 是一個字典。我們將 `observation` (位置與速度) 與 `desired_goal` (目標座標) 拼接在一起,形成一個 **6 維** 的狀態向量。
2. **Action Vector**: 代理人的輸出是 **2 維** 的加速度 (Force)。
3. **Aligned Chunking**: 在 `__getitem__` 中,我們實作了對齊的序列取樣。
- `t_start = t - obs_horizon + 1`
- 這確保了觀察序列 (Observation) 與動作序列 (Action) 在時間軸上是完美對齊的起點。
### Training
針對點位導航這類數值型、高動態的任務,我們引入了 **Mixed Precision (AMP)** 來加速訓練:
```python!
# 使用 torch.amp 進行混合精度訓練
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
with autocast(enabled=True):
predicted_noise = model(observations=obs, diffusion_steps=k, noised_actions=noised_actions)
loss = mse_loss(predicted_noise, noise)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
```
- **Step-based Loop**: 不同於 Trajectory 使用 Epoch,PointMaze 採用總步數 (`TOTAL_STEPS = 30000`) 控制,適合處理超大型的離線資料集。
- **Optimization**: 使用 `AdamW` 配合 `CosineAnnealing` 學習率排程,並在 V100/H100 等高階顯卡上開啟 `pin_memory` 提升傳輸效率。
### Inference
在 **Maze** 環境中,我們不只需要預測軌跡,還需要將其可視化。
1. **point_maze_plot_eval.py**: 繪製靜態的軌跡對比圖,觀察模型預測的 16 步動作如何趨向目標點。
2. **point_maze_render_eval.py**: 透過 PyGame 即時渲染代理人在迷宮中移動的過程。
推論流程同樣遵循 **Receding Horizon** 邏輯:
- 模型每步預測 32 步動作。
- 透過 `NumpyTemporalEnsembler` 進行平滑。
- 實際執行前 16 步後,重新獲取 Observation 進行下一次預測。

## Aloha Transfer Cube Example
Aloha 是一個雙臂機器人操作任務,我們使用的是 **LeRobot** (Hugging Face) 開源的 `lerobot/aloha_sim_transfer_cube_human` 資料集。這是一個比迷宮複雜得多的視覺導航與操縱任務。
### Vision Encoder
```python!
class VisionEncoder(nn.Module):
"""
Mini-ResNet + SpatialSoftmax. Optimized for ALOHA (224x224) but supports 128x128 (default).
"""
def __init__(self, in_channels: int = 3, image_size: int = 128, embed_dim: int = 256):
super().__init__()
# 1. Lightweight ResNet feature extractor (Downsamples by a total factor of 16)
self.cnn = nn.Sequential(
# Input: (B, 3, H, W) -> e.g. (B, 3, 224, 224) for ALOHA
nn.Conv2d(in_channels, 32, kernel_size=3, stride=2, padding=1, bias=False),
nn.GroupNorm(8, 32),
nn.ReLU(inplace=True),
# -> (B, 32, H/2, W/2)
ResBlock(32, 64, stride=2),
# -> (B, 64, 32, 32)
ResBlock(64, 128, stride=2),
# -> (B, 128, 16, 16)
ResBlock(128, 256, stride=2)
# -> (B, 256, 8, 8)
)
# 2. Spatial Softmax Module
# Image is downsampled by 2*2*2*2 = 16 times: 128/16=8 or 224/16=14 (ALOHA)
feat_size = image_size // 16
num_channels = 256
# (Assuming the SpatialSoftmax class defined earlier is available)
self.spatial_softmax = SpatialSoftmax(
height=feat_size,
width=feat_size,
num_channels=num_channels
)
# 3. Final projection: maps 256 coordinate pairs (512 values) to Transformer's embed_dim
self.proj = nn.Sequential(
nn.Linear(num_channels * 2, embed_dim),
nn.LayerNorm(embed_dim), # Add LayerNorm to stabilize tokens fed into the Transformer
nn.SiLU()
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (B, 3, H, W) -> e.g. (B, 3, 224, 224) for ALOHA
features = self.cnn(x) # -> (B, 256, 8, 8)
keypoints = self.spatial_softmax(features) # -> (B, 512)
img_token = self.proj(keypoints) # -> (B, embed_dim)
return img_token
```
**Aloha Dataset** 設定的 **Image Size** 為 `3x224x224`,我們會將當前環境的俯視視角圖片,先經過 **Vision Encoder**,然後才會放進 **Transformer** :
* **Input** : $(B,3,224,224)$
* **Conv2d(S2)+GN+ReLU** : 因為使用 `stride=2`,所以 **feature map** 大小減半且數量變多 $(B,32,112,112)$
* **ResBlock** : $(B,32,112,112)$->$(64,56,56)$,大小減半,**feature map** 數量加倍,接著再經過兩次 **ResBlock** 變成 $(B,256,14,14)$ 的高維特徵
* **Spatial Softmax** : $(B,256,14,14)$ -> $(B,512)$,其中 **512** 為 **256** 組的座標 $(x,y)$,只是為 **Flatten** 的狀態
* **Linear + LN + SiLU** : $(B,512)$->$(B,\text{embed_dim})$,最後一層用來對齊 **Transformer** 的維度
---
#### Residual Block
```python!
class ResBlock(nn.Module):
def __init__(self, in_channels: int, out_channels: int, stride: int = 1, groups: int = 8):
super().__init__()
# Ensure out_channels is divisible by groups
# For channels [32, 64, 128, 256], groups=8 or 16 will work perfectly.
# First convolutional layer
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
self.gn1 = nn.GroupNorm(groups, out_channels) # Fixed: (num_groups, num_channels)
# Second convolutional layer
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
self.gn2 = nn.GroupNorm(groups, out_channels)
# Shortcut branch
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.GroupNorm(groups, out_channels)
)
def forward(self, x):
# 1. First Conv + GN + ReLU
out = F.relu(self.gn1(self.conv1(x)), inplace=True)
# 2. Second Conv + GN
out = self.gn2(self.conv2(out))
# 3. Add shortcut connection
out += self.shortcut(x)
# 4. Final ReLU
out = F.relu(out, inplace=True)
return out
```
在 **Vision Encoder** 使用 **Residual Block** 可以有效避免梯度消失
* **Conv3x3 + GN + ReLU** : 經過一次標準的 **CNN** 組合
* **Conv3x3 + GN** : 再經過一層 **CNN** 和 **Group Normalize**
* **Shortcut(Conv3x3+GN)** : 因為這邊要 **Shortcut**,所以前一層才沒有 **Activation** **Function**,這邊的使用到的 **CNN Input** 是使用原始的 **Image**,經過兩層 **CNN** 的 **Feature Map** 和經過一層的 **Feature Map** 相加,最後經過 **ReLU**
---
#### Spatial Softmax
在 **Vision-Based Policy** 中,**Spatial Softmax** 是關鍵技術。它不像普通的 **Flatten** 會丟失空間資訊,而是將 **Feature Map** 轉換為「期望特徵點座標」。
```python!
# ---------------------------------
# Spatial Softmax Block
# ---------------------------------
class SpatialSoftmax(nn.Module):
"""
Extracts spatial expected coordinates (Keypoints) from feature maps.
"""
def __init__(self, height: int, width: int, num_channels: int, temperature: float = 1.0):
super().__init__()
self.height = height
self.width = width
self.num_channels = num_channels
# Create normalized coordinate grids [-1, 1]
pos_y, pos_x = torch.meshgrid(
torch.linspace(-1., 1., height),
torch.linspace(-1., 1., width),
indexing='ij'
)
pos_x = pos_x.reshape(-1)
pos_y = pos_y.reshape(-1)
self.register_buffer('pos_x', pos_x)
self.register_buffer('pos_y', pos_y)
# Learnable temperature parameter
self.temperature = nn.Parameter(torch.ones(1) * temperature)
def forward(self, feature: torch.Tensor) -> torch.Tensor:
# feature: (B, C, H, W)
B, C, H, W = feature.shape
assert H == self.height and W == self.width, f"Expected {self.height}x{self.width}, got {H}x{W}"
# Flatten spatial dimensions: (B, C, H*W)
feature_flat = feature.reshape(B, C, H * W)
# Apply temperature and softmax to get probabilities
weights = F.softmax(feature_flat / self.temperature, dim=-1)
# Calculate expected coordinates
expected_x = torch.sum(self.pos_x * weights, dim=-1) # (B, C)
expected_y = torch.sum(self.pos_y * weights, dim=-1) # (B, C)
# Stack coordinates: (B, C, 2) and flatten to (B, C * 2)
expected_xy = torch.stack([expected_x, expected_y], dim=-1)
return expected_xy.reshape(B, C * 2)
```
以 Aloha Dataset 來說,經過前面多個 **Residual Block** 後,**Feature Map** 的 **Shape** 為 $(B,256,14,14)$,Spatial Softmax 的運算流程如下 :
* **Flatten** : $(B,256,14,14)$ -> $(B,256,196)$
* **Softmax** : 對最後一個 **Dimension** 計算 **Softmax**,也就是說會有 **256** 組加總為 1 的 **Vector**
* **Coordinate Grid** : 初始化時建立一個從 -1 到 1 的 **Matrix**,$14\times 14$ 的 `pos_x` 會像這樣
```python!
[[-1.0, -0.85, ..., 1.0],
[-1.0, -0.85, ..., 1.0],
...
[-1.0, -0.85, ..., 1.0]]
```
`pos_y` 則方向相反,建立完成後也會 **Flatten** 成 $(196)$
* **Expected Value** : 計算座標的加權平均,類似物理上 **Center of Mass** 的計算,`expected_x = torch.sum(pos_x * weights, dim=-1)` 計算 X 座標的期望值,維度 $(B,256,196)$ -> $(B,256)$,`expected_y = torch.sum(pos_y * weights, dim=-1)` 計算 Y 座標的期望值
* 合併 : 利用 **Stack** 將兩個 $(B,256)$ 合併成 $(B,256,2)$,然後 **Reshape** 成 $(B,512)$,剛好對齊
**Spatial Softmax** 在計算 **Softmax** 的時候會有一個參數 **Tempture** $\tau$,會先將 **Flatten** 後的原始 **Input** 先除 $\tau$,當 $\tau$ 很低時,機率分布會很集中,反之當 $\tau$ 很高時,機率分布會很散,他是一個可訓練參數,以下示範了低和高的差異

* 熱力圖代表的是不同的 $\tau$ 計算出的 `weights`,類似 2D 的機率分布,是帶入 $\tau$ 計算 Softmax 後的結果
* 白點代表我們計算出的 **Expected Value**,他是一個 **XY** 座標點,可以視為熱力圖的 **Center of Mass**
* 紅點為機率最高的位置
可以發現 $\tau$ 較高時,可以讓 **Expected Value** 不會動的那麼劇烈,一個合適的 $\tau$ 可以給出穩定的座標點
---
### Dataset
`AlohaDataset` 與 `PointMazeDataset` 最大的不同在於其**數據規模**與**多模態輸入**。迷宮任務只需要處理簡單的向量,而 Aloha 需要同時處理高解析度影像與 14 維的關節狀態 (`qpos`)。
#### 1. 硬體加速快取機制 (Hardware Accelerated Caching)
由於直接從原始影片檔讀取影像非常緩慢,我們實作了快取機制。在第一次執行時,我們會利用 GPU 加速影像的預處理:
```python!
def _create_image_cache(self):
print(f"[AlohaDataset] Cache not found. Accelerating single-camera cache via DataLoader & GPU...")
n = len(self.lerobot_dataset)
self.cached_images = torch.zeros((n, 3, self.image_size, self.image_size), dtype=torch.float16)
# 1. Use DataLoader to multiprocess video decoding
from torch.utils.data import DataLoader
dl = DataLoader(self.lerobot_dataset, batch_size=64, num_workers=4, pin_memory=True)
# 2. Setup GPU-accelerated preprocessing
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
preprocess = T.Compose([
T.CenterCrop(480),
T.Resize((self.image_size, self.image_size), antialias=True)
])
idx = 0
for batch in tqdm(dl, desc="Hardware Accelerated Caching"):
img = batch[self.cam_key]
# ... (ndim handling)
img = img.to(device, non_blocking=True)
# Apply CenterCrop + Resize on GPU and cast to FP16
img_processed = preprocess(img.float() / 255.0).half().cpu()
bs = img_processed.shape[0]
self.cached_images[idx : idx + bs] = img_processed
idx += bs
torch.save(self.cached_images, self.cache_path)
```
- **為什麼要快取?**: Vision-based 訓練的瓶頸通常在於 **Disk IO**。預先處理並壓縮影像(例如從 640x480 縮小到 128x128),能讓模型在訓練時以極高的速度讀取 Batch,避免 GPU 空轉等待數據。
#### 2. TensorNormalizer (GPU-Native Normalization)
為了避免在 **Data Loading** 過程中不斷進行 **CPU-GPU** 或 **Numpy-Tensor** 的轉換造成延遲,我們實作了全 **PyTorch** 版本的 `TensorNormalizer`:
- **優點**: 所有數據保持在 `torch.Tensor` 格式,正規化運算直接在 **GPU** 上執行。這對於高頻率讀取影像數據的 **Task** 來說至關重要,能有效降低 **CPU** 的負擔。
#### 3. 數據增強 (Data Augmentation)
機器人任務對相機位置與光影極度敏感。我們實作了 `AlohaAugmentor`:
```python!
class AlohaAugmentor:
def __init__(self, image_size=128):
self.image_size = image_size
# 1. Random Shift (maintains original Edge Padding)
self.padding = 8 # Increased to 8px to expand the shift range
def __call__(self, img_seq: torch.Tensor) -> torch.Tensor:
"""
img_seq: (T, C, H, W)
"""
# --- Determine random parameters shared across this specific sequence ---
# 1. Random Crop parameters
top = random.randint(0, self.padding * 2)
left = random.randint(0, self.padding * 2)
# 2. Color Jitter parameters
brightness = random.uniform(0.8, 1.2)
contrast = random.uniform(0.8, 1.2)
saturation = random.uniform(0.8, 1.2)
# 3. Decide whether to apply certain probabilistic augmentations
apply_noise = random.random() > 0.5
# Apply padding first to prepare for cropping
# (T, C, H, W) -> Padding is applied to the last two dimensions (H and W)
img_seq = F.pad(img_seq, padding=[self.padding]*4, padding_mode='edge')
processed_frames = []
for i in range(img_seq.shape[0]):
img = img_seq[i]
# Apply the same crop across the sequence
img = F.crop(img, top, left, self.image_size, self.image_size)
# Apply the same color jitter across the sequence
img = F.adjust_brightness(img, brightness)
img = F.adjust_contrast(img, contrast)
img = F.adjust_saturation(img, saturation)
# Add slight Gaussian noise
# (Noise varies slightly per frame to improve robustness)
if apply_noise:
img = img + torch.randn_like(img) * 0.01
img = torch.clamp(img, 0.0, 1.0)
processed_frames.append(img)
return torch.stack(processed_frames)
```
- **時間一致性 (Temporal Consistency)**: 在一個 Observation Window 中,如果每幀的位移或亮度不同,模型會誤以為物體在抖動。因此我們在 `__call__` 開頭決定一組參數後,套用到整個 `T` 維度。
- **Random Shift (平移不變性)**: 這是 **Vision-based Policy** 的核心技巧。透過隨機位移,模擬相機安裝誤差,讓模型學會「空間平移不變性」,使其在現實部署時極具魯棒性。
### Training
由於加入了 **ResNet18** 與較深的 **Transformer**,**Aloha** 的顯存佔用遠高於 **Point Maze**。
| 技術名稱 | 目的 | 對比 **Point Maze** |
| -------- | -------- | -------- |
| **Gradient Checkpointing** | 顯存優化 | **Point Maze** 不需開啟;**Aloha** 開啟後顯存可從 **24G** 降至 **8G** 以下。 |
| **BF16 Autocast** | 訓練穩定性 | **Point Maze** 使用 **FP16** 即可;**Aloha** 推薦 **BF16** 以防 **Transformer** 梯度溢出。 |
| **Warmup & Cosine Decay** | 學習率排程 | 視覺模型在訓練初期需要 Warmup 來穩定 CNN 的特徵提取層。 |
### Inference
最終驗證於 `aloha_render_eval.py`:
- **Observation**: 包含 `top` 攝影機的影像序列 + **14** 維的關節狀態。影像會先通過 **Vision Encoder** (**CNN** + **Spatial Softmax**) 轉為 **Image Token**。
- **Output**: 預測未來的關節目標序列。
- **Result**: 推論結果會渲染成 **MP4** 影片儲存在 `assets/eval_aloha.mp4`。你可以看到模型如何精確地控制左右手,完成方塊的傳遞動作。

## Consistency Policy
**Consistency Policy** 主要是用來加速 **Diffusion Process** 的方法,用蒸餾模型的方式,縮減產生 **Action** 的步數,以下簡單說明如何訓練
1. Initialize
* 載入預訓練好的 Diffusion Policy
*