# Dynamic Programming Algorithms **Dynamic Programming (DP)** provides a powerful set of techniques for solving complex problems by strategically breaking them down into simpler, overlapping subproblems. Within *Reinforcement Learning (RL)*, DP plays a central role in solving prediction and control problems in environments modeled as *Markov Decision Processes (MDPs)* and *Markov Reward Processes (MRPs)*. ## Key Concepts in DP for RL * **Prediction (Policy Evaluation):** Calculating the value function (expected future rewards) for a given policy within an MRP. This tells us how well the policy is expected to perform from different states. * **Control (Policy Optimization):** Discovering an optimal policy for an MDP – a policy that maximizes the expected reward from each possible state. * **Planning vs. Learning:** DP algorithms frequently assume we have a perfect model of the environment (transitions and rewards) for planning purposes. Learning-based RL methods, in contrast, focus on building these models from experience. ### The Bellman Equation: The Heart of DP The Bellman Equation expresses a fundamental recursive relationship between the value of a state and the values of its subsequent states. This equation lies at the core of many DP algorithms in RL. ### The Principle of Optimality An optimal policy has a special property: regardless of how we got to a particular state, the remaining decisions must form an optimal policy with respect to that state. This principle is what enables DP's subproblem decomposition. ## Fixed-Point Theory ### Fixed Points and Iterative Solutions * **Fixed Point:** A fixed point of a function $f$ is a value $x$ such that $f(x) = x$. Many problems can be formulated as finding fixed points. * **Iterative Methods:** If we can express our problem in terms of a fixed point equation, we often find solutions by repeated application of the function: $x_{i+1} = f(x_i)$. Convergence to the true fixed point depends on the properties of $f$. #### Example $f(x) = cos(x)$ * Fixed-Point: $x_{0} = \cos(x_0)$ * For any $x_{0}$, $\cos(\cos(\dots \cos(x_0) \dots))$ converges to fixed-point $x_{0}$. ``` python= import numpy as np import matplotlib.pyplot as plt iterations = 100 initial_guess = 1 iteration_results = [] def cos_function(x): return np.cos(x) #Define fixed point iteration function def fixed_point_iteration(func, x0, iterations=10): history = [x0] x = x0 for _ in range(iterations): x = func(x) history.append(x) return history for i in range(iterations): history = fixed_point_iteration(cos_function, initial_guess, i+1) iteration_results.append(history) #plot plt.figure(figsize=(10, 6)) for history in iteration_results: plt.plot(range(len(history)), history, marker='o', linestyle='-') plt.xlabel('Iteration') plt.ylabel('Value') plt.title('Fixed Point Iteration Process') plt.grid(True) plt.show() ``` ![image](https://hackmd.io/_uploads/Bk7C4GECp.png) ### The Banach Fixed-Point Theorem **Theorem** Let $\mathcal{X}$ be a non-empty set equipped with a complete metric $d : \mathcal{X} \times \mathcal{X} \rightarrow \mathbb{R}$. Let $f : \mathcal{X} \rightarrow \mathcal{X}$ be such that there exists a $L \in [0, 1)$ such that $d(f(x_1), f(x_2)) \leq L \cdot d(x_1, x_2)$ for all $x_1, x_2 \in \mathcal{X}$. Then, 1. There exists a **unique** fixed-point $x^* \in \mathcal{X}$, i.e. $x^* = f(x^*)$. 2. For any $x_0 \in \mathcal{X}$, and sequence $\{x_i\}_{i=0}^\infty$ defined as $x_{i+1} = f(x_i)$ for all $i = 0, 1, 2, \ldots$, $$\lim_{i \rightarrow \infty} x_i = x^*.$$ 3. $d(x^*, x_{i+1}) \leq \frac{L}{1-L} \cdot d(x_1, x_0)$. ## Policy Evaluation **Policy Evaluation** is a fundamental algorithm in RL that computes the value function (expected discounted future rewards) for a given policy $\pi$ within a finite MDP. It tells us how well we expect the policy to perform when starting from each state. ### Key Concepts * **MDP Terminology:** * A finite MDP has a set of states $\mathcal{S} = \{s_1, s_2, ..., s_n\}$, a subset of non-terminal states$\mathcal{N} = \{s_1, s_2, ..., s_m\}$. * A policy determines how the agent behaves at each state. * **Transition Probabilities with policy $\pi$:** $$\mathcal{P}^{\pi}_R: \mathcal{N} \times \mathcal{D} \times \mathcal{S} \rightarrow [0,1].$$ * **Reward with policy $\pi$:** Expected immediate reward when in state $s$ under policy $\pi$: $$\mathcal{P}^{\pi}: \mathcal{N} \times \mathcal{S} \rightarrow [0,1].$$ * **Value Function with policy $\pi$:** The value function maps states to their expected returns when following policy $\pi$: $$V^{\pi}: \mathcal{N} \rightarrow \mathbb{R}.$$ * **Discount Factor $\gamma$:** Controls the importance of future rewards $0 < \gamma < 1$. ### The Bellman Equation for Policy Evaluation The core of policy evaluation is the Bellman Expectation Equation for a given policy: $$ V^{\pi}(s) = R^{\pi}(s) + \gamma \sum_{s' \in S} P^{\pi}(s' | s) V^{\pi}(s'). $$ * **Direct Solution (Small MDPs):** $$V^{\pi} = (I_m - \gamma P^{\pi})^{-1} \cdot R^{\pi}.$$ * **Iterative Solution (More Common):** We'll focus on this approach. ### The Bellman Policy Operator * The Bellman Policy Operator $B^\pi : \mathbb{R}^m \rightarrow \mathbb{R}^m$ is defined as: $$B^\pi(V) = R^\pi + \gamma P^\pi \cdot V.$$ A value function $V$ is a fixed point of $B^\pi$ if and only if it satisfies the Bellman Equation. * **Contraction Property:** $B^\pi$ is a contraction mapping under the $L^\infty$ norm: \begin{split} d(X,Y) :=& \max_{s \in \mathcal{N}} |(B^\pi(X) - B^\pi(Y))(s)| \\ =& \gamma \cdot \max_{s \in \mathcal{N}} |(P^\pi \cdot (X - Y))(s)| \\ \leq& \gamma \cdot \max_{s \in \mathcal{N}} |(X - Y)(s)| = \gamma \cdot d(X,Y). \end{split} #### Convergence Theorem The Banach Fixed-Point Theorem combined with the contraction property gives us: **Theorem** (Policy Evaluation Convergence Theorem) For any finite MDP, if $V^\pi \in \mathbb{R}^m$ is the value function of the MDP evaluated with a fixed policy $\pi$, then $V^\pi$ is the *unique* fixed point of $B^\pi$, and $$ \lim_{i \rightarrow \infty} (B^\pi)^i(V_0) → V^\pi \quad \forall V_0 \in \mathbb{R}^m. $$ ### Policy Evaluation Algoithm :::success <font color=blue>Algoithm: Policy Evaluation </font> 1. Choose an initial value function $V_0 \in \mathbb{R}^m$ (often all zeros). 2. Update the value function via: $$V_{i+1} = B^\pi(V_i) = R^\pi + \gamma P^\pi V_i.$$ 3. Stop when the change in the value function, $d(V_i, V_{i+1})$, becomes sufficiently small. ::: #### Python Implementation ```python= # Policy Evaluation Simple code from typing import Mapping, Iterator from markov_process import FiniteMarkovRewardProcess from distribution import Categorical, Constant import numpy as np DEFAULT_TOLERANCE = 1e-5 V = Mapping[FiniteMarkovRewardProcess.NonTerminal[S], float] def evaluate_mrp( mrp: FiniteMarkovRewardProcess[S], gamma: float ) -> Iterator[np.ndarray]: def update(v: np.ndarray) -> np.ndarray: return mrp.reward_function_vec + gamma * \ mrp.get_transition_matrix().dot(v) v_0: np.ndarray = np.zeros(len(mrp.non_terminal_states)) return iterate(update, v_0) def almost_equal_np_arrays( v1: np.ndarray, v2: np.ndarray, tolerance: float = DEFAULT_TOLERANCE ) -> bool: return max(abs(v1 - v2)) < tolerance def evaluate_mrp_result( mrp: FiniteMarkovRewardProcess[S], gamma: float ) -> V[S]: v_star: np.ndarray = converged( evaluate_mrp(mrp, gamma=gamma), done=almost_equal_np_arrays ) return {s: v_star[i] for i, s in enumerate(mrp.non_terminal_states)} ``` ## Policy Iteration **Policy iteration** is a powerful algorithm for finding optimal policies in finite MDPs. It works by iterating between two phases: * **Policy Evaluation:** Compute the value function $V^\pi$ for the current policy $\pi$. * **Policy Improvement:** Create a new, improved policy $G(V^\pi)$ by acting greedily with respect to the value function. ### Greedy Policy A greedy policy with respect to a value function $V \in \mathbb{R}^m$ selects the action at each state that maximizes the expected sum of the immediate reward and the discounted value of the next state: $$ G(V)(s) = \pi'_{D}(s) = \arg\max_{a \in \mathcal{A}} \left\{\mathcal{R}(s, a) + \gamma \cdot \mathcal{P}(s,a,s’) \cdot V(s') \right\} \quad \forall s \in \mathcal{N}. \quad (*) $$ ```python= # Greedy Policy Simple Code import operator from typing import Dict, Iterator, Tuple def extended_vf(v: Dict[S, float], s: State[S]) -> float: def non_terminal_vf(st: NonTerminal[S], v=v) -> float: return v[st] return s.on_non_terminal(non_terminal_vf, 0.0) def greedy_policy_from_vf( mdp: FiniteMarkovDecisionProcess[S, A], vf: Dict[S, float], gamma: float ) -> FiniteDeterministicPolicy[S, A]: greedy_policy_dict: Dict[S, A] = {} for s in mdp.non_terminal_states: q_values: Iterator[Tuple[A, float]] = ( (a, mdp.mapping[s][a].expectation( lambda s_r: s_r[1] + gamma * extended_vf(vf, s_r[0]) )) for a in mdp.actions(s) ) greedy_policy_dict[s.state] = max(q_values, key=operator.itemgetter(1))[0] return FiniteDeterministicPolicy(greedy_policy_dict) ``` ### Policy Improvement Theorem For value functions $X, Y : \mathcal{N} \rightarrow \mathbb{R}$, we define $X \geq Y$ if $X(s) \geq Y(s)$ for all $s \in \mathcal{N}$. A policy $\pi_1$ is said to be an improvement over another policy $\pi_2$ if $V^{\pi_1} \geq V^{\pi_2}$. **Theorem** For any policy $\pi$ in a finite MDP, we have $$ V^{\pi_D^\prime} = V^{G(V^\pi)} ≥ V^\pi. $$ **Proof** By policy evaluation with ${\pi_D^\prime} = {G(V^\pi)}$, we have: $$ \lim_{i \to \infty} (B^{\pi_D^\prime})^i(V^{\pi}) = V^{\pi_D^\prime}. $$ To establish the theorem, it suffices to show $$ (B^{\pi_D^\prime})^{i+1}(V^{\pi}) \geq (B^{\pi_D^\prime})^i(V^{\pi}) \quad \forall i = 0, 1, 2, \ldots $$ Let's prove this by induction: * **Base Case (i=0):** We aim to show $B^{\pi_D^\prime}(V^{\pi})(s) \geq V^{\pi}(s)$ for all state $s \in \mathcal{N}$. * From the definition of ${\pi_D^\prime}$ and $V^{\pi}$, $$B^{\pi_D^\prime}(V^{\pi})(s) = R(s, {\pi_D^\prime}(s)) + \gamma \sum_{s' \in N} P(s, {\pi_D^\prime}(s), s') \cdot V^{\pi}(s') \quad \forall s \in \mathcal{N}.$$ * For each $s \in \mathcal{N}$, $\pi_D^\prime(s) = G(V^{\pi})(s)$ is the action that maximizes $$R(s, a) + \gamma \sum_{s'\in N} P(s, a, s') \cdot V^{\pi}(s').$$ * Therefore, \begin{split} B^{\pi_D^\prime}(V^{\pi})(s) &= \max_{a \in \mathcal{A}} \{ R(s,a) + \gamma \sum_{s' \in \mathcal{N}} P(s, a, s')V^{\pi}(s') \} = \max_{a \in \mathcal{A}} Q^{\pi}(s, a) \\ &\geq V^{\pi}(s) = \sum_{a \in \mathcal{A}} \pi(s, a) \cdot Q^{\pi}(s, a) \quad \forall s \in \mathcal{N} \end{split} * **Inductive Step:** Assuming $(B^{\pi_D^\prime})^{i+1}(V^{\pi}) \geq (B^{\pi_D^\prime})^i(V^{\pi})$, we must show $(B^{\pi_D^\prime})^{i+2}(V^{\pi}) \geq (B^{\pi_D^\prime})^{i+1}(V^{\pi})$. * Using relations \begin{split} (B^{\pi'_D})^{i+2}(V^{\pi})(s) &= R(s, \pi'_D(s)) + \gamma \sum_{s' \in N} P(s, \pi'_D(s), s') \cdot (B^{\pi'_D})^{i+1}(V^{\pi})(s') \\ (B^{\pi'_D})^{i+1}(V^{\pi})(s) &= R(s, \pi'_D(s)) + \gamma \sum_{s' \in N} P(s, \pi'_D(s), s') \cdot (B^{\pi'_D})^i(V^{\pi})(s') \quad \forall s \in \mathcal{N}, \end{split} * We obtain \begin{split} &(B^{\pi'_D})^{i+2}(V^{\pi})(s) - (B^{\pi'_D})(V^{\pi})^{i+1}(s) \\ =& \gamma \sum_{s' \in N} P(s, \pi'_D(s), s') \cdot ((B^{\pi'_D})^{i+1}(V^{\pi})(s') - (B^{\pi'_D})^i(V^{\pi})(s')) \geq 0 \quad \forall s \in \mathcal{N} \end{split} by induction. $\Box$ ### Policy Iteration Algoithm :::success <font color=blue>Algoithm: Policy Iteration </font> 1. Choose an arbitrary initial policy $V_0 \in \mathbb{R}^m$. 2. For each iteration $j = 0, 1, 2, \dots$: * Update the policy by applying the greedy policy $\pi_{j+1} = G(V_j)$. * Compute $V_{j+1}$ for the current policy $\pi_{j+1}$ via $$V_{j+1} = \lim_{{i \to \infty}} (B^{\pi_{j+1}})^i(V_j).$$ 3. Stop when the change in the value function, $d(V_i, V_{i+1})$, becomes sufficiently small. ::: #### Convergence Guarantee **Theorem** (Policy Iteration Convergence Theorem) Policy iteration converges to the optimal policy $\pi^*$ and the optimal value function $V^*$ for any finite MDP. #### Python Implementation ```python= # Policy Iteration Simple code DEFAULT_TOLERANCE = 1e-5 def policy_iteration( mdp: FiniteMarkovDecisionProcess[S, A], gamma: float, matrix_method_for_mrp_eval: bool = False ) -> Iterator[Tuple[V[S], FinitePolicy[S, A]]]: def update(vf_policy: Tuple[V[S], FinitePolicy[S, A]]) \ -> Tuple[V[S], FiniteDeterministicPolicy[S, A]]: vf, pi = vf_policy mrp: FiniteMarkovRewardProcess[S] = mdp.apply_finite_policy(pi) policy_vf: V[S] = { mrp.non_terminal_states[i]: v for i, v in enumerate(mrp.get_value_function_vec(gamma)) } if matrix_method_for_mrp_eval else evaluate_mrp_result(mrp, gamma) improved_pi: FiniteDeterministicPolicy[S, A] = greedy_policy_from_vf( mdp, policy_vf, gamma ) return policy_vf, improved_pi v_0: V[S] = {s: 0.0 for s in mdp.non_terminal_states} pi_0: FinitePolicy[S, A] = FinitePolicy( {s.state: Choose(mdp.actions(s)) for s in mdp.non_terminal_states} ) return iterate(update, (v_0, pi_0)) def almost_equal_vf_pis( x1: Tuple[V[S], FinitePolicy[S, A]], x2: Tuple[V[S], FinitePolicy[S, A]] ) -> bool: return max( abs(x1[0][s] - x2[0][s]) for s in x1[0] ) < DEFAULT_TOLERANCE def policy_iteration_result( mdp: FiniteMarkovDecisionProcess[S, A], gamma: float, ) -> Tuple[V[S], FiniteDeterministicPolicy[S, A]]: return converged(policy_iteration(mdp, gamma), done=almost_equal_vf_pis) ``` ## Value Iteration **Value iteration** is a core dynamic programming algorithm in RL used to discover the optimal value function $V^*$ and, consequently, the optimal policy $\pi^*$ for a finite MDP. It works by repeatedly applying the *Bellman Optimality Operator* to refine value function estimates. ### Key Ideas * **The Bellman Optimality Equation:** The optimal value function satisfies: $$ V^*(s) = \max_{a \in \mathcal{A}} \left\{ \mathcal{R}(s, a) + \gamma \sum_{s' \in \mathcal{N}} \mathcal{P}(s, a, s') \cdot V^*(s') \right\}. $$ * **The Bellman Optimality Operator $B^*$:** We can concisely express the Bellman Optimality Equation using this operator: $$ B^*(V)(s) = \max_{a \in \mathcal{A}} \left\{ \mathcal{R}(s, a) + \gamma \sum_{s' \in \mathcal{N}} \mathcal{P}(s, a, s') \cdot V(s') \right\}. $$ * The optimal value function V is a fixed point of $B^*$: $$V^* = B^*(V^*).$$ #### Properties - **Monotonicity**: If $X \geq Y$, then $B^*(X) \ge B^*(Y)$. - **Constant Shift**: $B^*(X + c) = B^*(X) + \gamma c$. - **Contraction:** $d(B^*(X), B^*(Y)) \leq \gamma \cdot d(X,Y)$. #### Convergence The contraction property of $B^*$ guarantees that value iteration converges to the unique optimal value function $V^*$, regardless of the initial $V_0 \in \mathbb{R}^m$. **Theorem** (Value Iteration Convergence Theorem) For a Finite MDP, if $V^* \in \mathbb{R}^m$ is the optimal value function, then $V^*$ is the *unique* fixed-point of $B^*$, and $$ \lim_{i \rightarrow \infty} (B^*)^i(V_0) \rightarrow V^* \quad \forall V_0 \in \mathbb{R}^m. $$ ### Value Iteration Algorithm :::success <font color=blue>Algoithm: Value Iteration </font> 1. Start with an arbitrary value function $V_0 \in \mathbb{R}^m$ (e.g., all zeros). 2. For each iteration $i = 0, 1, 2, \ldots$: * Update the value function via: $$ V_{i+1}(s) = B^*(V_i)(s) = \max_{a \in \mathcal{A}} \left\{ R(s, a) + \gamma \sum_{s' \in S} P(s,a,s') V_i(s') \right\}.$$ 3. Stop when $d(V_i, V_{i+1})$ is small enough. ::: ### Getting the Optimal Policy Once $V^*$ is found, the optimal policy $\pi^*$ is the greedy policy with respect to $V$: $$ \pi^*(s) = G(V^*) = \arg\max_{a \in \mathcal{A}} \left\{ R(s, a) + \gamma \sum_{s' \in S} P(s,a,s') V^*(s') \right\}. $$ #### Python Implementation ```python= # Value Iteration Simple Code DEFAULT_TOLERANCE = 1e-5 def value_iteration( mdp: FiniteMarkovDecisionProcess[S, A], gamma: float ) -> Iterator[V[S]]: def update(v: V[S]) -> V[S]: return { s: max( mdp.mapping[s][a].expectation( lambda s_r: s_r[1] + gamma * extended_vf(v, s_r[0]) ) for a in mdp.actions(s) ) for s in v } v_0: V[S] = {s: 0.0 for s in mdp.non_terminal_states} return iterate(update, v_0) def almost_equal_vfs( v1: V[S], v2: V[S], tolerance: float = DEFAULT_TOLERANCE ) -> bool: return max(abs(v1[s] - v2[s]) for s in v1) < tolerance def value_iteration_result( mdp: FiniteMarkovDecisionProcess[S, A], gamma: float ) -> Tuple[V[S], FiniteDeterministicPolicy[S, A]]: opt_vf: V[S] = converged( value_iteration(mdp, gamma), done=almost_equal_vfs ) opt_policy: FiniteDeterministicPolicy[S, A] = greedy_policy_from_vf( mdp, opt_vf, gamma ) return opt_vf, opt_policy ``` ## Example: Simple Inventory We explore a simple inventory scenario from the MDP notes, solving for the value function using the contrasting methods of policy iteration and value iteration. ```python= from dataclasses import dataclass from typing import Tuple, Dict, Mapping from markov_decision_process import FiniteMarkovDecisionProcess from policy import FiniteDeterministicPolicy from markov_process import FiniteMarkovProcess, FiniteMarkovRewardProcess from distribution import Categorical from scipy.stats import poisson @dataclass(frozen=True) class InventoryState: on_hand: int on_order: int def inventory_position(self) -> int: return self.on_hand + self.on_order InvOrderMapping = Mapping[ InventoryState, Mapping[int, Categorical[Tuple[InventoryState, float]]] ] class SimpleInventoryMDPCap(FiniteMarkovDecisionProcess[InventoryState, int]): def __init__( self, capacity: int, poisson_lambda: float, holding_cost: float, stockout_cost: float ): self.capacity: int = capacity self.poisson_lambda: float = poisson_lambda self.holding_cost: float = holding_cost self.stockout_cost: float = stockout_cost self.poisson_distr = poisson(poisson_lambda) super().__init__(self.get_action_transition_reward_map()) def get_action_transition_reward_map(self) -> InvOrderMapping: d: Dict[InventoryState, Dict[int, Categorical[Tuple[InventoryState, float]]]] = {} for alpha in range(self.capacity + 1): for beta in range(self.capacity + 1 - alpha): state: InventoryState = InventoryState(alpha, beta) ip: int = state.inventory_position() base_reward: float = - self.holding_cost * alpha d1: Dict[int, Categorical[Tuple[InventoryState, float]]] = {} for order in range(self.capacity - ip + 1): sr_probs_dict: Dict[Tuple[InventoryState, float], float] =\ {(InventoryState(ip - i, order), base_reward): self.poisson_distr.pmf(i) for i in range(ip)} probability: float = 1 - self.poisson_distr.cdf(ip - 1) reward: float = base_reward - self.stockout_cost * \ (self.poisson_lambda - ip * (1 - self.poisson_distr.pmf(ip) / probability)) sr_probs_dict[(InventoryState(0, order), reward)] = \ probability d1[order] = Categorical(sr_probs_dict) d[state] = d1 return d if __name__ == '__main__': from pprint import pprint user_capacity = 2 user_poisson_lambda = 1.0 user_holding_cost = 1.0 user_stockout_cost = 10.0 user_gamma = 0.9 si_mdp: FiniteMarkovDecisionProcess[InventoryState, int] =\ SimpleInventoryMDPCap( capacity=user_capacity, poisson_lambda=user_poisson_lambda, holding_cost=user_holding_cost, stockout_cost=user_stockout_cost ) print("MDP Transition Map") print("------------------") print(si_mdp) fdp: FiniteDeterministicPolicy[InventoryState, int] = \ FiniteDeterministicPolicy( {InventoryState(alpha, beta): user_capacity - (alpha + beta) for alpha in range(user_capacity + 1) for beta in range(user_capacity + 1 - alpha)} ) print("Deterministic Policy Map") print("------------------------") print(fdp) implied_mrp: FiniteMarkovRewardProcess[InventoryState] =\ si_mdp.apply_finite_policy(fdp) print("Implied MP Transition Map") print("--------------") print(FiniteMarkovProcess( {s.state: Categorical({s1.state: p for s1, p in v.table().items()}) for s, v in implied_mrp.transition_map.items()} )) print("Implied MRP Transition Reward Map") print("---------------------") print(implied_mrp) print("Implied MP Stationary Distribution") print("-----------------------") implied_mrp.display_stationary_distribution() print() print("Implied MRP Reward Function") print("---------------") implied_mrp.display_reward_function() print() print("Implied MRP Value Function") print("--------------") implied_mrp.display_value_function(gamma=user_gamma) print() from dynamic_programming import evaluate_mrp_result from dynamic_programming import policy_iteration_result from dynamic_programming import value_iteration_result print("Implied MRP Policy Evaluation Value Function") print("--------------") pprint(evaluate_mrp_result(implied_mrp, gamma=user_gamma)) print() print("MDP Policy Iteration Optimal Value Function and Optimal Policy") print("--------------") opt_vf_pi, opt_policy_pi = policy_iteration_result( si_mdp, gamma=user_gamma ) pprint(opt_vf_pi) print(opt_policy_pi) print() print("MDP Value Iteration Optimal Value Function and Optimal Policy") print("--------------") opt_vf_vi, opt_policy_vi = value_iteration_result(si_mdp, gamma=user_gamma) pprint(opt_vf_vi) print(opt_policy_vi) print() ``` **Some important result:** ```python MDP Policy Iteration Optimal Value Function and Optimal Policy -------------- {NonTerminal(state=InventoryState(on_hand=0, on_order=0)): -43.59563313047815, NonTerminal(state=InventoryState(on_hand=0, on_order=1)): -37.97111179441265, NonTerminal(state=InventoryState(on_hand=0, on_order=2)): -37.3284904356655, NonTerminal(state=InventoryState(on_hand=1, on_order=0)): -38.97111179441265, NonTerminal(state=InventoryState(on_hand=1, on_order=1)): -38.3284904356655, NonTerminal(state=InventoryState(on_hand=2, on_order=0)): -39.3284904356655} For State InventoryState(on_hand=0, on_order=0): Do Action 2 For State InventoryState(on_hand=0, on_order=1): Do Action 1 For State InventoryState(on_hand=0, on_order=2): Do Action 0 For State InventoryState(on_hand=1, on_order=0): Do Action 1 For State InventoryState(on_hand=1, on_order=1): Do Action 0 For State InventoryState(on_hand=2, on_order=0): Do Action 0 MDP Value Iteration Optimal Value Function and Optimal Policy -------------- {NonTerminal(state=InventoryState(on_hand=0, on_order=0)): -43.59563313047815, NonTerminal(state=InventoryState(on_hand=0, on_order=1)): -37.97111179441265, NonTerminal(state=InventoryState(on_hand=0, on_order=2)): -37.3284904356655, NonTerminal(state=InventoryState(on_hand=1, on_order=0)): -38.97111179441265, NonTerminal(state=InventoryState(on_hand=1, on_order=1)): -38.3284904356655, NonTerminal(state=InventoryState(on_hand=2, on_order=0)): -39.3284904356655} For State InventoryState(on_hand=0, on_order=0): Do Action 2 For State InventoryState(on_hand=0, on_order=1): Do Action 1 For State InventoryState(on_hand=0, on_order=2): Do Action 0 For State InventoryState(on_hand=1, on_order=0): Do Action 1 For State InventoryState(on_hand=1, on_order=1): Do Action 0 For State InventoryState(on_hand=2, on_order=0): Do Action 0 ``` ## Generalized Policy Iteration **Generalized Policy Iteration (GPI)** is a powerful conceptual framework that underpins many reinforcement learning algorithms. It describes the interplay between two core processes: * **Policy Evaluation:** Computing the value function ($V^\pi$ or $Q^\pi$) for a given policy $\pi$. * **Policy Improvement:** Generating a new, improved policy by acting greedily with respect to the current value function. ### GPI as a Spectrum GPI encompasses a range of methods, from classic policy iteration to more flexible and practical algorithms: * **Classic Policy Iteration:** * Fully evaluates the value function of each policy to convergence. * Performs complete policy improvement steps (making the policy fully greedy). \begin{split} \pi_1 &= G(V_0) : V_0 \rightarrow B^{\pi_1}(V_0) \rightarrow &&\ldots \rightarrow (B^{\pi_1})^j(V_0) \rightarrow \ldots \rightarrow V^{\pi_1} = V_1 \\ \pi_2 &= G(V_1) : V_1 \rightarrow B^{\pi_2}(V_1) \rightarrow &&\ldots \rightarrow (B^{\pi_2})^j(V_1) \rightarrow \ldots \rightarrow V^{\pi_2} = V_2 \\ & &&\ldots \\ \pi_{j+1} &= G(V_j) : V_j \rightarrow B^{\pi_{j+1}}(V_j) \rightarrow &&\ldots \rightarrow (B^{\pi_{j+1}})^j(V_j) \rightarrow \ldots \rightarrow V^{\pi_{j+1}} = V^* \end{split} ![Screenshot 2024-03-19 at 12.13.23 PM](https://hackmd.io/_uploads/Hky1M580a.png) * **Flexible GPI:** * Updates values and policies incrementally, potentially focusing on subsets of states or using function approximation. * Seeks to balance theoretical guarantees with the computational realities of large-scale problems. \begin{split} \pi_1 = G(V_0) : V_0 &\rightarrow B^{\pi_1}(V_0) = V_1 \\ \pi_2 = G(V_1) : V_1 &\rightarrow B^{\pi_2}(V_1) = V_2 \\ &\ldots \\ \pi_{j+1} = G(V_j) : V_j &\rightarrow B^{\pi_{j+1}}(V_j) = V^* \end{split} ![Screenshot 2024-03-19 at 12.23.59 PM](https://hackmd.io/_uploads/B1hu7cUCa.png) #### Key Ideas * **Interleaving:** Policy evaluation and improvement steps in GPI can be done with varying granularity (from full sweeps to single-state updates). * **Approximation:** When state spaces are very large, function approximators (e.g., neural networks) are often used to represent value functions or policies. * **Convergence:** GPI methods offer varying convergence guarantees depending on the specific update scheme and approximations used. ## Finite-Horizon Markov Decision Processes **Finite-horizon MDPs** are a type of MDP specifically designed for problems where decisions have a clear end-point or a fixed time horizon. This makes them well-suited for: * **Planning under time constraints:** Finding optimal actions when there's a set deadline. * **Episodic Tasks:** Modeling problems with natural breaks or resets (e.g., games with multiple rounds) * **Goal-Oriented Scenarios:** When the objective is to reach a specific target state within a limited number of steps. ### Key Characteristics * **Time-Dependent States:** States may change over time (e.g., $\mathcal{S}_0, \mathcal{S}_1, \dots, \mathcal{S}_{T-1}$). * **Time-Dependent Actions and Rewards:** The available actions $\mathcal{A}_t$ and their potential rewards $\mathcal{D}_t$ can also vary depending on the current time step. * **Terminating States:** A set of terminal states $\mathcal{T}_t$ where the episode ends, with no subsequent transitions. ### Value Functions and Bellman Equations The goal in a finite-horizon MDP is to find the optimal policy that maximizes expected rewards over the fixed horizon. This involves: * **Time-dependent Value Function $V^\pi_t$:** Represents the expected total rewards from time t onwards, given policy $\pi$. * **Bellman Equation:** $$V^{\pi}_t(s_t) = \sum_{s_{t+1} \in \mathcal{S}_{t+1}} \sum_{r_{t+1} \in \mathcal{D}_{t+1}} (\mathcal{P}^{\pi_{t}}_{R})_t(s_t, r_{t+1}, s_{t+1}) \cdot (r_{t+1}+ \gamma V^\pi_{t+1}(s_{t+1})).$$ * **Bellman Optimality Equation:** $$V^*_t(s_t) = \max_{a_t\in\mathcal{A}_t} \{ \sum_{s_{t+1} \in \mathcal{S}_{t+1}} \sum_{r_{t+1} \in \mathcal{D}_{t+1}} (\mathcal{P}_{R})_t(s_t, a_t, r_{t+1}, s_{t+1}) \cdot (r_{t+1} + \gamma V^*_{t+1}(s_{t+1})) \}. \quad (\star)$$ ``` python= # finite_horizon_MDP Partial Code from __future__ import annotations from itertools import groupby import dataclasses from dataclasses import dataclass from operator import itemgetter from typing import Dict, List, Generic, Sequence, Tuple, TypeVar, Iterator from distribution import FiniteDistribution from dynamic_programming import V, extended_vf from markov_process import (FiniteMarkovRewardProcess, RewardTransition, StateReward, NonTerminal, Terminal, State) from markov_decision_process import ( ActionMapping, FiniteMarkovDecisionProcess, StateActionMapping) from policy import FiniteDeterministicPolicy S = TypeVar('S') @dataclass(frozen=True) class WithTime(Generic[S]): ''' A wrapper that augments a state of type S with a time field. ''' state: S time: int = 0 def step_time(self) -> WithTime[S]: return dataclasses.replace(self, time=self.time + 1) RewardOutcome = FiniteDistribution[Tuple[WithTime[S], float]] # Finite-horizon Markov reward processes def finite_horizon_MRP( process: FiniteMarkovRewardProcess[S], limit: int ) -> FiniteMarkovRewardProcess[WithTime[S]]: '''Turn a normal FiniteMarkovRewardProcess into one with a finite horizon that stops after 'limit' steps. Note that this makes the data representation of the process larger, since we end up having distinct sets and transitions for every single time step up to the limit. ''' transition_map: Dict[WithTime[S], RewardOutcome] = {} # Non-terminal states for time in range(limit): for s in process.non_terminal_states: result: StateReward[S] = process.transition_reward(s) s_time = WithTime(state=s.state, time=time) transition_map[s_time] = result.map( lambda sr: (WithTime(state=sr[0].state, time=time + 1), sr[1]) ) return FiniteMarkovRewardProcess(transition_map) # TODO: Better name... def unwrap_finite_horizon_MRP( process: FiniteMarkovRewardProcess[WithTime[S]] ) -> Sequence[RewardTransition[S]]: '''Given a finite-horizon process, break the transition between each time step (starting with 0) into its own data structure. This representation makes it easier to implement backwards induction. ''' def time(x: WithTime[S]) -> int: return x.time def single_without_time( s_r: Tuple[State[WithTime[S]], float] ) -> Tuple[State[S], float]: if isinstance(s_r[0], NonTerminal): ret: Tuple[State[S], float] = ( NonTerminal(s_r[0].state.state), s_r[1] ) else: ret = (Terminal(s_r[0].state.state), s_r[1]) return ret def without_time(arg: StateReward[WithTime[S]]) -> StateReward[S]: return arg.map(single_without_time) return [{NonTerminal(s.state): without_time( process.transition_reward(NonTerminal(s)) ) for s in states} for _, states in groupby( sorted( (nt.state for nt in process.non_terminal_states), key=time ), key=time )] def evaluate( steps: Sequence[RewardTransition[S]], gamma: float ) -> Iterator[V[S]]: '''Evaluate the given finite Markov reward process using backwards induction, given that the process stops after limit time steps. ''' v: List[V[S]] = [] for step in reversed(steps): v.append({s: res.expectation( lambda s_r: s_r[1] + gamma * ( extended_vf(v[-1], s_r[0]) if len(v) > 0 else 0. ) ) for s, res in step.items()}) return reversed(v) # Finite-horizon Markov decision processes A = TypeVar('A') def finite_horizon_MDP( process: FiniteMarkovDecisionProcess[S, A], limit: int ) -> FiniteMarkovDecisionProcess[WithTime[S], A]: '''Turn a normal FiniteMarkovDecisionProcess into one with a finite horizon that stops after 'limit' steps. Note that this makes the data representation of the process larger, since we end up having distinct sets and transitions for every single time step up to the limit. ''' mapping: Dict[WithTime[S], Dict[A, FiniteDistribution[ Tuple[WithTime[S], float]]]] = {} # Non-terminal states for time in range(0, limit): for s in process.non_terminal_states: s_time = WithTime(state=s.state, time=time) mapping[s_time] = {a: result.map( lambda sr: (WithTime(state=sr[0].state, time=time + 1), sr[1]) ) for a, result in process.mapping[s].items()} return FiniteMarkovDecisionProcess(mapping) def unwrap_finite_horizon_MDP( process: FiniteMarkovDecisionProcess[WithTime[S], A] ) -> Sequence[StateActionMapping[S, A]]: '''Unwrap a finite Markov decision process into a sequence of transitions between each time step (starting with 0). This representation makes it easier to implement backwards induction. ''' def time(x: WithTime[S]) -> int: return x.time def single_without_time( s_r: Tuple[State[WithTime[S]], float] ) -> Tuple[State[S], float]: if isinstance(s_r[0], NonTerminal): ret: Tuple[State[S], float] = ( NonTerminal(s_r[0].state.state), s_r[1] ) else: ret = (Terminal(s_r[0].state.state), s_r[1]) return ret def without_time(arg: ActionMapping[A, WithTime[S]]) -> \ ActionMapping[A, S]: return {a: sr_distr.map(single_without_time) for a, sr_distr in arg.items()} return [{NonTerminal(s.state): without_time( process.mapping[NonTerminal(s)] ) for s in states} for _, states in groupby( sorted( (nt.state for nt in process.non_terminal_states), key=time ), key=time )] def optimal_vf_and_policy( steps: Sequence[StateActionMapping[S, A]], gamma: float ) -> Iterator[Tuple[V[S], FiniteDeterministicPolicy[S, A]]]: '''Use backwards induction to find the optimal value function and optimal policy at each time step ''' v_p: List[Tuple[V[S], FiniteDeterministicPolicy[S, A]]] = [] for step in reversed(steps): this_v: Dict[NonTerminal[S], float] = {} this_a: Dict[S, A] = {} for s, actions_map in step.items(): action_values = ((res.expectation( lambda s_r: s_r[1] + gamma * ( extended_vf(v_p[-1][0], s_r[0]) if len(v_p) > 0 else 0. ) ), a) for a, res in actions_map.items()) v_star, a_star = max(action_values, key=itemgetter(0)) this_v[s] = v_star this_a[s.state] = a_star v_p.append((this_v, FiniteDeterministicPolicy(this_a))) return reversed(v_p) ``` ### Solving Finite-Horizon MDPs * Backward Induction: * Start at the final time step $T$. * Calculate the optimal value function $V_T$ using the Bellman Optimality Equation. * Work backwards, iteratively computing $V_{T-1}, V_{T-2}, \dots$ until $V_0$. * At each step, derive the optimal actions based on $V_{t+1}$. ``` python= # Backward Induction Partial Code def evaluate( steps: Sequence[RewardTransition[S]], gamma: float ) -> Iterator[V[S]]: '''Evaluate the given finite Markov reward process using backwards induction, given that the process stops after limit time steps. ''' v: List[V[S]] = [] for step in reversed(steps): v.append({ s: res.expectation( lambda s_r: s_r[1] + gamma * ( extended_vf(v[-1], s_r[0]) if len(v) > 0 else 0. ) ) for s, res in step.items() }) return reversed(v) def optimal_vf_and_policy( steps: Sequence[StateActionMapping[S, A]], gamma: float ) -> Iterator[Tuple[V[S], FiniteDeterministicPolicy[S, A]]]: '''Use backwards induction to find the optimal value function and optimal policy at each time step ''' v_p: List[Tuple[V[S], FiniteDeterministicPolicy[S, A]]] = [] for step in reversed(steps): this_v: Dict[NonTerminal[S], float] = {} this_a: Dict[S, A] = {} for s, actions_map in step.items(): action_values = ( (res.expectation( lambda s_r: s_r[1] + gamma * ( extended_vf(v_p[-1][0], s_r[0]) if len(v_p) > 0 else 0. ) ), a) for a, res in actions_map.items() ) v_star, a_star = max(action_values, key=itemgetter(0)) this_v[s] = v_star this_a[s.state] = a_star v_p.append((this_v, FiniteDeterministicPolicy(this_a))) return reversed(v_p) ``` ## Dynamic Pricing for Perishables and End-of-Season Goods **Dynamic pricing** is crucial for maximizing revenue when selling items with limited shelf-life or facing rapidly changing demand trends. Here's how to address this problem using finite-horizon MDPs. ### Problem Setup * **Time Frame**: We have $T$ days left to sell our product. * **Objective:** Maximize total expected revenue. * **Pricing Options:** Each day, choose a price from $\{P_1, P_2, \ldots, P_N \}$. * **Demand Model:** Customer demand follows a Poisson distribution. The average demand rate depends on the chosen price (denoted $\lambda_i$ for price $P_i$). * Inventory: We start with $M$ units. We can't sell more than we have. ### Modeling with a Finite-Horizon MDP This problem fits this framework naturally: * **States:** Remaining inventory $l$ at the start of each day. * **Actions:** Price index (which price to choose). * **Transitions:** * If inventory is $l$, we set price $P_i$, and demand $d$ follows $\text{Poisson}(\lambda_i)$: * Next day's inventory: $l' = \max(0, l - d)$ * **Rewards:** * Revenue for the day: $r = \min(l, d) \cdot P_i$. #### The Poisson Demand Twist The Poisson distribution is key to capturing the uncertainty and the 'limited sales' aspect: * The probability of selling $k$ units at price $P_i$, given inventory $l$, is: $$ (\mathcal{P}_R)_t(l, i, r, l - k) = \begin{cases} \frac{e^{-\lambda_i}\lambda_i^k}{k!} & \text{if } k < l \text{ and } r = k \cdot P_i, \\ \sum_{j=k}^\infty \frac{e^{-\lambda_i}\lambda_i^j}{j!} & \text{if } k = l \text{ and } r = k \cdot P_i, \\ 0 & \text{otherwise}. \end{cases} $$ ### Solving the MDP ``` python= from scipy.stats import poisson from rl.finite_horizon import finite_horizon_MDP, evaluate, optimal_vf_and_policy from rl.markov_decision_process import FiniteMarkovDecisionProcess, FinitePolicy from rl.markov_process import FiniteMarkovRewardProcess, unwrap_finite_horizon_MDP, unwrap_finite_horizon_MRP from rl.distribution import Categorical from typing import Sequence, Tuple, Iterator class ClearancePricingMDP: def __init__( self, initial_inventory: int, time_steps: int, price_lambda_pairs: Sequence[Tuple[float, float]] ): self.initial_inventory = initial_inventory self.time_steps = time_steps self.price_lambda_pairs = price_lambda_pairs distrs = [poisson(l) for _, l in price_lambda_pairs] prices = [p for p, _ in price_lambda_pairs] self.single_step_mdp = FiniteMarkovDecisionProcess({ s: { i: Categorical({ (s - k, prices[i] * k): (distrs[i].pmf(k) if k < s else 1 - distrs[i].cdf(s - 1)) for k in range(s + 1) }) for i in range(len(prices)) } for s in range(initial_inventory + 1) }) self.mdp = finite_horizon_MDP(self.single_step_mdp, time_steps) def get_vf_for_policy(self, policy: FinitePolicy) -> Iterator: mrp = self.mdp.apply_finite_policy(policy) return evaluate(unwrap_finite_horizon_MRP(mrp), 1.) def get_optimal_vf_and_policy(self) -> Iterator: return optimal_vf_and_policy(unwrap_finite_horizon_MDP(self.mdp), 1.) # Example usage ii = 12 steps = 8 pairs = [(1.0, 0.5), (0.7, 1.0), (0.5, 1.5), (0.3, 2.5)] cp = ClearancePricingMDP( initial_inventory=ii, time_steps=steps, price_lambda_pairs=pairs ) def policy_func(x: int) -> int: return 0 if x < 2 else (1 if x < 5 else (2 if x < 8 else 3)) stationary_policy = FinitePolicy({s: policy_func(s) for s in range(ii + 1)}) # Visualization part import matplotlib.pyplot as plt import numpy as np prices = [[pairs[policy.act(s).value][0] for s in range(ii + 1)] for _, policy in cp.get_optimal_vf_and_policy()] heatmap = plt.imshow(np.array(prices).T, origin='lower') plt.colorbar(heatmap, shrink=0.5, aspect=5) plt.xlabel("Time Steps") plt.ylabel("Inventory") plt.show() ``` ![latexImage_330a10f465313ecfd05d45606a5bf149](https://hackmd.io/_uploads/HyC-9NBAa.png) ## From Tables to Function Approximation: Why ADP? Tabular dynamic programming algorithms are powerful for small MDPs. However, they face severe limitations in most real-world problems: * **The Curse of Dimensionality:** In environments with large state spaces or continuous states, storing a value for every possible state becomes impossible. * **Impractical Updates:** Explicitly updating the value of each state in every iteration is computationally expensive, and often unnecessary. ### Approximate Dynamic Programming (ADP) to the Rescue ADP addresses these challenges by making two key changes: * **Function Approximation:** Instead of a giant lookup table, ADP uses flexible models (like linear combinations of features, or neural networks) to represent value functions. These models can generalize and make predictions for states never seen before. * **Experience-Based Sampling:** ADP learns incrementally from sampled experiences (state, action, reward, next state) drawn from the environment or a simulation model. It doesn't try to perform full sweeps of the entire state space. #### The Essence Remains Despite these adaptations, ADP retains the core principles of dynamic programming: * **Bellman Equations:** They still describe the relationship between values of related states. * **Iterative Refinement:** ADP algorithms iteratively improve their value function approximation, often in a way that resembles the update steps in tabular methods. ## Reference - Chapter 5 of the [RLForFinanceBook](https://stanford.edu/~ashlearn/RLForFinanceBook/book.pdf) - [Dynamic Programming Algorithms](https://github.com/coverdrive/technical-documents/blob/master/finance/cme241/Tour-DP.pdf) slides for CME 241: Foundations of Reinforcement Learning with Applications in Finance