# **PyTorch Masterclass: Part 5 – Reinforcement Learning with PyTorch** **Duration: ~90 minutes** >#PyTorch #ReinforcementLearning #RL #DeepRL #Qlearning #DQN #PPO #DDPG #MarkovDecisionProcesses #AI #MachineLearning #DeepLearning #ReinforcementLearning #PyTorchRL --- ## **Table of Contents** 1. [Introduction to Reinforcement Learning](#introduction-to-reinforcement-learning) 2. [Markov Decision Processes](#markov-decision-processes) 3. [Q-Learning and Deep Q-Networks](#q-learning-and-deep-q-networks) 4. [Policy Gradient Methods](#policy-gradient-methods) 5. [Proximal Policy Optimization](#proximal-policy-optimization) 6. [Deep Deterministic Policy Gradient](#deep-deterministic-policy-gradient) 7. [Model-Based Reinforcement Learning](#model-based-reinforcement-learning) 8. [Multi-Agent Reinforcement Learning](#multi-agent-reinforcement-learning) 9. [Building a Complete RL Agent](#building-a-complete-rl-agent) 10. [Quiz 5: Test Your Understanding of Reinforcement Learning](#quiz-5-test-your-understanding-of-reinforcement-learning) 11. [Summary and What's Next](#summary-and-whats-next) --- ## **Introduction to Reinforcement Learning** Reinforcement Learning (RL) is a branch of machine learning where an agent learns to make decisions by interacting with an environment to maximize cumulative reward. ### **Why Reinforcement Learning Matters** RL has achieved remarkable successes: - **AlphaGo**: Defeated world champion in Go - **Dota 2**: OpenAI Five defeated professional teams - **Robotics**: Learning complex manipulation tasks - **Autonomous vehicles**: Decision-making in complex environments - **Resource management**: Optimizing energy, network, and computing resources According to a 2023 Gartner report, RL will be a key driver in **autonomous systems**, with the market expected to reach **$14.5 billion by 2026**. ### **Key Components of RL** - **Agent**: The learner/decision-maker - **Environment**: What the agent interacts with - **State ($s$)**: Representation of the current situation - **Action ($a$)**: What the agent can do - **Reward ($r$)**: Feedback signal indicating success - **Policy ($\pi$)**: Strategy mapping states to actions - **Value function ($V(s)$ or $Q(s,a)$)**: Expected cumulative reward - **Model (optional)**: Agent's representation of the environment ### **RL vs. Other Learning Paradigms** | **Supervised Learning** | **Unsupervised Learning** | **Reinforcement Learning** | |-------------------------|---------------------------|----------------------------| | Labeled dataset | Unlabeled data | Environment interaction | | Predict output | Find patterns | Maximize cumulative reward | | Immediate feedback | No explicit feedback | Delayed, sparse feedback | ### **Why PyTorch for RL?** PyTorch is ideal for RL because: - **Dynamic computation graphs**: Essential for variable-length trajectories - **GPU acceleration**: Critical for training deep RL models - **Seamless integration**: With deep learning models - **Rich ecosystem**: Libraries like `torchrl`, `stable-baselines3`, `ray[rllib]` --- ## **Markov Decision Processes** Markov Decision Processes (MDPs) provide the mathematical framework for RL. ### **MDP Formal Definition** An MDP is defined by the tuple $(\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma)$: - $\mathcal{S}$: Set of states - $\mathcal{A}$: Set of actions - $\mathcal{P}$: State transition probability, $\mathcal{P}_{ss'}^a = \mathbb{P}[S_{t+1}=s'|S_t=s, A_t=a]$ - $\mathcal{R}$: Reward function, $\mathcal{R}_s^a = \mathbb{E}[R_{t+1}|S_t=s, A_t=a]$ - $\gamma$: Discount factor, $\gamma \in [0,1]$ ### **The Markov Property** The Markov property states that the future depends only on the present state: $$\mathbb{P}[S_{t+1}|S_t,A_t] = \mathbb{P}[S_{t+1}|S_1,A_1,\dots,S_t,A_t]$$ ### **Return and Value Functions** The **return** $G_t$ is the total discounted reward from time $t$: $$G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \dots = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$$ The **state-value function** for policy $\pi$: $$V_{\pi}(s) = \mathbb{E}_{\pi}[G_t|S_t=s]$$ The **action-value function** (Q-function): $$Q_{\pi}(s,a) = \mathbb{E}_{\pi}[G_t|S_t=s, A_t=a]$$ ### **Bellman Equations** The Bellman equation for the value function: $$V_{\pi}(s) = \sum_{a \in \mathcal{A}} \pi(a|s) \sum_{s' \in \mathcal{S}} \mathcal{P}_{ss'}^a \left[\mathcal{R}_s^a + \gamma V_{\pi}(s')\right]$$ For the optimal value function $V_*(s)$: $$V_*(s) = \max_a \sum_{s' \in \mathcal{S}} \mathcal{P}_{ss'}^a \left[\mathcal{R}_s^a + \gamma V_*(s')\right]$$ ### **Implementing MDP in PyTorch** ```python import torch import numpy as np class MDP: def __init__(self, states, actions, transition_probs, rewards, gamma=0.99): """ Initialize Markov Decision Process Args: states: List of states actions: List of actions transition_probs: Dict[state][action] -> list of (next_state, prob) rewards: Dict[state][action][next_state] -> reward gamma: Discount factor """ self.states = states self.actions = actions self.transition_probs = transition_probs self.rewards = rewards self.gamma = gamma self.state_to_idx = {s: i for i, s in enumerate(states)} self.action_to_idx = {a: i for i, a in enumerate(actions)} def get_transition_prob(self, s, a, s_prime): """Get transition probability P(s'|s,a)""" s_idx = self.state_to_idx[s] a_idx = self.action_to_idx[a] for next_s, prob in self.transition_probs[s][a]: if next_s == s_prime: return prob return 0.0 def get_reward(self, s, a, s_prime): """Get reward R(s,a,s')""" return self.rewards.get(s, {}).get(a, {}).get(s_prime, 0.0) def bellman_update(self, V, s): """Perform Bellman update for state s""" v = 0 for a in self.actions: action_value = 0 for s_prime, prob in self.transition_probs[s][a]: r = self.get_reward(s, a, s_prime) action_value += prob * (r + self.gamma * V[self.state_to_idx[s_prime]]) v = max(v, action_value) return v # Example: Grid World MDP states = [(i, j) for i in range(4) for j in range(4)] actions = ['up', 'down', 'left', 'right'] # Define transition probabilities and rewards transition_probs = {} rewards = {} for s in states: transition_probs[s] = {} rewards[s] = {} for a in actions: transition_probs[s][a] = [] rewards[s][a] = {} # Calculate next state based on action i, j = s if a == 'up' and i > 0: s_prime = (i-1, j) elif a == 'down' and i < 3: s_prime = (i+1, j) elif a == 'left' and j > 0: s_prime = (i, j-1) elif a == 'right' and j < 3: s_prime = (i, j+1) else: s_prime = s # Stay in same state if action not possible # Set transition probability (assume deterministic for simplicity) transition_probs[s][a] = [(s_prime, 1.0)] # Set rewards (goal state at (3,3) has reward 1) if s_prime == (3, 3): rewards[s][a][s_prime] = 1.0 else: rewards[s][a][s_prime] = 0.0 # Create MDP mdp = MDP(states, actions, transition_probs, rewards) ``` ### **Policy Evaluation and Iteration** ```python def policy_evaluation(mdp, policy, theta=1e-6, max_iterations=1000): """Evaluate a policy using iterative policy evaluation""" V = torch.zeros(len(mdp.states)) for _ in range(max_iterations): delta = 0 for i, s in enumerate(mdp.states): v = 0 for a in mdp.actions: for s_prime, prob in mdp.transition_probs[s][a]: r = mdp.get_reward(s, a, s_prime) v += policy[i, mdp.action_to_idx[a]] * prob * (r + mdp.gamma * V[mdp.state_to_idx[s_prime]]) delta = max(delta, torch.abs(v - V[i])) V[i] = v if delta < theta: break return V def policy_iteration(mdp, gamma=0.99, theta=1e-6): """Perform policy iteration to find optimal policy""" # Initialize random policy policy = torch.ones(len(mdp.states), len(mdp.actions)) / len(mdp.actions) while True: # Policy evaluation V = policy_evaluation(mdp, policy, theta) # Policy improvement policy_stable = True for i, s in enumerate(mdp.states): old_action = torch.argmax(policy[i]).item() # Calculate action values action_values = torch.zeros(len(mdp.actions)) for a_idx, a in enumerate(mdp.actions): for s_prime, prob in mdp.transition_probs[s][a]: r = mdp.get_reward(s, a, s_prime) action_values[a_idx] += prob * (r + gamma * V[mdp.state_to_idx[s_prime]]) # Update policy best_action = torch.argmax(action_values).item() policy[i] = torch.zeros(len(mdp.actions)) policy[i, best_action] = 1.0 if old_action != best_action: policy_stable = False if policy_stable: break return policy, V ``` --- ## **Q-Learning and Deep Q-Networks** Q-Learning is a model-free RL algorithm that learns the optimal action-value function. ### **Q-Learning Algorithm** The Q-Learning update rule: $$Q(s_t,a_t) \leftarrow Q(s_t,a_t) + \alpha \left[r_{t+1} + \gamma \max_a Q(s_{t+1},a) - Q(s_t,a_t)\right]$$ Where: - $\alpha$ is the learning rate - $\gamma$ is the discount factor ### **Deep Q-Networks (DQN)** DQN uses a neural network to approximate the Q-function: $$Q(s,a;\theta) \approx Q^*(s,a)$$ Key innovations: - **Experience replay**: Store transitions $(s_t,a_t,r_{t+1},s_{t+1})$ in a replay buffer - **Target network**: Use a separate network for computing target values ### **DQN Loss Function** The DQN loss: $$\mathcal{L}(\theta) = \mathbb{E}_{(s,a,r,s') \sim \mathcal{U}(\mathcal{D})} \left[ \left( r + \gamma \max_{a'} Q(s',a';\theta^-) - Q(s,a;\theta) \right)^2 \right]$$ Where $\theta^-$ are the parameters of the target network. ### **Implementing DQN in PyTorch** ```python import torch import torch.nn as nn import torch.optim as optim import random from collections import deque class DQN(nn.Module): """Deep Q-Network""" def __init__(self, input_dim, output_dim): super(DQN, self).__init__() self.network = nn.Sequential( nn.Linear(input_dim, 128), nn.ReLU(), nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, output_dim) ) def forward(self, x): return self.network(x) class ReplayBuffer: """Experience replay buffer""" def __init__(self, capacity): self.buffer = deque(maxlen=capacity) def push(self, state, action, reward, next_state, done): self.buffer.append((state, action, reward, next_state, done)) def sample(self, batch_size): batch = random.sample(self.buffer, batch_size) states, actions, rewards, next_states, dones = zip(*batch) return ( torch.stack(states), torch.tensor(actions), torch.tensor(rewards, dtype=torch.float32), torch.stack(next_states), torch.tensor(dones, dtype=torch.float32) ) def __len__(self): return len(self.buffer) class DQNAgent: """DQN Agent implementation""" def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, buffer_capacity=10000, batch_size=64, tau=0.005): self.state_dim = state_dim self.action_dim = action_dim self.gamma = gamma self.batch_size = batch_size self.tau = tau # Q-Network self.q_network = DQN(state_dim, action_dim) self.target_network = DQN(state_dim, action_dim) self.target_network.load_state_dict(self.q_network.state_dict()) # Optimizer self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr) # Replay buffer self.replay_buffer = ReplayBuffer(buffer_capacity) def select_action(self, state, epsilon=0.1): """Epsilon-greedy action selection""" if random.random() < epsilon: return random.randrange(self.action_dim) else: with torch.no_grad(): state = torch.FloatTensor(state).unsqueeze(0) q_values = self.q_network(state) return torch.argmax(q_values).item() def update(self): """Update Q-network using experience replay""" if len(self.replay_buffer) < self.batch_size: return # Sample batch from replay buffer states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size) # Compute current Q values current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1) # Compute target Q values with torch.no_grad(): next_q = self.target_network(next_states).max(1)[0] target_q = rewards + (1 - dones) * self.gamma * next_q # Compute loss loss = nn.MSELoss()(current_q, target_q) # Optimize the model self.optimizer.zero_grad() loss.backward() self.optimizer.step() # Update target network self._soft_update_target_network() return loss.item() def _soft_update_target_network(self): """Soft update of the target network""" for target_param, param in zip(self.target_network.parameters(), self.q_network.parameters()): target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data) def store_experience(self, state, action, reward, next_state, done): """Store experience in replay buffer""" self.replay_buffer.push( torch.FloatTensor(state), action, reward, torch.FloatTensor(next_state), done ) ``` ### **Training a DQN Agent** ```python import gym import numpy as np import matplotlib.pyplot as plt def train_dqn(env, agent, episodes=500, max_steps=200, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995, render=False): """Train DQN agent""" rewards = [] epsilons = [] epsilon = epsilon_start for episode in range(episodes): state = env.reset() if isinstance(state, tuple): state = state[0] # Handle new Gym API total_reward = 0 for step in range(max_steps): action = agent.select_action(state, epsilon) next_state, reward, done, truncated, _ = env.step(action) done = done or truncated agent.store_experience(state, action, reward, next_state, done) loss = agent.update() state = next_state total_reward += reward if render and episode % 50 == 0: env.render() if done: break # Decay epsilon epsilon = max(epsilon_end, epsilon * epsilon_decay) rewards.append(total_reward) epsilons.append(epsilon) if episode % 10 == 0: print(f"Episode {episode}, Reward: {total_reward:.2f}, Epsilon: {epsilon:.3f}") env.close() return rewards, epsilons # Train DQN on CartPole env = gym.make('CartPole-v1') state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = DQNAgent(state_dim, action_dim) rewards, epsilons = train_dqn(env, agent, episodes=500) # Plot results plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.plot(rewards) plt.title('Rewards per Episode') plt.xlabel('Episode') plt.ylabel('Total Reward') plt.subplot(1, 2, 2) plt.plot(epsilons) plt.title('Epsilon Decay') plt.xlabel('Episode') plt.ylabel('Epsilon') plt.tight_layout() plt.show() ``` ### **DQN Improvements** #### **Double DQN** Reduces overestimation bias: $$Q_{\text{target}} = r + \gamma Q(s', \arg\max_a Q(s',a;\theta);\theta^-)$$ ```python def update_double_dqn(self, states, actions, rewards, next_states, dones): """Double DQN update""" # Compute current Q values current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1) # Compute target Q values using Double DQN with torch.no_grad(): # Get best actions from online network online_actions = self.q_network(next_states).argmax(1) # Get Q values from target network for these actions next_q = self.target_network(next_states).gather(1, online_actions.unsqueeze(1)).squeeze(1) target_q = rewards + (1 - dones) * self.gamma * next_q # Compute loss and update loss = nn.MSELoss()(current_q, target_q) self.optimizer.zero_grad() loss.backward() self.optimizer.step() # Update target network self._soft_update_target_network() return loss.item() ``` #### **Dueling DQN** Separates state value and advantage: $$Q(s,a) = V(s) + A(s,a) - \frac{1}{|A|}\sum_{a'}A(s,a')$$ ```python class DuelingDQN(nn.Module): """Dueling DQN architecture""" def __init__(self, input_dim, output_dim): super(DuelingDQN, self).__init__() # Shared layers self.feature = nn.Sequential( nn.Linear(input_dim, 128), nn.ReLU() ) # Value stream self.value = nn.Sequential( nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 1) ) # Advantage stream self.advantage = nn.Sequential( nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, output_dim) ) def forward(self, x): features = self.feature(x) value = self.value(features) advantage = self.advantage(features) # Combine value and advantage q_values = value + (advantage - advantage.mean(dim=1, keepdim=True)) return q_values ``` #### **Prioritized Experience Replay** Samples important transitions more frequently: ```python class PrioritizedReplayBuffer: """Prioritized experience replay buffer""" def __init__(self, capacity, alpha=0.6): self.buffer = [] self.priorities = np.zeros((capacity,), dtype=np.float32) self.capacity = capacity self.position = 0 self.alpha = alpha self.beta_start = 0.4 self.beta_frames = 1000 def push(self, state, action, reward, next_state, done): """Add experience to buffer with max priority""" max_priority = np.max(self.priorities) if len(self.buffer) > 0 else 1.0 if len(self.buffer) < self.capacity: self.buffer.append((state, action, reward, next_state, done)) else: self.buffer[self.position] = (state, action, reward, next_state, done) self.priorities[self.position] = max_priority self.position = (self.position + 1) % self.capacity def sample(self, batch_size, frame): """Sample batch with priorities""" if len(self.buffer) == self.capacity: priorities = self.priorities else: priorities = self.priorities[:len(self.buffer)] # Compute sampling probabilities probs = priorities ** self.alpha probs /= probs.sum() # Sample indices indices = np.random.choice(len(self.buffer), batch_size, p=probs) # Compute importance-sampling weights beta = self.beta_start + frame * (1.0 - self.beta_start) / self.beta_frames beta = min(beta, 1.0) weights = (len(self.buffer) * probs[indices]) ** (-beta) weights /= weights.max() # Get samples samples = [self.buffer[idx] for idx in indices] states, actions, rewards, next_states, dones = zip(*samples) return ( torch.stack(states), torch.tensor(actions), torch.tensor(rewards, dtype=torch.float32), torch.stack(next_states), torch.tensor(dones, dtype=torch.float32), indices, torch.tensor(weights, dtype=torch.float32) ) def update_priorities(self, indices, priorities): """Update priorities for sampled transitions""" for idx, priority in zip(indices, priorities): self.priorities[idx] = priority def __len__(self): return len(self.buffer) ``` --- ## **Policy Gradient Methods** Policy gradient methods directly optimize the policy rather than learning a value function. ### **Policy Gradient Theorem** The policy gradient theorem states: $$\nabla_{\theta}J(\pi_{\theta}) = \mathbb{E}_{\pi_{\theta}}\left[\nabla_{\theta}\log\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s,a)\right]$$ Where $J(\pi_{\theta})$ is the expected return under policy $\pi_{\theta}$. ### **REINFORCE Algorithm** The simplest policy gradient algorithm: $$\theta \leftarrow \theta + \alpha \nabla_{\theta}\log\pi_{\theta}(a_t|s_t)G_t$$ Where $G_t$ is the return from time $t$. ### **Actor-Critic Methods** Combine policy gradient (actor) with value function estimation (critic): $$\theta \leftarrow \theta + \alpha \nabla_{\theta}\log\pi_{\theta}(a_t|s_t)\delta_t$$ Where $\delta_t = r_{t+1} + \gamma V(s_{t+1}) - V(s_t)$ is the TD error. ### **Implementing REINFORCE in PyTorch** ```python class PolicyNetwork(nn.Module): """Policy network for REINFORCE""" def __init__(self, input_dim, output_dim): super(PolicyNetwork, self).__init__() self.network = nn.Sequential( nn.Linear(input_dim, 128), nn.ReLU(), nn.Linear(128, output_dim), nn.Softmax(dim=-1) ) def forward(self, x): return self.network(x) class REINFORCE: """REINFORCE algorithm implementation""" def __init__(self, state_dim, action_dim, lr=2e-3, gamma=0.99): self.policy = PolicyNetwork(state_dim, action_dim) self.optimizer = optim.Adam(self.policy.parameters(), lr=lr) self.gamma = gamma self.log_probs = [] self.rewards = [] def select_action(self, state): """Select action using policy network""" state = torch.FloatTensor(state) probs = self.policy(state) m = torch.distributions.Categorical(probs) action = m.sample() self.log_probs.append(m.log_prob(action)) return action.item() def store_reward(self, reward): """Store reward for later use""" self.rewards.append(reward) def update(self): """Update policy using REINFORCE""" R = 0 returns = [] # Calculate returns (backwards) for r in self.rewards[::-1]: R = r + self.gamma * R returns.insert(0, R) # Normalize returns returns = torch.tensor(returns) returns = (returns - returns.mean()) / (returns.std() + 1e-9) # Calculate policy loss policy_loss = [] for log_prob, R in zip(self.log_probs, returns): policy_loss.append(-log_prob * R) policy_loss = torch.cat(policy_loss).sum() # Update policy self.optimizer.zero_grad() policy_loss.backward() self.optimizer.step() # Clear buffers self.log_probs = [] self.rewards = [] ``` ### **Implementing Actor-Critic in PyTorch** ```python class ValueNetwork(nn.Module): """Value network for Actor-Critic""" def __init__(self, input_dim): super(ValueNetwork, self).__init__() self.network = nn.Sequential( nn.Linear(input_dim, 128), nn.ReLU(), nn.Linear(128, 1) ) def forward(self, x): return self.network(x) class ActorCritic: """Actor-Critic implementation""" def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99): self.actor = PolicyNetwork(state_dim, action_dim) self.critic = ValueNetwork(state_dim) self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr) self.gamma = gamma def select_action(self, state): """Select action using actor network""" state = torch.FloatTensor(state) probs = self.actor(state) m = torch.distributions.Categorical(probs) action = m.sample() return action.item(), m.log_prob(action) def update(self, state, action_log_prob, reward, next_state, done): """Update actor and critic networks""" state = torch.FloatTensor(state) next_state = torch.FloatTensor(next_state) # Compute value targets with torch.no_grad(): value = self.critic(state) next_value = self.critic(next_state) target = reward + (1 - int(done)) * self.gamma * next_value # TD error (advantage) advantage = target - value # Update critic critic_loss = nn.MSELoss()(value, target) self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # Update actor (policy) actor_loss = -action_log_prob * advantage self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() return critic_loss.item(), actor_loss.item() ``` --- ## **Proximal Policy Optimization** Proximal Policy Optimization (PPO) is a state-of-the-art policy optimization algorithm. ### **PPO Objective Function** The PPO objective with clipping: $$L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right]$$ Where: - $r_t(\theta) = \frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta_{\text{old}}}(a_t|s_t)}$ is the probability ratio - $\hat{A}_t$ is the estimated advantage - $\epsilon$ is the clipping parameter (typically 0.1-0.3) ### **Advantage Estimation** Generalized Advantage Estimation (GAE) provides better estimates: $$\hat{A}_t^{\text{GAE}(\gamma,\lambda)} = \sum_{l=0}^{\infty}(\gamma\lambda)^l\delta_{t+l}$$ Where $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ is the TD residual. ### **Implementing PPO in PyTorch** ```python class PPO: """Proximal Policy Optimization implementation""" def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, clip_epsilon=0.2, gae_lambda=0.95, ppo_epochs=4, batch_size=64, value_coef=0.5, entropy_coef=0.01): self.actor = PolicyNetwork(state_dim, action_dim) self.critic = ValueNetwork(state_dim) self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr) self.gamma = gamma self.clip_epsilon = clip_epsilon self.gae_lambda = gae_lambda self.ppo_epochs = ppo_epochs self.batch_size = batch_size self.value_coef = value_coef self.entropy_coef = entropy_coef # Storage for experiences self.states = [] self.actions = [] self.log_probs = [] self.values = [] self.rewards = [] self.dones = [] def select_action(self, state): """Select action and store necessary information""" state = torch.FloatTensor(state) # Get policy distribution probs = self.actor(state) dist = torch.distributions.Categorical(probs) # Sample action action = dist.sample() # Store values for later update value = self.critic(state) log_prob = dist.log_prob(action) self.states.append(state) self.actions.append(action) self.log_probs.append(log_prob) self.values.append(value) return action.item() def store_reward(self, reward, done): """Store reward and done flag""" self.rewards.append(reward) self.dones.append(done) def finish_episode(self): """Process the end of an episode""" # Convert to tensors states = torch.stack(self.states) actions = torch.tensor(self.actions) old_log_probs = torch.stack(self.log_probs).detach() rewards = torch.tensor(self.rewards, dtype=torch.float32) dones = torch.tensor(self.dones, dtype=torch.float32) values = torch.stack(self.values).squeeze() # Compute returns and advantages using GAE returns, advantages = self.compute_gae(rewards, values, dones) # Clear storage self.states = [] self.actions = [] self.log_probs = [] self.values = [] self.rewards = [] self.dones = [] return states, actions, old_log_probs, returns, advantages def compute_gae(self, rewards, values, dones): """Compute GAE advantages""" batch_size = len(rewards) advantages = torch.zeros_like(rewards) returns = torch.zeros_like(rewards) # Calculate advantages using GAE gae = 0 for t in reversed(range(batch_size)): if t == batch_size - 1: next_value = 0 # Assume episode ends else: next_value = values[t + 1] # TD residual delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t] # GAE gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * gae advantages[t] = gae # Calculate returns returns = advantages + values return returns, advantages def update(self, states, actions, old_log_probs, returns, advantages): """Update policy and value networks using PPO""" # Normalize advantages advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) # Perform multiple PPO updates total_actor_loss = 0 total_critic_loss = 0 total_entropy = 0 for _ in range(self.ppo_epochs): # Create mini-batches indices = np.arange(len(states)) np.random.shuffle(indices) for start in range(0, len(states), self.batch_size): end = start + self.batch_size mb_indices = indices[start:end] # Get mini-batch data mb_states = states[mb_indices] mb_actions = actions[mb_indices] mb_old_log_probs = old_log_probs[mb_indices] mb_returns = returns[mb_indices] mb_advantages = advantages[mb_indices] # Get current policy and value probs = self.actor(mb_states) dist = torch.distributions.Categorical(probs) log_probs = dist.log_prob(mb_actions) entropy = dist.entropy().mean() values = self.critic(mb_states).squeeze() # Compute probability ratio ratio = (log_probs - mb_old_log_probs).exp() # Compute clipped surrogate objective surr1 = ratio * mb_advantages surr2 = torch.clamp(ratio, 1.0 - self.clip_epsilon, 1.0 + self.clip_epsilon) * mb_advantages actor_loss = -torch.min(surr1, surr2).mean() # Value loss critic_loss = nn.MSELoss()(values, mb_returns) # Total loss loss = actor_loss + self.value_coef * critic_loss - self.entropy_coef * entropy # Update networks self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() loss.backward() self.actor_optimizer.step() self.critic_optimizer.step() total_actor_loss += actor_loss.item() total_critic_loss += critic_loss.item() total_entropy += entropy.item() # Average over updates n_updates = self.ppo_epochs * (len(states) // self.batch_size) return ( total_actor_loss / n_updates, total_critic_loss / n_updates, total_entropy / n_updates ) ``` ### **Training PPO on CartPole** ```python def train_ppo(env, agent, episodes=500, max_steps=200, render=False): """Train PPO agent""" rewards = [] for episode in range(episodes): state = env.reset() if isinstance(state, tuple): state = state[0] # Handle new Gym API total_reward = 0 for step in range(max_steps): action = agent.select_action(state) next_state, reward, done, truncated, _ = env.step(action) done = done or truncated agent.store_reward(reward, done) state = next_state total_reward += reward if render and episode % 50 == 0: env.render() if done: break # Process the end of episode states, actions, old_log_probs, returns, advantages = agent.finish_episode() # Update policy actor_loss, critic_loss, entropy = agent.update( states, actions, old_log_probs, returns, advantages ) rewards.append(total_reward) if episode % 10 == 0: print(f"Episode {episode}, Reward: {total_reward:.2f}, " f"Actor Loss: {actor_loss:.4f}, Critic Loss: {critic_loss:.4f}") env.close() return rewards # Train PPO on CartPole env = gym.make('CartPole-v1') state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = PPO(state_dim, action_dim) rewards = train_ppo(env, agent, episodes=500) # Plot results plt.figure(figsize=(10, 5)) plt.plot(rewards) plt.title('PPO Training on CartPole') plt.xlabel('Episode') plt.ylabel('Total Reward') plt.show() ``` --- ## **Deep Deterministic Policy Gradient** Deep Deterministic Policy Gradient (DDPG) is an off-policy algorithm for continuous control. ### **DDPG Algorithm** DDPG combines: - DQN ideas for continuous actions - Deterministic Policy Gradient theorem - Actor-Critic architecture The deterministic policy gradient theorem: $$\nabla_{\theta}J(\mu_{\theta}) = \mathbb{E}_{s \sim \rho^{\mu}}\left[\nabla_{\theta}\mu_{\theta}(s)\nabla_a Q^{\mu}(s,a)|_{a=\mu_{\theta}(s)}\right]$$ ### **DDPG Implementation** ```python class DDPGActor(nn.Module): """Actor network for DDPG (deterministic policy)""" def __init__(self, state_dim, action_dim, max_action=1.0): super(DDPGActor, self).__init__() self.max_action = max_action self.network = nn.Sequential( nn.Linear(state_dim, 400), nn.ReLU(), nn.Linear(400, 300), nn.ReLU(), nn.Linear(300, action_dim), nn.Tanh() ) def forward(self, state): return self.max_action * self.network(state) class DDPGCritic(nn.Module): """Critic network for DDPG""" def __init__(self, state_dim, action_dim): super(DDPGCritic, self).__init__() self.network = nn.Sequential( nn.Linear(state_dim + action_dim, 400), nn.ReLU(), nn.Linear(400, 300), nn.ReLU(), nn.Linear(300, 1) ) def forward(self, state, action): return self.network(torch.cat([state, action], 1)) class DDPG: """Deep Deterministic Policy Gradient implementation""" def __init__(self, state_dim, action_dim, max_action=1.0, lr_actor=1e-3, lr_critic=1e-3, gamma=0.99, tau=0.005, buffer_capacity=1000000, batch_size=100, noise_std=0.2): self.gamma = gamma self.tau = tau self.batch_size = batch_size self.noise_std = noise_std self.max_action = max_action # Actor networks self.actor = DDPGActor(state_dim, action_dim, max_action) self.actor_target = DDPGActor(state_dim, action_dim, max_action) self.actor_target.load_state_dict(self.actor.state_dict()) # Critic networks self.critic = DDPGCritic(state_dim, action_dim) self.critic_target = DDPGCritic(state_dim, action_dim) self.critic_target.load_state_dict(self.critic.state_dict()) # Optimizers self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic) # Replay buffer self.replay_buffer = ReplayBuffer(buffer_capacity) def select_action(self, state, add_noise=True): """Select action with optional noise for exploration""" state = torch.FloatTensor(state).unsqueeze(0) action = self.actor(state).detach().numpy()[0] if add_noise: action = action + np.random.normal(0, self.noise_std, size=action.shape) action = np.clip(action, -self.max_action, self.max_action) return action def store_experience(self, state, action, reward, next_state, done): """Store experience in replay buffer""" self.replay_buffer.push( torch.FloatTensor(state), torch.FloatTensor(action), torch.FloatTensor([reward]), torch.FloatTensor(next_state), torch.FloatTensor([done]) ) def update(self): """Update actor and critic networks""" if len(self.replay_buffer) < self.batch_size: return None, None # Sample batch from replay buffer states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size) # Compute target Q-value with torch.no_grad(): next_actions = self.actor_target(next_states) target_q = self.critic_target(next_states, next_actions) target_q = rewards + (1 - dones) * self.gamma * target_q # Update critic current_q = self.critic(states, actions) critic_loss = nn.MSELoss()(current_q, target_q) self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # Update actor actor_loss = -self.critic(states, self.actor(states)).mean() self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() # Update target networks self._soft_update_target_networks() return actor_loss.item(), critic_loss.item() def _soft_update_target_networks(self): """Soft update of target networks""" # Update actor target for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()): target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data) # Update critic target for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()): target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data) ``` ### **Training DDPG on Pendulum** ```python import gym def train_ddpg(env, agent, episodes=200, max_steps=200, render=False): """Train DDPG agent""" rewards = [] for episode in range(episodes): state = env.reset() if isinstance(state, tuple): state = state[0] # Handle new Gym API total_reward = 0 for step in range(max_steps): action = agent.select_action(state) next_state, reward, done, truncated, _ = env.step(action) done = done or truncated agent.store_experience(state, action, reward, next_state, done) # Update agent actor_loss, critic_loss = agent.update() state = next_state total_reward += reward if render and episode % 10 == 0: env.render() if done: break rewards.append(total_reward) if episode % 10 == 0: print(f"Episode {episode}, Reward: {total_reward:.2f}, " f"Actor Loss: {actor_loss:.4f}, Critic Loss: {critic_loss:.4f}") env.close() return rewards # Train DDPG on Pendulum env = gym.make('Pendulum-v1') state_dim = env.observation_space.shape[0] action_dim = env.action_space.shape[0] max_action = float(env.action_space.high[0]) agent = DDPG(state_dim, action_dim, max_action) rewards = train_ddpg(env, agent, episodes=200) # Plot results plt.figure(figsize=(10, 5)) plt.plot(rewards) plt.title('DDPG Training on Pendulum') plt.xlabel('Episode') plt.ylabel('Total Reward') plt.show() ``` --- ## **Model-Based Reinforcement Learning** Model-based RL learns a model of the environment dynamics and uses it for planning. ### **Model-Based vs Model-Free RL** | **Model-Free RL** | **Model-Based RL** | |-------------------|--------------------| | Learns directly from experience | Learns environment model first | | Simpler implementation | More complex but sample-efficient | | Less sample-efficient | More sample-efficient | | Common: DQN, PPO, DDPG | Common: PILCO, MB-MPO, Dreamer | ### **Dynamics Model** The dynamics model predicts next state and reward: $$s_{t+1}, r_t \sim \mathcal{P}_{\theta}(s_{t+1}, r_t|s_t, a_t)$$ ### **Planning with Models** Once we have a model, we can: - **Simulate experiences**: Generate synthetic data - **Plan trajectories**: Find optimal actions without interacting with real environment - **Improve sample efficiency**: Learn from both real and simulated experiences ### **Implementing Model-Based RL** ```python class DynamicsModel(nn.Module): """Environment dynamics model""" def __init__(self, state_dim, action_dim): super(DynamicsModel, self).__init__() self.network = nn.Sequential( nn.Linear(state_dim + action_dim, 256), nn.ReLU(), nn.Linear(256, 256), nn.ReLU(), nn.Linear(256, state_dim + 1) # state_dim for next state, 1 for reward ) def forward(self, state, action): x = torch.cat([state, action], dim=1) output = self.network(x) next_state_pred = output[:, :-1] reward_pred = output[:, -1] return next_state_pred, reward_pred class ModelBasedRL: """Model-Based Reinforcement Learning implementation""" def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, model_lr=1e-3, imagination_horizon=5): # Policy network (actor) self.actor = PolicyNetwork(state_dim, action_dim) self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr) # Value network (critic) self.critic = ValueNetwork(state_dim) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr) # Dynamics model self.dynamics = DynamicsModel(state_dim, action_dim) self.model_optimizer = optim.Adam(self.dynamics.parameters(), lr=model_lr) self.gamma = gamma self.imagination_horizon = imagination_horizon # Experience storage self.real_experiences = [] self.model_experiences = [] def select_action(self, state): """Select action using policy network""" state = torch.FloatTensor(state) probs = self.actor(state) dist = torch.distributions.Categorical(probs) action = dist.sample() return action.item(), dist.log_prob(action) def store_real_experience(self, state, action, reward, next_state, done): """Store real experience for model training""" self.real_experiences.append((state, action, reward, next_state, done)) def train_dynamics_model(self, batch_size=64): """Train dynamics model on real experiences""" if len(self.real_experiences) < batch_size: return 0.0 # Sample batch indices = np.random.choice(len(self.real_experiences), batch_size) batch = [self.real_experiences[i] for i in indices] # Prepare data states = torch.tensor([exp[0] for exp in batch], dtype=torch.float32) actions = torch.tensor([exp[1] for exp in batch], dtype=torch.long) rewards = torch.tensor([exp[2] for exp in batch], dtype=torch.float32) next_states = torch.tensor([exp[3] for exp in batch], dtype=torch.float32) # One-hot encode actions actions_onehot = torch.zeros(batch_size, self.actor.network[-1].out_features) actions_onehot[range(batch_size), actions] = 1 # Predict next state and reward next_state_pred, reward_pred = self.dynamics(states, actions_onehot) # Compute loss state_loss = nn.MSELoss()(next_state_pred, next_states) reward_loss = nn.MSELoss()(reward_pred, rewards) loss = state_loss + reward_loss # Update model self.model_optimizer.zero_grad() loss.backward() self.model_optimizer.step() return loss.item() def generate_model_experiences(self, num_trajectories=5, trajectory_length=10): """Generate experiences using the learned model""" self.model_experiences = [] for _ in range(num_trajectories): # Start from random real experience idx = np.random.randint(len(self.real_experiences)) state, _, _, _, _ = self.real_experiences[idx] state = torch.tensor(state, dtype=torch.float32) for _ in range(trajectory_length): # Select action using current policy with torch.no_grad(): probs = self.actor(state) dist = torch.distributions.Categorical(probs) action = dist.sample() # One-hot encode action action_onehot = torch.zeros(self.actor.network[-1].out_features) action_onehot[action] = 1 # Predict next state and reward using dynamics model with torch.no_grad(): next_state_pred, reward_pred = self.dynamics(state.unsqueeze(0), action_onehot.unsqueeze(0)) # Store model experience self.model_experiences.append(( state.numpy(), action.item(), reward_pred.item(), next_state_pred.squeeze().numpy(), False # Model doesn't know when episode ends )) # Continue with predicted state state = next_state_pred.squeeze() def update_policy(self, batch_size=64): """Update policy using both real and model experiences""" # Combine real and model experiences all_experiences = self.real_experiences + self.model_experiences if len(all_experiences) < batch_size: return 0.0, 0.0 # Sample batch indices = np.random.choice(len(all_experiences), batch_size) batch = [all_experiences[i] for i in indices] # Prepare data states = torch.tensor([exp[0] for exp in batch], dtype=torch.float32) actions = torch.tensor([exp[1] for exp in batch], dtype=torch.long) rewards = torch.tensor([exp[2] for exp in batch], dtype=torch.float32) next_states = torch.tensor([exp[3] for exp in batch], dtype=torch.float32) dones = torch.tensor([exp[4] for exp in batch], dtype=torch.float32) # Compute advantages using GAE with torch.no_grad(): values = self.critic(states).squeeze() next_values = self.critic(next_states).squeeze() td_errors = rewards + (1 - dones) * self.gamma * next_values - values advantages = td_errors # Simple TD error as advantage # Update critic critic_loss = nn.MSELoss()(self.critic(states).squeeze(), rewards + (1 - dones) * self.gamma * next_values) self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # Update actor probs = self.actor(states) dist = torch.distributions.Categorical(probs) log_probs = dist.log_prob(actions) actor_loss = -(log_probs * advantages.detach()).mean() self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() return actor_loss.item(), critic_loss.item() ``` --- ## **Multi-Agent Reinforcement Learning** Multi-Agent RL (MARL) deals with multiple agents interacting in a shared environment. ### **MARL Challenges** - **Non-stationarity**: Environment changes as other agents learn - **Credit assignment**: Determining each agent's contribution to team reward - **Communication**: How agents share information - **Scalability**: Complexity increases with number of agents ### **MARL Approaches** - **Independent Q-Learning (IQL)**: Each agent learns independently - **Centralized Training with Decentralized Execution (CTDE)**: Train with global info, execute with local info - **Multi-Agent Actor-Critic (MAAC)**: Extension of actor-critic to multiple agents - **MADDPG**: Multi-Agent DDPG for continuous control ### **Implementing MADDPG** ```python class MADDPGAgent: """MADDPG agent for multi-agent environments""" def __init__(self, agent_id, state_dim, action_dim, num_agents, lr_actor=1e-4, lr_critic=1e-3, gamma=0.95, tau=0.01): self.agent_id = agent_id self.gamma = gamma self.tau = tau self.num_agents = num_agents # Actor network self.actor = DDPGActor(state_dim, action_dim) self.actor_target = DDPGActor(state_dim, action_dim) self.actor_target.load_state_dict(self.actor.state_dict()) self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor) # Critic network (takes all states and actions) critic_input_dim = num_agents * state_dim + num_agents * action_dim self.critic = DDPGCritic(critic_input_dim, 1) self.critic_target = DDPGCritic(critic_input_dim, 1) self.critic_target.load_state_dict(self.critic.state_dict()) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic) def select_action(self, state, add_noise=True, noise_scale=0.1): """Select action for this agent""" state = torch.FloatTensor(state) action = self.actor(state).detach().numpy() if add_noise: action += noise_scale * np.random.randn(*action.shape) action = np.clip(action, -1, 1) return action def update(self, agents, experiences, batch_size=1024, device='cpu'): """ Update this agent's networks Args: agents: List of all agents experiences: List of (states, actions, rewards, next_states, dones) batch_size: Batch size for updates """ if len(experiences) < batch_size: return None, None # Sample batch indices = np.random.choice(len(experiences), batch_size) batch = [experiences[i] for i in indices] # Unpack batch states_batch = [torch.stack([exp[0][i] for exp in batch]) for i in range(self.num_agents)] actions_batch = [torch.stack([exp[1][i] for exp in batch]) for i in range(self.num_agents)] rewards_batch = torch.tensor([exp[2][self.agent_id] for exp in batch], dtype=torch.float32).to(device) next_states_batch = [torch.stack([exp[3][i] for exp in batch]) for i in range(self.num_agents)] dones_batch = torch.tensor([exp[4][self.agent_id] for exp in batch], dtype=torch.float32).to(device) # Concatenate all states and actions for critic input all_states = torch.cat(states_batch, dim=1) all_actions = torch.cat(actions_batch, dim=1) all_next_states = torch.cat(next_states_batch, dim=1) # Update critic with torch.no_grad(): # Get next actions from all target actors next_actions = [ agents[i].actor_target(next_states_batch[i]) for i in range(self.num_agents) ] next_actions = torch.cat(next_actions, dim=1) # Compute target Q-value target_q = self.critic_target(all_next_states, next_actions) target_q = rewards_batch + (1 - dones_batch) * self.gamma * target_q # Current Q-value current_q = self.critic(all_states, all_actions) # Critic loss critic_loss = nn.MSELoss()(current_q, target_q) # Update critic self.critic_optimizer.zero_grad() critic_loss.backward() torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5) self.critic_optimizer.step() # Update actor # Get current policy actions curr_actions = [ self.actor(states_batch[self.agent_id]) if i == self.agent_id else agents[i].actor(states_batch[i]).detach() for i in range(self.num_agents) ] curr_actions = torch.cat(curr_actions, dim=1) # Actor loss actor_loss = -self.critic(all_states, curr_actions).mean() # Update actor self.actor_optimizer.zero_grad() actor_loss.backward() torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) self.actor_optimizer.step() # Update target networks self._soft_update_target_networks() return actor_loss.item(), critic_loss.item() def _soft_update_target_networks(self): """Soft update of target networks""" # Update actor target for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()): target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data) # Update critic target for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()): target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data) ``` ### **Training MADDPG on Multi-Agent Environment** ```python def train_maddpg(env, agents, episodes=1000, max_steps=25, batch_size=1024): """Train MADDPG agents in a multi-agent environment""" rewards_history = [[] for _ in range(len(agents))] for episode in range(episodes): states = env.reset() if isinstance(states, tuple): states = states[0] # Handle new Gym API total_rewards = np.zeros(len(agents)) for step in range(max_steps): # Select actions for all agents actions = [ agents[i].select_action(states[i]) for i in range(len(agents)) ] # Take step in environment next_states, rewards, dones, truncated, _ = env.step(actions) dones = [d or t for d, t in zip(dones, truncated)] # Store experience experience = (states, actions, rewards, next_states, dones) for agent in agents: agent.experiences.append(experience) # Update all agents for i, agent in enumerate(agents): agent.update(agents, agent.experiences, batch_size) # Update state and track rewards states = next_states total_rewards += np.array(rewards) if any(dones): break # Store episode rewards for i in range(len(agents)): rewards_history[i].append(total_rewards[i]) # Print progress if episode % 100 == 0: avg_rewards = [np.mean(rewards[-100:]) for rewards in rewards_history] print(f"Episode {episode}, Avg Rewards: {avg_rewards}") return rewards_history ``` --- ## **Building a Complete RL Agent** Let's build a complete RL agent that can solve complex environments. ### **Step 1: Environment Setup** ```python # Install required packages !pip install gym[classic_control,box2d] pybullet # Import libraries import gym import pybullet_envs import numpy as np import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from collections import deque, namedtuple import random import matplotlib.pyplot as plt import time ``` ### **Step 2: Advanced Replay Buffer** ```python class PrioritizedReplayBuffer: """Prioritized experience replay buffer with importance sampling""" def __init__(self, capacity, alpha=0.6): self.buffer = [] self.priorities = np.zeros((capacity,), dtype=np.float32) self.capacity = capacity self.position = 0 self.alpha = alpha self.beta_start = 0.4 self.beta_frames = 100000 def push(self, state, action, reward, next_state, done): """Add experience to buffer with max priority""" max_priority = np.max(self.priorities) if len(self.buffer) > 0 else 1.0 if len(self.buffer) < self.capacity: self.buffer.append((state, action, reward, next_state, done)) else: self.buffer[self.position] = (state, action, reward, next_state, done) self.priorities[self.position] = max_priority self.position = (self.position + 1) % self.capacity def sample(self, batch_size, frame): """Sample batch with priorities""" if len(self.buffer) == self.capacity: priorities = self.priorities else: priorities = self.priorities[:len(self.buffer)] # Compute sampling probabilities probs = priorities ** self.alpha probs /= probs.sum() # Sample indices indices = np.random.choice(len(self.buffer), batch_size, p=probs) # Compute importance-sampling weights beta = self.beta_start + frame * (1.0 - self.beta_start) / self.beta_frames beta = min(beta, 1.0) weights = (len(self.buffer) * probs[indices]) ** (-beta) weights /= weights.max() # Get samples samples = [self.buffer[idx] for idx in indices] states, actions, rewards, next_states, dones = zip(*samples) return ( torch.stack(states), torch.tensor(actions), torch.tensor(rewards, dtype=torch.float32), torch.stack(next_states), torch.tensor(dones, dtype=torch.float32), indices, torch.tensor(weights, dtype=torch.float32) ) def update_priorities(self, indices, priorities): """Update priorities for sampled transitions""" for idx, priority in zip(indices, priorities): self.priorities[idx] = priority def __len__(self): return len(self.buffer) ``` ### **Step 3: Advanced Network Architectures** ```python class NoisyLinear(nn.Module): """Noisy linear layer for exploration""" def __init__(self, in_features, out_features, std_init=0.4): super(NoisyLinear, self).__init__() self.in_features = in_features self.out_features = out_features self.std_init = std_init self.weight_mu = nn.Parameter(torch.empty(out_features, in_features)) self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features)) self.register_buffer('weight_epsilon', torch.empty(out_features, in_features)) self.bias_mu = nn.Parameter(torch.empty(out_features)) self.bias_sigma = nn.Parameter(torch.empty(out_features)) self.register_buffer('bias_epsilon', torch.empty(out_features)) self.reset_parameters() self.reset_noise() def reset_parameters(self): """Initialize parameters""" mu_range = 1 / np.sqrt(self.in_features) self.weight_mu.data.uniform_(-mu_range, mu_range) self.weight_sigma.data.fill_(self.std_init / np.sqrt(self.in_features)) self.bias_mu.data.uniform_(-mu_range, mu_range) self.bias_sigma.data.fill_(self.std_init / np.sqrt(self.out_features)) def _scale_noise(self, size): """Generate scaled noise""" x = torch.randn(size) return x.sign().mul_(x.abs().sqrt_()) def reset_noise(self): """Reset noise""" epsilon_in = self._scale_noise(self.in_features) epsilon_out = self._scale_noise(self.out_features) self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in)) self.bias_epsilon.copy_(epsilon_out) def forward(self, x): """Forward pass with noise""" if self.training: return F.linear(x, self.weight_mu + self.weight_sigma * self.weight_epsilon, self.bias_mu + self.bias_sigma * self.bias_epsilon) else: return F.linear(x, self.weight_mu, self.bias_mu) class RainbowDQN(nn.Module): """Rainbow DQN architecture combining multiple improvements""" def __init__(self, input_dim, output_dim, num_atoms=51, v_min=-10, v_max=10): super(RainbowDQN, self).__init__() self.input_dim = input_dim self.output_dim = output_dim self.num_atoms = num_atoms self.v_min = v_min self.v_max = v_max self.delta_z = (v_max - v_min) / (num_atoms - 1) # Feature extraction self.feature = nn.Sequential( nn.Linear(input_dim, 256), nn.ReLU() ) # Noisy networks for exploration self.noisy_value1 = NoisyLinear(256, 256) self.noisy_value2 = NoisyLinear(256, num_atoms) self.noisy_advantage1 = NoisyLinear(256, 256) self.noisy_advantage2 = NoisyLinear(256, output_dim * num_atoms) # Register support self.register_buffer('supports', torch.linspace(v_min, v_max, num_atoms)) def reset_noise(self): """Reset noise in noisy layers""" self.noisy_value1.reset_noise() self.noisy_value2.reset_noise() self.noisy_advantage1.reset_noise() self.noisy_advantage2.reset_noise() def forward(self, x, log=False): """Forward pass""" x = self.feature(x) # Value stream value = F.relu(self.noisy_value1(x)) value = self.noisy_value2(value) # Advantage stream advantage = F.relu(self.noisy_advantage1(x)) advantage = self.noisy_advantage2(advantage) # Reshape advantage advantage = advantage.view(-1, self.output_dim, self.num_atoms) # Combine value and advantage value = value.view(-1, 1, self.num_atoms) q_atoms = value + advantage - advantage.mean(1, keepdim=True) # Apply softmax to get probabilities if log: q_dist = F.log_softmax(q_atoms, dim=-1) else: q_dist = F.softmax(q_atoms, dim=-1) return q_dist def get_q_values(self, x): """Get Q-values from distribution""" with torch.no_grad(): q_dist = self.forward(x) q_values = torch.sum(q_dist * self.supports, dim=2) return q_values ``` ### **Step 4: Rainbow DQN Agent** ```python class RainbowDQNAgent: """Rainbow DQN agent implementation""" def __init__(self, state_dim, action_dim, lr=6.25e-5, gamma=0.99, buffer_capacity=1000000, batch_size=32, target_update_freq=8000, num_atoms=51, v_min=-10, v_max=10): self.state_dim = state_dim self.action_dim = action_dim self.gamma = gamma self.batch_size = batch_size self.target_update_freq = target_update_freq self.num_atoms = num_atoms self.v_min = v_min self.v_max = v_max self.delta_z = (v_max - v_min) / (num_atoms - 1) # Q-Networks self.q_network = RainbowDQN(state_dim, action_dim, num_atoms, v_min, v_max) self.target_network = RainbowDQN(state_dim, action_dim, num_atoms, v_min, v_max) self.target_network.load_state_dict(self.q_network.state_dict()) # Optimizer self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr) # Replay buffer self.replay_buffer = PrioritizedReplayBuffer(buffer_capacity) # Trackers self.frame_idx = 0 self.update_counter = 0 def select_action(self, state): """Select action using Q-network""" state = torch.FloatTensor(state).unsqueeze(0) q_values = self.q_network.get_q_values(state) return torch.argmax(q_values).item() def store_experience(self, state, action, reward, next_state, done): """Store experience in replay buffer""" self.replay_buffer.push( torch.FloatTensor(state), action, np.clip(reward, -1, 1), # Clip rewards torch.FloatTensor(next_state), done ) self.frame_idx += 1 def update(self): """Update Q-network using Rainbow DQN algorithm""" if len(self.replay_buffer) < self.batch_size: return 0.0 # Sample batch from replay buffer states, actions, rewards, next_states, dones, indices, weights = self.replay_buffer.sample( self.batch_size, self.frame_idx ) # Compute current Q distribution current_dist = self.q_network(states, log=True) current_dist = current_dist[range(self.batch_size), actions] # Compute target Q distribution with torch.no_grad(): # Get next state distribution next_dist = self.target_network(next_states) # Get next state Q-values for action selection next_q_values = torch.sum(next_dist * self.q_network.supports, dim=2) next_actions = torch.argmax(next_q_values, dim=1) # Get target distribution for selected actions target_dist = next_dist[range(self.batch_size), next_actions] # Project target distribution rewards = rewards.unsqueeze(1) dones = dones.unsqueeze(1) Tz = torch.clamp( rewards + (1 - dones) * self.gamma * self.q_network.supports, self.v_min, self.v_max ) b = (Tz - self.v_min) / self.delta_z l = b.floor().long() u = b.ceil().long() # Fix disappearing probability mass when l = b = u l[(u > 0) * (l == u)] -= 1 u[(u > 0) * (l == u)] += 1 # Distribute probability offset = torch.linspace(0, (self.batch_size - 1) * self.num_atoms, self.batch_size).long().unsqueeze(1).expand(self.batch_size, self.num_atoms).to(states.device) proj_dist = torch.zeros(self.batch_size * self.num_atoms, device=states.device) proj_dist.index_add_(0, (l + offset).view(-1), (target_dist * (u.float() - b)).view(-1)) proj_dist.index_add_(0, (u + offset).view(-1), (target_dist * (b - l.float())).view(-1)) proj_dist = proj_dist.view(self.batch_size, self.num_atoms) # Compute loss with importance sampling weights loss = -torch.sum(proj_dist * current_dist, -1) * weights prios = loss + 1e-5 loss = loss.mean() # Update networks self.optimizer.zero_grad() loss.backward() self.optimizer.step() # Update priorities self.replay_buffer.update_priorities(indices, prios.data.cpu().numpy()) # Reset noise self.q_network.reset_noise() # Update target network self.update_counter += 1 if self.update_counter % self.target_update_freq == 0: self.target_network.load_state_dict(self.q_network.state_dict()) return loss.item() ``` ### **Step 5: Training and Evaluation** ```python def train_rainbow(env, agent, episodes=1000, max_steps=500, evaluation_interval=50, evaluation_episodes=10): """Train Rainbow DQN agent""" rewards = [] losses = [] for episode in range(episodes): state = env.reset() if isinstance(state, tuple): state = state[0] # Handle new Gym API total_reward = 0 for step in range(max_steps): action = agent.select_action(state) next_state, reward, done, truncated, _ = env.step(action) done = done or truncated agent.store_experience(state, action, reward, next_state, done) loss = agent.update() state = next_state total_reward += reward if done: break rewards.append(total_reward) if loss is not None: losses.append(loss) # Evaluation if episode % evaluation_interval == 0: eval_reward = evaluate_agent(env, agent, evaluation_episodes) print(f"Episode {episode}, Reward: {total_reward:.2f}, " f"Eval Reward: {eval_reward:.2f}, Loss: {loss:.4f}") else: print(f"Episode {episode}, Reward: {total_reward:.2f}") env.close() return rewards, losses def evaluate_agent(env, agent, episodes=10): """Evaluate agent performance""" total_rewards = [] for _ in range(episodes): state = env.reset() if isinstance(state, tuple): state = state[0] # Handle new Gym API total_reward = 0 done = False while not done: action = agent.select_action(state) next_state, reward, done, truncated, _ = env.step(action) done = done or truncated state = next_state total_reward += reward total_rewards.append(total_reward) return np.mean(total_rewards) # Train on LunarLander env = gym.make('LunarLander-v2') state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = RainbowDQNAgent(state_dim, action_dim) rewards, losses = train_rainbow(env, agent, episodes=1000) # Plot results plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.plot(rewards) plt.title('Training Rewards') plt.xlabel('Episode') plt.ylabel('Total Reward') plt.subplot(1, 2, 2) plt.plot(losses) plt.title('Training Loss') plt.xlabel('Update') plt.ylabel('Loss') plt.tight_layout() plt.show() ``` --- ## **Quiz 5: Test Your Understanding of Reinforcement Learning** **1. What is the primary objective in Reinforcement Learning?** A) Minimize prediction error on a labeled dataset B) Maximize the cumulative reward over time C) Minimize the reconstruction error of input data D) Maximize the accuracy of a classification task **2. In the Bellman equation for the optimal value function, what does $V_*(s)$ represent?** A) The expected return starting from state $s$ and following policy $\pi$ B) The maximum expected return achievable from state $s$ C) The immediate reward for being in state $s$ D) The probability of transitioning to the best next state from $s$ **3. What is the main advantage of Deep Q-Networks (DQN) over traditional Q-Learning?** A) They can handle continuous action spaces B) They use experience replay and target networks for more stable training C) They directly optimize the policy instead of learning a value function D) They require less memory for storing the Q-table **4. In the REINFORCE algorithm, what is the update rule for the policy parameters $\theta$?** A) $\theta \leftarrow \theta + \alpha \nabla_{\theta}\log\pi_{\theta}(a|s)G_t$ B) $\theta \leftarrow \theta - \alpha \nabla_{\theta}Q(s,a)$ C) $\theta \leftarrow \theta + \alpha (r + \gamma \max_{a'}Q(s',a') - Q(s,a))\nabla_{\theta}Q(s,a)$ D) $\theta \leftarrow \theta + \alpha \nabla_{\theta}V(s)$ **5. What is the key innovation of Proximal Policy Optimization (PPO)?** A) Using a target network to stabilize training B) Clipping the probability ratio to prevent large policy updates C) Using a deterministic policy for continuous control D) Learning a model of the environment dynamics **6. In Deep Deterministic Policy Gradient (DDPG), why is the policy deterministic?** A) To reduce the variance of policy gradient estimates B) Because it's designed for discrete action spaces C) To enable the use of the deterministic policy gradient theorem D) To simplify the implementation of the algorithm **7. What is the main advantage of model-based reinforcement learning over model-free approaches?** A) Better asymptotic performance B) Greater sample efficiency C) Simpler implementation D) Better handling of non-stationary environments **8. In Multi-Agent Deep Deterministic Policy Gradient (MADDPG), what information is available to the critic during training?** A) Only the agent's own observations and actions B) Global state information and actions of all agents C) Only the observations of other agents D) The reward signals of all agents **9. What is the purpose of the replay buffer in DQN?** A) To store the policy network parameters for later use B) To store experiences for breaking correlation in training data C) To store the value function estimates for each state D) To store the environment model parameters **10. In the Rainbow DQN algorithm, what does "distributional" refer to?** A) The distribution of rewards over time B) Modeling the full distribution of returns rather than just the expected value C) The distribution of states visited during training D) The distribution of policy parameters --- **Answers:** 1. B - RL aims to maximize cumulative reward 2. B - $V_*(s)$ is the maximum expected return from state $s$ 3. B - DQN uses experience replay and target networks 4. A - REINFORCE updates using the return $G_t$ 5. B - PPO uses clipping to prevent large policy updates 6. C - DDPG uses deterministic policy gradient theorem 7. B - Model-based RL is more sample-efficient 8. B - MADDPG critic uses global information during training 9. B - Replay buffer breaks correlation in training data 10. B - Distributional RL models the full return distribution --- ## **Summary and What's Next** In this **comprehensive Part 5** of our PyTorch Masterclass, we've covered: - **Markov Decision Processes**: The theoretical foundation of RL - **Q-Learning and Deep Q-Networks**: Learning from experience with value functions - **Policy Gradient Methods**: Direct policy optimization - **Proximal Policy Optimization**: State-of-the-art policy optimization - **Deep Deterministic Policy Gradient**: For continuous control problems - **Model-Based Reinforcement Learning**: Learning environment models - **Multi-Agent Reinforcement Learning**: Multiple agents interacting - **Complete RL Agent**: Building a state-of-the-art Rainbow DQN agent You now have the skills to: - Implement and train various RL algorithms - Solve complex control problems with deep RL - Work with both discrete and continuous action spaces - Handle multi-agent scenarios - Build sample-efficient RL systems ### **What's Next?** This concludes our **5-part PyTorch Masterclass series**! Over these five parts, we've covered: - **Part 1**: PyTorch fundamentals, tensors, autograd, and basic neural networks - **Part 2**: Computer vision with CNNs, transfer learning, and object detection - **Part 3**: NLP with RNNs, attention, Transformers, and BERT - **Part 4**: Generative models including GANs, VAEs, and diffusion models - **Part 5**: Reinforcement learning with DQN, PPO, DDPG, and multi-agent systems You now have a comprehensive understanding of deep learning with PyTorch across multiple domains. The next step is to apply these skills to real-world problems and continue exploring the latest advancements in AI research. ### **Continuing Your Learning Journey** To further deepen your knowledge: - **Read research papers**: Follow NeurIPS, ICML, ICLR proceedings - **Contribute to open source**: Projects like PyTorch, Hugging Face, RLlib - **Participate in competitions**: Kaggle, AIcrowd, DeepHack - **Build your own projects**: Solve problems that interest you Remember, the field of AI is rapidly evolving. Stay curious, keep experimenting, and don't be afraid to dive into new areas! 👉 **Thank you for joining this PyTorch Masterclass series!** --- **Hashtags:** #PyTorch #ReinforcementLearning #RL #DeepRL #Qlearning #DQN #PPO #DDPG #MarkovDecisionProcesses #AI #MachineLearning #DeepLearning #ReinforcementLearning #PyTorchRL #ActorCritic #PolicyGradient #ModelBasedRL #MultiAgentRL #RainbowDQN #ProximalPolicyOptimization #DeepDeterministicPolicyGradient