# Diffusion Policy [TOC] **Paper** : [Diffusion Policy: Visuomotor Policy Learning via Action Diffusion](https://arxiv.org/abs/2303.04137v5) 網頁 : https://diffusion-policy.cs.columbia.edu/ 之前 **Transformer** 相關的文章 : * [Self-Attention](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/rJRwYNRX-g) * [Transformer Decoder](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/rJvmAClSbl) **Diffusion Model** 相關的文章 : * [Denoising Diffusion Probabilistic Model](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/HkcbC12zWe) * [Latent Diffusion Transformer](https://hackmd.io/@bGCXESmGSgeAArScMaBxLA/B1vYJqVu-g) ## Introduction 由簡入深,從實作的層面與視覺化來理解 **Diffusion Policy**,**Diffusion Policy** 主要是從圖片的生成,變成 **Sequence Action** 的生成,因為 **Sequence Action** 有時間資訊(如$[a_{t-1},a_t,a_{t+1}]$),所以 **Diffusion Process** 的過 **Timestep** 就改用 $k$ 來描述 ## Diffusion Policy vs. 傳統 Behavior Cloning 作者有清楚的針對這個差異舉例,假如想讓 **AI** 模仿開車,並且路中間有一棵樹,**Dataset** 內有兩條路徑,一個向左一個向右,為了擬和這兩條路徑,讓 **Predict** 出的路徑誤差做最低,他會選擇直走,最後就會撞到樹。 1. **Vanilla BC (MSE)** : 他預測的是一個確定的數值,也就是一個 **State** 只能對應一個 **Action**,這個數值通常是 **Dataset** 內平均誤差最低的,有可能會選擇兩種不同 **Action** 的中間值 2. **GMM-BC** : 我們知道 **Gaussian Distribution** 是可以組合的,變成一個多峰形式的 **Gaussian Mixture Model**,因為是多個分布所組成,難以擴展到高維動作空間 ![image](https://hackmd.io/_uploads/H15NvqM5Zl.png) 3. **IBC (Implicit Behavior Cloning)** : 使用能量模型 (**EBM**),效果好但訓練極度不穩,**Inference** 速度很慢 ![image](https://hackmd.io/_uploads/rJ2uP9fqZe.png) 其中黑色線是期望的軌跡,藍色是低能量,白色是高能量,可以看到能很好的吻合期望軌跡 而 **Diffusion Policy** 則可以完美達成多峰的需求,速度也很快 ## DDIM 關於 **Diffusion Model** 的方法最一開始是使用 **DDPM(Denoising Diffusion Probabilitic Models)**,訓練一個 **Noise Predictor**,將充滿雜訊的圖片,分成 **1000** 步來慢慢除去雜訊,但現在的 **Diffusion Process** 會改用 **DDIM**,可以將步驟縮減,但會從馬可夫鏈變成非馬可夫鏈,**Inference** 的過程不再隨機,是可預期的。 **Diffusion Policy** 則不是用來去除圖片的雜訊,而是 **Sequence Action**, ## Forward Process 添加雜訊的方式 : $$ x_k=\sqrt{\bar{\alpha}_k} x_{0} + \sqrt{1-\bar\alpha_k}\epsilon $$ 1. $\alpha_k=1-\beta_k$ 2. $\bar\alpha_k=\prod^k_{i=1}\alpha_i$ 3. $\epsilon\sim\mathcal{N}(0,I)$ 4. $x_0$ : 原圖,無雜訊 5. $x_k$ : $k$ 時刻後的雜訊圖 上面的是 **Reparameterization** 的表示方式,為 **Implement** 時會用到的方法,但他也可以寫成 **Distribution** 的形式 : $$ q(x_k|x_0)=\mathcal{N}(x_k;\sqrt{\bar\alpha_k}x_0,(1-\bar\alpha_k)I) $$ ## Reverse Process 這個步驟只會在 **Inference** 時用到,**DDIM** 的 **Reverse Process** 如下 : $$ x_{t-1}=\sqrt{\bar\alpha_{t-1}}\cdot(\frac{x_t-\sqrt{1-\bar\alpha_t}\epsilon_\theta(x_t,t)}{\sqrt{\bar\alpha}_t})+\sqrt{1-\bar\alpha_{t-1}}\cdot\epsilon_\theta(x_t,t) $$ 其中 $t-1$ 和 $t$ 是可以允許跳點的,如 `12 -> 8`,重複這個步驟,直到得到原圖 ## Architecture 1. 準備 **Input** * **Observation Conditioning** : 過去的 **State Sequence** $[s_{t-1},s_t]$(假設長度為2) * **Noised Action Sequence** : $x_0$ 經過 **Diffusion Forward Process** 到 $x_k$ 的 **Action Sequence** * **Diffusion Step** : 提供現在是 **Forward Process** 的第幾步 $k$ 2. 對齊維度 * 利用 **Linear Layer** 將 $s_{t-1},s_t$ 從 `(2,state_dim)` 變成 `(2,embed_dim)` * 利用 **Linear Layer** 將 $a_1...a_{10}$ 從 `(10,action_dim)` 變成 `(10,embed_dim)` 3. 加入位置資訊 * 處理 **Diffusion Step** ($k$) : 讓整數 $k$ 通過 **Sinusoidal Embedding** + **Linear Layer**,將 **Diffusion Step** 的位置資訊變成 **Vector** ,**Shape** 為 `(1,embed_dim)` * **Sequence Position** : 我們的 **Noised Action Sequence** 和 **Observation Conditioning** 也是需要位置資訊的,通常使用 `nn.Embedding(13, embed_dim)` 來實現,它是可學習的 **Look up Table** * 將 **Action Tokens** 和 **Observation Tokens** 都與 **Position Embedding** 相加 * 注意這邊跟 **Diffusion Model** 不一樣,不會加 **Sinusoidal Embedding** 的輸出,而是用拼接的 4. **Sequence** 拼接與 **Transformer** * 將前面準備好的 **Tokens** 串接成 : `[Sinusoidal Embedding,Observation Conditioning,Action Sequence]`,以前面舉例的長度,這邊的 **Shape** 為 `(sequence length , embed_size)=(13, embed_size)` * >也可以用 **Cross Attention** 將多個 **Input** 組合 * 接著送入 **Transformer Block** 計算,要注意的是這邊不會使用 **Causal Mask**,不需要刻意遮擋未來資訊,因為我們是在預測雜訊,不是在做文字接龍 5. 輸出擷取 * **Transformer** 的輸出最後依然是 `(13,embed_size)`,但我們只要最後面 10 個 **Token** * 將最後 10 個 **Tokens** 通過 **Final Linear Layer**,將 `embed_size` 降維回 `action_dim` 6. 計算 **Loss** * 拿這 **10** 個 **Predict** 出的 **Noise** 與當初加進去 **Noise** 計算 **MSE Loss** ## Neural Network 其中 **MLP** 的部分會使用 **Zero initialize**,這個方法在 **Diffusion Model** 很常使用,**MLP** 內部為 **SwiGLU** 架構,在 **Concatenate** 的時候順序很重要, **Noised Action Sequence** 請一定要放在最後面 ![image](https://hackmd.io/_uploads/S1pC0ILsbe.png) ## Trajectory Example **Dataset** 產生兩個方向的 8 字型軌跡,一個順向,一個逆向,顏色越深代表越後面的 **Action** ![image](https://hackmd.io/_uploads/BJXchEV9be.png) ![diffusion_policy_trajectory_forward](https://hackmd.io/_uploads/H1C38HVq-g.gif) 在初始點(**Observation**) 加一點雜訊,就有機會看到正向和反向,這也展現了 **Diffsusion Policy** 的多峰性(**Multimodality**),如果有兩種路徑可以達到終點,他兩種都能擬和 ![diffusion_policy_trajectory_reverse](https://hackmd.io/_uploads/HkR3LSVcZl.gif) ### Dataset 因為 **Diffusion Model** 對 **Input** 資料的 **Distribution** 是很敏感的,所以我建立一個 **NumpyNormalizer** 的 **class**,要記住 **Training** 和 **Inference** 都要用 : ```python! class NumpyNormalizer: """ Standard Min-Max Normalizer implemented in NumPy. """ def __init__(self, data: np.ndarray): # data: [N, dim] self.min = np.min(data, axis=0) self.max = np.max(data, axis=0) self.range = self.max - self.min self.range[self.range == 0] = 1e-5 def normalize(self, x: np.ndarray) -> np.ndarray: return 2.0 * (x - self.min) / self.range - 1.0 def unnormalize(self, x_norm: np.ndarray) -> np.ndarray: return (x_norm + 1.0) / 2.0 * self.range + self.min ``` 因為用普通的 $\mu$、$\sigma$ 來 **Normalize** 的話總會有數值超出 $[-1,1]$,所以 **Diffusion Policy** 通常當採用 **Min-Max Normalization** : $$ x_{norm}=2\cdot\frac{x-x_{min}}{x_{mzx}-x_{min}}-1 $$ ```python! # --------------------------------- # Trajectory Dataset Class # --------------------------------- from utils.normalization import NumpyNormalizer class TrajectoryDataset(Dataset): def __init__(self, pred_horizon, obs_horizon=8): self.pred_horizon = pred_horizon self.obs_horizon = obs_horizon # 1. define dimension of action and state self.action_dim = 2 self.state_dim = 2 # Here, state is just the (x, y) position # 2. generate the two trajectories self.traj_fwd_np = generate_trajectory_forward(num_steps=100) # shape: (100, 2) self.traj_rev_np = generate_trajectory_reverse(num_steps=100) # shape: (100, 2) # Initialize normalizer on the combined dataset to ensure scale is equal combined_np = np.concatenate([self.traj_fwd_np, self.traj_rev_np], axis=0) self.normalizer = NumpyNormalizer(combined_np) # apply normalization self.traj_fwd_np = self.normalizer.normalize(self.traj_fwd_np) self.traj_rev_np = self.normalizer.normalize(self.traj_rev_np) # convert to torch tensors self.traj_fwd = torch.from_numpy(self.traj_fwd_np).float() self.traj_rev = torch.from_numpy(self.traj_rev_np).float() self.step_num = 100 # Steps per trajectory # 3. Calculate dataset length (100 steps * 2 modalities) self.length = self.step_num * 2 def __len__(self): return self.length def __getitem__(self, idx): # is forward trajectory or reverse trajectory is_forward = idx < self.step_num t = idx % self.step_num # select forward or reverse trajectory trajectory = self.traj_fwd if is_forward else self.traj_rev # 1. Observation sequence: length 'obs_horizon' ending at 't' obs_indices = torch.arange(t - self.obs_horizon + 1, t + 1) # Pad start of the episode by repeating the first frame obs_indices = torch.clamp(obs_indices, min=0, max=self.step_num - 1) obs_seq = trajectory[obs_indices] # shape: (obs_horizon, state_dim) # 2. Action sequence: length 'pred_horizon' starting from 't + 1' action_indices = torch.arange(t + 1, t + 1 + self.pred_horizon) # Pad end of the episode by repeating the last frame action_indices = torch.clamp(action_indices, min=0, max=self.step_num - 1) action_seq = trajectory[action_indices] # shape: (pred_horizon, action_dim) return { "obs": obs_seq, "action": action_seq } ``` 這個 **Class** 繼承了 **Pytorch** 的 **Dataset**,初始化包含 : * 定義 **action、state** 的 **Shape**,因為是 **2D Trajectory**,**State** 也是過去的 **Trajectory**,所以都是 **2** * 產生順向與逆向的兩種 **Trajectory** * 對 **Trajectory Normalize** 這邊有兩個關鍵的參數 : 1. `obs_horizon` : 代表 **Observation** 的 **Window** 大小,也就是要給 **Noise Predictor** 的 **Observation steps** 2. `pred_horizon` : 代表要預測的 **Window** 大小,也就是 **Noise Predictor** 預測的 **Noise**,這個 **Noise** 要用來對 **Noised Trajectory** 去雜訊 --- * `self.length = self.step_num * 2` : 我們定義完整的 **Trajectory** 為 **100** 個 **step**,正反兩個 **Trajectory** 就為 **200**,並 **overwrite** `__len__` **return** **200** * `def __getitem__(self, idx)` : **overwrite** **Dataset** 用來取一個 **Batch** 的 **Function**,`idx` 會隨機取 **0~199** 的整數 * `is_forward = idx < self.step_num` : 我們用這個整數是否超過 **100** 來判斷要給他正或反的 **Trajectory** * `t` : 代表是 **Trajectory** 的第幾個 **Step** * `obs_indices` : 利用 `torch.arange` 取得 $t-H_o+1\sim t$ 的序列整數,如我們 `obs_horizon`$H_o$ 設為 **8**,且 $t$ 為 **10**,那就是 **3~10**,但是因為有可能為負數,我們的處理方式就是將負數設定為 **0**,藉由 `torch.clamp` 裁切就 **Ok** 了 * `obs_seq` : 前面的為 **index** 序列,這邊就是從 **Trajectory** 中取得一段資料作為這次的 **Batch** * `action_seq` : 也是一樣的方式,只是 **Range** 變成 $t+1\sim t+H_p$,也就是未來的軌跡 --- ### Noise Predictor ![image](https://hackmd.io/_uploads/B1gyfvUiZx.png) ```python! # --------------------------------- # SwiGLU Activation Class # --------------------------------- class SwiGLU(nn.Module): def __init__(self, in_features, hidden_features, out_features): super(SwiGLU, self).__init__() self.w1 = nn.Linear(in_features, hidden_features) self.w2 = nn.Linear(in_features, hidden_features) self.w_out = nn.Linear(hidden_features, out_features) nn.init.zeros_(self.w_out.weight) nn.init.zeros_(self.w_out.bias) def forward(self, x): # 1. create gate gate = F.silu(self.w1(x)) # shape: (batch_size, hidden_features) # 2. apply gate to second linear transformation x = self.w2(x) * gate # 3. project to output features x = self.w_out(x) return x ``` **Transformer Block** 內的 **MLP** 我們採用 **SwiGLU**,他引入了 **Gate** 的機制,`w1` 的輸出作為 **Gate**,與數值 `w2` 相乘後輸出 $$ \text{SwiGLU}(x) = \left( \text{SiLU}(x W_1) \otimes (x W_2) \right) W_{out} $$ $$ \text{SiLU}(z) = z \cdot \sigma(z) = \frac{z}{1 + e^{-z}} $$ 並且採用了 **Diffusion Model** 常用的 **Zero-Initialization**,在預測雜訊的這個任務中特別適合 --- ![image](https://hackmd.io/_uploads/SyQeMPUj-l.png) ```python! # --------------------------------- # Time Embedding Class for Diffusion Step Information # --------------------------------- class TimestepEmbedder(nn.Module): """ Standard sinusoidal timestep embedding followed by an MLP. """ def __init__(self, freq_dim: int, embed_dim=256): super().__init__() self.freq_dim = freq_dim self.mlp = nn.Sequential( nn.Linear(freq_dim, embed_dim), nn.SiLU(), nn.Linear(embed_dim, embed_dim) ) self.embed_dim = embed_dim @staticmethod def sinusoidal(t: torch.Tensor, dim: int) -> torch.Tensor: """ Generates sinusoidal embeddings for the given timesteps. This is a common technique in diffusion models to encode the timestep information. """ half = dim // 2 freqs = torch.exp( -math.log(10000) * torch.arange(half, dtype=torch.float32, device=t.device) / half ) args = t[:, None].float() * freqs[None] # (B, half) return torch.cat([torch.cos(args), torch.sin(args)], dim=-1) # (B, dim) def forward(self, t: torch.Tensor) -> torch.Tensor: x = self.sinusoidal(t, self.freq_dim) return self.mlp(x) ``` 這個 **Class** 主要是用來做位置編碼,要注意這邊代表的是 **Diffusion Process** 的 **Step** 資訊,**Input** 為 $k$,最重要的核心是使用 **Sinusoidal Embedding**,是一個純數學的技巧,並搭配一個 **MLP** 然後再輸出,這個 **MLP** 用來讓 **Transformer** 自己學習解讀時間資訊 ![upload_71ba7b024e4f845d239818c7cdddbc92](https://hackmd.io/_uploads/H1Q_lw8jWx.gif) 並且使用數值穩定的寫法 : $$ \omega_i = 10000^{-\frac{2i}{d}} = \exp\left( \ln(10000^{-\frac{2i}{d}}) \right) = \exp\left( -\ln(10000) \cdot \frac{i}{d/2} \right) $$ * `freq_dim` : **Sinusoidal Embedding** 編碼的 **Shape**,也就是 **Vector** 的長度,不管設多少最後都會用 **MLP** 去對齊其他 **Embedding Vector**(如 **Observation**、**Noised Action**) * --- ![image](https://hackmd.io/_uploads/B1JfGwIoZe.png) ```python! # --------------------------------- # Transformer Block Class # --------------------------------- class TransformerBlock(nn.Module): def __init__(self, embed_dim=256, num_heads=8, mlp_ratio=4.0): super().__init__() self.norm1 = nn.LayerNorm(embed_dim) self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True) self.norm2 = nn.LayerNorm(embed_dim) hidden_dim = int(embed_dim * mlp_ratio) self.mlp = SwiGLU(embed_dim, hidden_dim, embed_dim) def forward(self, x: torch.Tensor) -> torch.Tensor: # 1. residual connection for attention x_res = x # 2. layer norm x = self.norm1(x) # 3. self-attention x, _ = self.attn(x, x, x) # 4. add residual x = x + x_res # 5. residual connection for MLP x_res = x # 6. layer norm x = self.norm2(x) # 7. MLP x = self.mlp(x) # 8. add residual x = x + x_res return x ``` * `embed_dim` : 定義每一個 **Token** 的特徵維度 * `num_heads` : 定義 **Multi-Head Attention** 的 **head** 數量,用來尋找 **Token** 之間的多種不同的關聯性 * `mlp_ratio` : 確定 **MLP** 中間層要放大幾倍 * `LayerNorm` : **Transformer** 標配的,對 **Token Embedding** 特徵進行標準化 * `MultiheadAttention` : 當 `batch_first=True` 時就比較符合直覺,他的 **Input** 資料 **Shape** 應該要是 `(sequence, batch, embed_dim)` $$ \text{Attention}(Q, K, V) = \text{Softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V $$ **Forward** : 1. 存下 **Residual**,經過 **LayerNorm** 2. 通過 **attention** 3. **Residual Connection** 4. 存下 **Residual**,經過 **LayerNorm** 5. 通過 MLP,這邊用的是前面說過的 **SwiGLU** 6. **Residual Connection** --- ```python! # --------------------------------- # Diffusion Policy Class # --------------------------------- class DiffusionPolicy(nn.Module): def __init__(self, action_dim=2, state_dim=0, embed_dim=256, num_heads=8, mlp_ratio=4.0, num_blocks=4): super().__init__() self.action_dim = action_dim self.state_dim = state_dim self.embed_dim = embed_dim # 1. Observation embedding self.obs_embedder = nn.Linear(state_dim, embed_dim) # 2. Time embedding for diffusion step information self.time_embedder = TimestepEmbedder(freq_dim=embed_dim, embed_dim=embed_dim) # 3. Noised action embedding self.action_embedder = nn.Linear(action_dim, embed_dim) # 4. Transformer blocks for processing the combined embeddings self.blocks = nn.ModuleList([ TransformerBlock(embed_dim, num_heads, mlp_ratio) for _ in range(num_blocks) ]) # 5. Final layer projection to action dimension self.final_proj = nn.Linear(embed_dim, action_dim) def forward(self, observations: torch.Tensor, diffusion_steps: torch.Tensor, noised_actions: torch.Tensor) -> torch.Tensor: obs_emb = self.obs_embedder(observations) # (B, embed_dim) # 2. Time embedding for diffusion steps time_emb = self.time_embedder(diffusion_steps).reshape(-1,1, self.embed_dim) # (B, 1, embed_dim) # 3. Embed noised actions action_emb = self.action_embedder(noised_actions) # (B, embed_dim) # 4. concatenate embeddings (if obs exists) or just use action and time embeddings x = torch.cat((obs_emb, time_emb, action_emb), dim=1) # 5. Add position encoding for transformer seq_len = x.shape[1] positions = torch.arange(seq_len, device=x.device) pos_emb = TimestepEmbedder.sinusoidal(positions, self.embed_dim) x = x + pos_emb # 6. Pass through transformer blocks for block in self.blocks: x = block(x) # 7. Final projection to action dimension action_seq_len = noised_actions.shape[1] action_out = x[:,-action_seq_len:,:] output = self.final_proj(action_out) return output # (Batch, action_seq_len , action_dim) ``` 這邊是將所有組件結合,其中 **forward** **Input** 說明如下 : * `observation` : `(Batch, obs_horizon, state_dim)` * `diffusion_steps` ($k$) : `(Batch,)`,代表 **Diffusion Process** 的加噪是第幾步 * `noised_actions` ($x_k$) : `(Batch, pred_horizon, action_dim)` 步驟 : 1. 用 **Linear Layer** 將 **Observation**、**Action** 投影成 **Embedding** 維度 2. 輸入 $k$ 獲得 `time_emb` ,**Shape** 為 `(Batch,embed_dim)`,Reshape 成 `(Batch,1,embed_dim)` 變成一個 **Sequence** 長度為 1 的 **Token** 3. **Concatenation** 三組 **Token** 成一整組 **Token**,按照 `[obs,time,action]` 的順序 4. Transformer 對於我們給的 Token 是沒有順序性的,我們隨意調換順序的或對她來說都一樣,所以我們重複使用 `TimestepEmbedder.sinusoidal` 產生位置資訊,然後加在 **Token** 上 5. 通過 **N** 個 **Transformer Block** 輸出會保持 Input 時的 **Shape** 6. 我們前面特意將 **Action** 放在 **Sequence Token** 的尾巴,我們要預測的 **Noise** 會從最後 **16**(`action_seq_len`) 個 **Token** 切出來 7. 取出 Token 後我們還需要最後一個 Linear Layer 將 `(Batch,action_seq_len,embed_dim)` 投影並壓縮回 `(Batch,action_seq_len,action_dim)` --- ```python! # --------------------------------- # EMA (Exponential Moving Average) Class # --------------------------------- class EMA: def __init__(self, model: nn.Module, beta: float = 0.995): self.beta = beta self.step = 0 # Create a copy of the model for EMA self.ema_model = copy.deepcopy(model) # Freeze the EMA model parameters for param in self.ema_model.parameters(): param.requires_grad_(False) def update(self, model: nn.Module): """ Update the EMA model parameters using the current model parameters. This should be called after each training step. """ self.step += 1 for current_param , ema_param in zip(model.parameters(), self.ema_model.parameters()): # Update EMA parameter ema_param.data.mul_(self.beta) ema_param.data.add_(current_param.data * (1.0 - self.beta)) def copy_to(self, model: nn.Module): model.load_state_dict(self.ema_model.state_dict()) def save_pretrained(self, path: str): torch.save(self.ema_model.state_dict(), path) ``` 在 **Diffusion Model** 中很常使用 **Exponential Moving Average**,有助於減少生成圖片的割裂感,**Diffusion Policy** 使用的話就可以減少軌跡的抖動(**Jittery**),他會放在 **Training** 階段的每個 **Epoch** 後,他會將 **Model** 的 **Weight** 拿出來進行指數移動平均 : $$ \theta_{EMA}^{(t)} = \beta \cdot \theta_{EMA}^{(t-1)} + (1 - \beta) \cdot \theta_{current}^{(t)} $$ 這個流程不影響 **Training**,主要是用在 **Inference** 階段 ### Temporal Ensembler 在 **Receding Horizon Control** 中,模型每一步都會預測一個長度為 `pred_horizon` 的動作序列。然而,前一個時間步預測的未來軌跡,與當前時間步預測的未來軌跡在重疊部分(**Overlapping**)可能會有細微的差異。 **Temporal Ensembling** 的目的就是將這些重疊的預測值進行**加權平均**,從而獲得更平滑、更連續的控制指令,避免機器人關節出現劇烈抖動。 ```python! class NumpyTemporalEnsembler: """ Temporal Ensembling for action smoothing across overlapping prediction horizons. """ def __init__(self, pred_horizon: int, action_dim: int): self.pred_horizon = pred_horizon self.action_dim = action_dim self.action_sum = np.zeros((pred_horizon, action_dim)) self.action_count = np.zeros((pred_horizon, 1)) def update(self, predicted_action_seq: np.ndarray): """Add a new predicted sequence to the ensemble buffer.""" length = min(len(predicted_action_seq), self.pred_horizon) self.action_sum[:length] += predicted_action_seq[:length] self.action_count[:length] += 1 def get_and_shift_actions(self, n_actions: int) -> np.ndarray: """Compute average actions and shift the buffer window forward.""" # Prevent division by zero counts = np.clip(self.action_count[:n_actions], a_min=1, a_max=None) avg_actions = self.action_sum[:n_actions] / counts # Update and shift the buffer new_sum = np.zeros_like(self.action_sum) new_count = np.zeros_like(self.action_count) if self.pred_horizon > n_actions: new_sum[:-n_actions] = self.action_sum[n_actions:] new_count[:-n_actions] = self.action_count[n_actions:] self.action_sum = new_sum self.action_count = new_count return avg_actions ``` #### 運作範例 (Example) 假設 `pred_horizon = 4`,每步執行 `n_actions = 1`: 1. **Step 0**: 模型預測 $[a_0, a_1, a_2, a_3]$。 - `action_sum` 變為 $[a_0, a_1, a_2, a_3]$,`counts` 變為 $[1, 1, 1, 1]$。 - 執行 $a_0$,**Window** 位移後剩下 $[a_1, a_2, a_3, 0]$。 2. **Step 1**: 模型預測新的序列 $[b_1, b_2, b_3, b_4]$。 - 將新序列加入後,`action_sum` 變為 $[a_1+b_1, a_2+b_2, a_3+b_3, b_4]$。 - `counts` 變為 $[2, 2, 2, 1]$。 - 下一步執行的動作將會是 $(a_1+b_1)/2$,即兩次預測的平均值。 透過這種方式,軌跡會變得非常絲滑,因為每個動作都是多次預測共同決定的結果。 ![image](https://hackmd.io/_uploads/HJWqMVw6-g.png) 從這張圖可以看到剛開啟幾步是不穩的,但後面的軌跡與前面的軌跡透過算法更新變得更加平滑,並且可以更貼近期望的軌跡 ### Training **Initialize** : ```python! if __name__ == "__main__": # ========================================== # 0. Setup & Hyperparameters # ========================================== DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[trajectory_train] Using device: {DEVICE}") # Hyperparameters for Diffusion Policy TIMESTEPS = 100 # Total diffusion steps EMBED_DIM = 256 # Embedding dimension for the model NUM_HEADS = 8 # Number of attention heads MLP_RATIO = 4.0 # MLP expansion ratio PRED_HORIZON = 16 # Prediction horizon for training samples OBS_HORIZON = 8 # Observation horizon BATCH_SIZE = 32 # Batch size for training EPOCHS = 3000 # Number of epochs to train DEPTH = 4 # Number of transformer blocks LR = 1e-4 # Learning rate for optimizer SAVE_DIR = "checkpoints" os.makedirs(SAVE_DIR, exist_ok=True) # ========================================== # 1. Dataset & DataLoader # ========================================== print("[trajectory_train] Initializing Dataset and DataLoader...") trajectory_dataset = TrajectoryDataset(pred_horizon=PRED_HORIZON, obs_horizon=OBS_HORIZON) dataloader = DataLoader(trajectory_dataset, batch_size=BATCH_SIZE, shuffle=True) print(f"[trajectory_train] Dataset size: {len(trajectory_dataset)} | Batches/epoch: {len(dataloader)}") # ========================================== # 2. Model & EMA Definition # ========================================== print("[trajectory_train] Initializing Model and EMA...") model = DiffusionPolicy( action_dim=trajectory_dataset.action_dim, state_dim=trajectory_dataset.state_dim, embed_dim=EMBED_DIM, num_heads=NUM_HEADS, mlp_ratio=MLP_RATIO, num_blocks=DEPTH ).to(DEVICE) model.train() ema_model = EMA(model, beta=0.995) # ========================================== # 3. Optimizer, Scheduler & Diffusion Setup # ========================================== scheduler = DDIMScheduler( num_train_timesteps=TIMESTEPS, beta_schedule="squaredcos_cap_v2", clip_sample=True, set_alpha_to_one=True, steps_offset=0, prediction_type="epsilon" ) optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4) mse_loss = torch.nn.MSELoss() ``` 1. **Hyperparameters** * `DEVICE` : **GPU** 加速 * `OBS_HORIZON = 8` : 作為 **Observation**,過去 8 步 * `PRED_HORIZON = 16` : 預測未來 **16** 步,**Noised Action** 的 **Sequence Length** * `EMBED_DIM = 256` : **Token** 的特徵維度 * `BATCH_SIZE = 200`、`EPOCHS = 2000`、`LR = 1e-4` : **Training** 相關設定 2. **Dataset & DataLoader** * 建立 `TrajectoryDataset` 並透過 **DataLoader** 打包,`shuffle=True` 讓模型每個 **Epoch** 都能看到打亂的 **Batch** 資料(**Trajectory** 內沒有打亂) 3. **EMA** 初始化,$\beta=0.995$ 4. **Diffusion Scheduler** * `TIMESTEPS = 100` : **Diffusion Process** 的最大步數 $T=100$ * `beta_schedule="squaredcos_cap_v2"` : 對 $\beta$ 使用 **Cosine** **Schedule**,相對於 **Linear** 能更平滑 * `prediction_type="epsilon"` : 定義模型的任務是預測雜訊 $\epsilon$ 5. **Optimizer & Loss** : * **AdamW** : 目前 **Transformer** 標配的 **Optimizer**,自帶 `weight_decay` 能有效防止 **overfitting** * **MSELoss** : 用來計算 "模型預測的雜訊" 與 "實際加入的雜訊" 之間的 **Mean Square Error** **Training Loop** : ```python! # Training loop for epoch in range(1, EPOCHS + 1): pbar = tqdm(dataloader, desc=f"Epoch {epoch}/{EPOCHS} Training") for step, batch in enumerate(pbar): # 1. Get the batch of action sequences actions = batch['action'].to(DEVICE) obs = batch['obs'].to(DEVICE) # 2. Sample random diffusion steps for each sequence in the batch k = torch.randint(0, TIMESTEPS, (actions.shape[0],), device=actions.device) # shape: (batch_size,) # 3. Add noise to the actions according to the sampled diffusion steps noise = torch.randn_like(actions) noised_actions = scheduler.add_noise(actions, noise, k) # 4. Predict the noise using the Diffusion Policy model predicted_noise = model(observations=obs, diffusion_steps=k, noised_actions=noised_actions) # 5. Compute the loss between the predicted noise and the true noise loss = mse_loss(predicted_noise, noise) # 6. Backpropagation and optimization step optimizer.zero_grad() loss.backward() optimizer.step() ema_model.update(model) pbar.set_postfix({"Epoch": epoch, "Loss": loss.item()}) if epoch % 500 == 0 or epoch == EPOCHS: checkpoint_path = os.path.join(SAVE_DIR, f"trajectory_diffusion_policy_{epoch}.pth") ema_model.save_pretrained(checkpoint_path) print(f"\nSaved checkpoint to {checkpoint_path}") ``` 1. `actions` : 乾淨的未來軌跡 2. `obs` : 過去的軌跡 3. `k` : 隨機抽樣 `batch_size` 個 **Diffusion Step** 4. `noise` : $\epsilon\sim\mathcal{N}(0,I)$ **Sample** 雜訊,也是我們要猜的標準答案 5. `noised_actions` : 對乾淨的軌跡加上雜訊,並根據 $k$ 來調整雜訊的多寡,然後透過 **Scheduler** 幫我套 **Forward Process** 的公式 $x_k=\sqrt{\bar\alpha_k}x_0+\sqrt{1-\bar\alpha_k}\epsilon$ 6. `predicted_noise` : 神經網路預測的 **Noise** $\epsilon_\theta$ 7. **MSE** 計算損失函數 8. **Optimization & EMA** 9. **Checkpointing**,這邊注意,要存的是 **EMA Model**,到時候推論要用的是這個,如果是要接續訓練的話就是存原本的 ### Inference ```python! def main(): DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # --- 1. Parameters (strictly matching trajectory_gif_eval.py) --- TIMESTEPS = 100 ACTION_DIM = 2 STATE_DIM = 2 EMBED_DIM = 256 NUM_HEADS = 8 MLP_RATIO = 4.0 OBS_HORIZON = 8 PRED_HORIZON = 16 ACTION_HORIZON = 8 INFERENCE_STEPS = 50 MAX_EPISODE_STEPS = 100 CHECKPOINT_PATH = "checkpoints/trajectory_diffusion_policy_3000.pth" # --- 2. Initialize Model and Scheduler --- model = DiffusionPolicy(action_dim=ACTION_DIM, state_dim=STATE_DIM, embed_dim=EMBED_DIM, num_heads=NUM_HEADS, mlp_ratio=MLP_RATIO, num_blocks=4).to(DEVICE) print(f"Loading checkpoint from: {CHECKPOINT_PATH}") model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=DEVICE, weights_only=True)) model.eval() scheduler = DDIMScheduler(num_train_timesteps=TIMESTEPS, beta_schedule="squaredcos_cap_v2", clip_sample=True, set_alpha_to_one=True, prediction_type="epsilon") scheduler.set_timesteps(INFERENCE_STEPS) # --- 3. Setup Inference environment --- reference_traj = generate_trajectory_forward(num_steps=100) normalizer = NumpyNormalizer(reference_traj) current_state = reference_traj[0].copy() # Observation buffer (window of past states) obs_buffer = collections.deque([current_state.copy()] * OBS_HORIZON, maxlen=OBS_HORIZON) actual_trajectory = [current_state.copy()] # Initialize Temporal Ensembling temporal_ensembler = NumpyTemporalEnsembler(pred_horizon=PRED_HORIZON, action_dim=ACTION_DIM) ``` 1. **Hyperprarmeter** : * `INFERENCE_STEPS` : 這個代表是要用多少 **Step** 來進行 **Reverse Process**,**Scheduler** 會根據這個來決定跳點的 $k$ 序列 * 其他參數都跟 **Training** 一樣,要用來建立相同的 **model** 與 **Scheduler** 2. **Model and Scheduler** : * 初始化相同架構的 **Model** 並載入 **Weight**,然後設定為 **eval** 模式 * 建立相同的 **Scheduler**,並設定 **DDIM** 要使用多少 **Step** 來進行 **Reverse Process** 3. **Setup Inference environment** : * 產生 **Target** 的軌跡 `reference_traj` * 建立 **Normalizer**、軌跡的第一個點作為起始點 * 建立一個 **state** 的 **Queue buffer (deque)** `obs_buffer`,並用第一個點來填滿,最大上限為 `OBS_HORIZON`, 之後填入新資料的話舊的會被捨棄 * `actual_trajecory` : 建立一個 **buffer** 來儲存完整軌跡 --- ```python! # --- 4. Closed-loop Inference --- print(f"Starting closed-loop inference for {MAX_EPISODE_STEPS} steps...") step_count = 0 while step_count < MAX_EPISODE_STEPS: # Prepare Observation (shape: 1, OBS_HORIZON, STATE_DIM) obs_np = np.stack(obs_buffer) obs_norm = normalizer.normalize(obs_np) obs_tensor = torch.from_numpy(obs_norm).float().unsqueeze(0).to(DEVICE) # Action Prediction (Gaussian Noise -> Predicted Trajectory) noised_actions = torch.randn((1, PRED_HORIZON, ACTION_DIM), device=DEVICE) with torch.no_grad(): for k in scheduler.timesteps: t_batch = torch.full((1,), k, device=DEVICE, dtype=torch.long) predicted_noise = model(observations=obs_tensor, diffusion_steps=t_batch, noised_actions=noised_actions) noised_actions = scheduler.step(model_output=predicted_noise, timestep=k, sample=noised_actions).prev_sample # Post-process: Unnormalize predicted_action_seq = normalizer.unnormalize(noised_actions.squeeze(0).cpu().numpy()) # Update Ensembler and get smoothed actions temporal_ensembler.update(predicted_action_seq) exec_actions = temporal_ensembler.get_and_shift_actions(ACTION_HORIZON) # Execute Actions (Receding Horizon) for i in range(ACTION_HORIZON): if step_count >= MAX_EPISODE_STEPS: break # Use predicted point as next state (simple simulator) current_state = exec_actions[i] actual_trajectory.append(current_state.copy()) obs_buffer.append(current_state.copy()) step_count += 1 print(f"Progress: {step_count}/{MAX_EPISODE_STEPS}", end='\r') ``` 1. 建立最外層迴圈 ,`while step_count < MAX_EPISODE_STEPS:` 其中 `MAX_EPISODE_STEPS` 為整個軌跡的長度,讓產生的軌跡不要超過沒訓練過的範圍 2. `obs_buffer` 會一直存當前 **Step** 的過去 8 個點,然後將這些點轉為 **numpy->normalize->tensor** 3. 初始化一個純雜訊 `noised_actions`,代表為 **Diffusion** 的起點 $x_T$,大小為我們 **Training** 設定的 `(1, PRED_HORIZON,ACTION_DIM)` 4. 根據 **scheduler** 產生的 **timesteps** (跳點的 **List**,**ex**.`[98, 96, 94, 92, 90, ...]`) 進行 **Reverse Process** 5. 將 `obs_tensor`、`t_batch`、`noised_actions` 帶入 **Noise Predictor**,並使用 **DDIM Scheduler** 和預測出的 **Noise**,把 `noised_actions` 內的雜訊去除,去的次數為 `INFERENCE_STEPS` 6. 當前的乾淨 **action**,也就是未來的軌跡還在 **Range** $[-1,1]$ 之間,透過 **unnormalize** 還原回原本的尺度 7. 我們預測了後面 `PRED_HORIZON` 個點,但為了穩定我們只會拿一半 (`ACTION_HORIZON`) 來用,然後這一半的點會填入整體軌跡和 **observation buffer**(舊資料會被丟棄),並用做下一次的軌跡生成 後面就是 **Plot** 的部分,就不提了 ## Minari Point Maze Example **Minari** 是一個標準的 **Offline Reinforcement Learning** 集合推託管的 **Interface**,且大多遵循 **Gymnasium API** 的 **Reinforcement Learning**,並提供資料蒐集、取樣等功能,簡化 **Dataset** 的處理,我想用 **D4RL** 的 **PointMaze** 來實作,但是 **Diffusion Policy** 算是一種 **Behavior Cloning**,其實不需要他提供的 **Reward** 使用固定的 **seed** 去跑 **10** 次,並記錄下軌跡 : ![image](https://hackmd.io/_uploads/B1E4bzthZg.png) ### Dataset 我們使用了 `minari` 庫提供的 **Offline Dataset**。**Minari** 是 **Gymnasium** 官方維護的一個用於離線強化學習的資料集接口,繼承了 **D4RL** 的精神並提供更現代化的支援。 在 `PointMazeDataset` 中,我們針對 `pointmaze-large-v2` 進行處理。這個資料集包含了一個代理器在迷宮中移動到隨機目標點的軌跡。 ```python! from point_maze_dataset import PointMazeDataset # 初始化 Dataset # observation 包含:[pos_x, pos_y, vel_x, vel_y] (4維) # desired_goal 包含:[goal_x, goal_y] (2維) dataset = PointMazeDataset(dataset_id="D4RL/pointmaze/large-v2", pred_horizon=16, obs_horizon=2) ``` 1. **State Vector**: PointMaze 的 Observation 是一個字典。我們將 `observation` (位置與速度) 與 `desired_goal` (目標座標) 拼接在一起,形成一個 **6 維** 的狀態向量。 2. **Action Vector**: 代理人的輸出是 **2 維** 的加速度 (Force)。 3. **Aligned Chunking**: 在 `__getitem__` 中,我們實作了對齊的序列取樣。 - `t_start = t - obs_horizon + 1` - 這確保了觀察序列 (Observation) 與動作序列 (Action) 在時間軸上是完美對齊的起點。 ### Training 針對點位導航這類數值型、高動態的任務,我們引入了 **Mixed Precision (AMP)** 來加速訓練: ```python! # 使用 torch.amp 進行混合精度訓練 from torch.cuda.amp import autocast, GradScaler scaler = GradScaler() with autocast(enabled=True): predicted_noise = model(observations=obs, diffusion_steps=k, noised_actions=noised_actions) loss = mse_loss(predicted_noise, noise) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() ``` - **Step-based Loop**: 不同於 Trajectory 使用 Epoch,PointMaze 採用總步數 (`TOTAL_STEPS = 30000`) 控制,適合處理超大型的離線資料集。 - **Optimization**: 使用 `AdamW` 配合 `CosineAnnealing` 學習率排程,並在 V100/H100 等高階顯卡上開啟 `pin_memory` 提升傳輸效率。 ### Inference 在 **Maze** 環境中,我們不只需要預測軌跡,還需要將其可視化。 1. **point_maze_plot_eval.py**: 繪製靜態的軌跡對比圖,觀察模型預測的 16 步動作如何趨向目標點。 2. **point_maze_render_eval.py**: 透過 PyGame 即時渲染代理人在迷宮中移動的過程。 推論流程同樣遵循 **Receding Horizon** 邏輯: - 模型每步預測 32 步動作。 - 透過 `NumpyTemporalEnsembler` 進行平滑。 - 實際執行前 16 步後,重新獲取 Observation 進行下一次預測。 ![1776522424070](https://hackmd.io/_uploads/H1yCEzZaWg.gif) ## Aloha Transfer Cube Example Aloha 是一個雙臂機器人操作任務,我們使用的是 **LeRobot** (Hugging Face) 開源的 `lerobot/aloha_sim_transfer_cube_human` 資料集。這是一個比迷宮複雜得多的視覺導航與操縱任務。 ### Vision Encoder ```python! class VisionEncoder(nn.Module): """ Mini-ResNet + SpatialSoftmax. Optimized for ALOHA (224x224) but supports 128x128 (default). """ def __init__(self, in_channels: int = 3, image_size: int = 128, embed_dim: int = 256): super().__init__() # 1. Lightweight ResNet feature extractor (Downsamples by a total factor of 16) self.cnn = nn.Sequential( # Input: (B, 3, H, W) -> e.g. (B, 3, 224, 224) for ALOHA nn.Conv2d(in_channels, 32, kernel_size=3, stride=2, padding=1, bias=False), nn.GroupNorm(8, 32), nn.ReLU(inplace=True), # -> (B, 32, H/2, W/2) ResBlock(32, 64, stride=2), # -> (B, 64, 32, 32) ResBlock(64, 128, stride=2), # -> (B, 128, 16, 16) ResBlock(128, 256, stride=2) # -> (B, 256, 8, 8) ) # 2. Spatial Softmax Module # Image is downsampled by 2*2*2*2 = 16 times: 128/16=8 or 224/16=14 (ALOHA) feat_size = image_size // 16 num_channels = 256 # (Assuming the SpatialSoftmax class defined earlier is available) self.spatial_softmax = SpatialSoftmax( height=feat_size, width=feat_size, num_channels=num_channels ) # 3. Final projection: maps 256 coordinate pairs (512 values) to Transformer's embed_dim self.proj = nn.Sequential( nn.Linear(num_channels * 2, embed_dim), nn.LayerNorm(embed_dim), # Add LayerNorm to stabilize tokens fed into the Transformer nn.SiLU() ) def forward(self, x: torch.Tensor) -> torch.Tensor: # x: (B, 3, H, W) -> e.g. (B, 3, 224, 224) for ALOHA features = self.cnn(x) # -> (B, 256, 8, 8) keypoints = self.spatial_softmax(features) # -> (B, 512) img_token = self.proj(keypoints) # -> (B, embed_dim) return img_token ``` **Aloha Dataset** 設定的 **Image Size** 為 `3x224x224`,我們會將當前環境的俯視視角圖片,先經過 **Vision Encoder**,然後才會放進 **Transformer** : * **Input** : $(B,3,224,224)$ * **Conv2d(S2)+GN+ReLU** : 因為使用 `stride=2`,所以 **feature map** 大小減半且數量變多 $(B,32,112,112)$ * **ResBlock** : $(B,32,112,112)$->$(64,56,56)$,大小減半,**feature map** 數量加倍,接著再經過兩次 **ResBlock** 變成 $(B,256,14,14)$ 的高維特徵 * **Spatial Softmax** : $(B,256,14,14)$ -> $(B,512)$,其中 **512** 為 **256** 組的座標 $(x,y)$,只是為 **Flatten** 的狀態 * **Linear + LN + SiLU** : $(B,512)$->$(B,\text{embed_dim})$,最後一層用來對齊 **Transformer** 的維度 --- #### Residual Block ```python! class ResBlock(nn.Module): def __init__(self, in_channels: int, out_channels: int, stride: int = 1, groups: int = 8): super().__init__() # Ensure out_channels is divisible by groups # For channels [32, 64, 128, 256], groups=8 or 16 will work perfectly. # First convolutional layer self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False) self.gn1 = nn.GroupNorm(groups, out_channels) # Fixed: (num_groups, num_channels) # Second convolutional layer self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False) self.gn2 = nn.GroupNorm(groups, out_channels) # Shortcut branch self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), nn.GroupNorm(groups, out_channels) ) def forward(self, x): # 1. First Conv + GN + ReLU out = F.relu(self.gn1(self.conv1(x)), inplace=True) # 2. Second Conv + GN out = self.gn2(self.conv2(out)) # 3. Add shortcut connection out += self.shortcut(x) # 4. Final ReLU out = F.relu(out, inplace=True) return out ``` 在 **Vision Encoder** 使用 **Residual Block** 可以有效避免梯度消失 * **Conv3x3 + GN + ReLU** : 經過一次標準的 **CNN** 組合 * **Conv3x3 + GN** : 再經過一層 **CNN** 和 **Group Normalize** * **Shortcut(Conv3x3+GN)** : 因為這邊要 **Shortcut**,所以前一層才沒有 **Activation** **Function**,這邊的使用到的 **CNN Input** 是使用原始的 **Image**,經過兩層 **CNN** 的 **Feature Map** 和經過一層的 **Feature Map** 相加,最後經過 **ReLU** --- #### Spatial Softmax 在 **Vision-Based Policy** 中,**Spatial Softmax** 是關鍵技術。它不像普通的 **Flatten** 會丟失空間資訊,而是將 **Feature Map** 轉換為「期望特徵點座標」。 ```python! # --------------------------------- # Spatial Softmax Block # --------------------------------- class SpatialSoftmax(nn.Module): """ Extracts spatial expected coordinates (Keypoints) from feature maps. """ def __init__(self, height: int, width: int, num_channels: int, temperature: float = 1.0): super().__init__() self.height = height self.width = width self.num_channels = num_channels # Create normalized coordinate grids [-1, 1] pos_y, pos_x = torch.meshgrid( torch.linspace(-1., 1., height), torch.linspace(-1., 1., width), indexing='ij' ) pos_x = pos_x.reshape(-1) pos_y = pos_y.reshape(-1) self.register_buffer('pos_x', pos_x) self.register_buffer('pos_y', pos_y) # Learnable temperature parameter self.temperature = nn.Parameter(torch.ones(1) * temperature) def forward(self, feature: torch.Tensor) -> torch.Tensor: # feature: (B, C, H, W) B, C, H, W = feature.shape assert H == self.height and W == self.width, f"Expected {self.height}x{self.width}, got {H}x{W}" # Flatten spatial dimensions: (B, C, H*W) feature_flat = feature.reshape(B, C, H * W) # Apply temperature and softmax to get probabilities weights = F.softmax(feature_flat / self.temperature, dim=-1) # Calculate expected coordinates expected_x = torch.sum(self.pos_x * weights, dim=-1) # (B, C) expected_y = torch.sum(self.pos_y * weights, dim=-1) # (B, C) # Stack coordinates: (B, C, 2) and flatten to (B, C * 2) expected_xy = torch.stack([expected_x, expected_y], dim=-1) return expected_xy.reshape(B, C * 2) ``` 以 Aloha Dataset 來說,經過前面多個 **Residual Block** 後,**Feature Map** 的 **Shape** 為 $(B,256,14,14)$,Spatial Softmax 的運算流程如下 : * **Flatten** : $(B,256,14,14)$ -> $(B,256,196)$ * **Softmax** : 對最後一個 **Dimension** 計算 **Softmax**,也就是說會有 **256** 組加總為 1 的 **Vector** * **Coordinate Grid** : 初始化時建立一個從 -1 到 1 的 **Matrix**,$14\times 14$ 的 `pos_x` 會像這樣 ```python! [[-1.0, -0.85, ..., 1.0], [-1.0, -0.85, ..., 1.0], ... [-1.0, -0.85, ..., 1.0]] ``` `pos_y` 則方向相反,建立完成後也會 **Flatten** 成 $(196)$ * **Expected Value** : 計算座標的加權平均,類似物理上 **Center of Mass** 的計算,`expected_x = torch.sum(pos_x * weights, dim=-1)` 計算 X 座標的期望值,維度 $(B,256,196)$ -> $(B,256)$,`expected_y = torch.sum(pos_y * weights, dim=-1)` 計算 Y 座標的期望值 * 合併 : 利用 **Stack** 將兩個 $(B,256)$ 合併成 $(B,256,2)$,然後 **Reshape** 成 $(B,512)$,剛好對齊 **Spatial Softmax** 在計算 **Softmax** 的時候會有一個參數 **Tempture** $\tau$,會先將 **Flatten** 後的原始 **Input** 先除 $\tau$,當 $\tau$ 很低時,機率分布會很集中,反之當 $\tau$ 很高時,機率分布會很散,他是一個可訓練參數,以下示範了低和高的差異 ![spatial_softmax_demo](https://hackmd.io/_uploads/SyjAY_mCWe.gif) * 熱力圖代表的是不同的 $\tau$ 計算出的 `weights`,類似 2D 的機率分布,是帶入 $\tau$ 計算 Softmax 後的結果 * 白點代表我們計算出的 **Expected Value**,他是一個 **XY** 座標點,可以視為熱力圖的 **Center of Mass** * 紅點為機率最高的位置 可以發現 $\tau$ 較高時,可以讓 **Expected Value** 不會動的那麼劇烈,一個合適的 $\tau$ 可以給出穩定的座標點 --- ### Dataset `AlohaDataset` 與 `PointMazeDataset` 最大的不同在於其**數據規模**與**多模態輸入**。迷宮任務只需要處理簡單的向量,而 Aloha 需要同時處理高解析度影像與 14 維的關節狀態 (`qpos`)。 #### 1. 硬體加速快取機制 (Hardware Accelerated Caching) 由於直接從原始影片檔讀取影像非常緩慢,我們實作了快取機制。在第一次執行時,我們會利用 GPU 加速影像的預處理: ```python! def _create_image_cache(self): print(f"[AlohaDataset] Cache not found. Accelerating single-camera cache via DataLoader & GPU...") n = len(self.lerobot_dataset) self.cached_images = torch.zeros((n, 3, self.image_size, self.image_size), dtype=torch.float16) # 1. Use DataLoader to multiprocess video decoding from torch.utils.data import DataLoader dl = DataLoader(self.lerobot_dataset, batch_size=64, num_workers=4, pin_memory=True) # 2. Setup GPU-accelerated preprocessing device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') preprocess = T.Compose([ T.CenterCrop(480), T.Resize((self.image_size, self.image_size), antialias=True) ]) idx = 0 for batch in tqdm(dl, desc="Hardware Accelerated Caching"): img = batch[self.cam_key] # ... (ndim handling) img = img.to(device, non_blocking=True) # Apply CenterCrop + Resize on GPU and cast to FP16 img_processed = preprocess(img.float() / 255.0).half().cpu() bs = img_processed.shape[0] self.cached_images[idx : idx + bs] = img_processed idx += bs torch.save(self.cached_images, self.cache_path) ``` - **為什麼要快取?**: Vision-based 訓練的瓶頸通常在於 **Disk IO**。預先處理並壓縮影像(例如從 640x480 縮小到 128x128),能讓模型在訓練時以極高的速度讀取 Batch,避免 GPU 空轉等待數據。 #### 2. TensorNormalizer (GPU-Native Normalization) 為了避免在 **Data Loading** 過程中不斷進行 **CPU-GPU** 或 **Numpy-Tensor** 的轉換造成延遲,我們實作了全 **PyTorch** 版本的 `TensorNormalizer`: - **優點**: 所有數據保持在 `torch.Tensor` 格式,正規化運算直接在 **GPU** 上執行。這對於高頻率讀取影像數據的 **Task** 來說至關重要,能有效降低 **CPU** 的負擔。 #### 3. 數據增強 (Data Augmentation) 機器人任務對相機位置與光影極度敏感。我們實作了 `AlohaAugmentor`: ```python! class AlohaAugmentor: def __init__(self, image_size=128): self.image_size = image_size # 1. Random Shift (maintains original Edge Padding) self.padding = 8 # Increased to 8px to expand the shift range def __call__(self, img_seq: torch.Tensor) -> torch.Tensor: """ img_seq: (T, C, H, W) """ # --- Determine random parameters shared across this specific sequence --- # 1. Random Crop parameters top = random.randint(0, self.padding * 2) left = random.randint(0, self.padding * 2) # 2. Color Jitter parameters brightness = random.uniform(0.8, 1.2) contrast = random.uniform(0.8, 1.2) saturation = random.uniform(0.8, 1.2) # 3. Decide whether to apply certain probabilistic augmentations apply_noise = random.random() > 0.5 # Apply padding first to prepare for cropping # (T, C, H, W) -> Padding is applied to the last two dimensions (H and W) img_seq = F.pad(img_seq, padding=[self.padding]*4, padding_mode='edge') processed_frames = [] for i in range(img_seq.shape[0]): img = img_seq[i] # Apply the same crop across the sequence img = F.crop(img, top, left, self.image_size, self.image_size) # Apply the same color jitter across the sequence img = F.adjust_brightness(img, brightness) img = F.adjust_contrast(img, contrast) img = F.adjust_saturation(img, saturation) # Add slight Gaussian noise # (Noise varies slightly per frame to improve robustness) if apply_noise: img = img + torch.randn_like(img) * 0.01 img = torch.clamp(img, 0.0, 1.0) processed_frames.append(img) return torch.stack(processed_frames) ``` - **時間一致性 (Temporal Consistency)**: 在一個 Observation Window 中,如果每幀的位移或亮度不同,模型會誤以為物體在抖動。因此我們在 `__call__` 開頭決定一組參數後,套用到整個 `T` 維度。 - **Random Shift (平移不變性)**: 這是 **Vision-based Policy** 的核心技巧。透過隨機位移,模擬相機安裝誤差,讓模型學會「空間平移不變性」,使其在現實部署時極具魯棒性。 ### Training 由於加入了 **ResNet18** 與較深的 **Transformer**,**Aloha** 的顯存佔用遠高於 **Point Maze**。 | 技術名稱 | 目的 | 對比 **Point Maze** | | -------- | -------- | -------- | | **Gradient Checkpointing** | 顯存優化 | **Point Maze** 不需開啟;**Aloha** 開啟後顯存可從 **24G** 降至 **8G** 以下。 | | **BF16 Autocast** | 訓練穩定性 | **Point Maze** 使用 **FP16** 即可;**Aloha** 推薦 **BF16** 以防 **Transformer** 梯度溢出。 | | **Warmup & Cosine Decay** | 學習率排程 | 視覺模型在訓練初期需要 Warmup 來穩定 CNN 的特徵提取層。 | ### Inference 最終驗證於 `aloha_render_eval.py`: - **Observation**: 包含 `top` 攝影機的影像序列 + **14** 維的關節狀態。影像會先通過 **Vision Encoder** (**CNN** + **Spatial Softmax**) 轉為 **Image Token**。 - **Output**: 預測未來的關節目標序列。 - **Result**: 推論結果會渲染成 **MP4** 影片儲存在 `assets/eval_aloha.mp4`。你可以看到模型如何精確地控制左右手,完成方塊的傳遞動作。 ![eval_aloha_0-ezgif.com-video-to-gif-converter](https://hackmd.io/_uploads/BypmeTPT-g.gif) ## Consistency Policy **Consistency Policy** 主要是用來加速 **Diffusion Process** 的方法,用蒸餾模型的方式,縮減產生 **Action** 的步數,以下簡單說明如何訓練 1. Initialize * 載入預訓練好的 Diffusion Policy *