# Dynamic Mode Decomposition [toc] --- ## Problem Statement Say we have a dynamic system $\dot{x} = f(x)$ with unknown $f$. We have collected states (or snapshots) $z_0, \ldots, z_n$ of this system. Each state $z_j$ represents an $m$-dimensional vector. We seek to recover $f$. --- ## Motivation Construct two matrices $X$ and $Y$ as follows: $$X=[z_0\ z_1\ ...\ z_{n-1}]$$ and $$Y=[z_1\ z_1\ ...\ z_n].$$ Both $X$ and $Y$ are $m\times n$ matrices. We assume the dynamics of the system is characterized by $$AX=Y\tag{1}$$ for some unknown linear operator $A$. In vector forms, $(1)$ means that $$A z_j = z_{j+1}\quad \forall j\le n-1.$$ The goal is to find the linear operator $A$, and the story of Dynamic Mode Decomposition starts here. Solving $A$ through $(1)$ is straightforward: $$A=YX^+\tag{2}$$ where $X^+$ denotes the pseudo-inverse of $X$. In most scenarios, however, we encounter that the dimension of the states $z_j$ is much larger than the total amount of the dataset. The number of rows of $X$ is much larger than that of columns of $X$. This implies that the matrix $A$ is in fact a low-rank matrix. Direct computation of $A$ via $(2)$ should be avoided. --- ## Exact DMD The dynamic mode decomposition (or DMD) for the pair $(X, Y)$ of matrices stated as above is defined to be the eigendecomposition of $A=YX^+$. In order to find the DMD for $(X, Y)$, we need to find two matrices $\Phi$ and $\Lambda$, the columns of $\Phi$ form the eigenvectors of $A$, and $\Lambda$ is diagonal with eigenvalues of $A$ as its entries, such that the following holds: $$A\Phi=\Phi\Lambda\tag{3}$$ --- ## Application of DMD in Time Series Forecasting After such $\Phi$ and $\Lambda$ as mentioned in $(3)$ are obtained, evaluation of the power of $A$ is nothing but: $$A^k=\Phi \Lambda^k \Phi^+\quad\forall k.\tag{4}$$ Now if our goal is to predict the state $z_{k+1}$ at time $k+1$, we know from $(1)$ that $$z_{k+1}=Az_k$$ and it follows immediately from $(4)$ that $$z_k = A^k z_0 = \Phi \Lambda^k \Phi^+z_0$$ --- ## Dynamic Mode Decomposition Algorithm To obtain the DMD of the pair $(X, Y)$, we need to tackle as follows: ### Step 1 Compute $X=U\Sigma V^*$, the rank-$r$ truncated SVD of $X$, which means that $U$ is $m\times r$, $\Sigma$ is $r\times r$, and $V^*$ is $r\times n.$ Since we assume that the dynamics of our system is governed by $(1)$, we immediately obtain another relations: $$Y=AX=AU\Sigma V^*\tag{5}$$ or $$YV\Sigma^{-1}=AU\tag{6}$$ ### Step 2 Compute the matrix $\tilde{A}$: $$\tilde{A}:=U^*YV\Sigma^{-1}\tag{7}$$ Observe from the RHS of $(7)$ that $$U^*YV\Sigma^{-1}=U^*AU,\tag{by (6)}$$ hence $$\tilde{A}=U^*AU,$$ or equivalently, $$\tilde{A}U=AU.$$ The main reasons why we want to pursuit this matrix are the following two: 1. Now $\tilde{A}$ is an $r\times r$ matrix, which means computation cost of eigendecomposition is cheap. 2. $\tilde{A}$ and $A$ has the same eigenvalues. ### Step 3 Compute the eigenvalue decomposition of $\tilde{A}$: $$\tilde{A}W=W\Lambda\tag{8}$$ where $W$ is consisting of eigenvectors of $\tilde{A}$ and $\Lambda$ is a diagonal matrix with eigenvalues as its diagonal entries. ### Step 4 Compute the matrix $\Phi$ defined as $$\Phi:=YV\Sigma^{-1}W\tag{9}.$$ It can be shown that the following relation hold: $$A\Phi = \Phi\Lambda\tag{10}$$ #### Proof of $(10)$ The argument proceeds as follows: $$\begin{align} A\Phi &=(YV\Sigma^{-1}U^*)(YV\Sigma^{-1}W)\tag{by (6) and (9)}\\ &=YV\Sigma^{-1}(U^*YV\Sigma^{-1})W\\ &=YV\Sigma^{-1}(\tilde{A}W)\tag{by (7)}\\ &=YV\Sigma^{-1}(W\Lambda)\tag{by (8)}\\ &=(YV\Sigma^{-1}W)\Lambda\\ &=\Phi\Lambda\tag{by (9)} \end{align}$$ --- ## Python Implementation of DMD Prepare the `requirements.txt` file ```bash= numpy pandas matplotlib ``` ```bash= pip install -r requirements.txt ``` `dmd.py`: ```python= class DMD: """ DMD v2 """ def __init__(self, threshold: float = 0.95, topn: int | None = None) -> None: self.threshold = threshold self.topn = topn self.X = None self.Y = None self.z0 = None self.A = None self.Atilde = None self.U = None self.S = None self.Vt = None self.Phi = None self.pinv_phi = None self.Lamb = None self.W = None def fit(self, X: np.ndarray, Y: np.ndarray): self.X = X self.Y = Y self.z0 = X[:, 0] self.svd_X(topn=self.topn) self.compute_atilde() self.compute_eig_atilde() self.compute_dmd() self.compute_pinv_phi() def svd_X(self, topn: int | None = None): if self.X is None: raise RuntimeError("The first matrix of the DMD pair is None") u, s, vt = np.linalg.svd(self.X, full_matrices=False) if topn is None: ascend_prop_s = np.cumsum(s) / np.sum(s) auto_topn = np.where(ascend_prop_s > self.threshold)[0][0] self.U, self.S, self.Vt = u[:, :auto_topn], s[:auto_topn], vt[:auto_topn, :] return self.U, self.S, self.Vt = u[:, :topn], s[:topn], vt[:topn, :] def compute_atilde(self): if self.U is None or self.Y is None or self.Vt is None or self.S is None: raise RuntimeError() self.Atilde = self.U.T @ self.Y @ self.Vt.T @ np.diag(1 / self.S) def compute_eig_atilde(self): if self.Atilde is None: raise RuntimeError() self.Lamb, self.W = np.linalg.eig(self.Atilde) def compute_dmd(self): if self.Y is None or self.Vt is None or self.S is None or self.W is None: raise RuntimeError() self.Phi = self.Y @ self.Vt.T @ np.diag(1 / self.S) @ self.W def compute_pinv_phi(self): if self.Phi is None: raise RuntimeError() self.pinv_phi = np.linalg.pinv(self.Phi) def get_zk(self, timestep: int, real: bool = True): zk = self.Phi @ np.diag(self.Lamb**timestep) @ self.pinv_phi @ self.z0 if real: return np.real(zk) return zk ``` ```python= def get_data(url: str) -> np.ndarray: df = pd.read_csv(url) return df[["Temp"]].to_numpy().ravel() def transform_univariate_timeseries_to_matrix( x: np.ndarray, window_shape: int ) -> np.ndarray: """ Given an univariate timeseries z_1, z_2, z_3, ..., z_m and window_shape (say `n`) This function will generate the following 2-dimensional numpy array whose number of rows equals n. -> z_1 z_2 ... z_2 z_3 z_3 z_4 . . . . . . z_n z_{n+1} ... """ arr = np.lib.stride_tricks.sliding_window_view(x=x, window_shape=window_shape).T return arr ``` ```python= url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv" x = get_data(url=url) x = x[:2000] ``` ```python= import matplotlib.pyplot as plt plt.plot(x) plt.plot(dmd.z0) ``` ```python= window_shape = 500 arr = transform_univariate_timeseries_to_matrix(x=x, window_shape=window_shape) print(arr, arr.shape) X = arr[:, :-1] Y = arr[:, 1:] dmd = DMD() dmd.fit(X=X, Y=Y) ``` ```python= n = 700 # plt.plot(x) lst = [] lst.append(dmd.z0) for i in range(1, n+1): zi = dmd.get_zk(timestep=i) lst.append(zi) # plt.plot(range(i, i+window_shape), zi) ``` ```python= k=700 plt.plot(lst[k]) plt.plot(dmd.X[:, k]) ``` ```python= from typing import Optional import numpy as np import pandas as pd from scipy import signal class DMD: def __init__(self, threshold: float = 0.7) -> None: self.threshold = threshold def get_dmd_pair(self, df: pd.DataFrame, tstep: int): y = df.to_numpy().flatten() self.n_samples = df.shape[0] self.tstep = tstep tensor = np.lib.stride_tricks.sliding_window_view(y, tstep).T self.x1 = tensor[:, :-1] self.x2 = tensor[:, 1:] self.x0 = self.x1[:, 0] print(f"Shapes of X and Y: {self.x1.shape}; tstep: {tstep}") def fit(self, df: pd.DataFrame, tstep: int, topn: Optional[int] = None): self.get_dmd_pair(df=df, tstep=tstep) self.svd_x1(topn=topn) self.get_atilde() self.get_eig_atilde() self.get_dmd() def svd_x1(self, topn: Optional[int] = "auto"): u, s, vt = np.linalg.svd(self.x1, full_matrices=False) if isinstance(topn, int): self.u, self.s, self.vt = u[:, :topn], s[:topn], vt[:topn, :] print(np.sum(s[:topn]) / np.sum(s)) elif topn == "auto": ascend_prop_s = np.cumsum(s) / np.sum(s) auto_topn = np.where(ascend_prop_s > self.threshold)[0][0] self.u, self.s, self.vt = u[:, :auto_topn], s[:auto_topn], vt[:auto_topn, :] else: self.u, self.s, self.vt = u, s, vt def get_atilde(self): self.atilde = self.u.T @ self.x2 @ self.vt.T @ np.diag(1 / self.s) def get_eig_atilde(self): self.lamb, self.w = np.linalg.eig(self.atilde) def get_dmd(self): self.phi = self.x2 @ self.vt.T @ np.diag(1 / self.s) @ self.w def predict_future(self, t: int): pinv_phi_x0 = np.linalg.pinv(self.phi) @ self.x0.reshape(-1, 1) atphi = self.phi @ np.diag(self.lamb ** t) xt = (atphi @ pinv_phi_x0).reshape(-1,) return np.real(xt) def _predict(self, start: int, col_name: str = "YHAT"): pred_x = np.arange(start, start + self.tstep) pred_y = self.predict_future(start) return pd.DataFrame(pred_y, index=pred_x, columns=[col_name]) def predict(self, start: int, col_name: str = "YHAT", coeff: float = 5.0): pred_df = self._predict(start=start, col_name=col_name) pred_arr = pred_df.to_numpy().ravel() last_pred_df = self._predict(start=self.n_samples - self.tstep) err = np.abs(self.last_data_frame().to_numpy().ravel() - last_pred_df.to_numpy().ravel()) upper = signal.savgol_filter(pred_arr + coeff * err, 51, 3) upper = pd.DataFrame(upper, columns=["YHAT_UPPER"]) upper.index = pred_df.index lower = signal.savgol_filter(pred_arr - coeff * err, 51, 3) lower = pd.DataFrame(lower, columns=["YHAT_LOWER"]) lower.index = pred_df.index concat_df = pd.concat([pred_df, upper, lower], axis=1) return concat_df def last_data_frame(self): data_frame = pd.DataFrame(self.x2[:, -1], columns=["LAST"]) data_frame.index = data_frame.index + self.n_samples - self.tstep return data_frame def last_data_frame_predict(self): out = self._predict(start=self.n_samples - self.tstep, col_name="YHAT_LAST") return out def get_envelope( data_frame: pd.DataFrame, window: int, func: str = "mean", column_name: str = "envelope" ) -> pd.DataFrame: old_column_name = data_frame.columns[0] out = data_frame[[old_column_name]] out = out.rolling(window=window).agg(func=func).bfill().ffill().reset_index(drop=True) out = out.rename(columns={old_column_name: column_name}) return out ``` Example: forecast the time series ```python= import numpy as np import pandas as pd import plotly.express as px from utils import DMD def main(): t = 0.1 * np.arange(700) df = pd.DataFrame(np.sin(0.4 * t) + 5e-1 * np.random.randn(t.shape[0]), columns=["raw"]) # url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv" # df = pd.read_csv(url) # df = df[["Temp"]] dmd = DMD() dmd.fit(df, tstep=500) pred_df = dmd.predict(start=50) print(pred_df) fig = px.line(data_frame=pred_df) fig.write_image("example.png") if __name__ == "__main__": main() ``` In the terminal, run and we should see the `temperature-data.png` figure. ```bash= python dmd_example.py ``` Example result: ![](https://i.imgur.com/hmgW8iN.png) Explanation: - The red part is the raw data - The blue part is the fitted result at the beginning. - The green part is the future predicted result. --- ## References - Tu, J. H. (2013). Dynamic mode decomposition: Theory and applications (Doctoral dissertation, Princeton University). [[PDF]](https://arxiv.org/pdf/1312.0041.pdf) - Grosek, J., & Kutz, J. N. (2014). Dynamic mode decomposition for real-time background/foreground separation in video. arXiv preprint arXiv:1404.7592. [[PDF]](https://arxiv.org/pdf/1404.7592.pdf) ###### tags: `dynamic-mode-decomposition`