# Reinforcement Learning for Prediction
## Introduction
* **Model-Free Learning:** Reinforcement Learning (RL) excels in scenarios where a pre-defined model of the environment is unavailable or impractical to obtain. It enables agents to learn directly through interaction with the environment.
* **Learning from Experience:** In real-world situations, explicit probabilities governing state transitions and rewards are often unknown. RL agents adapt by learning from the experiences they gather during interaction.
* **Addressing Model Complexity:** Even when a model exists, its complexity can make updates computationally challenging. RL offers a more practical approach by learning directly from experience, bypassing the need for complex model manipulation.
* **Trial-and-Error Learning:** RL mirrors how humans and animals learn through a "trial-and-error" process. Agents explore the environment, take actions, and receive feedback in the form of rewards, gradually refining their understanding of optimal behavior.
* **Value Function Approximation:** RL algorithms iteratively update a Q-value function, which estimates the expected return for taking a particular action in a given state. The accuracy of this approximation is crucial for the agent's performance.
* **Theoretical Foundation:** Most RL algorithms are built upon the Bellman Equations, a set of fundamental equations that describe the relationship between the value of a state and the values of its successor states.
* **Control Strategies:** Generalized Policy Iteration (GPI) is a common framework for RL control, where the agent iteratively improves its policy by evaluating its current policy and then greedily updating it based on the learned value function.
## RL for Prediction
### Prediction Problem
The core problem in RL prediction is to estimate the value function of a *Markov Decision Process (MDP)* under a fixed policy $\pi$. This value function, denoted as $V^{\pi}(s)$, represents the expected cumulative discounted reward an agent can achieve by starting in state s and following policy π. It can also be viewed as the value function of the *Markov Reward Process (MRP)* induced by the MDP and policy $\pi$.
### Experience Interface
In RL, agents learn from experiences obtained through interaction with the environment. Each experience is typically represented as a tuple $(s,r,s')$, where:
* $s$ is the current state
* $r$ is the reward received
* $s'$ is the next state
A sequence of these experiences forms a trace $S_0, R_1, S_1, R_2, S_2, \dots$, which can be used to update the agent's value function estimate.
### Value Function and Return
The value function $V(s)$ is defined as the expected return $G_t$ starting from state $s$ at time $t$ under policy $\pi$:
$$
V(s) = \mathbb{E}[G_t | S_t = s]
$$
where the return $G_t$ is the cumulative discounted reward:
$$
G_t = \sum_{i=t+1}^{\infty} \gamma^{i-t-1} R_i
$$
### Code Implementation
```python=
class TransitionStep(Generic[S]):
state: NonTerminal[S]
next_state: State[S]
reward: float
def reward_traces(
self,
start_state_distribution: Distribution[NonTerminal[S]]
) -> Iterable[Iterable[TransitionStep[S]]]:
while True:
yield self.simulate_reward(start_state_distribution)
```
## Monte-Carlo (MC) Prediction
### Overview
* **Supervised Learning Approach:** MC prediction leverages states and returns from trace experiences in a supervised learning manner.
* **Incremental Estimation:** Utilizes the `ValueFunctionApprox` update method for incremental estimation.
* **Inputs and Outputs:** Takes states $S_t$ as x-values and returns $G_t$ as y-values.
* **Update Timing:** Updates occur at the end of each trace experience.
* **Return Calculation:** Returns are computed through a backward walk: $$G_t = R_{t+1} + \gamma \cdot G_{t+1}$$
* **Loss Function:** $$\mathcal{L}_{(S_t, G_t)}(w) = \frac12 (V(S_t; w) - G_t)^2$$
```python=
def mc_prediction(
traces: Iterable[Iterable[mp.TransitionStep[S]]],
approx_0: ValueFunctionApprox[S],
γ: float,
episode_length_tolerance: float = 1e-6
) -> Iterator[ValueFunctionApprox[S]]:
episodes = (returns(trace, γ, episode_length_tolerance) for trace in traces)
f = approx_0
yield f
for episode in episodes:
f = last(f.iterate_updates([(step.state, step.return_) for step in episode]))
yield f
```
### Parameter Update Formula
$$
\Delta w = \alpha \cdot (G_t - V(S_t; w)) \cdot \nabla_w V(S_t; w)
$$
* $\alpha$: Learning rate
* $G_t - V(S_t; w)$: Return residual (observed vs. estimated return)
* $\nabla_w V(S_t; w)$: Gradient of the estimated return with respect to parameters
### Tabular MC Prediction
:::success
### Algorithm: First-Visit Monte Carlo Prediction (Policy Evaluation)
**Input:**
* `pi`: Policy to be evaluated
* `episodes`: Set of episodes, each a sequence of (state, reward) pairs
* `gamma`: Discount factor ($0 < \gamma \leq 1$)
* `alpha`: Learning rate (optional, for incremental update)
* `V`: Initial value function (optional, for incremental update)
**Output:**
* `V`: Estimated value function
**Initialization:**
* `V(s) = 0` for all states `s` (or use the provided initial value function)
* `N(s) = 0` for all states `s` (state visit counts, optional for incremental update)
**Algorithm:**
1. **For each episode** in `episodes`:
* Initialize `G = 0` (return)
* Let `episode = [(S[0], R[1]), (S[1], R[2]), ..., (S[T-1], R[T])]` (sequence of (state, reward) pairs)
2. **For each step** `t` from `T-1` down to `0`:
* `G = gamma * G + R[t+1]` (compute the discounted return)
* **If** `S[t]` is the first visit to state `S[t]` in the episode:
* `N(S[t]) += 1` (increment the state visit count)
* **Option 1: Incremental Update**
* `V(S[t]) += alpha * (G - V(S[t]))`
* **Option 2: Average Update**
* `V(S[t]) += (G - V(S[t])) / N(S[t])`
3. **Return** `V` (the estimated value function)
:::
### Tabular MC as Linear Function Approximation
* **Features:** Indicator functions for each state.
* **Parameters:** Directly represent the estimated values for each state.
* **Count-to-weight:** State-dependent learning rate, determining the weight given to new returns.
* **Update:** $$w^{(n)}_i = w^{(n-1)}_i + \alpha_n \cdot (Y^{(n)} - w^{(n-1)}_i),$$ a step towards the gradient of the loss function.
* **Non-stationary Problems:** Constant learning rate $\alpha$ with time-decaying weights to prioritize recent experiences: $$V_n(s_i) = \sum_{j=1}^n \alpha \cdot (1-\alpha)^{n-j} \cdot Y^{(j)}_i$$
### Each-Visit vs. First-Visit MC
Monte Carlo (MC) prediction includes two variants:
* **Each-Visit MC:** Updates the value estimate for a state every time it is encountered within a trace.
* **First-Visit MC:** Updates the value estimate only the first time a state is encountered within a trace.
#### Key Points
* Both algorithms are easy to implement and produce unbiased estimates of the value function.
* MC methods can be slower to converge than TD learning.
* Both require complete traces (episodes) before updates.
Each-Visit MC may be better suited for non-stationary environments, while First-Visit MC may be preferred for stationary environments.
### Example: Simple Inventory MRP Finite model
```python=
from rl.chapter2.simple_inventory_mrp import SimpleInventoryMRPFinite
user_capacity = 2
user_poisson_lambda = 1.0
user_holding_cost = 1.0
user_stockout_cost = 10.0
user_gamma = 0.9
si_mrp = SimpleInventoryMRPFinite(
capacity=user_capacity,
poisson_lambda=user_poisson_lambda,
holding_cost=user_holding_cost,
stockout_cost=user_stockout_cost
)
si_mrp.display_value_function(gamma=user_gamma)
```
Output:
```python
{
NonTerminal(state=InventoryState(on_hand=0, on_order=0)): -43.596,
NonTerminal(state=InventoryState(on_hand=0, on_order=1)): -37.971,
NonTerminal(state=InventoryState(on_hand=0, on_order=2)): -37.329,
NonTerminal(state=InventoryState(on_hand=1, on_order=0)): -38.971,
NonTerminal(state=InventoryState(on_hand=1, on_order=1)): -38.329,
NonTerminal(state=InventoryState(on_hand=2, on_order=0)): -39.329
}
```
```python=
from rl.chapter2.simple_inventory_mrp import InventoryState
from rl.function_approx import Tabular
from rl.approximate_dynamic_programming import ValueFunctionApprox
from rl.distribution import Choose
from rl.iterate import last
from rl.monte_carlo import mc_prediction
from itertools import islice
from pprint import pprint
traces = si_mrp.reward_traces(Choose(si_mrp.non_terminal_states))
it = mc_prediction(
traces=traces,
approx_0=Tabular(),
gamma=user_gamma,
episode_length_tolerance=1e-6
)
num_traces = 60000
last_func = last(islice(it, num_traces))
pprint({s: round(last_func.evaluate([s])[0], 3) for s in si_mrp.non_terminal_states})
```
Output:
```python
{
NonTerminal(state=InventoryState(on_hand=0, on_order=0)): -43.598,
NonTerminal(state=InventoryState(on_hand=0, on_order=1)): -37.983,
NonTerminal(state=InventoryState(on_hand=0, on_order=2)): -37.331,
NonTerminal(state=InventoryState(on_hand=1, on_order=0)): -38.991,
NonTerminal(state=InventoryState(on_hand=1, on_order=1)): -38.347,
NonTerminal(state=InventoryState(on_hand=2, on_order=0)): -39.343
}
```
## Temporal-Difference (TD) Prediction
### Understanding TD
* **Key Insight:** Leverages the recursive relationship between states in the Bellman equation for efficient value function updates.
* **Bootstrapping:** Updates the value function estimate for the current state $S_t$ using the immediate reward $R_{t+1}$ and the estimated value of the next state $V(S_{t+1})$, without waiting for the final outcome of the episode.
* **Modified Update Rule:** $$V(S_t) \leftarrow V(S_t) + \alpha \cdot (R_{t+1} + \gamma \cdot V(S_{t+1}) - V(S_t))$$
* **TD Target:** $R_{t+1} + \gamma \cdot V(S_{t+1})$, representing an estimate of the return based on the immediate reward and the current value function estimate.
* **TD Error:** $\delta_t = R_{t+1} + \gamma \cdot V(S_{t+1}) - V(S_t)$ quantifies the discrepancy between the estimated return and the current value estimate for the state $S_t$.
:::success
### Algorithm: TD(0) Prediction
**Input:**
* `pi`: Policy to be evaluated
* `episodes`: Set of episodes, each a sequence of (state, reward, next_state) triples
* `gamma`: Discount factor ($0 < \gamma \leq 1$)
* `alpha`: Learning rate
* `V`: Initial value function (optional)
**Output:**
* `V`: Estimated value function
**Initialization:**
* `V(s) = 0` for all states `s` (or use the provided initial value function)
**Algorithm:**
1. **For each episode** in `episodes`:
* **For each step** `t` in the episode, given (`S[t]`, `R[t+1]`, `S[t+1]`):
* Calculate TD error:
```delta = R[t+1] + gamma * (V(S[t+1]) - V(S[t])```
* Update value function:
```V(S[t]) += alpha * delta```
2. **Return** `V` (the estimated value function)
:::
#### TD Updates and Advantages
* **Online Learning:** TD updates the value function after each step in the environment (atomic experience), allowing for continuous and efficient learning.
* **Flexibility:** It can be applied to incomplete episodes (non-episodic tasks) and can handle experiences in any order.
* **Efficiency:** TD often converges faster than Monte Carlo methods, particularly in tasks with long episodes.
### TD Prediction with Function Approximation
* **Parameter Update:** Each experience triggers an update to the parameters $w$ of the function approximator used to represent the value function $V(S_t; w)$.
* **Loss Function:** $$\mathcal{L}_{(S_{t}, S_{t+1}, R_{t+1})}(w) = \frac12 (V(S_{t}; w) - (R_{t+1} + \gamma \cdot V(S_{t+1}; w)))^2$$ The loss function quantifies the discrepancy between the current value estimate $V(S_t; w)$ and the TD target.
* **Semi-Gradient Update:** Instead of computing the full gradient of the loss function, TD uses a "semi-gradient" approach, where the dependency of the TD target on the current parameter $w$ is ignored.
* **Update Formula:** $$\Delta w = \alpha \cdot (R_{t+1} + \gamma \cdot V(S_{t+1};w) - V(S_t;w)) \cdot \nabla_{w} V(S_t; w)$$
#### Implementation
```python=
def td_prediction(
transitions: Iterable[mp.TransitionStep[S]],
approx_0: ValueFunctionApprox[S],
γ: float
) -> Iterator[ValueFunctionApprox[S]]:
'''Evaluate an MRP using TD(0) using the given sequence of
transitions.
Each value this function yields represents the approximated value
function for the MRP after an additional transition.
Arguments:
transitions -- a sequence of transitions from an MRP which don't
have to be in order or from the same simulation
approx_0 -- initial approximation of value function
γ -- discount rate (0 < γ ≤ 1)
'''
def step(
v: ValueFunctionApprox[S],
transition: mp.TransitionStep[S]
) -> ValueFunctionApprox[S]:
return v.update([(
transition.state,
transition.reward + γ * extended_vf(v, transition.next_state)
)])
return iterate.accumulate(transitions, step, initial=approx_0)
```
### TD's Benefits
* **Combines Strengths of DP and MC:** TD learning combines the bootstrapping aspect of dynamic programming (updating estimates based on other estimates) with the ability to learn directly from experience, similar to Monte Carlo methods.
* **Overcomes Limitations:** It can handle the curse of dimensionality (large state spaces) and the curse of modeling (unknown environment dynamics) effectively.
* **Continuous Learning:** It updates the value function after each step, allowing for continuous learning and adaptation in dynamic environments.
### Example: Simple Inventory MRP Finite model
```python=
import itertools
from rl.distribution import Distribution, Choose
from rl.approximate_dynamic_programming import NTStateDistribution
def mrp_episodes_stream(
mrp: MarkovRewardProcess[S],
start_state_distribution: NTStateDistribution[S]
) -> Iterable[Iterable[TransitionStep[S]]]:
return mrp.reward_traces(start_state_distribution)
def fmrp_episodes_stream(
fmrp: FiniteMarkovRewardProcess[S]
) -> Iterable[Iterable[TransitionStep[S]]]:
return mrp_episodes_stream(fmrp, Choose(fmrp.non_terminal_states))
def unit_experiences_from_episodes(
episodes: Iterable[Iterable[TransitionStep[S]]],
episode_length: int
) -> Iterable[TransitionStep[S]]:
return itertools.chain.from_iterable(
itertools.islice(episode, episode_length) for episode in episodes
)
```
```python=
def learning_rate_schedule(
initial_learning_rate: float,
half_life: float,
exponent: float
) -> Callable[[int], float]:
def lr_func(n: int) -> float:
return initial_learning_rate * (1 + (n - 1) / half_life) ** -exponent
return lr_func
```
```python=
import rl.iterate as iterate
import rl.td as td
import itertools
from pprint import pprint
from rl.chapter10.prediction_utils import fmrp_episodes_stream
from rl.chapter10.prediction_utils import unit_experiences_from_episodes
from rl.function_approx import learning_rate_schedule
episode_length: int = 100
initial_learning_rate: float = 0.03
half_life: float = 1000.0
exponent: float = 0.5
gamma: float = 0.9
episodes: Iterable[Iterable[TransitionStep[InventoryState]]] = \
fmrp_episodes_stream(si_mrp)
td_experiences: Iterable[TransitionStep[InventoryState]] = \
unit_experiences_from_episodes(
episodes,
episode_length
)
learning_rate_func: Callable[[int], float] = learning_rate_schedule(
initial_learning_rate=initial_learning_rate,
half_life=half_life,
exponent=exponent
)
td_vfs: Iterator[ValueFunctionApprox[InventoryState]] = td.td_prediction(
transitions=td_experiences,
approx_0=Tabular(count_to_weight_func=learning_rate_func),
gamma=gamma
)
num_episodes = 60000
final_td_vf: ValueFunctionApprox[InventoryState] = \
iterate.last(itertools.islice(td_vfs, episode_length * num_episodes))
pprint({s: round(final_td_vf(s), 3) for s in si_mrp.non_terminal_states})
```
Output
```python
{NonTerminal(state=InventoryState(on_hand=0, on_order=0)): -43.564,
NonTerminal(state=InventoryState(on_hand=0, on_order=1)): -38.117,
NonTerminal(state=InventoryState(on_hand=0, on_order=2)): -37.314,
NonTerminal(state=InventoryState(on_hand=1, on_order=0)): -39.12,
NonTerminal(state=InventoryState(on_hand=1, on_order=1)): -38.551,
NonTerminal(state=InventoryState(on_hand=2, on_order=0)): -39.38}
```
## TD versus MC
### Estimation Bias
* **MC:** Uses the actual return $G_t$ as an unbiased estimate of the value function, which generally aids convergence even with function approximation.
* **TD:** Uses the TD target $R_{t+1} + \gamma \cdot V(S_{t+1}; w)$ as a biased estimate of the value function, relying on the current value estimate for the next state.
### Convergence
* **Tabular TD:** Guaranteed to converge to the true value function under the Robbins-Monro learning rate schedule: $$\sum_{n=1}^{\infty} \alpha_n = \infty, \quad \sum_{n=1}^{\infty} \alpha^2_n < \infty$$
* **TD with Function Approximation:** Convergence is not always guaranteed, especially with non-linear function approximation. Most theoretical guarantees are for tabular or linear function approximation.
### Variance
The TD target $R_{t+1} + \gamma \cdot V(S_{t+1}; w)$ typically has lower variance than the MC return $G_t$.
* The MC return depends on the entire trajectory, accumulating variance from multiple random rewards.
* The TD target depends only on the next immediate reward, leading to less variance.
### Speed of Convergence
* **No Formal Proofs:** There's no definitive theoretical comparison of convergence speed or sample efficiency between TD and MC.
* **Empirical Results:** TD often converges faster than MC in practice, especially with constant learning rates.
* **Sensitivity to Initialization:** MC is generally less sensitive to the initial value function than TD.
### MC vs. TD: Learning Different Value Functions
While both Monte Carlo (MC) and Temporal Difference (TD) prediction learn from experience, they focus on different aspects:
* **MC Prediction:** Directly estimates the value function as the average return observed in the finite dataset. It mirrors the explicit calculation of average returns, making it intuitive and easy to interpret.
* **TD Prediction:** Converges to the value function of the Markov Reward Process (MRP) implied by the finite dataset. It essentially learns the maximum likelihood estimate (MLE) of the transition probabilities: $$\mathcal{P}_R(s,r,s') = \frac{\sum_{i=1}^N \mathbb{I}_{S_i=s, R_{i+1}=r, S_{i+1}=s'}}{\sum_{i=1}^N \mathbb{I}_{S_i=s}}$$ This means TD learns a model of the environment and estimates the value function based on this model.
### Advantage in Different Environments:
* **TD:** Excels in Markov environments where the next state depends only on the current state. Its ability to model transitions makes it efficient in predicting future rewards.
* **MC:** Better suited for non-Markov environments where the next state depends on the entire history of states. It directly estimates returns without relying on a model, making it less sensitive to model inaccuracies.
### Bootstrapping and Experiencing
| Method | Bootstrapping (update using estimates) | Experiencing (update using environment interaction) |
| --- | --- | --- |
| MC | ❌ | ✅ |
| TD | ✅ | ✅ |
| DP | ✅ | ❌ |
* **Bootstrapping:** TD and DP use bootstrapping, updating value estimates based on other estimates. MC does not bootstrap.
* **Experiencing:** TD and MC update based on direct interaction with the environment. DP does not experience, relying solely on the model.



### Example: Random Walk
* **Setup:** Symmetric random walk with barrier $B = 10$ and no discounting $\gamma = 1$.
* **Evaluation:** Root Mean Square Error (RMSE) of value function estimates is calculated after every 7th episode (700 total episodes).
```python=
class RandomWalkMRP(FiniteMarkovRewardProcess[int]):
'''
This MRP's states are {0, 1, 2,...,self.barrier}
with 0 and self.barrier as the terminal states.
At each time step, we go from state i to state
i+1 with probability self.p or to state i-1 with
probability 1-self.p, for all 0 < i < self.barrier.
The reward is 0 if we transition to a non-terminal
state or to terminal state 0, and the reward is 1
if we transition to terminal state self.barrier
'''
barrier: int
p: float
def __init__(
self,
barrier: int,
p: float
):
self.barrier = barrier
self.p = p
super().__init__(self.get_transition_map())
def get_transition_map(self) -> \
Mapping[int, Categorical[Tuple[int, float]]]:
d: Dict[int, Categorical[Tuple[int, float]]] = {
i: Categorical({
(i + 1, 0. if i < self.barrier - 1 else 1.): self.p,
(i - 1, 0.): 1 - self.p
}) for i in range(1, self.barrier)
}
return d
```
```python=
def compare_td_and_mc(
fmrp: FiniteMarkovRewardProcess[S],
gamma: float,
mc_episode_length_tol: float,
num_episodes: int,
learning_rates: Sequence[Tuple[float, float, float]],
initial_vf_dict: Mapping[NonTerminal[S], float],
plot_batch: int,
plot_start: int
) -> None:
true_vf: np.ndarray = fmrp.get_value_function_vec(gamma)
states: Sequence[NonTerminal[S]] = fmrp.non_terminal_states
colors: Sequence[str] = ['r', 'y', 'm', 'g', 'c', 'k', 'b']
import matplotlib.pyplot as plt
plt.figure(figsize=(11, 7))
for k, (init_lr, half_life, exponent) in enumerate(learning_rates):
mc_funcs_it: Iterator[ValueFunctionApprox[S]] = \
mc_finite_prediction_learning_rate(
fmrp=fmrp,
gamma=gamma,
episode_length_tolerance=mc_episode_length_tol,
initial_learning_rate=init_lr,
half_life=half_life,
exponent=exponent,
initial_vf_dict=initial_vf_dict
)
mc_errors = []
batch_mc_errs = []
for i, mc_f in enumerate(itertools.islice(mc_funcs_it, num_episodes)):
batch_mc_errs.append(sqrt(sum(
(mc_f(s) - true_vf[j]) ** 2 for j, s in enumerate(states)
) / len(states)))
if i % plot_batch == plot_batch - 1:
mc_errors.append(sum(batch_mc_errs) / plot_batch)
batch_mc_errs = []
mc_plot = mc_errors[plot_start:]
label = f"MC InitRate={init_lr:.3f},HalfLife" + \
f"={half_life:.0f},Exp={exponent:.1f}"
plt.plot(
range(len(mc_plot)),
mc_plot,
color=colors[k],
linestyle='-',
label=label
)
sample_episodes: int = 1000
td_episode_length: int = int(round(sum(
len(list(returns(
trace=fmrp.simulate_reward(Choose(states)),
γ=gamma,
tolerance=mc_episode_length_tol
))) for _ in range(sample_episodes)
) / sample_episodes))
for k, (init_lr, half_life, exponent) in enumerate(learning_rates):
td_funcs_it: Iterator[ValueFunctionApprox[S]] = \
td_finite_prediction_learning_rate(
fmrp=fmrp,
gamma=gamma,
episode_length=td_episode_length,
initial_learning_rate=init_lr,
half_life=half_life,
exponent=exponent,
initial_vf_dict=initial_vf_dict
)
td_errors = []
transitions_batch = plot_batch * td_episode_length
batch_td_errs = []
for i, td_f in enumerate(
itertools.islice(td_funcs_it, num_episodes * td_episode_length)
):
batch_td_errs.append(sqrt(sum(
(td_f(s) - true_vf[j]) ** 2 for j, s in enumerate(states)
) / len(states)))
if i % transitions_batch == transitions_batch - 1:
td_errors.append(sum(batch_td_errs) / transitions_batch)
batch_td_errs = []
td_plot = td_errors[plot_start:]
label = f"TD InitRate={init_lr:.3f},HalfLife" + \
f"={half_life:.0f},Exp={exponent:.1f}"
plt.plot(
range(len(td_plot)),
td_plot,
color=colors[k],
linestyle='--',
label=label
)
plt.xlabel("Episode Batches", fontsize=20)
plt.ylabel("Value Function RMSE", fontsize=20)
plt.title(
"RMSE of MC and TD as function of episode batches",
fontsize=25
)
plt.grid(True)
plt.legend(fontsize=10)
plt.show()
```

#### Observations:
* MC exhibits higher variance in RMSE.
* Small learning rate $\alpha$ leads to slower RMSE reduction for both methods.
* MC initially progresses faster but then stagnates.
* TD typically reaches lower RMSE faster than MC with a constant learning rate.
### Example: Fixed-Data Experience Replay
* **Experience Replay:** The agent repeatedly learns from a fixed set of experiences.
* **MC:** Estimates the value function as the average of the returns observed in the fixed dataset.
* **TD:** Converges to the value function of the Markov Reward Process (MRP) implied by the fixed dataset, essentially learning the maximum likelihood estimate (MLE) of the transition probabilities.
```python=
given_data: Sequence[Sequence[Tuple[str, float]]] = [
[('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
[('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
[('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
[('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
[('B', 8.), ('B', 2.)]
]
```
```python=
def get_fixed_episodes_from_sr_pairs_seq(
sr_pairs_seq: Sequence[Sequence[Tuple[S, float]]],
terminal_state: S
) -> Sequence[Sequence[TransitionStep[S]]]:
return [[TransitionStep(
state=NonTerminal(s),
reward=r,
next_state=NonTerminal(trace[i+1][0])
if i < len(trace) - 1 else Terminal(terminal_state)
) for i, (s, r) in enumerate(trace)] for trace in sr_pairs_seq]
def get_episodes_stream(
fixed_episodes: Sequence[Sequence[TransitionStep[S]]]
) -> Iterator[Sequence[TransitionStep[S]]]:
num_episodes: int = len(fixed_episodes)
while True:
yield fixed_episodes[np.random.randint(num_episodes)]
def fixed_experiences_from_fixed_episodes(
fixed_episodes: Sequence[Sequence[TransitionStep[S]]]
) -> Sequence[TransitionStep[S]]:
return list(itertools.chain.from_iterable(fixed_episodes))
```
#### Mean Return
```python=
from rl.returns import returns
from rl.markov_process import ReturnStep
def get_return_steps_from_fixed_episodes(
fixed_episodes: Sequence[Sequence[TransitionStep[S]]],
gamma: float
) -> Sequence[ReturnStep[S]]:
return list(itertools.chain.from_iterable(returns(episode, gamma, 1e-8)
for episode in fixed_episodes))
def get_mean_returns_from_return_steps(
returns_seq: Sequence[ReturnStep[S]]
) -> Mapping[NonTerminal[S], float]:
def by_state(ret: ReturnStep[S]) -> S:
return ret.state.state
sorted_returns_seq: Sequence[ReturnStep[S]] = sorted(
returns_seq,
key=by_state
)
return {NonTerminal(s): np.mean([r.return_ for r in l])
for s, l in itertools.groupby(
sorted_returns_seq,
key=by_state
)}
```
```python=
from pprint import pprint
gamma: float = 0.9
fixed_episodes: Sequence[Sequence[TransitionStep[str]]] = \
get_fixed_episodes_from_sr_pairs_seq(
sr_pairs_seq=given_data,
terminal_state=’T’
)
returns_seq: Sequence[ReturnStep[str]] = \
get_return_steps_from_fixed_episodes(
fixed_episodes=fixed_episodes,
gamma=gamma
)
mean_returns: Mapping[NonTerminal[str], float] = \
get_mean_returns_from_return_steps(returns_seq)
pprint(mean_returns)
```
Output
```python
{NonTerminal(state=’B’): 5.190378571428572,
NonTerminal(state=’A’): 8.261809999999999}
```
#### MC
Run MC prediction with experience-replayed $100,000$ trace experiences with equal weighting for each of the (state, return) pairs, i.e. $n = \frac{1}{n}$
```python=
import rl.monte_carlo as mc
import rl.iterate as iterate
def mc_prediction(
episodes_stream: Iterator[Sequence[TransitionStep[S]]],
gamma: float,
num_episodes: int
) -> Mapping[NonTerminal[S], float]:
return iterate.last(itertools.islice(
mc.mc_prediction(
traces=episodes_stream,
approx_0=Tabular(),
gamma=gamma,
episode_length_tolerance=1e-10
),
num_episodes
)).values_map
num_mc_episodes: int = 100000
episodes: Iterator[Sequence[TransitionStep[str]]] = \
get_episodes_stream(fixed_episodes)
mc_pred: Mapping[NonTerminal[str], float] = mc_prediction(
episodes_stream=episodes,
gamma=gamma,
num_episodes=num_mc_episodes
)
pprint(mc_pred)
```
Output
```python
{NonTerminal(state=’A’): 8.262643843836214,
NonTerminal(state=’B’): 5.191276907315868}
```
#### TD
Run TD Prediction on experience-replayed $1,000,000$ atomic experiences with a learning rate schedule having an initial learning rate of $0.01$, decaying with a half life of $10000$, and with an exponent of $0.5$.
```python=
import rl.td as td
from rl.function_approx import learning_rate_schedule, Tabular
def td_prediction(
experiences_stream: Iterator[TransitionStep[S]],
gamma: float,
num_experiences: int
) -> Mapping[NonTerminal[S], float]:
return iterate.last(itertools.islice(
td.td_prediction(
transitions=experiences_stream,
approx_0=Tabular(count_to_weight_func=learning_rate_schedule(
initial_learning_rate=0.01,
half_life=10000,
exponent=0.5
)),
gamma=gamma
),
num_experiences
)).values_map
num_td_experiences: int = 1000000
fixed_experiences: Sequence[TransitionStep[str]] = \
fixed_experiences_from_fixed_episodes(fixed_episodes)
experiences: Iterator[TransitionStep[str]] = \
get_experiences_stream(fixed_experiences)
td_pred: Mapping[NonTerminal[str], float] = td_prediction(
experiences_stream=experiences,
gamma=gamma,
num_experiences=num_td_experiences
)
pprint(td_pred)
```
Output
```python=
{NonTerminal(state=’A’): 9.899838136517303,
NonTerminal(state=’B’): 7.444114569419306}
```
#### DP
```python=
from rl.distribution import Categorical
from rl.markov_process import FiniteMarkovRewardProcess
def finite_mrp(
fixed_experiences: Sequence[TransitionStep[S]]
) -> FiniteMarkovRewardProcess[S]:
def by_state(tr: TransitionStep[S]) -> S:
return tr.state.state
d: Mapping[S, Sequence[Tuple[S, float]]] = \
{s: [(t.next_state.state, t.reward) for t in l] for s, l in
itertools.groupby(
sorted(fixed_experiences, key=by_state),
key=by_state
)}
mrp: Dict[S, Categorical[Tuple[S, float]]] = \
{s: Categorical({x: y / len(l) for x, y in
collections.Counter(l).items()})
for s, l in d.items()}
return FiniteMarkovRewardProcess(mrp)
```
```python=
fmrp: FiniteMarkovRewardProcess[str] = finite_mrp(fixed_experiences)
fmrp.display_value_function(gamma)
```
Output
```python
{NonTerminal(state=’A’): 9.958, NonTerminal(state=’B’): 7.545}
```
## TD($\lambda$) Prediction
### Tabular n-Step Bootstrapping
* **Extension of TD:** Tabular TD prediction, which updates the value function $V(S_t)$ based on the next immediate reward and estimated value of the next state, can be extended to consider returns from multiple steps in the future.
* **n-Step Bootstrapped Return:** This approach uses the n-step return $$G_{t,n} = R_{t+1} + \gamma \cdot R_{t+2} + \gamma^2 \cdot R_{t+3} + \dots + \gamma^{n-1} \cdot R_{t+n} + \gamma^n \cdot V(S_{t+n})$$ as the target for updating $V(S_t)$.
* **Parameter Update (Function Approximation):** In function approximation settings, the parameters (w) are updated using the following rule: $$\Delta w = \alpha \cdot (G_{t,n} - V(S_t; w)) \cdot \nabla_w V(S_t; w)$$
* **Tuning Parameter $n$:** The parameter $n$ controls the degree of bootstrapping. When $n = 1$, we get the standard TD update, and for sufficiently large $n$, we approach Monte Carlo updates.
### $\lambda$-Return Prediction Algorithm
:::success
### Algorithm: TD($\lambda$) Prediction
**Input:**
* `pi`: Policy to be evaluated
* `episodes`: Set of episodes, each a sequence of (state, reward) pairs
* `gamma`: Discount factor ($0 < \gamma \leq 1$)
* `lambda`: Trace decay parameter ($0 \leq \lambda \leq 1$)
* `alpha`: Learning rate
* `V`: Initial value function (optional)
**Output:**
* `V`: Estimated value function
**Initialization:**
* `V(s) = 0` for all states `s` (or use the provided initial value function)
**Algorithm:**
1. **For each episode** in `episodes`:
* Initialize `E(s) = 0` for all states `s` (eligibility trace)
* Let `episode = [(S[0], R[1]), (S[1], R[2]), ..., (S[T-1], R[T])]` (sequence of (state, reward) pairs)
2. **For each step** `t` in `range(len(episode) - 1)`:
* `state = S[t]`
* `reward = R[t+1]`
* `next_state = S[t+1]`
* Calculate TD error:
```delta = reward + gamma * V(next_state) - V(state)```
* Update eligibility trace for the current state:
```E(state) += 1```
3. **Eligibility Trace and Value Function Update:**
* **For each state** `s`:
* Update value function:
```V(s) += alpha * delta * E(s)```
* Decay eligibility trace:
```E(s) *= gamma * lambda```
4. **Return** `V` (the estimated value function)
:::
#### Online vs. Offline Updates
* $\lambda = 0$: Reduces to TD(0), where updates are made online after each time step.
* $\lambda = 1$: Reduces to Monte Carlo, where updates are made offline at the end of each episode.
* $0 < \lambda < 1$: Allows for a blend of TD and MC updates, balancing bias and variance.
#### Implementation
```python=
import rl.markov_process as mp
import numpy as np
from rl.approximate_dynamic_programming import ValueFunctionApprox
def lambda_return_prediction(
traces: Iterable[Iterable[mp.TransitionStep[S]]],
approx_0: ValueFunctionApprox[S],
γ: float,
lambd: float
) -> Iterator[ValueFunctionApprox[S]]:
'''Value Function Prediction using the lambda-return method given a
sequence of traces.
Each value this function yields represents the approximated value
function for the MRP after an additional episode
Arguments:
traces -- a sequence of traces
approx_0 -- initial approximation of value function
γ -- discount rate (0 < γ ≤ 1)
lambd -- lambda parameter (0 <= lambd <= 1)
'''
func_approx: ValueFunctionApprox[S] = approx_0
yield func_approx
for trace in traces:
gp: List[float] = [1.]
lp: List[float] = [1.]
predictors: List[NonTerminal[S]] = []
partials: List[List[float]] = []
weights: List[List[float]] = []
trace_seq: Sequence[mp.TransitionStep[S]] = list(trace)
for t, tr in enumerate(trace_seq):
for i, partial in enumerate(partials):
partial.append(
partial[-1] +
gp[t - i] * (tr.reward - func_approx(tr.state)) +
(gp[t - i] * γ * extended_vf(func_approx, tr.next_state)
if t < len(trace_seq) - 1 else 0.)
)
weights[i].append(
weights[i][-1] * lambd if t < len(trace_seq)
else lp[t - i]
)
predictors.append(tr.state)
partials.append([tr.reward +
(γ * extended_vf(func_approx, tr.next_state)
if t < len(trace_seq) - 1 else 0.)])
weights.append([1. - (lambd if t < len(trace_seq) else 0.)])
gp.append(gp[-1] * γ)
lp.append(lp[-1] * lambd)
responses: Sequence[float] = [np.dot(p, w) for p, w in
zip(partials, weights)]
for p, r in zip(predictors, responses):
func_approx = func_approx.update([(p, r)])
yield func_approx
```
```python=
def plot_memory_function(theta: float, event_times: List[float]) -> None:
step: float = 0.01
x_vals: List[float] = [0.0]
y_vals: List[float] = [0.0]
for t in event_times:
rng: Sequence[int] = range(1, int(math.floor((t - x_vals[-1]) / step)))
x_vals += [x_vals[-1] + i * step for i in rng]
y_vals += [y_vals[-1] * theta ** (i * step) for i in rng]
x_vals.append(t)
y_vals.append(y_vals[-1] * theta ** (t - x_vals[-1]) + 1.0)
plt.plot(x_vals, y_vals)
plt.grid()
plt.xticks([0.0] + event_times)
plt.xlabel(”Event Timings”, fontsize=15)
plt.ylabel(”Memory Funtion Values”, fontsize=15)
plt.title(”Memory Function (Frequency and Recency)”, fontsize=25)
plt.show()
theta = 0.8
event_times = [2.0, 3.0, 4.0, 7.0, 9.0, 14.0, 15.0, 21.0]
plot_memory_function(theta, event_times)
```

### TD($\lambda$) vs. $\lambda$-Return
* TD($\lambda$) is an online algorithm, updating all states at each time step.
* Offline TD($\lambda$) (cumulative updates) is equivalent to offline λ-Return prediction.
* TD(0) with offline updates is equivalent to TD.
* Offline TD(1) is equivalent to Every-Visit MC.
**Theorem** Online TD($\lambda$) is equivalent to the offline λ-return algorithm. That is,
$$
\sum_{t=0}^{T-1}\alpha\cdot\delta_t\cdot E_t(s)=\sum_{t=0}^{T-1}\alpha\cdot(G_t^{(\lambda)}-V(S_t))\cdot\mathbb{I}_{S_t=s}, \quad \forall s\in\mathcal{N}
$$
**Proof**
Let's start by expanding the $\lambda$-return $G_t^{(\lambda)}$:
\begin{split}
G_t^{(\lambda)}
=& (1-\lambda)\cdot\lambda^0\cdot(R_{t+1}+\gamma\cdot V(S_{t+1})) +\\
& (1-\lambda)\cdot\lambda^1\cdot(R_{t+1}+\gamma\cdot R_{t+2}\cdot +\gamma^2\cdot V(S_{t+2})) +\\
& (1-\lambda)\cdot\lambda^2\cdot(R_{t+1}+\gamma\cdot R_{t+2}\cdot+\gamma^2\cdot R_{t}+\gamma^3\cdot V(S_{t+3})) + \dots\\
=&(\gamma\lambda)^0\cdot(R_{t+1}+\gamma\cdot V(S_{t+1})-\gamma\lambda\cdot V(S_{t+1})) +\\
&(\gamma\lambda)^1\cdot(R_{t+2}+\gamma\cdot V(S_{t+2})-\gamma\lambda\cdot V(S_{t+2})) +\\
&(\gamma\lambda)^2\cdot(R_{t+3}+\gamma\cdot V(S_{t+3})-\gamma\lambda\cdot V(S_{t+3}))+ \dots
\end{split}
Notice that most of the value function terms cancel out, leaving us with:
\begin{split}
G_t^{(\lambda)}-V(S_t) =&(\gamma\lambda)^0\cdot(R_{t+1}+\gamma\cdot V(S_{t+1})-V(S_t)) +\\
&(\gamma\lambda)^1\cdot(R_{t+2}+\gamma\cdot V(S_{t+2})-V(S_{t+1})) +\\
&(\gamma\lambda)^2\cdot(R_{t+3}+\gamma\cdot V(S_{t+3})-V(S_{t+2})) +\dots\\
=& \delta_t+\gamma\lambda\cdot\delta_{t+1}+(\gamma\lambda)^2\cdot\delta_{t+2}+\dots
\end{split}
Now, assume a specific non-terminal state $s$ appears at time steps $t_1, t_2, \dots, t_n$. We can then write:
\begin{split}
\sum_{t=0}^{T-1}\alpha\cdot(G_t^{(\lambda)}-V(S_t))\cdot\mathbb{I}_{S_t=s}&=\sum^n_{i=1}\alpha\cdot(G_{t_i}^{(\lambda)}-V(S_{t_i}))\\
&=\sum^n_{i=1}\alpha\cdot(\delta_{t_i}+\gamma\lambda\cdot\delta_{t_i+1}+(\gamma\lambda)^2\cdot\delta_{t_i+2}+\dots)\\
&=\sum_{t=0}^{T-1}\alpha\cdot\delta_t\cdot E_t(s) \quad \square
\end{split}
### TD($\lambda$) Update with Function Approximation
* **Eligibility Trace Update:** $$E_t = \gamma \lambda \cdot E_{t-1} + \nabla_w V(S_t; w)$$
* **Value Function Update:** $$\Delta w = \alpha \cdot \delta_t \cdot E_t$$ where $\delta_t$ is the TD error.
### Implementation
```python=
from rl.function_approx import Gradient
from rl.approximate_dynamic_programming import ValueFunctionApprox
def td_lambda_prediction(
traces: Iterable[Iterable[mp.TransitionStep[S]]],
approx_0: ValueFunctionApprox[S],
γ: float,
lambd: float
) -> Iterator[ValueFunctionApprox[S]]:
'''Evaluate an MRP using TD(lambda) using the given sequence of traces.
Each value this function yields represents the approximated value function
for the MRP after an additional transition within each trace
Arguments:
transitions -- a sequence of transitions from an MRP which don't
have to be in order or from the same simulation
approx_0 -- initial approximation of value function
γ -- discount rate (0 < γ ≤ 1)
lambd -- lambda parameter (0 <= lambd <= 1)
'''
func_approx: ValueFunctionApprox[S] = approx_0
yield func_approx
for trace in traces:
el_tr: Gradient[ValueFunctionApprox[S]] = Gradient(func_approx).zero()
for step in trace:
x: NonTerminal[S] = step.state
y: float = step.reward + γ * \
extended_vf(func_approx, step.next_state)
el_tr = el_tr * (γ * lambd) + func_approx.objective_gradient(
xy_vals_seq=[(x, y)],
obj_deriv_out_fun=lambda x1, y1: np.ones(len(x1))
)
func_approx = func_approx.update_with_gradient(
el_tr * (func_approx(x) - y)
)
yield func_approx
```
### Example: Simple Inventory MRP Finite model
```python=
import rl.iterate as iterate
import rl.td_lambda as td_lambda
import itertools
from pprint import pprint
from rl.chapter10.prediction_utils import fmrp_episodes_stream
from rl.function_approx import learning_rate_schedule
gamma: float = 0.9
episode_length: int = 100
initial_learning_rate: float = 0.03
half_life: float = 1000.0
exponent: float = 0.5
lambda_param = 0.3
episodes: Iterable[Iterable[TransitionStep[InventoryState]]] = \
fmrp_episodes_stream(si_mrp)
curtailed_episodes: Iterable[Iterable[TransitionStep[InventoryState]]] = \
(itertools.islice(episode, episode_length) for episode in episodes)
learning_rate_func: Callable[[int], float] = learning_rate_schedule(
initial_learning_rate=initial_learning_rate,
half_life=half_life,
exponent=exponent
)
td_lambda_vfs: Iterator[ValueFunctionApprox[InventoryState]] = \
td_lambda.td_lambda_prediction(
traces=curtailed_episodes,
approx_0=Tabular(count_to_weight_func=learning_rate_func),
gamma=gamma,
)
num_episodes = 60000
final_td_lambda_vf: ValueFunctionApprox[InventoryState] = \
iterate.last(itertools.islice(td_lambda_vfs, episode_length * num_episodes))
pprint({s: round(final_td_lambda_vf(s), 3) for s in si_mrp.non_terminal_states})
```
Output:
```python
{NonTerminal(state=InventoryState(on_hand=0, on_order=0)): -43.513,
NonTerminal(state=InventoryState(on_hand=0, on_order=1)): -37.878,
NonTerminal(state=InventoryState(on_hand=0, on_order=2)): -37.215,
NonTerminal(state=InventoryState(on_hand=1, on_order=0)): -38.948,
NonTerminal(state=InventoryState(on_hand=1, on_order=1)): -38.34,
NonTerminal(state=InventoryState(on_hand=2, on_order=0)): -39.45}
```
## References
- Chapter 11 of the [RLForFinanceBook](https://stanford.edu/~ashlearn/RLForFinanceBook/book.pdf)
- [RL for Prediction](https://github.com/coverdrive/technical-documents/blob/master/finance/cme241/Tour-RLPrediction.pdf) slides for CME 241: Foundations of Reinforcement Learning with Applications in Finance