# **PyTorch Masterclass: Part 5 – Reinforcement Learning with PyTorch**
**Duration: ~90 minutes**
>#PyTorch #ReinforcementLearning #RL #DeepRL #Qlearning #DQN #PPO #DDPG #MarkovDecisionProcesses #AI #MachineLearning #DeepLearning #ReinforcementLearning #PyTorchRL
---
## **Table of Contents**
1. [Introduction to Reinforcement Learning](#introduction-to-reinforcement-learning)
2. [Markov Decision Processes](#markov-decision-processes)
3. [Q-Learning and Deep Q-Networks](#q-learning-and-deep-q-networks)
4. [Policy Gradient Methods](#policy-gradient-methods)
5. [Proximal Policy Optimization](#proximal-policy-optimization)
6. [Deep Deterministic Policy Gradient](#deep-deterministic-policy-gradient)
7. [Model-Based Reinforcement Learning](#model-based-reinforcement-learning)
8. [Multi-Agent Reinforcement Learning](#multi-agent-reinforcement-learning)
9. [Building a Complete RL Agent](#building-a-complete-rl-agent)
10. [Quiz 5: Test Your Understanding of Reinforcement Learning](#quiz-5-test-your-understanding-of-reinforcement-learning)
11. [Summary and What's Next](#summary-and-whats-next)
---
## **Introduction to Reinforcement Learning**
Reinforcement Learning (RL) is a branch of machine learning where an agent learns to make decisions by interacting with an environment to maximize cumulative reward.
### **Why Reinforcement Learning Matters**
RL has achieved remarkable successes:
- **AlphaGo**: Defeated world champion in Go
- **Dota 2**: OpenAI Five defeated professional teams
- **Robotics**: Learning complex manipulation tasks
- **Autonomous vehicles**: Decision-making in complex environments
- **Resource management**: Optimizing energy, network, and computing resources
According to a 2023 Gartner report, RL will be a key driver in **autonomous systems**, with the market expected to reach **$14.5 billion by 2026**.
### **Key Components of RL**
- **Agent**: The learner/decision-maker
- **Environment**: What the agent interacts with
- **State ($s$)**: Representation of the current situation
- **Action ($a$)**: What the agent can do
- **Reward ($r$)**: Feedback signal indicating success
- **Policy ($\pi$)**: Strategy mapping states to actions
- **Value function ($V(s)$ or $Q(s,a)$)**: Expected cumulative reward
- **Model (optional)**: Agent's representation of the environment
### **RL vs. Other Learning Paradigms**
| **Supervised Learning** | **Unsupervised Learning** | **Reinforcement Learning** |
|-------------------------|---------------------------|----------------------------|
| Labeled dataset | Unlabeled data | Environment interaction |
| Predict output | Find patterns | Maximize cumulative reward |
| Immediate feedback | No explicit feedback | Delayed, sparse feedback |
### **Why PyTorch for RL?**
PyTorch is ideal for RL because:
- **Dynamic computation graphs**: Essential for variable-length trajectories
- **GPU acceleration**: Critical for training deep RL models
- **Seamless integration**: With deep learning models
- **Rich ecosystem**: Libraries like `torchrl`, `stable-baselines3`, `ray[rllib]`
---
## **Markov Decision Processes**
Markov Decision Processes (MDPs) provide the mathematical framework for RL.
### **MDP Formal Definition**
An MDP is defined by the tuple $(\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma)$:
- $\mathcal{S}$: Set of states
- $\mathcal{A}$: Set of actions
- $\mathcal{P}$: State transition probability, $\mathcal{P}_{ss'}^a = \mathbb{P}[S_{t+1}=s'|S_t=s, A_t=a]$
- $\mathcal{R}$: Reward function, $\mathcal{R}_s^a = \mathbb{E}[R_{t+1}|S_t=s, A_t=a]$
- $\gamma$: Discount factor, $\gamma \in [0,1]$
### **The Markov Property**
The Markov property states that the future depends only on the present state:
$$\mathbb{P}[S_{t+1}|S_t,A_t] = \mathbb{P}[S_{t+1}|S_1,A_1,\dots,S_t,A_t]$$
### **Return and Value Functions**
The **return** $G_t$ is the total discounted reward from time $t$:
$$G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \dots = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$$
The **state-value function** for policy $\pi$:
$$V_{\pi}(s) = \mathbb{E}_{\pi}[G_t|S_t=s]$$
The **action-value function** (Q-function):
$$Q_{\pi}(s,a) = \mathbb{E}_{\pi}[G_t|S_t=s, A_t=a]$$
### **Bellman Equations**
The Bellman equation for the value function:
$$V_{\pi}(s) = \sum_{a \in \mathcal{A}} \pi(a|s) \sum_{s' \in \mathcal{S}} \mathcal{P}_{ss'}^a \left[\mathcal{R}_s^a + \gamma V_{\pi}(s')\right]$$
For the optimal value function $V_*(s)$:
$$V_*(s) = \max_a \sum_{s' \in \mathcal{S}} \mathcal{P}_{ss'}^a \left[\mathcal{R}_s^a + \gamma V_*(s')\right]$$
### **Implementing MDP in PyTorch**
```python
import torch
import numpy as np
class MDP:
def __init__(self, states, actions, transition_probs, rewards, gamma=0.99):
"""
Initialize Markov Decision Process
Args:
states: List of states
actions: List of actions
transition_probs: Dict[state][action] -> list of (next_state, prob)
rewards: Dict[state][action][next_state] -> reward
gamma: Discount factor
"""
self.states = states
self.actions = actions
self.transition_probs = transition_probs
self.rewards = rewards
self.gamma = gamma
self.state_to_idx = {s: i for i, s in enumerate(states)}
self.action_to_idx = {a: i for i, a in enumerate(actions)}
def get_transition_prob(self, s, a, s_prime):
"""Get transition probability P(s'|s,a)"""
s_idx = self.state_to_idx[s]
a_idx = self.action_to_idx[a]
for next_s, prob in self.transition_probs[s][a]:
if next_s == s_prime:
return prob
return 0.0
def get_reward(self, s, a, s_prime):
"""Get reward R(s,a,s')"""
return self.rewards.get(s, {}).get(a, {}).get(s_prime, 0.0)
def bellman_update(self, V, s):
"""Perform Bellman update for state s"""
v = 0
for a in self.actions:
action_value = 0
for s_prime, prob in self.transition_probs[s][a]:
r = self.get_reward(s, a, s_prime)
action_value += prob * (r + self.gamma * V[self.state_to_idx[s_prime]])
v = max(v, action_value)
return v
# Example: Grid World MDP
states = [(i, j) for i in range(4) for j in range(4)]
actions = ['up', 'down', 'left', 'right']
# Define transition probabilities and rewards
transition_probs = {}
rewards = {}
for s in states:
transition_probs[s] = {}
rewards[s] = {}
for a in actions:
transition_probs[s][a] = []
rewards[s][a] = {}
# Calculate next state based on action
i, j = s
if a == 'up' and i > 0:
s_prime = (i-1, j)
elif a == 'down' and i < 3:
s_prime = (i+1, j)
elif a == 'left' and j > 0:
s_prime = (i, j-1)
elif a == 'right' and j < 3:
s_prime = (i, j+1)
else:
s_prime = s # Stay in same state if action not possible
# Set transition probability (assume deterministic for simplicity)
transition_probs[s][a] = [(s_prime, 1.0)]
# Set rewards (goal state at (3,3) has reward 1)
if s_prime == (3, 3):
rewards[s][a][s_prime] = 1.0
else:
rewards[s][a][s_prime] = 0.0
# Create MDP
mdp = MDP(states, actions, transition_probs, rewards)
```
### **Policy Evaluation and Iteration**
```python
def policy_evaluation(mdp, policy, theta=1e-6, max_iterations=1000):
"""Evaluate a policy using iterative policy evaluation"""
V = torch.zeros(len(mdp.states))
for _ in range(max_iterations):
delta = 0
for i, s in enumerate(mdp.states):
v = 0
for a in mdp.actions:
for s_prime, prob in mdp.transition_probs[s][a]:
r = mdp.get_reward(s, a, s_prime)
v += policy[i, mdp.action_to_idx[a]] * prob * (r + mdp.gamma * V[mdp.state_to_idx[s_prime]])
delta = max(delta, torch.abs(v - V[i]))
V[i] = v
if delta < theta:
break
return V
def policy_iteration(mdp, gamma=0.99, theta=1e-6):
"""Perform policy iteration to find optimal policy"""
# Initialize random policy
policy = torch.ones(len(mdp.states), len(mdp.actions)) / len(mdp.actions)
while True:
# Policy evaluation
V = policy_evaluation(mdp, policy, theta)
# Policy improvement
policy_stable = True
for i, s in enumerate(mdp.states):
old_action = torch.argmax(policy[i]).item()
# Calculate action values
action_values = torch.zeros(len(mdp.actions))
for a_idx, a in enumerate(mdp.actions):
for s_prime, prob in mdp.transition_probs[s][a]:
r = mdp.get_reward(s, a, s_prime)
action_values[a_idx] += prob * (r + gamma * V[mdp.state_to_idx[s_prime]])
# Update policy
best_action = torch.argmax(action_values).item()
policy[i] = torch.zeros(len(mdp.actions))
policy[i, best_action] = 1.0
if old_action != best_action:
policy_stable = False
if policy_stable:
break
return policy, V
```
---
## **Q-Learning and Deep Q-Networks**
Q-Learning is a model-free RL algorithm that learns the optimal action-value function.
### **Q-Learning Algorithm**
The Q-Learning update rule:
$$Q(s_t,a_t) \leftarrow Q(s_t,a_t) + \alpha \left[r_{t+1} + \gamma \max_a Q(s_{t+1},a) - Q(s_t,a_t)\right]$$
Where:
- $\alpha$ is the learning rate
- $\gamma$ is the discount factor
### **Deep Q-Networks (DQN)**
DQN uses a neural network to approximate the Q-function:
$$Q(s,a;\theta) \approx Q^*(s,a)$$
Key innovations:
- **Experience replay**: Store transitions $(s_t,a_t,r_{t+1},s_{t+1})$ in a replay buffer
- **Target network**: Use a separate network for computing target values
### **DQN Loss Function**
The DQN loss:
$$\mathcal{L}(\theta) = \mathbb{E}_{(s,a,r,s') \sim \mathcal{U}(\mathcal{D})} \left[ \left( r + \gamma \max_{a'} Q(s',a';\theta^-) - Q(s,a;\theta) \right)^2 \right]$$
Where $\theta^-$ are the parameters of the target network.
### **Implementing DQN in PyTorch**
```python
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
class DQN(nn.Module):
"""Deep Q-Network"""
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, x):
return self.network(x)
class ReplayBuffer:
"""Experience replay buffer"""
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.stack(states),
torch.tensor(actions),
torch.tensor(rewards, dtype=torch.float32),
torch.stack(next_states),
torch.tensor(dones, dtype=torch.float32)
)
def __len__(self):
return len(self.buffer)
class DQNAgent:
"""DQN Agent implementation"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
buffer_capacity=10000, batch_size=64, tau=0.005):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.batch_size = batch_size
self.tau = tau
# Q-Network
self.q_network = DQN(state_dim, action_dim)
self.target_network = DQN(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
# Optimizer
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
# Replay buffer
self.replay_buffer = ReplayBuffer(buffer_capacity)
def select_action(self, state, epsilon=0.1):
"""Epsilon-greedy action selection"""
if random.random() < epsilon:
return random.randrange(self.action_dim)
else:
with torch.no_grad():
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state)
return torch.argmax(q_values).item()
def update(self):
"""Update Q-network using experience replay"""
if len(self.replay_buffer) < self.batch_size:
return
# Sample batch from replay buffer
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
# Compute current Q values
current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
# Compute target Q values
with torch.no_grad():
next_q = self.target_network(next_states).max(1)[0]
target_q = rewards + (1 - dones) * self.gamma * next_q
# Compute loss
loss = nn.MSELoss()(current_q, target_q)
# Optimize the model
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Update target network
self._soft_update_target_network()
return loss.item()
def _soft_update_target_network(self):
"""Soft update of the target network"""
for target_param, param in zip(self.target_network.parameters(), self.q_network.parameters()):
target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)
def store_experience(self, state, action, reward, next_state, done):
"""Store experience in replay buffer"""
self.replay_buffer.push(
torch.FloatTensor(state),
action,
reward,
torch.FloatTensor(next_state),
done
)
```
### **Training a DQN Agent**
```python
import gym
import numpy as np
import matplotlib.pyplot as plt
def train_dqn(env, agent, episodes=500, max_steps=200, epsilon_start=1.0,
epsilon_end=0.01, epsilon_decay=0.995, render=False):
"""Train DQN agent"""
rewards = []
epsilons = []
epsilon = epsilon_start
for episode in range(episodes):
state = env.reset()
if isinstance(state, tuple):
state = state[0] # Handle new Gym API
total_reward = 0
for step in range(max_steps):
action = agent.select_action(state, epsilon)
next_state, reward, done, truncated, _ = env.step(action)
done = done or truncated
agent.store_experience(state, action, reward, next_state, done)
loss = agent.update()
state = next_state
total_reward += reward
if render and episode % 50 == 0:
env.render()
if done:
break
# Decay epsilon
epsilon = max(epsilon_end, epsilon * epsilon_decay)
rewards.append(total_reward)
epsilons.append(epsilon)
if episode % 10 == 0:
print(f"Episode {episode}, Reward: {total_reward:.2f}, Epsilon: {epsilon:.3f}")
env.close()
return rewards, epsilons
# Train DQN on CartPole
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgent(state_dim, action_dim)
rewards, epsilons = train_dqn(env, agent, episodes=500)
# Plot results
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(rewards)
plt.title('Rewards per Episode')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.subplot(1, 2, 2)
plt.plot(epsilons)
plt.title('Epsilon Decay')
plt.xlabel('Episode')
plt.ylabel('Epsilon')
plt.tight_layout()
plt.show()
```
### **DQN Improvements**
#### **Double DQN**
Reduces overestimation bias:
$$Q_{\text{target}} = r + \gamma Q(s', \arg\max_a Q(s',a;\theta);\theta^-)$$
```python
def update_double_dqn(self, states, actions, rewards, next_states, dones):
"""Double DQN update"""
# Compute current Q values
current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
# Compute target Q values using Double DQN
with torch.no_grad():
# Get best actions from online network
online_actions = self.q_network(next_states).argmax(1)
# Get Q values from target network for these actions
next_q = self.target_network(next_states).gather(1, online_actions.unsqueeze(1)).squeeze(1)
target_q = rewards + (1 - dones) * self.gamma * next_q
# Compute loss and update
loss = nn.MSELoss()(current_q, target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Update target network
self._soft_update_target_network()
return loss.item()
```
#### **Dueling DQN**
Separates state value and advantage:
$$Q(s,a) = V(s) + A(s,a) - \frac{1}{|A|}\sum_{a'}A(s,a')$$
```python
class DuelingDQN(nn.Module):
"""Dueling DQN architecture"""
def __init__(self, input_dim, output_dim):
super(DuelingDQN, self).__init__()
# Shared layers
self.feature = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU()
)
# Value stream
self.value = nn.Sequential(
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
# Advantage stream
self.advantage = nn.Sequential(
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, output_dim)
)
def forward(self, x):
features = self.feature(x)
value = self.value(features)
advantage = self.advantage(features)
# Combine value and advantage
q_values = value + (advantage - advantage.mean(dim=1, keepdim=True))
return q_values
```
#### **Prioritized Experience Replay**
Samples important transitions more frequently:
```python
class PrioritizedReplayBuffer:
"""Prioritized experience replay buffer"""
def __init__(self, capacity, alpha=0.6):
self.buffer = []
self.priorities = np.zeros((capacity,), dtype=np.float32)
self.capacity = capacity
self.position = 0
self.alpha = alpha
self.beta_start = 0.4
self.beta_frames = 1000
def push(self, state, action, reward, next_state, done):
"""Add experience to buffer with max priority"""
max_priority = np.max(self.priorities) if len(self.buffer) > 0 else 1.0
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
self.buffer[self.position] = (state, action, reward, next_state, done)
self.priorities[self.position] = max_priority
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size, frame):
"""Sample batch with priorities"""
if len(self.buffer) == self.capacity:
priorities = self.priorities
else:
priorities = self.priorities[:len(self.buffer)]
# Compute sampling probabilities
probs = priorities ** self.alpha
probs /= probs.sum()
# Sample indices
indices = np.random.choice(len(self.buffer), batch_size, p=probs)
# Compute importance-sampling weights
beta = self.beta_start + frame * (1.0 - self.beta_start) / self.beta_frames
beta = min(beta, 1.0)
weights = (len(self.buffer) * probs[indices]) ** (-beta)
weights /= weights.max()
# Get samples
samples = [self.buffer[idx] for idx in indices]
states, actions, rewards, next_states, dones = zip(*samples)
return (
torch.stack(states),
torch.tensor(actions),
torch.tensor(rewards, dtype=torch.float32),
torch.stack(next_states),
torch.tensor(dones, dtype=torch.float32),
indices,
torch.tensor(weights, dtype=torch.float32)
)
def update_priorities(self, indices, priorities):
"""Update priorities for sampled transitions"""
for idx, priority in zip(indices, priorities):
self.priorities[idx] = priority
def __len__(self):
return len(self.buffer)
```
---
## **Policy Gradient Methods**
Policy gradient methods directly optimize the policy rather than learning a value function.
### **Policy Gradient Theorem**
The policy gradient theorem states:
$$\nabla_{\theta}J(\pi_{\theta}) = \mathbb{E}_{\pi_{\theta}}\left[\nabla_{\theta}\log\pi_{\theta}(a|s)Q^{\pi_{\theta}}(s,a)\right]$$
Where $J(\pi_{\theta})$ is the expected return under policy $\pi_{\theta}$.
### **REINFORCE Algorithm**
The simplest policy gradient algorithm:
$$\theta \leftarrow \theta + \alpha \nabla_{\theta}\log\pi_{\theta}(a_t|s_t)G_t$$
Where $G_t$ is the return from time $t$.
### **Actor-Critic Methods**
Combine policy gradient (actor) with value function estimation (critic):
$$\theta \leftarrow \theta + \alpha \nabla_{\theta}\log\pi_{\theta}(a_t|s_t)\delta_t$$
Where $\delta_t = r_{t+1} + \gamma V(s_{t+1}) - V(s_t)$ is the TD error.
### **Implementing REINFORCE in PyTorch**
```python
class PolicyNetwork(nn.Module):
"""Policy network for REINFORCE"""
def __init__(self, input_dim, output_dim):
super(PolicyNetwork, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, output_dim),
nn.Softmax(dim=-1)
)
def forward(self, x):
return self.network(x)
class REINFORCE:
"""REINFORCE algorithm implementation"""
def __init__(self, state_dim, action_dim, lr=2e-3, gamma=0.99):
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = gamma
self.log_probs = []
self.rewards = []
def select_action(self, state):
"""Select action using policy network"""
state = torch.FloatTensor(state)
probs = self.policy(state)
m = torch.distributions.Categorical(probs)
action = m.sample()
self.log_probs.append(m.log_prob(action))
return action.item()
def store_reward(self, reward):
"""Store reward for later use"""
self.rewards.append(reward)
def update(self):
"""Update policy using REINFORCE"""
R = 0
returns = []
# Calculate returns (backwards)
for r in self.rewards[::-1]:
R = r + self.gamma * R
returns.insert(0, R)
# Normalize returns
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-9)
# Calculate policy loss
policy_loss = []
for log_prob, R in zip(self.log_probs, returns):
policy_loss.append(-log_prob * R)
policy_loss = torch.cat(policy_loss).sum()
# Update policy
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()
# Clear buffers
self.log_probs = []
self.rewards = []
```
### **Implementing Actor-Critic in PyTorch**
```python
class ValueNetwork(nn.Module):
"""Value network for Actor-Critic"""
def __init__(self, input_dim):
super(ValueNetwork, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 1)
)
def forward(self, x):
return self.network(x)
class ActorCritic:
"""Actor-Critic implementation"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
self.actor = PolicyNetwork(state_dim, action_dim)
self.critic = ValueNetwork(state_dim)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)
self.gamma = gamma
def select_action(self, state):
"""Select action using actor network"""
state = torch.FloatTensor(state)
probs = self.actor(state)
m = torch.distributions.Categorical(probs)
action = m.sample()
return action.item(), m.log_prob(action)
def update(self, state, action_log_prob, reward, next_state, done):
"""Update actor and critic networks"""
state = torch.FloatTensor(state)
next_state = torch.FloatTensor(next_state)
# Compute value targets
with torch.no_grad():
value = self.critic(state)
next_value = self.critic(next_state)
target = reward + (1 - int(done)) * self.gamma * next_value
# TD error (advantage)
advantage = target - value
# Update critic
critic_loss = nn.MSELoss()(value, target)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# Update actor (policy)
actor_loss = -action_log_prob * advantage
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
return critic_loss.item(), actor_loss.item()
```
---
## **Proximal Policy Optimization**
Proximal Policy Optimization (PPO) is a state-of-the-art policy optimization algorithm.
### **PPO Objective Function**
The PPO objective with clipping:
$$L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right]$$
Where:
- $r_t(\theta) = \frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta_{\text{old}}}(a_t|s_t)}$ is the probability ratio
- $\hat{A}_t$ is the estimated advantage
- $\epsilon$ is the clipping parameter (typically 0.1-0.3)
### **Advantage Estimation**
Generalized Advantage Estimation (GAE) provides better estimates:
$$\hat{A}_t^{\text{GAE}(\gamma,\lambda)} = \sum_{l=0}^{\infty}(\gamma\lambda)^l\delta_{t+l}$$
Where $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ is the TD residual.
### **Implementing PPO in PyTorch**
```python
class PPO:
"""Proximal Policy Optimization implementation"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
clip_epsilon=0.2, gae_lambda=0.95, ppo_epochs=4,
batch_size=64, value_coef=0.5, entropy_coef=0.01):
self.actor = PolicyNetwork(state_dim, action_dim)
self.critic = ValueNetwork(state_dim)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)
self.gamma = gamma
self.clip_epsilon = clip_epsilon
self.gae_lambda = gae_lambda
self.ppo_epochs = ppo_epochs
self.batch_size = batch_size
self.value_coef = value_coef
self.entropy_coef = entropy_coef
# Storage for experiences
self.states = []
self.actions = []
self.log_probs = []
self.values = []
self.rewards = []
self.dones = []
def select_action(self, state):
"""Select action and store necessary information"""
state = torch.FloatTensor(state)
# Get policy distribution
probs = self.actor(state)
dist = torch.distributions.Categorical(probs)
# Sample action
action = dist.sample()
# Store values for later update
value = self.critic(state)
log_prob = dist.log_prob(action)
self.states.append(state)
self.actions.append(action)
self.log_probs.append(log_prob)
self.values.append(value)
return action.item()
def store_reward(self, reward, done):
"""Store reward and done flag"""
self.rewards.append(reward)
self.dones.append(done)
def finish_episode(self):
"""Process the end of an episode"""
# Convert to tensors
states = torch.stack(self.states)
actions = torch.tensor(self.actions)
old_log_probs = torch.stack(self.log_probs).detach()
rewards = torch.tensor(self.rewards, dtype=torch.float32)
dones = torch.tensor(self.dones, dtype=torch.float32)
values = torch.stack(self.values).squeeze()
# Compute returns and advantages using GAE
returns, advantages = self.compute_gae(rewards, values, dones)
# Clear storage
self.states = []
self.actions = []
self.log_probs = []
self.values = []
self.rewards = []
self.dones = []
return states, actions, old_log_probs, returns, advantages
def compute_gae(self, rewards, values, dones):
"""Compute GAE advantages"""
batch_size = len(rewards)
advantages = torch.zeros_like(rewards)
returns = torch.zeros_like(rewards)
# Calculate advantages using GAE
gae = 0
for t in reversed(range(batch_size)):
if t == batch_size - 1:
next_value = 0 # Assume episode ends
else:
next_value = values[t + 1]
# TD residual
delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t]
# GAE
gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * gae
advantages[t] = gae
# Calculate returns
returns = advantages + values
return returns, advantages
def update(self, states, actions, old_log_probs, returns, advantages):
"""Update policy and value networks using PPO"""
# Normalize advantages
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Perform multiple PPO updates
total_actor_loss = 0
total_critic_loss = 0
total_entropy = 0
for _ in range(self.ppo_epochs):
# Create mini-batches
indices = np.arange(len(states))
np.random.shuffle(indices)
for start in range(0, len(states), self.batch_size):
end = start + self.batch_size
mb_indices = indices[start:end]
# Get mini-batch data
mb_states = states[mb_indices]
mb_actions = actions[mb_indices]
mb_old_log_probs = old_log_probs[mb_indices]
mb_returns = returns[mb_indices]
mb_advantages = advantages[mb_indices]
# Get current policy and value
probs = self.actor(mb_states)
dist = torch.distributions.Categorical(probs)
log_probs = dist.log_prob(mb_actions)
entropy = dist.entropy().mean()
values = self.critic(mb_states).squeeze()
# Compute probability ratio
ratio = (log_probs - mb_old_log_probs).exp()
# Compute clipped surrogate objective
surr1 = ratio * mb_advantages
surr2 = torch.clamp(ratio, 1.0 - self.clip_epsilon, 1.0 + self.clip_epsilon) * mb_advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Value loss
critic_loss = nn.MSELoss()(values, mb_returns)
# Total loss
loss = actor_loss + self.value_coef * critic_loss - self.entropy_coef * entropy
# Update networks
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
total_actor_loss += actor_loss.item()
total_critic_loss += critic_loss.item()
total_entropy += entropy.item()
# Average over updates
n_updates = self.ppo_epochs * (len(states) // self.batch_size)
return (
total_actor_loss / n_updates,
total_critic_loss / n_updates,
total_entropy / n_updates
)
```
### **Training PPO on CartPole**
```python
def train_ppo(env, agent, episodes=500, max_steps=200, render=False):
"""Train PPO agent"""
rewards = []
for episode in range(episodes):
state = env.reset()
if isinstance(state, tuple):
state = state[0] # Handle new Gym API
total_reward = 0
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, truncated, _ = env.step(action)
done = done or truncated
agent.store_reward(reward, done)
state = next_state
total_reward += reward
if render and episode % 50 == 0:
env.render()
if done:
break
# Process the end of episode
states, actions, old_log_probs, returns, advantages = agent.finish_episode()
# Update policy
actor_loss, critic_loss, entropy = agent.update(
states, actions, old_log_probs, returns, advantages
)
rewards.append(total_reward)
if episode % 10 == 0:
print(f"Episode {episode}, Reward: {total_reward:.2f}, "
f"Actor Loss: {actor_loss:.4f}, Critic Loss: {critic_loss:.4f}")
env.close()
return rewards
# Train PPO on CartPole
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = PPO(state_dim, action_dim)
rewards = train_ppo(env, agent, episodes=500)
# Plot results
plt.figure(figsize=(10, 5))
plt.plot(rewards)
plt.title('PPO Training on CartPole')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()
```
---
## **Deep Deterministic Policy Gradient**
Deep Deterministic Policy Gradient (DDPG) is an off-policy algorithm for continuous control.
### **DDPG Algorithm**
DDPG combines:
- DQN ideas for continuous actions
- Deterministic Policy Gradient theorem
- Actor-Critic architecture
The deterministic policy gradient theorem:
$$\nabla_{\theta}J(\mu_{\theta}) = \mathbb{E}_{s \sim \rho^{\mu}}\left[\nabla_{\theta}\mu_{\theta}(s)\nabla_a Q^{\mu}(s,a)|_{a=\mu_{\theta}(s)}\right]$$
### **DDPG Implementation**
```python
class DDPGActor(nn.Module):
"""Actor network for DDPG (deterministic policy)"""
def __init__(self, state_dim, action_dim, max_action=1.0):
super(DDPGActor, self).__init__()
self.max_action = max_action
self.network = nn.Sequential(
nn.Linear(state_dim, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, action_dim),
nn.Tanh()
)
def forward(self, state):
return self.max_action * self.network(state)
class DDPGCritic(nn.Module):
"""Critic network for DDPG"""
def __init__(self, state_dim, action_dim):
super(DDPGCritic, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim + action_dim, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, 1)
)
def forward(self, state, action):
return self.network(torch.cat([state, action], 1))
class DDPG:
"""Deep Deterministic Policy Gradient implementation"""
def __init__(self, state_dim, action_dim, max_action=1.0, lr_actor=1e-3,
lr_critic=1e-3, gamma=0.99, tau=0.005, buffer_capacity=1000000,
batch_size=100, noise_std=0.2):
self.gamma = gamma
self.tau = tau
self.batch_size = batch_size
self.noise_std = noise_std
self.max_action = max_action
# Actor networks
self.actor = DDPGActor(state_dim, action_dim, max_action)
self.actor_target = DDPGActor(state_dim, action_dim, max_action)
self.actor_target.load_state_dict(self.actor.state_dict())
# Critic networks
self.critic = DDPGCritic(state_dim, action_dim)
self.critic_target = DDPGCritic(state_dim, action_dim)
self.critic_target.load_state_dict(self.critic.state_dict())
# Optimizers
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)
# Replay buffer
self.replay_buffer = ReplayBuffer(buffer_capacity)
def select_action(self, state, add_noise=True):
"""Select action with optional noise for exploration"""
state = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state).detach().numpy()[0]
if add_noise:
action = action + np.random.normal(0, self.noise_std, size=action.shape)
action = np.clip(action, -self.max_action, self.max_action)
return action
def store_experience(self, state, action, reward, next_state, done):
"""Store experience in replay buffer"""
self.replay_buffer.push(
torch.FloatTensor(state),
torch.FloatTensor(action),
torch.FloatTensor([reward]),
torch.FloatTensor(next_state),
torch.FloatTensor([done])
)
def update(self):
"""Update actor and critic networks"""
if len(self.replay_buffer) < self.batch_size:
return None, None
# Sample batch from replay buffer
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
# Compute target Q-value
with torch.no_grad():
next_actions = self.actor_target(next_states)
target_q = self.critic_target(next_states, next_actions)
target_q = rewards + (1 - dones) * self.gamma * target_q
# Update critic
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# Update actor
actor_loss = -self.critic(states, self.actor(states)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# Update target networks
self._soft_update_target_networks()
return actor_loss.item(), critic_loss.item()
def _soft_update_target_networks(self):
"""Soft update of target networks"""
# Update actor target
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)
# Update critic target
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)
```
### **Training DDPG on Pendulum**
```python
import gym
def train_ddpg(env, agent, episodes=200, max_steps=200, render=False):
"""Train DDPG agent"""
rewards = []
for episode in range(episodes):
state = env.reset()
if isinstance(state, tuple):
state = state[0] # Handle new Gym API
total_reward = 0
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, truncated, _ = env.step(action)
done = done or truncated
agent.store_experience(state, action, reward, next_state, done)
# Update agent
actor_loss, critic_loss = agent.update()
state = next_state
total_reward += reward
if render and episode % 10 == 0:
env.render()
if done:
break
rewards.append(total_reward)
if episode % 10 == 0:
print(f"Episode {episode}, Reward: {total_reward:.2f}, "
f"Actor Loss: {actor_loss:.4f}, Critic Loss: {critic_loss:.4f}")
env.close()
return rewards
# Train DDPG on Pendulum
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
agent = DDPG(state_dim, action_dim, max_action)
rewards = train_ddpg(env, agent, episodes=200)
# Plot results
plt.figure(figsize=(10, 5))
plt.plot(rewards)
plt.title('DDPG Training on Pendulum')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()
```
---
## **Model-Based Reinforcement Learning**
Model-based RL learns a model of the environment dynamics and uses it for planning.
### **Model-Based vs Model-Free RL**
| **Model-Free RL** | **Model-Based RL** |
|-------------------|--------------------|
| Learns directly from experience | Learns environment model first |
| Simpler implementation | More complex but sample-efficient |
| Less sample-efficient | More sample-efficient |
| Common: DQN, PPO, DDPG | Common: PILCO, MB-MPO, Dreamer |
### **Dynamics Model**
The dynamics model predicts next state and reward:
$$s_{t+1}, r_t \sim \mathcal{P}_{\theta}(s_{t+1}, r_t|s_t, a_t)$$
### **Planning with Models**
Once we have a model, we can:
- **Simulate experiences**: Generate synthetic data
- **Plan trajectories**: Find optimal actions without interacting with real environment
- **Improve sample efficiency**: Learn from both real and simulated experiences
### **Implementing Model-Based RL**
```python
class DynamicsModel(nn.Module):
"""Environment dynamics model"""
def __init__(self, state_dim, action_dim):
super(DynamicsModel, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, state_dim + 1) # state_dim for next state, 1 for reward
)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
output = self.network(x)
next_state_pred = output[:, :-1]
reward_pred = output[:, -1]
return next_state_pred, reward_pred
class ModelBasedRL:
"""Model-Based Reinforcement Learning implementation"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
model_lr=1e-3, imagination_horizon=5):
# Policy network (actor)
self.actor = PolicyNetwork(state_dim, action_dim)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
# Value network (critic)
self.critic = ValueNetwork(state_dim)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)
# Dynamics model
self.dynamics = DynamicsModel(state_dim, action_dim)
self.model_optimizer = optim.Adam(self.dynamics.parameters(), lr=model_lr)
self.gamma = gamma
self.imagination_horizon = imagination_horizon
# Experience storage
self.real_experiences = []
self.model_experiences = []
def select_action(self, state):
"""Select action using policy network"""
state = torch.FloatTensor(state)
probs = self.actor(state)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
def store_real_experience(self, state, action, reward, next_state, done):
"""Store real experience for model training"""
self.real_experiences.append((state, action, reward, next_state, done))
def train_dynamics_model(self, batch_size=64):
"""Train dynamics model on real experiences"""
if len(self.real_experiences) < batch_size:
return 0.0
# Sample batch
indices = np.random.choice(len(self.real_experiences), batch_size)
batch = [self.real_experiences[i] for i in indices]
# Prepare data
states = torch.tensor([exp[0] for exp in batch], dtype=torch.float32)
actions = torch.tensor([exp[1] for exp in batch], dtype=torch.long)
rewards = torch.tensor([exp[2] for exp in batch], dtype=torch.float32)
next_states = torch.tensor([exp[3] for exp in batch], dtype=torch.float32)
# One-hot encode actions
actions_onehot = torch.zeros(batch_size, self.actor.network[-1].out_features)
actions_onehot[range(batch_size), actions] = 1
# Predict next state and reward
next_state_pred, reward_pred = self.dynamics(states, actions_onehot)
# Compute loss
state_loss = nn.MSELoss()(next_state_pred, next_states)
reward_loss = nn.MSELoss()(reward_pred, rewards)
loss = state_loss + reward_loss
# Update model
self.model_optimizer.zero_grad()
loss.backward()
self.model_optimizer.step()
return loss.item()
def generate_model_experiences(self, num_trajectories=5, trajectory_length=10):
"""Generate experiences using the learned model"""
self.model_experiences = []
for _ in range(num_trajectories):
# Start from random real experience
idx = np.random.randint(len(self.real_experiences))
state, _, _, _, _ = self.real_experiences[idx]
state = torch.tensor(state, dtype=torch.float32)
for _ in range(trajectory_length):
# Select action using current policy
with torch.no_grad():
probs = self.actor(state)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
# One-hot encode action
action_onehot = torch.zeros(self.actor.network[-1].out_features)
action_onehot[action] = 1
# Predict next state and reward using dynamics model
with torch.no_grad():
next_state_pred, reward_pred = self.dynamics(state.unsqueeze(0), action_onehot.unsqueeze(0))
# Store model experience
self.model_experiences.append((
state.numpy(),
action.item(),
reward_pred.item(),
next_state_pred.squeeze().numpy(),
False # Model doesn't know when episode ends
))
# Continue with predicted state
state = next_state_pred.squeeze()
def update_policy(self, batch_size=64):
"""Update policy using both real and model experiences"""
# Combine real and model experiences
all_experiences = self.real_experiences + self.model_experiences
if len(all_experiences) < batch_size:
return 0.0, 0.0
# Sample batch
indices = np.random.choice(len(all_experiences), batch_size)
batch = [all_experiences[i] for i in indices]
# Prepare data
states = torch.tensor([exp[0] for exp in batch], dtype=torch.float32)
actions = torch.tensor([exp[1] for exp in batch], dtype=torch.long)
rewards = torch.tensor([exp[2] for exp in batch], dtype=torch.float32)
next_states = torch.tensor([exp[3] for exp in batch], dtype=torch.float32)
dones = torch.tensor([exp[4] for exp in batch], dtype=torch.float32)
# Compute advantages using GAE
with torch.no_grad():
values = self.critic(states).squeeze()
next_values = self.critic(next_states).squeeze()
td_errors = rewards + (1 - dones) * self.gamma * next_values - values
advantages = td_errors # Simple TD error as advantage
# Update critic
critic_loss = nn.MSELoss()(self.critic(states).squeeze(), rewards + (1 - dones) * self.gamma * next_values)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# Update actor
probs = self.actor(states)
dist = torch.distributions.Categorical(probs)
log_probs = dist.log_prob(actions)
actor_loss = -(log_probs * advantages.detach()).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
return actor_loss.item(), critic_loss.item()
```
---
## **Multi-Agent Reinforcement Learning**
Multi-Agent RL (MARL) deals with multiple agents interacting in a shared environment.
### **MARL Challenges**
- **Non-stationarity**: Environment changes as other agents learn
- **Credit assignment**: Determining each agent's contribution to team reward
- **Communication**: How agents share information
- **Scalability**: Complexity increases with number of agents
### **MARL Approaches**
- **Independent Q-Learning (IQL)**: Each agent learns independently
- **Centralized Training with Decentralized Execution (CTDE)**: Train with global info, execute with local info
- **Multi-Agent Actor-Critic (MAAC)**: Extension of actor-critic to multiple agents
- **MADDPG**: Multi-Agent DDPG for continuous control
### **Implementing MADDPG**
```python
class MADDPGAgent:
"""MADDPG agent for multi-agent environments"""
def __init__(self, agent_id, state_dim, action_dim, num_agents,
lr_actor=1e-4, lr_critic=1e-3, gamma=0.95, tau=0.01):
self.agent_id = agent_id
self.gamma = gamma
self.tau = tau
self.num_agents = num_agents
# Actor network
self.actor = DDPGActor(state_dim, action_dim)
self.actor_target = DDPGActor(state_dim, action_dim)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)
# Critic network (takes all states and actions)
critic_input_dim = num_agents * state_dim + num_agents * action_dim
self.critic = DDPGCritic(critic_input_dim, 1)
self.critic_target = DDPGCritic(critic_input_dim, 1)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)
def select_action(self, state, add_noise=True, noise_scale=0.1):
"""Select action for this agent"""
state = torch.FloatTensor(state)
action = self.actor(state).detach().numpy()
if add_noise:
action += noise_scale * np.random.randn(*action.shape)
action = np.clip(action, -1, 1)
return action
def update(self, agents, experiences, batch_size=1024, device='cpu'):
"""
Update this agent's networks
Args:
agents: List of all agents
experiences: List of (states, actions, rewards, next_states, dones)
batch_size: Batch size for updates
"""
if len(experiences) < batch_size:
return None, None
# Sample batch
indices = np.random.choice(len(experiences), batch_size)
batch = [experiences[i] for i in indices]
# Unpack batch
states_batch = [torch.stack([exp[0][i] for exp in batch]) for i in range(self.num_agents)]
actions_batch = [torch.stack([exp[1][i] for exp in batch]) for i in range(self.num_agents)]
rewards_batch = torch.tensor([exp[2][self.agent_id] for exp in batch], dtype=torch.float32).to(device)
next_states_batch = [torch.stack([exp[3][i] for exp in batch]) for i in range(self.num_agents)]
dones_batch = torch.tensor([exp[4][self.agent_id] for exp in batch], dtype=torch.float32).to(device)
# Concatenate all states and actions for critic input
all_states = torch.cat(states_batch, dim=1)
all_actions = torch.cat(actions_batch, dim=1)
all_next_states = torch.cat(next_states_batch, dim=1)
# Update critic
with torch.no_grad():
# Get next actions from all target actors
next_actions = [
agents[i].actor_target(next_states_batch[i])
for i in range(self.num_agents)
]
next_actions = torch.cat(next_actions, dim=1)
# Compute target Q-value
target_q = self.critic_target(all_next_states, next_actions)
target_q = rewards_batch + (1 - dones_batch) * self.gamma * target_q
# Current Q-value
current_q = self.critic(all_states, all_actions)
# Critic loss
critic_loss = nn.MSELoss()(current_q, target_q)
# Update critic
self.critic_optimizer.zero_grad()
critic_loss.backward()
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
self.critic_optimizer.step()
# Update actor
# Get current policy actions
curr_actions = [
self.actor(states_batch[self.agent_id]) if i == self.agent_id
else agents[i].actor(states_batch[i]).detach()
for i in range(self.num_agents)
]
curr_actions = torch.cat(curr_actions, dim=1)
# Actor loss
actor_loss = -self.critic(all_states, curr_actions).mean()
# Update actor
self.actor_optimizer.zero_grad()
actor_loss.backward()
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
self.actor_optimizer.step()
# Update target networks
self._soft_update_target_networks()
return actor_loss.item(), critic_loss.item()
def _soft_update_target_networks(self):
"""Soft update of target networks"""
# Update actor target
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)
# Update critic target
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)
```
### **Training MADDPG on Multi-Agent Environment**
```python
def train_maddpg(env, agents, episodes=1000, max_steps=25, batch_size=1024):
"""Train MADDPG agents in a multi-agent environment"""
rewards_history = [[] for _ in range(len(agents))]
for episode in range(episodes):
states = env.reset()
if isinstance(states, tuple):
states = states[0] # Handle new Gym API
total_rewards = np.zeros(len(agents))
for step in range(max_steps):
# Select actions for all agents
actions = [
agents[i].select_action(states[i])
for i in range(len(agents))
]
# Take step in environment
next_states, rewards, dones, truncated, _ = env.step(actions)
dones = [d or t for d, t in zip(dones, truncated)]
# Store experience
experience = (states, actions, rewards, next_states, dones)
for agent in agents:
agent.experiences.append(experience)
# Update all agents
for i, agent in enumerate(agents):
agent.update(agents, agent.experiences, batch_size)
# Update state and track rewards
states = next_states
total_rewards += np.array(rewards)
if any(dones):
break
# Store episode rewards
for i in range(len(agents)):
rewards_history[i].append(total_rewards[i])
# Print progress
if episode % 100 == 0:
avg_rewards = [np.mean(rewards[-100:]) for rewards in rewards_history]
print(f"Episode {episode}, Avg Rewards: {avg_rewards}")
return rewards_history
```
---
## **Building a Complete RL Agent**
Let's build a complete RL agent that can solve complex environments.
### **Step 1: Environment Setup**
```python
# Install required packages
!pip install gym[classic_control,box2d] pybullet
# Import libraries
import gym
import pybullet_envs
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque, namedtuple
import random
import matplotlib.pyplot as plt
import time
```
### **Step 2: Advanced Replay Buffer**
```python
class PrioritizedReplayBuffer:
"""Prioritized experience replay buffer with importance sampling"""
def __init__(self, capacity, alpha=0.6):
self.buffer = []
self.priorities = np.zeros((capacity,), dtype=np.float32)
self.capacity = capacity
self.position = 0
self.alpha = alpha
self.beta_start = 0.4
self.beta_frames = 100000
def push(self, state, action, reward, next_state, done):
"""Add experience to buffer with max priority"""
max_priority = np.max(self.priorities) if len(self.buffer) > 0 else 1.0
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
self.buffer[self.position] = (state, action, reward, next_state, done)
self.priorities[self.position] = max_priority
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size, frame):
"""Sample batch with priorities"""
if len(self.buffer) == self.capacity:
priorities = self.priorities
else:
priorities = self.priorities[:len(self.buffer)]
# Compute sampling probabilities
probs = priorities ** self.alpha
probs /= probs.sum()
# Sample indices
indices = np.random.choice(len(self.buffer), batch_size, p=probs)
# Compute importance-sampling weights
beta = self.beta_start + frame * (1.0 - self.beta_start) / self.beta_frames
beta = min(beta, 1.0)
weights = (len(self.buffer) * probs[indices]) ** (-beta)
weights /= weights.max()
# Get samples
samples = [self.buffer[idx] for idx in indices]
states, actions, rewards, next_states, dones = zip(*samples)
return (
torch.stack(states),
torch.tensor(actions),
torch.tensor(rewards, dtype=torch.float32),
torch.stack(next_states),
torch.tensor(dones, dtype=torch.float32),
indices,
torch.tensor(weights, dtype=torch.float32)
)
def update_priorities(self, indices, priorities):
"""Update priorities for sampled transitions"""
for idx, priority in zip(indices, priorities):
self.priorities[idx] = priority
def __len__(self):
return len(self.buffer)
```
### **Step 3: Advanced Network Architectures**
```python
class NoisyLinear(nn.Module):
"""Noisy linear layer for exploration"""
def __init__(self, in_features, out_features, std_init=0.4):
super(NoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.std_init = std_init
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
self.register_buffer('bias_epsilon', torch.empty(out_features))
self.reset_parameters()
self.reset_noise()
def reset_parameters(self):
"""Initialize parameters"""
mu_range = 1 / np.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.std_init / np.sqrt(self.in_features))
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.std_init / np.sqrt(self.out_features))
def _scale_noise(self, size):
"""Generate scaled noise"""
x = torch.randn(size)
return x.sign().mul_(x.abs().sqrt_())
def reset_noise(self):
"""Reset noise"""
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
self.bias_epsilon.copy_(epsilon_out)
def forward(self, x):
"""Forward pass with noise"""
if self.training:
return F.linear(x,
self.weight_mu + self.weight_sigma * self.weight_epsilon,
self.bias_mu + self.bias_sigma * self.bias_epsilon)
else:
return F.linear(x, self.weight_mu, self.bias_mu)
class RainbowDQN(nn.Module):
"""Rainbow DQN architecture combining multiple improvements"""
def __init__(self, input_dim, output_dim, num_atoms=51, v_min=-10, v_max=10):
super(RainbowDQN, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.num_atoms = num_atoms
self.v_min = v_min
self.v_max = v_max
self.delta_z = (v_max - v_min) / (num_atoms - 1)
# Feature extraction
self.feature = nn.Sequential(
nn.Linear(input_dim, 256),
nn.ReLU()
)
# Noisy networks for exploration
self.noisy_value1 = NoisyLinear(256, 256)
self.noisy_value2 = NoisyLinear(256, num_atoms)
self.noisy_advantage1 = NoisyLinear(256, 256)
self.noisy_advantage2 = NoisyLinear(256, output_dim * num_atoms)
# Register support
self.register_buffer('supports', torch.linspace(v_min, v_max, num_atoms))
def reset_noise(self):
"""Reset noise in noisy layers"""
self.noisy_value1.reset_noise()
self.noisy_value2.reset_noise()
self.noisy_advantage1.reset_noise()
self.noisy_advantage2.reset_noise()
def forward(self, x, log=False):
"""Forward pass"""
x = self.feature(x)
# Value stream
value = F.relu(self.noisy_value1(x))
value = self.noisy_value2(value)
# Advantage stream
advantage = F.relu(self.noisy_advantage1(x))
advantage = self.noisy_advantage2(advantage)
# Reshape advantage
advantage = advantage.view(-1, self.output_dim, self.num_atoms)
# Combine value and advantage
value = value.view(-1, 1, self.num_atoms)
q_atoms = value + advantage - advantage.mean(1, keepdim=True)
# Apply softmax to get probabilities
if log:
q_dist = F.log_softmax(q_atoms, dim=-1)
else:
q_dist = F.softmax(q_atoms, dim=-1)
return q_dist
def get_q_values(self, x):
"""Get Q-values from distribution"""
with torch.no_grad():
q_dist = self.forward(x)
q_values = torch.sum(q_dist * self.supports, dim=2)
return q_values
```
### **Step 4: Rainbow DQN Agent**
```python
class RainbowDQNAgent:
"""Rainbow DQN agent implementation"""
def __init__(self, state_dim, action_dim, lr=6.25e-5, gamma=0.99,
buffer_capacity=1000000, batch_size=32,
target_update_freq=8000, num_atoms=51, v_min=-10, v_max=10):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.batch_size = batch_size
self.target_update_freq = target_update_freq
self.num_atoms = num_atoms
self.v_min = v_min
self.v_max = v_max
self.delta_z = (v_max - v_min) / (num_atoms - 1)
# Q-Networks
self.q_network = RainbowDQN(state_dim, action_dim, num_atoms, v_min, v_max)
self.target_network = RainbowDQN(state_dim, action_dim, num_atoms, v_min, v_max)
self.target_network.load_state_dict(self.q_network.state_dict())
# Optimizer
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
# Replay buffer
self.replay_buffer = PrioritizedReplayBuffer(buffer_capacity)
# Trackers
self.frame_idx = 0
self.update_counter = 0
def select_action(self, state):
"""Select action using Q-network"""
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network.get_q_values(state)
return torch.argmax(q_values).item()
def store_experience(self, state, action, reward, next_state, done):
"""Store experience in replay buffer"""
self.replay_buffer.push(
torch.FloatTensor(state),
action,
np.clip(reward, -1, 1), # Clip rewards
torch.FloatTensor(next_state),
done
)
self.frame_idx += 1
def update(self):
"""Update Q-network using Rainbow DQN algorithm"""
if len(self.replay_buffer) < self.batch_size:
return 0.0
# Sample batch from replay buffer
states, actions, rewards, next_states, dones, indices, weights = self.replay_buffer.sample(
self.batch_size, self.frame_idx
)
# Compute current Q distribution
current_dist = self.q_network(states, log=True)
current_dist = current_dist[range(self.batch_size), actions]
# Compute target Q distribution
with torch.no_grad():
# Get next state distribution
next_dist = self.target_network(next_states)
# Get next state Q-values for action selection
next_q_values = torch.sum(next_dist * self.q_network.supports, dim=2)
next_actions = torch.argmax(next_q_values, dim=1)
# Get target distribution for selected actions
target_dist = next_dist[range(self.batch_size), next_actions]
# Project target distribution
rewards = rewards.unsqueeze(1)
dones = dones.unsqueeze(1)
Tz = torch.clamp(
rewards + (1 - dones) * self.gamma * self.q_network.supports,
self.v_min, self.v_max
)
b = (Tz - self.v_min) / self.delta_z
l = b.floor().long()
u = b.ceil().long()
# Fix disappearing probability mass when l = b = u
l[(u > 0) * (l == u)] -= 1
u[(u > 0) * (l == u)] += 1
# Distribute probability
offset = torch.linspace(0, (self.batch_size - 1) * self.num_atoms, self.batch_size).long().unsqueeze(1).expand(self.batch_size, self.num_atoms).to(states.device)
proj_dist = torch.zeros(self.batch_size * self.num_atoms, device=states.device)
proj_dist.index_add_(0, (l + offset).view(-1), (target_dist * (u.float() - b)).view(-1))
proj_dist.index_add_(0, (u + offset).view(-1), (target_dist * (b - l.float())).view(-1))
proj_dist = proj_dist.view(self.batch_size, self.num_atoms)
# Compute loss with importance sampling weights
loss = -torch.sum(proj_dist * current_dist, -1) * weights
prios = loss + 1e-5
loss = loss.mean()
# Update networks
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Update priorities
self.replay_buffer.update_priorities(indices, prios.data.cpu().numpy())
# Reset noise
self.q_network.reset_noise()
# Update target network
self.update_counter += 1
if self.update_counter % self.target_update_freq == 0:
self.target_network.load_state_dict(self.q_network.state_dict())
return loss.item()
```
### **Step 5: Training and Evaluation**
```python
def train_rainbow(env, agent, episodes=1000, max_steps=500,
evaluation_interval=50, evaluation_episodes=10):
"""Train Rainbow DQN agent"""
rewards = []
losses = []
for episode in range(episodes):
state = env.reset()
if isinstance(state, tuple):
state = state[0] # Handle new Gym API
total_reward = 0
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, truncated, _ = env.step(action)
done = done or truncated
agent.store_experience(state, action, reward, next_state, done)
loss = agent.update()
state = next_state
total_reward += reward
if done:
break
rewards.append(total_reward)
if loss is not None:
losses.append(loss)
# Evaluation
if episode % evaluation_interval == 0:
eval_reward = evaluate_agent(env, agent, evaluation_episodes)
print(f"Episode {episode}, Reward: {total_reward:.2f}, "
f"Eval Reward: {eval_reward:.2f}, Loss: {loss:.4f}")
else:
print(f"Episode {episode}, Reward: {total_reward:.2f}")
env.close()
return rewards, losses
def evaluate_agent(env, agent, episodes=10):
"""Evaluate agent performance"""
total_rewards = []
for _ in range(episodes):
state = env.reset()
if isinstance(state, tuple):
state = state[0] # Handle new Gym API
total_reward = 0
done = False
while not done:
action = agent.select_action(state)
next_state, reward, done, truncated, _ = env.step(action)
done = done or truncated
state = next_state
total_reward += reward
total_rewards.append(total_reward)
return np.mean(total_rewards)
# Train on LunarLander
env = gym.make('LunarLander-v2')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = RainbowDQNAgent(state_dim, action_dim)
rewards, losses = train_rainbow(env, agent, episodes=1000)
# Plot results
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(rewards)
plt.title('Training Rewards')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.subplot(1, 2, 2)
plt.plot(losses)
plt.title('Training Loss')
plt.xlabel('Update')
plt.ylabel('Loss')
plt.tight_layout()
plt.show()
```
---
## **Quiz 5: Test Your Understanding of Reinforcement Learning**
**1. What is the primary objective in Reinforcement Learning?**
A) Minimize prediction error on a labeled dataset
B) Maximize the cumulative reward over time
C) Minimize the reconstruction error of input data
D) Maximize the accuracy of a classification task
**2. In the Bellman equation for the optimal value function, what does $V_*(s)$ represent?**
A) The expected return starting from state $s$ and following policy $\pi$
B) The maximum expected return achievable from state $s$
C) The immediate reward for being in state $s$
D) The probability of transitioning to the best next state from $s$
**3. What is the main advantage of Deep Q-Networks (DQN) over traditional Q-Learning?**
A) They can handle continuous action spaces
B) They use experience replay and target networks for more stable training
C) They directly optimize the policy instead of learning a value function
D) They require less memory for storing the Q-table
**4. In the REINFORCE algorithm, what is the update rule for the policy parameters $\theta$?**
A) $\theta \leftarrow \theta + \alpha \nabla_{\theta}\log\pi_{\theta}(a|s)G_t$
B) $\theta \leftarrow \theta - \alpha \nabla_{\theta}Q(s,a)$
C) $\theta \leftarrow \theta + \alpha (r + \gamma \max_{a'}Q(s',a') - Q(s,a))\nabla_{\theta}Q(s,a)$
D) $\theta \leftarrow \theta + \alpha \nabla_{\theta}V(s)$
**5. What is the key innovation of Proximal Policy Optimization (PPO)?**
A) Using a target network to stabilize training
B) Clipping the probability ratio to prevent large policy updates
C) Using a deterministic policy for continuous control
D) Learning a model of the environment dynamics
**6. In Deep Deterministic Policy Gradient (DDPG), why is the policy deterministic?**
A) To reduce the variance of policy gradient estimates
B) Because it's designed for discrete action spaces
C) To enable the use of the deterministic policy gradient theorem
D) To simplify the implementation of the algorithm
**7. What is the main advantage of model-based reinforcement learning over model-free approaches?**
A) Better asymptotic performance
B) Greater sample efficiency
C) Simpler implementation
D) Better handling of non-stationary environments
**8. In Multi-Agent Deep Deterministic Policy Gradient (MADDPG), what information is available to the critic during training?**
A) Only the agent's own observations and actions
B) Global state information and actions of all agents
C) Only the observations of other agents
D) The reward signals of all agents
**9. What is the purpose of the replay buffer in DQN?**
A) To store the policy network parameters for later use
B) To store experiences for breaking correlation in training data
C) To store the value function estimates for each state
D) To store the environment model parameters
**10. In the Rainbow DQN algorithm, what does "distributional" refer to?**
A) The distribution of rewards over time
B) Modeling the full distribution of returns rather than just the expected value
C) The distribution of states visited during training
D) The distribution of policy parameters
---
**Answers:**
1. B - RL aims to maximize cumulative reward
2. B - $V_*(s)$ is the maximum expected return from state $s$
3. B - DQN uses experience replay and target networks
4. A - REINFORCE updates using the return $G_t$
5. B - PPO uses clipping to prevent large policy updates
6. C - DDPG uses deterministic policy gradient theorem
7. B - Model-based RL is more sample-efficient
8. B - MADDPG critic uses global information during training
9. B - Replay buffer breaks correlation in training data
10. B - Distributional RL models the full return distribution
---
## **Summary and What's Next**
In this **comprehensive Part 5** of our PyTorch Masterclass, we've covered:
- **Markov Decision Processes**: The theoretical foundation of RL
- **Q-Learning and Deep Q-Networks**: Learning from experience with value functions
- **Policy Gradient Methods**: Direct policy optimization
- **Proximal Policy Optimization**: State-of-the-art policy optimization
- **Deep Deterministic Policy Gradient**: For continuous control problems
- **Model-Based Reinforcement Learning**: Learning environment models
- **Multi-Agent Reinforcement Learning**: Multiple agents interacting
- **Complete RL Agent**: Building a state-of-the-art Rainbow DQN agent
You now have the skills to:
- Implement and train various RL algorithms
- Solve complex control problems with deep RL
- Work with both discrete and continuous action spaces
- Handle multi-agent scenarios
- Build sample-efficient RL systems
### **What's Next?**
This concludes our **5-part PyTorch Masterclass series**! Over these five parts, we've covered:
- **Part 1**: PyTorch fundamentals, tensors, autograd, and basic neural networks
- **Part 2**: Computer vision with CNNs, transfer learning, and object detection
- **Part 3**: NLP with RNNs, attention, Transformers, and BERT
- **Part 4**: Generative models including GANs, VAEs, and diffusion models
- **Part 5**: Reinforcement learning with DQN, PPO, DDPG, and multi-agent systems
You now have a comprehensive understanding of deep learning with PyTorch across multiple domains. The next step is to apply these skills to real-world problems and continue exploring the latest advancements in AI research.
### **Continuing Your Learning Journey**
To further deepen your knowledge:
- **Read research papers**: Follow NeurIPS, ICML, ICLR proceedings
- **Contribute to open source**: Projects like PyTorch, Hugging Face, RLlib
- **Participate in competitions**: Kaggle, AIcrowd, DeepHack
- **Build your own projects**: Solve problems that interest you
Remember, the field of AI is rapidly evolving. Stay curious, keep experimenting, and don't be afraid to dive into new areas!
👉 **Thank you for joining this PyTorch Masterclass series!**
---
**Hashtags:** #PyTorch #ReinforcementLearning #RL #DeepRL #Qlearning #DQN #PPO #DDPG #MarkovDecisionProcesses #AI #MachineLearning #DeepLearning #ReinforcementLearning #PyTorchRL #ActorCritic #PolicyGradient #ModelBasedRL #MultiAgentRL #RainbowDQN #ProximalPolicyOptimization #DeepDeterministicPolicyGradient