# Stochastic Shortest Path Problem
## Introduction
In the realm of modern navigation, finding the quickest route isn't always straightforward. Traffic congestion, accidents, and road closures can turn a seemingly optimal path into a frustrating delay. The Stochastic Shortest Path (SSP) problem tackles this challenge head-on. It's the mathematical framework that empowers navigation systems like Google Maps to guide us through the complexities of real-world traffic, aiming to minimize not just the distance, but the expected travel time.
Imagine you're designing the navigation brain for a self-driving car. It needs to not only know the map but also anticipate how traffic conditions might change. The SSP problem provides the tools to make these intelligent decisions, constantly adapting to the dynamic nature of the road network.
### Key Assumptions
To effectively address the SSP problem, we rely on a few key assumptions:
* **Dynamic Network:** Travel times on roads aren't fixed; they fluctuate due to traffic.
* **Data Availability:** We have historical data on average travel times and access to real-time traffic updates.
* **Sequential Decisions:** The route is determined step-by-step, considering the latest information at each intersection.
By incorporating these assumptions, the SSP problem moves beyond static maps and into the realm of real-time, adaptive navigation. In the following sections, we'll delve into the mathematical models and algorithms that bring this concept to life, exploring how to make optimal decisions even when the road ahead is uncertain.
### Setting the Stage
Before we dive into the specifics of different models, let's lay the groundwork by defining some common terminology and illustrating the core concepts.
#### Representing the Road Network
We'll visualize our road network as a directed graph, where:
* N**odes (intersections):** $\mathcal{N}=\{1,2,\ldots,N\}$ represents the set of all intersections in the network.
* **Links (roads):** $\mathcal{L}$ is the collection of one-way roads. If $(i,j) \in \mathcal{L}$, it means there's a road directly connecting intersection $i$ to intersection $j$.
* **Reachable Nodes:** $\mathcal{N}^+(i)=\{j\in\mathcal{N} | (i,j)\in\mathcal{L}\}$ denotes the set of intersections you can directly reach from intersection $i$.
#### Quantifying Travel Time
* **Actual Travel Time:** $c_{i, j}$ is a random variable representing the actual time it takes to travel from intersection $i$ to $j$. This is what we experience in reality, and it can vary due to traffic.
* **Estimated Travel Time:** $\tilde{c}_{i, j}$ is our best guess of the travel time from $i$ to $j$, based on real-time traffic data.
* **Average Travel Time:** $\bar{c}_{i, j}$ is the historical average travel time from $i$ to $j$. This gives us a baseline to work with.

### Grappling with Uncertainty
The heart of the SSP problem lies in dealing with the unpredictable nature of travel times. We'll explore two main strategies:
1. **Mathematical Modeling:** We'll create probability distributions to describe how travel times might vary.
2. **Data-Driven Approach:** We'll use historical and real-time data to estimate travel times and refine those estimates as we gather more information.
### Crafting the Policy
Our goal is to design a "policy" - a set of rules that tells us which road to take at each intersection. We want a policy that looks ahead, considering not just the immediate cost of taking a road, but also the expected cost of reaching the destination from the next intersection.
Mathematically, if $V(j)$ is the minimum expected cost to reach the destination from intersection $j$, our decision at intersection $i$ is:
$$
X^{\pi}(i)\in\mathop{\arg\min}_{j\in\mathcal{N}^+(i)} \ \left\{ \tilde{c}_{i, j} + V(j) \right\}
$$
In simpler terms, we pick the road that minimizes the sum of the estimated travel time to the next intersection plus the estimated cost to reach the destination from there.
The tricky part is getting a good estimate for $V(j)$, and that's where our different models come in. In the upcoming sections, we'll explore how to tackle this challenge in increasingly complex scenarios, starting with a deterministic model and progressing to dynamic stochastic models.
## Starting Simple: The Deterministic Model
In this initial model, we simplify the real world by assuming that travel times are predictable. This allows us to focus on the core problem of finding the shortest path without worrying about traffic fluctuations.
### Model Assumptions
* **Realization After Traversal:** We only learn the actual travel time $c_{ij}$ of a road after we've driven on it.
* **Unbiased Estimation:** Our predicted travel time $\bar{c}_{ij}$ for each road is equal to its historical average, meaning there's no systematic overestimation or underestimation.
* **Static Estimation:** These predicted travel times $\bar{c}_{ij}=c_{ij}$ don't change over time; we assume traffic conditions remain constant.
* **Independent Travel Times:** The travel time $\{c_{ij}\big{|}(i,j)\in\mathcal{L}\}$ on one road doesn't affect the travel time on any other road.
### Why is this considered a deterministic model?
While there's technically some uncertainty (we don't know the actual travel times beforehand), our decisions are based solely on historical averages. This means our policy (the rules for choosing which road to take) is essentially fixed from the start. We can prove that any policy in this model is equivalent to a policy in a purely deterministic model where travel times are always equal to their averages. Hence, for simplicity, we'll treat the costs as their deterministic averages $\bar{c}_{ij}$ in the following discussion.
### Mathematical Formulation
Our goal is to find a sequence of intersections (nodes) $\{n_k\}_{k=1}^{N}$ that satisfies:
1. **Start and End:** The sequence starts at the origin $n_1$ and ends at the destination $n_N$.
2. **Valid Path:** Each node in the sequence is directly reachable from the previous one: $n_{k+1} \in \mathcal{N}^+(n_k)$ for all $k$.
3. **Shortest Travel Time:** The total travel time of this sequence is less than or equal to any other valid sequence. i.e. $$\sum_{k=1}^{N'-1}c_{n'_kn'_{k+1}}\geq\sum_{k=1}^{N-1}c_{n_kn_{k+1}} \quad \forall \left\{n'_k \right\}_{k=1}^{N'}$$
### Policy Design: Dynamic Programming
We'll use a classic technique called dynamic programming to solve this problem. The idea is to break down the problem into smaller subproblems and solve them recursively.
Let $V(i)$ represent the shortest possible travel time from intersection $i$ to the destination. To find the optimal path, we'll work backward from the destination, calculating $V(i)$ for each intersection.
At each intersection $i$, we make the following decision:
$$
n_k \in \mathop{\arg\min}_{n_k\in\mathcal{N}^+(n_{k-1})} \left\{c_{n_{k-1}n_k}+V(n_k)\right\}
$$
This means we choose the next intersection $n_k$ that minimizes the sum of the travel time to that intersection plus the estimated shortest travel time from there to the destination.
To compute the value function $V(i)$, we'll use the Value Iteration Algorithm.
:::info
**Value Iteration Algorithm**
1. **Initialization**
* For each node `i` in `N` (set of all nodes):
* IF `i` is the destination node THEN
* `V[i] = 0`
* ELSE
* `V[i] = ∞`
2. **Repeat until convergence**
* For each node `i` in `N`:
* `V_new[i] = ∞`
* For each node `j` in `N^+(i)` (neighbors of `i`):
* `temp = c_ij + V[j]` // Cost to go from i to j + estimated cost from j to destination
* IF `temp < V_new[i]` THEN
* `V_new[i] = temp`
* For each node `i` in `N`:
* `V[i] = V_new[i]`
:::
### Example: Navigating Karlsruhe
Now, let's apply our deterministic shortest path algorithm to a real-world scenario. We'll use the OSMnx library to load a road network around Karlsruhe, Germany, and find the quickest route between the central station (Karlsruhe Hbf) and the E-building of KIT.
#### Import Libraries
```python=
import networkx as nx
import osmnx as ox
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from copy import deepcopy
from typing import Tuple, List, Dict, Any, Callable
```
#### Define Functions
```python=
def deterministic_shortest_path(graph: nx.DiGraph, origin: int, destination: int) -> Tuple[List[int], float]:
"""
Compute the deterministic shortest path between origin and destination.
Args:
graph (nx.DiGraph): The road network graph.
origin (int): The starting node.
destination (int): The ending node.
Returns:
Tuple[List[int], float]: The shortest path and its total travel time.
"""
if origin not in graph or destination not in graph:
raise ValueError("Origin or destination not in graph.")
# Compute value function without uncertainty using Bellman-Ford algorithm
value_dict = {node: float('inf') for node in graph.nodes()}
value_dict[destination] = 0
for _ in range(len(graph) - 1):
for u, v, data in graph.edges(data=True):
if value_dict[u] > value_dict[v] + data['travel_time']:
value_dict[u] = value_dict[v] + data['travel_time']
# Compute shortest path by value function
shortest_path = [origin]
shortest_path_length = 0.0
while shortest_path[-1] != destination:
current = shortest_path[-1]
next_node = min(
graph.successors(current),
key=lambda x: graph[current][x]['travel_time'] + value_dict[x]
)
shortest_path.append(next_node)
shortest_path_length += graph[current][next_node]['travel_time']
return shortest_path, shortest_path_length
def visualize_path(graph: nx.DiGraph, path: List[int]) -> None:
"""
Visualize the shortest path on the graph.
Args:
graph (nx.DiGraph): The road network graph.
path (List[int]): The shortest path to visualize.
"""
fig, ax = ox.plot_graph_route(
nx.MultiDiGraph(graph),
path,
route_linewidth=2,
node_size=7,
bgcolor='w',
node_color='k',
edge_color='#696969'
)
plt.show()
```
#### Load and Prepare Map Data
```python=
# Load data
point = (49.005433, 8.394618)
map_multi = ox.graph_from_point(point, dist=2000, network_type='drive', simplify=True)
# Set speed limit, compute expected travel time
map_multi = ox.add_edge_speeds(map_multi)
map_multi = ox.add_edge_travel_times(map_multi)
# Convert into a DiGraph
map_di = nx.DiGraph(map_multi)
for u, v, k, data in map_multi.edges(keys=True, data=True):
if not map_di.has_edge(u, v) or data['travel_time'] < map_di[u][v]['travel_time']:
map_di.add_edge(u, v, **data)
# Set the origin and destination
origin = ox.distance.nearest_nodes(map_di, X=8.401867, Y=48.994666) # Karlsruhe Hbf
destination = ox.distance.nearest_nodes(map_di, X=8.390525, Y=49.015000) # HKA, E-Building
```
#### Visualizing the graph
```python
fig, ax = ox.plot_graph(nx.MultiDiGraph(map_di), bgcolor='w', node_color='k', edge_color='#696969')
plt.show()
print(f'The graph consists of {len(map_di.nodes)} nodes and {len(map_di.edges)} edges.')
```

```
The graph consists of 1141 nodes and 2586 edges.
```
#### Compute and Visualize the Shortest Path
```python
try:
shortest_path, travel_time = deterministic_shortest_path(map_di, origin, destination)
visualize_path(map_di, shortest_path)
print(f'The path consists of {len(shortest_path) - 1} edges.')
print(f'The travel time is {travel_time:.1f} seconds.')
except ValueError as e:
print(f"Error: {e}")
```

```
The path consists of 34 edges.
The travel time is 274.7 seconds.
```
## Embracing Uncertainty: The Stochastic Static Model
We now step into a more realistic scenario where travel times are no longer fixed but subject to random fluctuations. This is where the stochastic nature of the shortest path problem truly comes into play.
### Model Assumption
The key difference from the deterministic model is in when we learn the actual travel times:
* **Real-Time Observation:** Before deciding which road to take at an intersection, we get to observe the actual travel times $\{c_{ij}\big{|}j\in\mathcal{N}^+(i)\}$ for all the roads leading out of that intersection. This allows us to adapt our route based on the current traffic conditions.
### Why Stochastic Models Matter
At first glance, you might think the stochastic model is just a minor tweak to the deterministic one – we simply replace expected travel times $\bar{c}_{ij}$ with their actual, observed values $c_{ij}$. And indeed, the average travel time for any given path remains the same in both models.
However, the game-changer is that our decision-making policy, denoted by $X$, now depends not only on our current location but also on the real-time travel times we observe. This seemingly small change has a big impact.
#### An Illustrative Example
Let's illustrate this with a simple example:

Suppose we're at node 1 and want to reach node 7. The numbers on the edges represent the expected travel times. Let's say the actual travel times are a bit random, modeled as:
$$
c_{ij} = \bar{c}_{ij} \cdot \varepsilon_{ij}, \quad \varepsilon_{ij} \overset{\text{i.i.d.}}{\sim}\text{Uniform}(0.9,1.1)
$$
This means the actual travel time is the expected time multiplied by a random factor between 0.9 and 1.1.
Now, imagine we observe that the actual travel time from node 1 to node 2 is 10, and from node 1 to node 3 is 10.4.
* A deterministic approach would simply pick the shorter path (1 -> 2 -> 7), with an expected travel time of 22.
* But a stochastic approach would consider the potential variability in the remaining paths. If we go to node 3 first, we have three options to continue, and we can take advantage of the one with the shortest actual travel time. This leads to a slightly lower expected travel time overall.
This demonstrates the power of incorporating uncertainty into our decision-making. By considering the potential range of travel times, we can make more informed choices, especially when there's high variability in traffic conditions. It's like choosing a slightly longer route with multiple exit options over a shorter route that could get easily jammed with no escape!
### Mathematical Model
Let's formalize this stochastic model:
1. **State Variables:**
* $S_t = (n_t, \left\{c_{n_tj}|j\in\mathcal{N}^+(n_t)\right\})$
* This captures both our current location $n_t$ and the observed travel times for all roads leading out of it.
2. **Decision Variables:**
* $x_t\in\mathcal{N}^+(n_t)$
* This is the node we decide to move to in the current time step $t$
3. **Exogenous Information (New Observations):**
* $W_{t+1}=\{c_{x_tj}|j\in\mathcal{N}^+(x_t)\}$
* When we reach the next node, we'll observe the actual travel times for the roads leaving that node. We'll model these actual travel times using a triangular distribution to capture their variability.
* We model $c_{ij}$ as: $$c_{ij}=\bar{c}_{ij} \cdot \varepsilon_{ij},\quad \varepsilon_{ij}\overset{i.i.d.}{\sim}\text{Triangular}(0.5,1,1.5)$$
4. **Transition Function:**
* $S_{t+1}=(n_{t+1}, \left\{c_{x_{t+1}j}|j\in\mathcal{N}^+(x_{t+1}) \right\})$
* This describes how the state evolves. Our new location is the node we chose $x_t$, and the new set of observed travel times comes from the exogenous information $W_{t+1}$.
5. **Objective Function:**
* Total travel time, with a penalty for not reaching the destination within a certain time frame: $$\sum_{t=0}^{T} c_t + k \mathbf{1}(x_T\neq\text{destination})$$
6. **Goal:**
* Find the best policy $X^{\pi}:S_t\mapsto x_t\in\mathcal{N}^+(n_t)$ (decision-making rules) that minimizes the expected total travel time: $$X^{\pi} \in \mathop{\arg\min}_{X} \ \mathbb{E}\left[\sum_{t=0}^{T} c_t + k\,\mathbf{1}(x_T\neq\text{destination})\Big{|}S_0,X\right]$$
#### Code Implementation
```python=
class SSPstatic:
def __init__(self, map: nx.DiGraph, origin: int, destination: int, T: int = 300, seed: int = 42):
"""
Initialize the Stochastic Shortest Path model.
Args:
map (nx.DiGraph): The road network graph.
origin (int): The starting node.
destination (int): The ending node.
T (int): Maximum number of time steps.
seed (int): Random seed for reproducibility.
"""
self.map = deepcopy(map)
self.origin = origin
self.destination = destination
self.prng = np.random.RandomState(seed)
self.t = 0
self.T = T
self.objective = 0.0
self.state = self._initialize_state()
def _initialize_state(self) -> Dict[str, Any]:
"""Initialize the state of the model."""
state = {'CurrentNode': self.origin}
state.update(self._exog_info_fn(self.origin))
return state
def _exog_info_fn(self, node: int) -> Dict[str, Dict[Tuple[int, int], float]]:
"""
Generate stochastic exogenous information for the current node.
Args:
node (int): The current node.
Returns:
Dict[str, Dict[Tuple[int, int], float]]: Dictionary of stochastic travel times for outgoing edges.
"""
cost_dict = {}
for edge in self.map.out_edges(node):
cost_dict[edge] = self.map.edges[edge]['travel_time'] * self.prng.triangular(left=0.5, mode=1, right=1.5)
return {'CurrentNodeLinkCosts': cost_dict}
def _transition_fn(self, decision: int, exog_info: Dict[str, Any]) -> Dict[str, Any]:
"""
Transition function to update the state based on the decision and exogenous information.
Args:
decision (int): The chosen next node.
exog_info (Dict[str, Any]): Exogenous information for the next state.
Returns:
Dict[str, Any]: The new state.
"""
new_state = {'CurrentNode': decision}
new_state.update(exog_info)
return new_state
def step(self, decision: int) -> None:
"""
Execute a single step in the model.
Args:
decision (int): The chosen next node.
"""
exog_info = self._exog_info_fn(decision)
self.objective += self.state['CurrentNodeLinkCosts'][(self.state['CurrentNode'], decision)]
self.state = self._transition_fn(decision, exog_info)
self.t += 1
def reset(self, seed: int = 1) -> None:
"""
Reset the model for repeating evaluation.
Args:
seed (int): New random seed.
"""
self.prng = np.random.RandomState(seed)
self.state = self._initialize_state()
self.objective = 0.0
self.t = 0
def is_finished(self) -> bool:
"""
Determine whether the termination condition is met.
Returns:
bool: True if the destination is reached or time limit is exceeded, False otherwise.
"""
if self.state['CurrentNode'] == self.destination:
return True
elif self.t >= self.T:
self.objective = np.nan
return True
else:
return False
def run_policy(model: SSPstatic, policy: 'VFA', iteration_num: int = 1) -> np.ndarray:
"""
Execute the policy and return average performance.
Args:
model (SSPstatic): The SSP model.
policy (VFA): The policy to evaluate.
iteration_num (int): Number of iterations to run.
Returns:
np.ndarray: Array of objective values for each iteration.
"""
results = np.zeros(iteration_num)
for i in range(iteration_num):
model.reset(i)
while not model.is_finished():
model.step(policy.make_decision(model.state))
results[i] = model.objective
return results
```
### Revised Policy Design
In an ideal world, we'd have a magical value function $V(S)$ that tells us the exact expected travel time to the destination from any given state S. With this, a simple "greedy" strategy – always choosing the road that leads to the lowest immediate cost plus the expected future cost – would guarantee the optimal path.
Mathematically, this optimal policy would be:
$$
X^{\pi}(S_t) \in \mathop{\arg\min}_{j\in\mathcal{N}^+(i)} \ \left\{ \tilde{c}_{ij}+\mathbb{E}[V(S_{t+1})|S_t, x_t=j] \right\}
$$
However, in reality, we rarely have access to the true value function $V$. Instead, we have to work with an approximation, which we'll call $\bar{V}$. Our policy then becomes:
$$
X^{\pi}(S_t) \in \mathop{\arg\min}_{j\in\mathcal{N}^+(i)} \ \left\{ \tilde{c}_{ij} + \mathbb{E}[\bar{V}(S_{t+1})|S_t, x_t=j] \right\}
$$
So, the core challenge is to find a good approximation \bar{V} that's as close as possible to the true $V$. We'll tackle this using techniques like Value Function Approximation (VFA).
#### The Curse of Dimensionality
Before we dive into VFA, let's understand why finding a good approximation is tricky. The Bellman equation, a fundamental principle in dynamic programming, gives us the theoretical definition of the true value function $V$:
$$
V(S_t) = \min_{j\in\mathcal{N}^+(i)} \left\{ \tilde{c}_{ij}+\mathbb{E}\left[V(S_{t+1})|S_t,x_t=j\right] \right\}
$$
Under our current assumptions (static stochastic model), this simplifies to:
$$
V(S_t) = \min_{j\in\mathcal{N}^+(i)} \left\{ \tilde{c}_{ij}+\mathbb{E}\left[V(S_{t+1})|x_t=j\right] \right\}
$$
The problem is that the state $S_t$ contains a lot of information:
1. Our current location (node $i$).
2. The observed travel times for all roads leading out of our current location.
If we try to compute $V(S_t)$ directly for every possible state, we run into the "curse of dimensionality." The number of possible states explodes as the network gets larger and the travel times become more variable.
To see this, imagine a node with just 3 outgoing roads. If we discretize (divide into intervals) the possible travel times for each road into $L$ values, we'd have to consider $L^3$ different combinations of travel times just for that single node!
#### Taming the Complexity: Value Function Approximation
To make the problem computationally tractable, we'll introduce a simplified value function $V'(i)$ that depends only on our current location $i$:
$$
V'(i)=\mathbb{E}\left[V(S_t)|x_t=i\right]
$$
This allows us to estimate just one value per node, dramatically reducing the complexity. Our policy now becomes:
$$
X^{\pi}(S_t) \in \mathop{\arg\min}_{j\in\mathcal{N}^+(i_t)}\ \{ \tilde{c}_{i_tj}+V'(j) \}
$$
We'll use algorithms like the Value Function Approximation Algorithm and Forward Approximate Dynamic Programming to iteratively compute an approximation $\bar{V}'$ of this simplified value function. These algorithms leverage sampling and updates to gradually refine our estimates, allowing us to make near-optimal decisions even in the face of uncertainty.
:::info
**Value Function Approximation Algorithm**
1. **Initialization:**
* For each node `i` in `N`:
* `V_bar'[i] =` Shortest travel time from `i` to destination (using deterministic costs)
2. **Repeat** for `iteration_num` times:
* For each node `i` in `N`:
* `v_n[i] = 0`
* **Repeat** `sample_size` times:
* For each neighbor `j` of `i` (j in N^+(i)):
* Sample `c_tilde_ij` from the travel time distribution for edge (i, j)
* `temp = min {j ∈ N^+(i)} (c_tilde_ij + V_bar'[j])`
* `v_n[i] = v_n[i] + temp`
* `v_n[i] = v_n[i] / sample_size`
* `alpha_n = theta / (theta + n)`
* `V_bar'[i] = (1 - alpha_n) * V_bar'[i] + alpha_n * v_n[i]`
:::
:::info
**Forward Approximate Dynamic Programming (FADP)**
1. **Initialization**
* For each node `i` in `N`:
* `V_bar'[i]` = Shortest travel time from `i` to destination (using deterministic costs)
* `visit_counts` = array of size N, initialized to 0 // To track how many times each node has been visited
2. **Repeat** until all nodes in `visit_counts` have a value greater than `iteration_num`
* `n_0` = node in `N` with the lowest value in `visit_counts`
* `path` = [n_0]
* `actual_costs` = []
* **While** the last node in `path` is not the destination node:
* For each neighbor `j` of the last node in `path`:
* Sample `c_tilde` from the travel time distribution for edge (last node in `path`, `j`)
* `costs[j] = c_tilde + V_bar'[j]`
* Append the node with the minimum cost in `costs` to `path`
* Append (`costs[chosen_node]` - `V_bar'[chosen_node]`) to `actual_costs`
* `v = 0`
* **For** each node `n_i` in `path` in reverse order (from destination to origin):
* `v = v + actual_costs[i]`
* `visit_counts[n_i] = visit_counts[n_i] + 1`
* `alpha =` Calculate learning rate based on `visit_counts[n_i]`
* `V_bar'[n_i] = (1 - alpha) * V_bar'[n_i] + alpha * v`
:::
#### Code Implementation
Let's translate our policy design strategies into Python code. We'll start by building a base class `Policy` to provide common functionalities for our value function-based policies.
```python=
# Parent class of policies base on value functions
# This is itself a policy ignoring all uncertainties
class Policy:
"""Base class for policies based on value functions."""
def __init__(self, model: 'SSPstatic'):
"""
Initialize the policy.
Args:
model (SSPstatic): The stochastic shortest path model.
"""
self.map = deepcopy(model.map)
self.destination = model.destination
# Add a self loop at the destination
self.map.add_edge(self.destination, self.destination, travel_time=0)
# Index the nodes
self._index_nodes()
# Set initial value estimation from deterministic model
self._initialize_value_function()
def _index_nodes(self):
"""Assign indices to nodes for efficient access."""
nx.set_node_attributes(self.map, -1, 'index')
for i, node in enumerate(self.map.nodes):
self.map.nodes[node]['index'] = i
def _initialize_value_function(self):
"""Initialize the value function using the deterministic shortest path."""
nx.set_node_attributes(self.map, np.inf, 'V0')
nx.set_node_attributes(self.map, np.inf, 'Vbar')
self.map.nodes[self.destination]['V0'] = 0
for _ in range(self.map.number_of_nodes()):
for i, j in self.map.edges:
if self.map.edges[(i, j)]['travel_time'] + self.map.nodes[j]['V0'] < self.map.nodes[i]['V0']:
self.map.nodes[i]['V0'] = self.map.edges[(i, j)]['travel_time'] + self.map.nodes[j]['V0']
for node in self.map.nodes:
self.map.nodes[node]['Vbar'] = self.map.nodes[node]['V0']
def make_decision(self, state: Dict[str, Any]) -> Any:
"""
Make a decision given the current state.
Args:
state (Dict[str, Any]): The current state.
Returns:
Any: The next node to visit.
"""
costs = {
j: state['CurrentNodeLinkCosts'][(state['CurrentNode'], j)] + self.map.nodes[j]['Vbar']
for j in self.map.successors(state['CurrentNode'])
}
return min(costs, key=costs.get)
def reset(self):
"""Reset the policy to its initial state."""
for node in self.map.nodes:
self.map.nodes[node]['Vbar'] = self.map.nodes[node]['V0']
```
Next, we define the `VFA` and `FADP` classes, which inherit from the Policy base class and implement their respective value function approximation algorithms.
```python=
class VFA(Policy):
"""A policy that approximates value function through naive value iteration."""
def __init__(self, model: 'SSPstatic'):
"""
Initialize the VFA policy.
Args:
model (SSPstatic): The stochastic shortest path model.
"""
super().__init__(model)
self.step_count = 0
def train(self, iteration_num: int = 1, sample_size: int = 10, alpha: Callable[[int], float] = lambda x: 1 / (x + 2)):
"""
Fit the value function.
Args:
iteration_num (int): Number of iterations.
sample_size (int): Number of samples for each iteration.
alpha (Callable[[int], float]): Learning rate function.
"""
for k in range(iteration_num):
v = np.empty(self.map.number_of_nodes(), dtype=float)
for i in self.map.nodes:
temp = np.full(sample_size, np.inf)
for j in self.map.successors(i):
rand = np.random.triangular(0.5, 1, 1.5, sample_size)
rand = rand * self.map.edges[(i, j)]['travel_time'] + self.map.nodes[j]['Vbar']
temp = np.minimum(temp, rand)
v[self.map.nodes[i]['index']] = np.mean(temp)
# Update value function approximation
a = alpha(self.step_count)
for node, data in self.map.nodes(data=True):
data['Vbar'] = (1 - a) * data['Vbar'] + a * v[data['index']]
self.step_count += 1
def train_error(self, iteration_num: int = 1, sample_size: int = 10,
alpha: Callable[[int], float] = lambda x: 1 / (x + 2),
benchmark: nx.DiGraph = None) -> np.ndarray:
"""
Train the policy and record the value function error compared to a benchmark.
Args:
iteration_num (int): Number of iterations.
sample_size (int): Number of samples for each iteration.
alpha (Callable[[int], float]): Learning rate function.
benchmark (nx.DiGraph): Benchmark graph for comparison.
Returns:
np.ndarray: Array of errors for each iteration.
"""
error = np.zeros(iteration_num, dtype=float)
for k in range(iteration_num):
self.train(iteration_num=1, sample_size=sample_size, alpha=alpha)
for node in self.map.nodes:
if self.map.nodes[node]['Vbar'] != np.inf:
error[k] += (self.map.nodes[node]['Vbar'] - benchmark.nodes[node]['Vbar']) ** 2
return error
class FADP(Policy):
"""A policy that approximates value function through forward approximate dynamic programming."""
def __init__(self, model: 'SSPstatic'):
"""
Initialize the FADP policy.
Args:
model (SSPstatic): The stochastic shortest path model.
"""
super().__init__(model)
self.visit_time = np.zeros(self.map.number_of_nodes(), dtype=int)
self.minimal_visit_time = 0
def train(self, iteration_num: int = 1):
"""
Fit the value function.
Args:
minimal_visit_time (int): Lower bound of visit times.
"""
for l in range(iteration_num):
self.minimal_visit_time += 1
for node in self.map.nodes:
if self.visit_time[self.map.nodes[node]['index']] < self.minimal_visit_time and self.map.nodes[node]['Vbar'] != np.inf:
self._sample_and_update_path(node)
def _sample_and_update_path(self, start_node):
"""Sample a path and update the value function."""
path = [start_node]
actual_cost = []
while path[-1] != self.destination:
costs = {
j: self.map.edges[(path[-1], j)]['travel_time'] * np.random.triangular(0.5, 1, 1.5) + self.map.nodes[j]['Vbar']
for j in self.map.successors(path[-1])
}
path.append(min(costs, key=costs.get))
actual_cost.append(costs[path[-1]] - self.map.nodes[path[-1]]['Vbar'])
v = 0.0
for k in range(len(path) - 2, -1, -1):
v += actual_cost[k]
node = path[k]
self.visit_time[self.map.nodes[node]['index']] += 1
self.map.nodes[node]['Vbar'] += 2 / (202 + self.visit_time[self.map.nodes[node]['index']]) * (v - self.map.nodes[node]['Vbar'])
def train_error(self, iteration_num: int = 1, benchmark: nx.DiGraph = None) -> np.ndarray:
"""
Train the policy and record the value function error compared to a benchmark.
Args:
minimal_visit_time (int): Lower bound of visit times.
benchmark (nx.DiGraph): Benchmark graph for comparison.
Returns:
np.ndarray: Array of errors for each iteration.
"""
error = np.zeros(iteration_num, dtype=float)
for l in range(iteration_num):
self.train(iteration_num=1)
for node in self.map.nodes:
if self.map.nodes[node]['Vbar'] != np.inf:
error[l] += (self.map.nodes[node]['Vbar'] - benchmark.nodes[node]['Vbar']) ** 2
return error
def reset(self):
"""Reset the policy to its initial state."""
super().reset()
self.visit_time.fill(0)
self.monimal_visit_time = 0
```
### Revised Example: Evaluating Policies in Action
Let's put our VFA and FADP policies to the test on the Karlsruhe road network and see how they compare to the deterministic baseline.
#### Setting up the Model and Policies
```python=
# Assume SSPstatic, VFA, FADP, and Deterministic_Policy classes are defined elsewhere
Model = SSPstatic(map=map_di, origin=origin, destination=destination, T=300, seed=42)
# Initialize policies
VFAPolicy = VFA(Model)
FADPPolicy = FADP(Model)
DeterministicPolicy = Policy(Model)
```
#### Training the Policies
Our policies have hyperparameters that influence their learning. We'll set them to reasonable values to ensure good performance:
* **VFA Policy:**
* `theta = 15` (controls learning rate decay)
* `sample_size = 60`
* `iteration_num = 2000`
* **FADP Policy:**
* Learning rate: `alpha_k(n_i^k) = 2 / (t + 202)`
* `iteration_num = 3000`
**Note:** We initialize the value functions with the deterministic estimates to provide a good starting point and help avoid getting stuck in suboptimal regions early in the learning process.
```python=
# Train VFA Policy
VFAPolicy.train(iteration_num=2000, sample_size=60, alpha=lambda x: 15 / (x + 16))
# Train FADP Policy
FADPPolicy.train(iteration_num=3000)
```
#### Checking Convergence
Let's visualize how the estimated value functions converge during training.
```python=
def plot_convergence(error: np.ndarray, xlabel: str):
"""
Plot the convergence of a policy's error over iterations.
Args:
error (np.ndarray): Array of errors.
xlabel (str): Label for x-axis.
"""
plt.figure(figsize=(10, 5))
plt.plot(np.arange(len(error)), np.log10(error))
plt.xlabel(xlabel)
plt.ylabel('Log error to the benchmark')
plt.ylim(bottom=0)
plt.show()
# Check VFA convergence
VFAPolicy2 = VFA(Model)
vfa_error = VFAPolicy2.train_error(iteration_num=2000, sample_size=60, alpha=lambda x: 15 / (x + 16), benchmark=VFAPolicy.map)
plot_convergence(vfa_error, 'Number of iterations')
# Check FADP convergence
FADPPolicy2 = FADP(Model)
fadp_error = FADPPolicy2.train_error(iteration_num=3000, benchmark=FADPPolicy.map)
plot_convergence(fadp_error, 'Visit Time Minimum')
```


##### Observations
* The errors decrease, showing convergence.
* Some final error remains due to the randomness in travel times
* FADP might have a slightly higher error, potentially due to training differences.
#### Evaluating Policy Exploration
Let's see how many different routes each policy explores during 10,000 simulations
```python=
def path_uses(policy: 'Policy', model: 'SSPstatic', trial_num: int = 10000) -> Tuple[List[List[int]], float]:
"""
Find the paths employed by a policy.
Args:
policy (Policy): The policy to evaluate.
model (SSPstatic): The stochastic shortest path model.
trial_num (int): Number of trials to run.
Returns:
Tuple[List[List[int]], float]: List of unique routes and average route length.
"""
routes = []
total_length = 0
for i in range(trial_num):
model.reset(i)
route = [model.origin]
while not model.is_finished():
decision = policy.make_decision(model.state)
route.append(decision)
model.step(decision)
if route not in routes and len(route) < model.T:
routes.append(route)
total_length += len(route)
avg_length = total_length / trial_num
return routes, avg_length
vfa_routes, vfa_avg_length = path_uses(VFAPolicy, Model)
fadp_routes, fadp_avg_length = path_uses(FADPPolicy, Model)
det_routes, det_avg_length = path_uses(DeterministicPolicy, Model)
print(f'VFA Policy explored {len(vfa_routes)} different routes. Average length: {vfa_avg_length:.2f}')
print(f'FADP Policy explored {len(fadp_routes)} different routes. Average length: {fadp_avg_length:.2f}')
print(f'Deterministic Policy explored {len(det_routes)} different routes. Average length: {det_avg_length:.2f}')
```
```
VFA Policy explored 751 different routes. Average length: 40.68
FADP Policy explored 732 different routes. Average length: 40.71
Deterministic Policy explored 211 different routes. Average length: 37.83
```
#### Running Policies and Collecting Results
Now, let's run each policy for 5000 trips and collect the travel times
```python=
vfa_result = run_policy(Model, VFAPolicy, iteration_num = 5000)
fadp_result = run_policy(Model, FADPPolicy, iteration_num = 5000)
det_result = run_policy(Model, DeterministicPolicy, iteration_num = 5000)
```
#### Visualizing and Analyzing Results
Let's plot histograms and CDFs to visualize the travel time distributions
```python=
def plot_travel_time_distribution(results: List[np.ndarray], labels: List[str]):
"""
Plot histograms of travel times for different policies.
Args:
results (List[np.ndarray]): List of travel time arrays for each policy.
labels (List[str]): Labels for each policy.
"""
plt.figure(figsize=(10, 5))
for result, label in zip(results, labels):
plt.hist(result, bins=50, alpha=0.5, label=label)
plt.xticks(np.arange(220, 320, 20))
plt.xlabel('Total Travel Time')
plt.ylabel('Occurrence in 5000 trials')
plt.legend()
plt.show()
def plot_cdf(results: List[np.ndarray], labels: List[str]):
"""
Plot the cumulative distribution function of travel times for different policies.
Args:
results (List[np.ndarray]): List of travel time arrays for each policy.
labels (List[str]): Labels for each policy.
"""
cdf = np.arange(1, len(results[0]) + 1) / len(results[0])
plt.figure(figsize=(10, 5))
for result, label in zip(results, labels):
plt.plot(np.sort(result), cdf, linestyle='-', alpha=0.6, label=label)
plt.legend()
plt.xlabel('Total Travel Time')
plt.ylabel('Cumulative Probability')
plt.grid(True)
plt.show()
# Plot travel time distributions
plot_travel_time_distribution([vfa_result, fadp_result, det_result], ['VFA policy', 'FADP policy', 'Deterministic policy'])
# Plot cumulative distribution functions
plot_cdf([vfa_result, fadp_result, det_result], ['VFA policy', 'FADP policy', 'Deterministic policy'])
print(f'VFA Policy: mean = {np.mean(vfa_result):.2f}, std = {np.std(vfa_result):.2f}')
print(f'FADP Policy: mean = {np.mean(fadp_result):.2f}, std = {np.std(fadp_result):.2f}')
print(f'Deterministic policy: mean = {np.mean(det_result):.2f}, std = {np.std(det_result):.2f}')
```


```
VFA Policy: mean = 268.47, std = 11.82
FADP Policy: mean = 268.49, std = 11.84
Deterministic policy: mean = 270.08, std = 11.48
```
#### Observations and Insights
* **Exploration vs. Exploitation:** The VFA and FADP policies explore a significantly larger number of routes compared to the deterministic policy. This suggests they're actively adapting to the changing travel times.
* **Why so many routes?** You might wonder why there are so many different routes explored, even though the actual distance is relatively short. This is partly due to how our model is set up: travel times are re-sampled every time we reach a node, even if we've been there before. So, if a route initially seems slow, the policy might "explore" other options in the hopes of finding a faster path later.
* **Performance Comparison**
* Both VFA and FADP achieve slightly lower average travel times than the deterministic policy
* The improvement might not be huge in this example, but it highlights the potential benefit of considering uncertainty, especially in networks with higher travel time variability
* The CDFs visually reinforce this, showing that the stochastic policies have a higher chance of finding faster routes
## Leveling Up: The Dynamic Model
In our final and most sophisticated model, we acknowledge that the world is not only uncertain but also constantly changing. Traffic patterns evolve, accidents happen, and road conditions shift. This dynamic model allows us to adapt our route in real-time as we receive new information about the network.
### Model Assumptions
* **Available Data:** We have historical average travel times and a measure of their volatility for all roads.
* **Real-Time Estimation:** We can estimate the current travel time for each road before making a decision.
* **Unbiased Estimation:** Our estimations are unbiased; on average, they're accurate.
* **Estimation Updates:** We receive new observations about actual travel times and update our estimates accordingly at each time step.
### Mathematical Model
Let's formalize the elements of this dynamic model:
1. **State Variables:**
* $S_t = (n_t, \tilde{c}_t)$
* $n_t$: The current node (intersection) at the beginning of time step $t$.
* $\tilde{c}_t$: A set containing our estimated travel times for all roads at time step $t$.
2. **Decision Variables:**
* $x_t\in\mathcal{N}^+(n_t)$
* The node we decide to move to in the current time step $t$.
3. **Exogenous Information (New Observations):**
* $W_{t+1}=(c_t, \tilde{c}_{t+1})$
* $c_t$: The actual travel time we experience in the current time step. It's based on our estimate but includes some random error.
* $c_t = \bar{c}_{t,n_tx_t} \cdot(1 +\varepsilon_{t,n_tx_t})$: Realized travel time in period $t$, where the estimation error $\varepsilon_{t,ij}\sim\text{Uniform}(-s_{ij},s_{ij})$.
* $\tilde{c}_{t+1}$: Our updated estimates of travel times for all roads at the next time step, based on both our previous estimates and new observations.
* Updated estimation modeled as $$\tilde{c}_{t+1,ij} = 0.9 \cdot \tilde{c}_{t,ij} + 0.1 \cdot c_{ij}(1+\tilde{\varepsilon}_{t,ij}),\quad \tilde{\varepsilon}_{t,ij}\overset{\text{i.i.d.}}{\sim} \varepsilon_{t,ij}$$
4. **Transition Function:**
* $S_{t+1}=(n_{t+1},\tilde{c}_{t+1})$
* This describes how we move to the next state. Our new location is the node we chose $n_{t+1}$, and our new estimates are given by $\tilde{c}_{t+1}$.
5. **Objective Function:** Total travel time, plus a penalty if we don't reach the destination by a specified deadline: $$\sum_{t=0}^{T} c_t + k\,\mathbf{1}(x_T\neq\text{destination})$$
6. **Goal:** Find a policy that minimizes the expected loss, which includes both travel time and the penalty for being late: $$X^{\pi} \in \mathop{\arg\min}_{X^{\pi}}\ \mathbb{E}\left[L\left(\sum_{t=0}^{T}c_t+k\,\mathbf{1}(x_T\neq\text{destination})\right)\Big{|}S_0,X^{\pi}\right]$$ where the loss function is a piecewise linear function $$L(t)=t+4\cdot\max\{0,t-\text{deadline}\}$$ that pebalizes being late, and deadline is a specified parameter.
#### Code Implementation
```python=
class SSPdynamic:
def __init__(self, map: nx.DiGraph, origin: int, destination: int, T: int = 300, seed: int = 42):
self.map = deepcopy(map)
self.origin = origin
self.destination = destination
self.seed = seed
self.prng = np.random.RandomState(seed)
self.t = 0
self.T = T
self.objective = 0.0
self._initialize_map()
self._index_nodes_and_edges()
self._initialize_arrays()
self._initialize_state()
def _initialize_map(self):
self.map.add_edge(self.destination, self.destination, cost=0, spread=0)
nx.set_edge_attributes(self.map, 0, 'c_bar')
def _index_nodes_and_edges(self):
for i, node in enumerate(self.map.nodes):
self.map.nodes[node]['index'] = i
for i, edge in enumerate(self.map.edges):
self.map.edges[edge]['index'] = i
def _initialize_arrays(self):
edge_count = self.map.number_of_edges()
self.cost = np.empty(edge_count, dtype=float)
self.spread = np.empty(edge_count, dtype=float)
self.c_bar = np.zeros(edge_count, dtype=float)
self.c_hat = np.zeros(edge_count, dtype=float)
for edge in self.map.edges:
idx = self.map.edges[edge]['index']
self.cost[idx] = self.map.edges[edge]['cost']
self.spread[idx] = self.map.edges[edge]['spread']
self.c_bar[:] = self.cost * (1 + self.spread * self.prng.uniform(low=-1, high=1, size=edge_count))
def _initialize_state(self):
self.state = {'CurrentNode': self.origin, 'c_bar': np.copy(self.c_bar)}
def exog_info_fn(self, decision: int) -> float:
edge_idx = self.map.edges[(self.state['CurrentNode'], decision)]['index']
realized_cost = self.c_bar[edge_idx] * (1 + self.map.edges[(self.state['CurrentNode'], decision)]['spread'] * self.prng.uniform(-1, 1))
self.c_bar[:] = 0.9 * self.c_bar + 0.1 * self.cost * (1 + self.spread * self.prng.uniform(-1, 1, size=len(self.c_bar)))
return realized_cost
def step(self, decision: int) -> None:
self.t += 1
self.objective += self.exog_info_fn(decision)
self.state['CurrentNode'] = decision
self.state['c_bar'][:] = self.c_bar
def reset(self, seed: int = 1) -> None:
self.prng = np.random.RandomState(seed)
self.t = 0
self.objective = 0.0
self.c_bar[:] = self.cost * (1 + self.spread * self.prng.uniform(-1, 1, size=len(self.c_bar)))
self.state['CurrentNode'] = self.origin
self.state['c_bar'][:] = self.c_bar
def is_finished(self) -> bool:
if self.state['CurrentNode'] == self.destination:
return True
elif self.t >= self.T:
self.objective = np.inf
return True
else:
return False
```
The class `DLA` serves as a parent class for our lookahead policies. The `run_policy` function executes a policy on a model and records performance metrics.
```python=
class DLA:
def __init__(self, model: SSPdynamic):
self.map = model.map
self.destination = model.destination
self.spread = model.spread
def make_decision(self, state: Dict[str, Any]) -> int:
raise NotImplementedError("Subclasses must implement make_decision method")
def run_policy(model: SSPdynamic, policy: DLA, trial_num: int = 1, deadline: float = 1000) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
cost = np.zeros(trial_num, dtype=float)
late = np.zeros(trial_num, dtype=bool)
loss = np.zeros(trial_num, dtype=float)
for i in range(trial_num):
model.reset(seed=i)
while not model.is_finished():
model.step(policy.make_decision(model.state))
cost[i] = model.objective
loss[i] = model.objective
if model.objective > deadline:
late[i] = True
loss[i] += (model.objective - deadline) * 4
return cost, late, loss
```
### Direct Lookahead Approximation
In the dynamic model, where travel times are constantly changing, finding the truly optimal policy becomes computationally challenging due to the vast state space. To tackle this, we introduce **Direct Lookahead Approximation (DLA)**.
These policies involve:
1. **Setting a Lookahead Horizon:** We decide how many steps into the future $T$ we want to consider when making decisions.
2. **Estimating Future States:** At each time step $t$, we predict what the possible states $\{S_{t+1}, \dots, S_{t+T} \}$ might be $T$ steps ahead
3. **Building a Simplified Model:** We create a simpler model that captures the essential dynamics of the next $T$ time steps
4. **Solving for a Temporary Policy:** We find the best policy within this simplified model
5. **Taking Action:** We execute the first step of this temporary policy, then repeat the entire process at the next time step with updated information.
The key distinction between a Direct Lookahead Approximation (DLA) and a Value Function Approximation (VFA) policy is this:
* **DLA:** Rebuilds a simplified model and finds a new solution at every time step, using only the first step of that solution before discarding the rest
* **VFA:** Computes an approximate value function once during training, and then uses this value function to guide decisions throughout the entire navigation process
Let's explore two ways to construct a lookahead policy, each with different trade-offs in terms of accuracy and computational cost.
#### Static Lookahead Model
In this simplified model, we assume that our estimates of the travel times will remain relatively constant over the lookahead horizon.
To account for some uncertainty, we'll use a quantile estimator to make our cost estimates more robust:
$$
\tilde{c}_{tt',ij} = (1+(1-2\theta)s_{ij}) \tilde{c}_{t,ij} \quad \forall t'\geq t
$$
* $\tilde{c}_{tt',ij}$ is our estimate at time t of what the travel time on road $(i, j)$ will be at a future time $t'$.
* $\theta$ is a hyperparameter that controls how conservative our estimate is.
* $\theta = 0$ -> We use the upper bound of the estimated travel time interval (most conservative)
* $\theta = 0.5$ -> We use the mean of the estimated travel time interval
* $\theta = 1$ -> We use the lower bound of the estimated travel time interval (least conservative)
##### Code Implementation
```python=
class StaticDLA(DLA):
def __init__(self, model: 'SSPdynamic', percentile: float = 0.2):
super().__init__(model)
self.percentile = percentile
self.horizon = self.map.number_of_nodes() + 10
def make_decision(self, state: Dict[str, Any]) -> int:
V = np.full((self.map.number_of_nodes(), self.horizon + 1), np.inf)
V[self.map.nodes[self.destination]['index'], :] = 0
c_robust = state['c_bar'] * (1 + (1 - 2 * self.percentile) * self.spread)
for lookahead_time in range(self.horizon - 1, -1, -1):
for node_from in self.map.nodes:
for node_to in self.map.successors(node_from):
edge_index = self.map.edges[(node_from, node_to)]['index']
new_value = V[self.map.nodes[node_to]['index'], lookahead_time + 1] + c_robust[edge_index]
if new_value < V[self.map.nodes[node_from]['index'], lookahead_time]:
V[self.map.nodes[node_from]['index'], lookahead_time] = new_value
current_node_index = self.map.nodes[state['CurrentNode']]['index']
for next_node in self.map.successors(state['CurrentNode']):
next_node_index = self.map.nodes[next_node]['index']
edge_index = self.map.edges[(state['CurrentNode'], next_node)]['index']
if np.isclose(V[current_node_index, 0], V[next_node_index, 1] + c_robust[edge_index]):
return next_node
raise ValueError("No valid decision found")
```
#### Deterministic Dynamic Model
In this model, we take a step further by trying to predict how our estimates of the travel times will change over the lookahead horizon. We'll use the following formula to estimate the expected value of our estimate at a future time $t'$, given our current state at time $t$:
$$
\tilde{c}_{tt',ij} = \mathbb{E}\left[\tilde{c}_{t',ij}|S_t\right] = \bar{c}_{ij}+0.9^{t'-t}\cdot(\tilde{c}_{t,ij}-c_{t,ij})
$$
This formula captures the idea that our estimates will gradually converge towards the historical averages $\bar{c}_ij$ as we gather more information.
##### Code Implementation
In our implementation, we'll set the lookahead horizon $T$ to be the number of nodes in the network plus 10. We'll also assume that the value of reaching the destination within the lookahead horizon is 0, while the value of not reaching it is infinite. The simplified model will be solved using backward induction on the value function.
```python=
class DeterministicDynamicDLA(DLA):
def __init__(self, model: 'SSPdynamic'):
super().__init__(model)
self.cost = np.copy(model.cost)
self.horizon = self.map.number_of_nodes() + 10
def make_decision(self, state: Dict[str, Any]) -> int:
V = np.full((self.map.number_of_nodes(), self.horizon + 1), np.inf)
V[self.map.nodes[self.destination]['index'], :] = 0
c_hat = np.empty((self.horizon, self.map.number_of_edges()), dtype=float)
c_hat[0] = state['c_bar']
for t in range(1, self.horizon):
c_hat[t] = 0.9 * c_hat[t - 1] + 0.1 * self.cost
for lookahead_time in range(self.horizon - 1, -1, -1):
for node_from in self.map.nodes:
for node_to in self.map.successors(node_from):
edge_index = self.map.edges[(node_from, node_to)]['index']
new_value = V[self.map.nodes[node_to]['index'], lookahead_time + 1] + c_hat[lookahead_time, edge_index]
if new_value < V[self.map.nodes[node_from]['index'], lookahead_time]:
V[self.map.nodes[node_from]['index'], lookahead_time] = new_value
current_node_index = self.map.nodes[state['CurrentNode']]['index']
for next_node in self.map.successors(state['CurrentNode']):
next_node_index = self.map.nodes[next_node]['index']
edge_index = self.map.edges[(state['CurrentNode'], next_node)]['index']
if np.isclose(V[current_node_index, 0], V[next_node_index, 1] + c_hat[0, edge_index]):
return next_node
raise ValueError("No valid decision found")
```
### Example: Applying the Dynamic Model
Let's see our dynamic models and policies in action. We'll utilize a road network dataset from [GitHub](https://github.com/djanka2/stochastic-optimization/blob/master/StochasticShortestPath_Dynamic/Network_Steps.xlsx), which includes historical travel times and their volatility, to simulate a real-world navigation scenario.
#### Building the Road Network
```python=
def build_road_network(file_path: str) -> Tuple[nx.DiGraph, int, int]:
"""
Build a road network from an Excel file.
Args:
file_path (str): Path to the Excel file containing network data.
Returns:
Tuple[nx.DiGraph, int, int]: The road network, origin node, and destination node.
"""
df = pd.read_excel(file_path)
map_graph = nx.DiGraph()
nx.set_edge_attributes(map_graph, 0, 'cost')
nx.set_edge_attributes(map_graph, 0, 'spread')
for _, row in df.iterrows():
i, j = row['From'], row['To']
map_graph.add_edge(i, j, cost=row['Cost'], spread=row['Spread'])
origin, destination = 0, 24
return map_graph, origin, destination
def visualize_graph(graph: nx.DiGraph, origin: int, destination: int):
"""
Visualize the road network graph.
Args:
graph (nx.DiGraph): The road network graph.
origin (int): The origin node.
destination (int): The destination node.
"""
pos = nx.spring_layout(graph)
plt.figure(figsize=(12, 8))
labels = {node: int(node) if isinstance(node, (int, float)) else node for node in graph.nodes()}
nx.draw(graph, pos, labels=labels, with_labels=True, node_color='lightblue', node_size=500, font_size=10, font_weight='bold')
nx.draw_networkx_nodes(graph, pos, nodelist=[origin], node_color='green', node_size=700)
nx.draw_networkx_nodes(graph, pos, nodelist=[destination], node_color='red', node_size=700)
edge_labels = {(u, v): f"{d['cost']:.2f}" for u, v, d in graph.edges(data=True)}
nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)
plt.title("Road Network")
plt.axis('off')
plt.tight_layout()
plt.show()
# Build the road network
map_graph, origin, destination = build_road_network('Network_Steps.xlsx')
visualize_graph(map_graph, origin, destination)
```

#### Building a simple benchmark policy
To establish a baseline for future performance comparisons, we've created a simple benchmark policy called `FixedPath`. This policy consistently chooses the route with the lowest historical average cost, irrespective of any new travel time estimates.
##### Code Implementation
```python=
class FixedPath(DLA):
def make_decision(self, state: Dict[str, Any]) -> int:
shortest_path = nx.shortest_path(self.map, source = state['CurrentNode'], target = self.destination, weight = 'cost')
return shortest_path[1]
```
#### Setting the Deadline
We need a reasonable deadline for reaching the destination. We'll base it on the shortest path using average travel times, plus a buffer accounting for the network's volatility.
```python=
def calculate_deadline(graph: nx.DiGraph, origin: int, destination: int) -> float:
"""
Calculate a reasonable deadline for reaching the destination.
Args:
graph (nx.DiGraph): The road network graph.
origin (int): The origin node.
destination (int): The destination node.
Returns:
float: The calculated deadline.
"""
travel_time = nx.shortest_path_length(graph, origin, destination, weight='cost')
volatility = sum(data['spread'] * data['cost'] for _, _, data in graph.edges(data=True))
weight_sum = sum(data['cost'] for _, _, data in graph.edges(data=True))
volatility /= weight_sum
return travel_time * (1 + volatility / 3)
deadline = calculate_deadline(map_graph, origin, destination)
print(f"Calculated deadline: {deadline:.2f}")
# Initialize the dynamic model and policies
Model = SSPdynamic(map_graph, origin, destination)
Policy_s = StaticDLA(Model)
Policy_dd = DeterministicDynamicDLA(Model)
Policy_fp = FixedPath(Model)
```
```
Calculated deadline: 730.75
```
#### Hyperparameter Tuning (for Static DLA)
The `StaticDLA` policy has a hyperparameter `theta` controlling its robustness. We'll find a suitable value using grid search.
```python=
def tune_static_dla(model: 'SSPdynamic', policy: 'StaticDLA', deadline: float):
theta_values = np.linspace(0, 1, 21)
results = np.zeros((21, 3)) # Cost, Late, Loss
for i, theta in enumerate(theta_values):
policy.percentile = theta
cost, late, loss = run_policy(model=model, policy=policy, trial_num=1000, deadline=deadline)
results[i] = [np.mean(cost), np.mean(late), np.mean(loss)]
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
plt.subplots_adjust(wspace=0.3)
for i, (title, ylabel) in enumerate([('Average Time Cost', 'Time'),
('Probability of Being Late', 'Probability'),
('Average Loss', 'Loss')]):
axes[i].plot(theta_values, results[:, i])
axes[i].set_title(title)
axes[i].set_xlabel('Percentile (theta)')
axes[i].set_ylabel(ylabel)
plt.tight_layout()
plt.show()
best_theta = theta_values[np.argmin(results[:, 2])] # Choose theta with minimum loss
policy.percentile = best_theta
print(f"Best theta value: {best_theta}")
# Tune Static DLA
tune_static_dla(Model, Policy_s, deadline)
```

```
Best theta value: 0.5
```
* Based on the plots, we'll choose `theta = 0.5` for our `StaticDLA` policy
#### Policy Evaluation
Let's compare our policies:
* `StaticDLA` (with the tuned theta)
* `DeterministicDynamicDLA`
* A baseline `FixedPath` that always takes the shortest path based on historical averages
We'll run 5000 simulations for each and analyze the results
```python=
def evaluate_policies(model: 'SSPdynamic', policies: list, policy_names: list, deadline: float, trial_num: int = 5000):
if len(policies) != len(policy_names):
raise ValueError("The lists have mismatched lengths.")
results = []
for policy in policies:
cost, late, loss = run_policy(model=model, policy=policy, trial_num=trial_num, deadline=deadline)
results.append((cost, late, loss))
print(f"{policy_names[len(results)-1]}: mean time cost = {np.mean(cost):.2f}, "
f"delay rate = {np.mean(late):.4f}, mean loss = {np.mean(loss):.2f}")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
for i, (name, result) in enumerate(zip(policy_names, results)):
ax1.hist(result[0], bins=50, alpha=0.5, label=name)
ax2.hist(result[2], bins=50, alpha=0.5, label=name)
ax1.set_title('Distribution of Travel Times')
ax1.set_xlabel('Travel Time')
ax1.set_ylabel('Frequency')
ax1.legend()
ax2.set_title('Distribution of Losses')
ax2.set_xlabel('Loss')
ax2.set_ylabel('Frequency')
ax2.legend()
plt.tight_layout()
plt.show()
# Evaluate policies
evaluate_policies(Model, policies = [Policy_s, Policy_dd, Policy_fp], policy_names = ['Static DLA', 'Deterministic Dynamic DLA', 'Fixed Path'], deadline = deadline)
```
```
Static DLA: mean time cost = 605.38, delay rate = 0.0220, mean loss = 607.53
Deterministic Dynamic DLA: mean time cost = 604.94, delay rate = 0.0206, mean loss = 606.88
Fixed Path: mean time cost = 664.67, delay rate = 0.1908, mean loss = 700.40
```

#### Key Observations
* Both DLA policies outperform the fixed-path policy in terms of average travel time and loss.
* The dynamic DLA policy may show a slight edge over the static DLA in this specific scenario.
## Reference
* Chap 5, 6 of Warren B. Powell (2022), Sequential Decision Analytics and Modeling: Modeling with Python [link](https://castle.princeton.edu/sdamodeling/)
* https://github.com/djanka2/stochastic-optimization/tree/master/StochasticShortestPath_Static
* https://github.com/djanka2/stochastic-optimization/tree/master/StochasticShortestPath_Dynamic