# Dynamic Mode Decomposition
[toc]
---
## Problem Statement
Say we have a dynamic system $\dot{x} = f(x)$ with unknown $f$. We have collected states (or snapshots) $z_0, \ldots, z_n$ of this system. Each state $z_j$ represents an $m$-dimensional vector.
We seek to recover $f$.
---
## Motivation
Construct two matrices $X$ and $Y$ as follows: $$X=[z_0\ z_1\ ...\ z_{n-1}]$$ and $$Y=[z_1\ z_1\ ...\ z_n].$$
Both $X$ and $Y$ are $m\times n$ matrices.
We assume the dynamics of the system is characterized by $$AX=Y\tag{1}$$ for some unknown linear operator $A$. In vector forms, $(1)$ means that $$A z_j = z_{j+1}\quad \forall j\le n-1.$$
The goal is to find the linear operator $A$, and the story of Dynamic Mode Decomposition starts here.
Solving $A$ through $(1)$ is straightforward: $$A=YX^+\tag{2}$$ where $X^+$ denotes the pseudo-inverse of $X$.
In most scenarios, however, we encounter that the dimension of the states $z_j$ is much larger than the total amount of the dataset. The number of rows of $X$ is much larger than that of columns of $X$. This implies that the matrix $A$ is in fact a low-rank matrix. Direct computation of $A$ via $(2)$ should be avoided.
---
## Exact DMD
The dynamic mode decomposition (or DMD) for the pair $(X, Y)$ of matrices stated as above is defined to be the eigendecomposition of $A=YX^+$. In order to find the DMD for $(X, Y)$, we need to find two matrices $\Phi$ and $\Lambda$, the columns of $\Phi$ form the eigenvectors of $A$, and $\Lambda$ is diagonal with eigenvalues of $A$ as its entries, such that the following holds: $$A\Phi=\Phi\Lambda\tag{3}$$
---
## Application of DMD in Time Series Forecasting
After such $\Phi$ and $\Lambda$ as mentioned in $(3)$ are obtained, evaluation of the power of $A$ is nothing but: $$A^k=\Phi \Lambda^k \Phi^+\quad\forall k.\tag{4}$$ Now if our goal is to predict the state $z_{k+1}$ at time $k+1$, we know from $(1)$ that $$z_{k+1}=Az_k$$ and it follows immediately from $(4)$ that $$z_k = A^k z_0 = \Phi \Lambda^k \Phi^+z_0$$
---
## Dynamic Mode Decomposition Algorithm
To obtain the DMD of the pair $(X, Y)$, we need to tackle as follows:
### Step 1
Compute $X=U\Sigma V^*$, the rank-$r$ truncated SVD of $X$, which means that $U$ is $m\times r$, $\Sigma$ is $r\times r$, and $V^*$ is $r\times n.$
Since we assume that the dynamics of our system is governed by $(1)$, we immediately obtain another relations: $$Y=AX=AU\Sigma V^*\tag{5}$$ or $$YV\Sigma^{-1}=AU\tag{6}$$
### Step 2
Compute the matrix $\tilde{A}$: $$\tilde{A}:=U^*YV\Sigma^{-1}\tag{7}$$
Observe from the RHS of $(7)$ that $$U^*YV\Sigma^{-1}=U^*AU,\tag{by (6)}$$ hence $$\tilde{A}=U^*AU,$$ or equivalently, $$\tilde{A}U=AU.$$
The main reasons why we want to pursuit this matrix are the following two:
1. Now $\tilde{A}$ is an $r\times r$ matrix, which means computation cost of eigendecomposition is cheap.
2. $\tilde{A}$ and $A$ has the same eigenvalues.
### Step 3
Compute the eigenvalue decomposition of $\tilde{A}$: $$\tilde{A}W=W\Lambda\tag{8}$$ where $W$ is consisting of eigenvectors of $\tilde{A}$ and $\Lambda$ is a diagonal matrix with eigenvalues as its diagonal entries.
### Step 4
Compute the matrix $\Phi$ defined as $$\Phi:=YV\Sigma^{-1}W\tag{9}.$$ It can be shown that the following relation hold: $$A\Phi = \Phi\Lambda\tag{10}$$
#### Proof of $(10)$
The argument proceeds as follows: $$\begin{align}
A\Phi
&=(YV\Sigma^{-1}U^*)(YV\Sigma^{-1}W)\tag{by (6) and (9)}\\
&=YV\Sigma^{-1}(U^*YV\Sigma^{-1})W\\
&=YV\Sigma^{-1}(\tilde{A}W)\tag{by (7)}\\
&=YV\Sigma^{-1}(W\Lambda)\tag{by (8)}\\
&=(YV\Sigma^{-1}W)\Lambda\\
&=\Phi\Lambda\tag{by (9)}
\end{align}$$
---
## Python Implementation of DMD
Prepare the `requirements.txt` file
```bash=
numpy
pandas
matplotlib
```
```bash=
pip install -r requirements.txt
```
`dmd.py`:
```python=
class DMD:
"""
DMD v2
"""
def __init__(self, threshold: float = 0.95, topn: int | None = None) -> None:
self.threshold = threshold
self.topn = topn
self.X = None
self.Y = None
self.z0 = None
self.A = None
self.Atilde = None
self.U = None
self.S = None
self.Vt = None
self.Phi = None
self.pinv_phi = None
self.Lamb = None
self.W = None
def fit(self, X: np.ndarray, Y: np.ndarray):
self.X = X
self.Y = Y
self.z0 = X[:, 0]
self.svd_X(topn=self.topn)
self.compute_atilde()
self.compute_eig_atilde()
self.compute_dmd()
self.compute_pinv_phi()
def svd_X(self, topn: int | None = None):
if self.X is None:
raise RuntimeError("The first matrix of the DMD pair is None")
u, s, vt = np.linalg.svd(self.X, full_matrices=False)
if topn is None:
ascend_prop_s = np.cumsum(s) / np.sum(s)
auto_topn = np.where(ascend_prop_s > self.threshold)[0][0]
self.U, self.S, self.Vt = u[:, :auto_topn], s[:auto_topn], vt[:auto_topn, :]
return
self.U, self.S, self.Vt = u[:, :topn], s[:topn], vt[:topn, :]
def compute_atilde(self):
if self.U is None or self.Y is None or self.Vt is None or self.S is None:
raise RuntimeError()
self.Atilde = self.U.T @ self.Y @ self.Vt.T @ np.diag(1 / self.S)
def compute_eig_atilde(self):
if self.Atilde is None:
raise RuntimeError()
self.Lamb, self.W = np.linalg.eig(self.Atilde)
def compute_dmd(self):
if self.Y is None or self.Vt is None or self.S is None or self.W is None:
raise RuntimeError()
self.Phi = self.Y @ self.Vt.T @ np.diag(1 / self.S) @ self.W
def compute_pinv_phi(self):
if self.Phi is None:
raise RuntimeError()
self.pinv_phi = np.linalg.pinv(self.Phi)
def get_zk(self, timestep: int, real: bool = True):
zk = self.Phi @ np.diag(self.Lamb**timestep) @ self.pinv_phi @ self.z0
if real:
return np.real(zk)
return zk
```
```python=
def get_data(url: str) -> np.ndarray:
df = pd.read_csv(url)
return df[["Temp"]].to_numpy().ravel()
def transform_univariate_timeseries_to_matrix(
x: np.ndarray, window_shape: int
) -> np.ndarray:
"""
Given an univariate timeseries z_1, z_2, z_3, ..., z_m and window_shape (say `n`)
This function will generate the following 2-dimensional numpy array whose number of
rows equals n.
->
z_1 z_2 ...
z_2 z_3
z_3 z_4
. .
. .
. .
z_n z_{n+1} ...
"""
arr = np.lib.stride_tricks.sliding_window_view(x=x, window_shape=window_shape).T
return arr
```
```python=
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv"
x = get_data(url=url)
x = x[:2000]
```
```python=
import matplotlib.pyplot as plt
plt.plot(x)
plt.plot(dmd.z0)
```
```python=
window_shape = 500
arr = transform_univariate_timeseries_to_matrix(x=x, window_shape=window_shape)
print(arr, arr.shape)
X = arr[:, :-1]
Y = arr[:, 1:]
dmd = DMD()
dmd.fit(X=X, Y=Y)
```
```python=
n = 700
# plt.plot(x)
lst = []
lst.append(dmd.z0)
for i in range(1, n+1):
zi = dmd.get_zk(timestep=i)
lst.append(zi)
# plt.plot(range(i, i+window_shape), zi)
```
```python=
k=700
plt.plot(lst[k])
plt.plot(dmd.X[:, k])
```
```python=
from typing import Optional
import numpy as np
import pandas as pd
from scipy import signal
class DMD:
def __init__(self, threshold: float = 0.7) -> None:
self.threshold = threshold
def get_dmd_pair(self, df: pd.DataFrame, tstep: int):
y = df.to_numpy().flatten()
self.n_samples = df.shape[0]
self.tstep = tstep
tensor = np.lib.stride_tricks.sliding_window_view(y, tstep).T
self.x1 = tensor[:, :-1]
self.x2 = tensor[:, 1:]
self.x0 = self.x1[:, 0]
print(f"Shapes of X and Y: {self.x1.shape}; tstep: {tstep}")
def fit(self, df: pd.DataFrame, tstep: int, topn: Optional[int] = None):
self.get_dmd_pair(df=df, tstep=tstep)
self.svd_x1(topn=topn)
self.get_atilde()
self.get_eig_atilde()
self.get_dmd()
def svd_x1(self, topn: Optional[int] = "auto"):
u, s, vt = np.linalg.svd(self.x1, full_matrices=False)
if isinstance(topn, int):
self.u, self.s, self.vt = u[:, :topn], s[:topn], vt[:topn, :]
print(np.sum(s[:topn]) / np.sum(s))
elif topn == "auto":
ascend_prop_s = np.cumsum(s) / np.sum(s)
auto_topn = np.where(ascend_prop_s > self.threshold)[0][0]
self.u, self.s, self.vt = u[:, :auto_topn], s[:auto_topn], vt[:auto_topn, :]
else:
self.u, self.s, self.vt = u, s, vt
def get_atilde(self):
self.atilde = self.u.T @ self.x2 @ self.vt.T @ np.diag(1 / self.s)
def get_eig_atilde(self):
self.lamb, self.w = np.linalg.eig(self.atilde)
def get_dmd(self):
self.phi = self.x2 @ self.vt.T @ np.diag(1 / self.s) @ self.w
def predict_future(self, t: int):
pinv_phi_x0 = np.linalg.pinv(self.phi) @ self.x0.reshape(-1, 1)
atphi = self.phi @ np.diag(self.lamb ** t)
xt = (atphi @ pinv_phi_x0).reshape(-1,)
return np.real(xt)
def _predict(self, start: int, col_name: str = "YHAT"):
pred_x = np.arange(start, start + self.tstep)
pred_y = self.predict_future(start)
return pd.DataFrame(pred_y, index=pred_x, columns=[col_name])
def predict(self, start: int, col_name: str = "YHAT", coeff: float = 5.0):
pred_df = self._predict(start=start, col_name=col_name)
pred_arr = pred_df.to_numpy().ravel()
last_pred_df = self._predict(start=self.n_samples - self.tstep)
err = np.abs(self.last_data_frame().to_numpy().ravel() - last_pred_df.to_numpy().ravel())
upper = signal.savgol_filter(pred_arr + coeff * err, 51, 3)
upper = pd.DataFrame(upper, columns=["YHAT_UPPER"])
upper.index = pred_df.index
lower = signal.savgol_filter(pred_arr - coeff * err, 51, 3)
lower = pd.DataFrame(lower, columns=["YHAT_LOWER"])
lower.index = pred_df.index
concat_df = pd.concat([pred_df, upper, lower], axis=1)
return concat_df
def last_data_frame(self):
data_frame = pd.DataFrame(self.x2[:, -1], columns=["LAST"])
data_frame.index = data_frame.index + self.n_samples - self.tstep
return data_frame
def last_data_frame_predict(self):
out = self._predict(start=self.n_samples - self.tstep, col_name="YHAT_LAST")
return out
def get_envelope(
data_frame: pd.DataFrame,
window: int,
func: str = "mean",
column_name: str = "envelope"
) -> pd.DataFrame:
old_column_name = data_frame.columns[0]
out = data_frame[[old_column_name]]
out = out.rolling(window=window).agg(func=func).bfill().ffill().reset_index(drop=True)
out = out.rename(columns={old_column_name: column_name})
return out
```
Example: forecast the time series
```python=
import numpy as np
import pandas as pd
import plotly.express as px
from utils import DMD
def main():
t = 0.1 * np.arange(700)
df = pd.DataFrame(np.sin(0.4 * t) + 5e-1 * np.random.randn(t.shape[0]), columns=["raw"])
# url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv"
# df = pd.read_csv(url)
# df = df[["Temp"]]
dmd = DMD()
dmd.fit(df, tstep=500)
pred_df = dmd.predict(start=50)
print(pred_df)
fig = px.line(data_frame=pred_df)
fig.write_image("example.png")
if __name__ == "__main__":
main()
```
In the terminal, run and we should see the `temperature-data.png` figure.
```bash=
python dmd_example.py
```
Example result:

Explanation:
- The red part is the raw data
- The blue part is the fitted result at the beginning.
- The green part is the future predicted result.
---
## References
- Tu, J. H. (2013). Dynamic mode decomposition: Theory and applications (Doctoral dissertation, Princeton University). [[PDF]](https://arxiv.org/pdf/1312.0041.pdf)
- Grosek, J., & Kutz, J. N. (2014). Dynamic mode decomposition for real-time background/foreground separation in video. arXiv preprint arXiv:1404.7592. [[PDF]](https://arxiv.org/pdf/1404.7592.pdf)
###### tags: `dynamic-mode-decomposition`