Try   HackMD

Hidden Markov Model implement in python

Source code

如果你不想看那麼多字想直接看程式碼,可以直接點上面的連結,程式碼都有註解,應該不難懂。
不過我都寫了還是希望可以看一下,拜託啦XD

前言

雖說標題是HMM,但這篇會著重在HMM中的Baum-Welch演算法,也就是用來訓練HMM的演算法。這篇文章會用python來實作,並且會用到numpy,所以如果你不熟悉python或numpy,可以先看看這篇文章:Python Numpy Tutorial

HMM

所謂的HMM指的是在不同狀態(status)下,會產生的結果(或是序列),所以很常被使用在基因體序列的預測。例如:在基因體中,有A、T、C、G四種碱基,而在不同的狀態下,會產生不同的碱基,所以我們可以用HMM來預測基因體的碱基序列。

關於HMM的基本概念,可以參考一系列影片,我自己覺得說明的很清楚:Hidden Markov Model

這裡的重點是會使用到以下幾個HMM的參數:

  • 初始機率(initial probability): \(\pi\)
  • 轉換機率(transition probability): A
  • 發射機率(emission probability): B
  • 觀測到的序列(observation sequence): O

這裡的轉換機率和發射機率都是一個二維矩陣,依照狀態和序列種類來決定,而初始機率和觀測到的序列都是一個一維陣列。

  • \(\pi\)矩陣的長度就是狀態的種類
  • A矩陣的row和column都是狀態的種類
  • B矩陣的長寬都是序列的種類
  • O矩陣的長度就是觀測到的序列的長度。
  • EX:
    • \(\pi\) = [0.2, 0.4] => 有兩種狀態,初始機率分別為0.2和0.4
    • A = [[0.5, 0.2], [0.3, 0.5]] => 有兩種狀態,狀態0轉換到狀態0和1機率分別為0.5和0.2,狀態1轉換到狀態0和1機率分別為0.3和0.5
    • B = [[0.5, 0.5], [0.4, 0.6]] => 有兩種序列,狀態0產生序列0和1機率分別為0.5和0.5,狀態1產生序列0和1機率分別為0.4和0.6
    • O = [0, 1, 0] => 觀測到的序列為 010

Forward Algorithm

在介紹Baum-Welch演算法之前我必須先介紹Forward Algorithm,因為Baum-Welch演算法是基於Forward Algorithm來做的。Forward Algorithm是用來計算在某個時間點t,觀測到某個序列O的機率,也就是P(O|λ)。這裡的λ是HMM的參數,包含初始機率、轉換機率、發射機率。

Forward Algorithm的終極目標是求得\(\alpha\),也就是P(O|λ)。

  1. 要求\(\alpha\),首先要先求得\(\alpha_{statej}(i)\),也就是在時間點t=0[註1],觀測到的序列為i的機率,也就是\(P(O_1|λ)\)。寫成這樣好像有點難懂,我們畫個表格來看看(這裡假設觀測到的序列O長度為5,所以時間長度也是5,state只有兩種):

    t = 0 t = 1 t = 2 t = 3 t = 4
    state = 0 \(\alpha_0(state_0)\) \(\alpha_1(state_0)\) \(\alpha_2(state_0)\) \(\alpha_3(state_0)\) \(\alpha_4(state_0)\)
    state = 1 \(\alpha_0(state_1)\) \(\alpha_1(state_1)\) \(\alpha_2(state_1)\) \(\alpha_3(state_1)\) \(\alpha_4(state_1)\)

    所以要幹嘛就很清楚了,把這個表格填滿就好了。

  2. 接著我們要知道計算\(\alpha\)的公式

    1. 初始值:
      • \[\alpha_0(state_j) = \pi_j * b_j(O_1)\]
      • 中文解釋:一開始產生狀態\(j\)的機率\(\pi_j\)乘上狀態j產生序列\(O_1\)的機率\(b_j(O_1)\)
    2. 用迴圈計算剩下的\(\alpha\)值:
      • \[\alpha_t(state_j) = {\sum_{i=1}^n}\alpha_{t-1}(i)a_{ij}b_{j}(\vec{x_{t}})\]
      • 中文解釋: 假設前一個時間點是\(state_j\),他產生\(state_j\)的機率乘\(state_j\)產生\(O_t\)的機率總和。簡單來說綜合前面算的幾種可能的機率來算出這個時間點這個已知序列產生的機率(n是state的數量)。
    3. 把所有的\(\alpha\)加起來就是P(O|λ)了。

Underflow and Scaling Solution

在計算\(\alpha\)的時候,會有一個問題,就是數字會太小,所以會有underflow的問題(電腦存不到那麼小的數字)。為了解決這個問題,我們可以把\(\alpha\)乘上一個常數,這樣就不會underflow了,而這個常數稱為scaling coefficient

公式會調整成這樣:

  • \[\hat{\alpha}_t(j) = \frac{{\sum_{i=1}^n}\hat{\alpha}_{t-1}(i)a_{ij}b_{j}(\vec{x_{t}})}{\sum_{l=1}^{n}{\sum_{i=1}^n}\hat{\alpha}_{t-1}(i)a_{il}b_{l}(\vec{x_{t}})}\]

你可能會問說那個scaling coefficient在哪?其實就是\(\sum_{l=1}^{n}{\sum_{i=1}^n}\hat{\alpha}_{t-1}(i)a_{il}b_{l}(\vec{x_{t}})\)這個值,也就是\(\alpha\)的分母,我們用C來表示。

再來我們一個一個步驟來重新解析:

  1. 初始值:
    \[ \alpha_0(state_j) = \pi_j * b_j(O_1) \]
    \[ C_0 = \sum_{i=1}^n\alpha_{0}(i) \]
    \[ \hat{\alpha}_0(state_j) = \frac{\pi_j * b_j(O_1)}{C_0} \]
  2. 使用前一個狀態的\(\hat{\alpha}\)來計算現在這個狀態的\(\alpha\),再用\(\alpha\)來計算新的\(C\),最後用\(\alpha\)\(C\)來計算新的\(\hat{\alpha}\)
    \[ \alpha_t(state_j) = {\sum_{i=1}^n}\hat{\alpha}_{t-1}(i)a_{ij}b_{j}(\vec{x_{t}}) \]
    \[ C_t = \sum_{j=1}^n\alpha_{t}(j) \]
    \[ \hat{\alpha}_t(state_j) = \frac{\alpha_t(state_j)}{C_t} \]
  3. 把所有的\(C_t\)\(log\)加起來再乘上負號就是\(logP(O|λ)\)[註2]了。
    \[ logP(O|λ) = -\sum_{t=1}^Tlog(C_t) \]

implement in python

def Forward(O, a, b, pi, isBW=False): # init parameters # scaled alpha (hat alpha) scaled_alpha = np.zeros((O.shape[0], a.shape[0])) alpha = np.zeros(a.shape[0]) # alpha C = np.zeros(O.shape[0]) # scaling factor C[0] = np.sum(pi * b[:, O[0]]) scaled_alpha[0, :] = (pi * b[:, O[0]]) / C[0] # init scaled alpha P = 0 # probability of the observation # compute alpha, t: time t for t in range(1, O.shape[0]): for j in range(a.shape[0]): alpha[j] = scaled_alpha[t - 1] @ a[:, j] * b[j, O[t]] C[t] += alpha[j] C[t] = 1 / C[t] P += math.log2(C[t]) scaled_alpha[t, :] = alpha * C[t] # update alpha P *= -1 print("Oberservation Probability: ", P) ## if is called by Baum-Welch, return scaled_alpha ## if is called by other function, return P if isBW: return scaled_alpha else: return P

因為篇幅關係程式碼解說請點這裡

Backward Algorithm

Backward其實想法跟Forward很相似,不過他是從最後一個\(\alpha\)算回來,這也是他們分別被稱為Forward與Backward的原因。
為了與Forward的\(\alpha\)做區別,我們把Backward要求的稱為\(\beta\)

Backward的終極目標是求得\(\beta\)

  1. 我們把\(\alpha\)搬過來照用。

    t = 0 t = 1 t = 2 t = 3 t = 4
    state = 0 \(\beta_0(state_0 )\) \(\beta_1(state_0 )\) \(\beta_2(state_0)\) \(\beta_3(state_0)\) \(\beta_4(state_0)\)
    state = 1 \(\beta_0(state_1)\) \(\beta_1(state_1)\) \(\beta_2(state_1)\) \(\beta_3(state_1)\) \(\beta_4(state_1)\)
  2. \(\beta公式\):

    1. 初始值[註3]:
      • \[ \beta_T(state_j) = 1 \]
    2. 用迴圈計算剩下的\(\beta\)值:
      • \[ \beta_t(state_j) = {\sum_{i=1}^n}a_{ij}b_{j}(\vec{x_{t}})\beta_{t+1}(j) \]
      • 中文解釋: 假設現在的狀態是j,產生下一個狀態j的機率乘上狀態j產生序列\(O_t\)的機率總和再乘上下一個狀態發生的機率\(\beta_t\),簡單來說依照下一個發生的機率來計算現在的機率。
    3. 把所有\(\beta\)相加即得到答案

Underflow and Scaling Solution

既然\(\alpha\)會遇到underflow的問題,\(\beta\)自然也會,所以我們也要使用相同技巧來計算\(\beta\)

公式會調整成這樣:

  • \[\hat{\beta}_t(j) = \frac{{\sum_{i=1}^n}a_{ij}b_{j}(\vec{x_{t}})\hat{\beta}_{t+1}(j)}{\sum_{l=1}^{n}{\sum_{i=1}^n}a_{il}b_{l}(\vec{x_{t}})\hat{\beta}_{t+1}(l)}\]

\(\hat{\beta}\)的分母同樣是我們的scaling factor C。

拆成一個一個步驟來看:

  1. 初始值:
    \[ \hat{\beta}_T(state_j) = 1 \]
    跟scaling前一模一樣,因為我們是從最後一個開始算,\(\hat{\beta}\)直接設為1不用初始化C和\(\beta\)來求。
  2. 使用後一個狀態的\(\hat{\beta}\)來計算現在這個狀態的\(\beta\),再用\(\beta\)來計算新的C,最後用\(\beta\)\(C\)來計算新的\(\hat{\beta}\)
    \[ \beta_t(state_j) = {\sum_{i=1}^n}a_{ij}b_{j}(\vec{x_{t}})\hat{\beta}_{t+1}(j) \]
    \[ C_t = \sum_{j=1}^n\beta_{t+1}(j) \]
    \[ \hat{\beta}_t(state_j) = \frac{\beta_t(state_j)}{C_t} \]
  3. 把所有的\(C_t\)\(log\)加起來再乘上負號就是\(logP(O|λ)\)了[註4]。
    \[ logP(O|λ) = -\sum_{t=1}^Tlog(C_t) \]

implement in python

def Backward(O, a, b): # init beta scaled_beta = np.zeros((O.shape[0], a.shape[0])) # scaled beta beta = np.zeros(a.shape[0]) C = np.zeros(O.shape[0]) P = 0 # probability of the backward observation # setting beta(T) = 1 scaled_beta[O.shape[0] - 1] = np.ones((a.shape[0])) # compute beta (from time t-1 to 1) for t in range(O.shape[0] - 2, -1, -1): for j in range(a.shape[0]): beta[j] = (scaled_beta[t + 1] * b[:, O[t + 1]]) @ a[j, :] C[t + 1] += beta[j] C[t + 1] = 1 / C[t + 1] P += math.log2(C[t + 1]) scaled_beta[t, :] = beta * C[t + 1] P *= -1 # print("Oberservation Backward Probability: ", P) return scaled_beta

因為篇幅關係程式碼解說請點這裡

Baum-Welch Algorithm

講了這麼多終於要進入本篇的重頭戲-BaumWelch演算法了,BW演算法正是利用前面的Forward和Backward所計算出來的\(\alpha\)\(\beta\)來計算更新我們的transition probability和emission probability。

在HMM的概念中,我們根本不會知道最一開始的A和B是多少,所以才需要BW演算法來求得最有可能的A和B。

\(\xi\)\(\gamma\)

BW中有兩個很重要的參數,一個是\(\xi\)(發音為xi),另一個是\(\gamma\),我們正是利用這兩個參數來更新A和B

\(\xi\)

先來講解\(\xi\),它代表了當time = t時,在state i,⽽當time = t + 1時,在state j的機率(\(\xi\)會考慮下個時間點的狀態,所以當前時間點是狀態0的話會產生兩個\(\xi\),分別是狀態0和狀態1,所以在儲存\(\xi\)時會用到三維矩陣)

xi

公式:
\[ \xi_t(i, j) = \frac{a_t(i)a_{ij}b_j(O_{t+1})\beta_{t+1}(j)}{\sum_{i=1}^n\sum_{j=1}^na_t(i)a_{ij}b_j(O_{t+1})\beta_{t+1}(j)} \]

\(\gamma\)

接著是\(\gamma\),也就是當time = t時,在state i的機率(和\(\xi\)不同不考慮下一個時間點的機率)。

公式:
\[ \gamma_t(state_i) = \frac{\alpha_t(i)\beta_t(i)}{\sum_{state_j}\alpha_t(j)\beta_t(j)} \]

此外,\(\gamma\)\(\xi\)是有關係的: \(\gamma_t(state_i)=\sum_{j=1}^n\xi_t(statei, statej)\),意即把當前時間點同個狀態的\(\xi\)全加起來就是\(\gamma\)

更新A與B

  • A: 首先A代表前一個狀態產生下一個狀態的機率,所以用\(\xi\)除以\(\gamma\)就可以得到新的機率。
    • 公式:
      \[ a_{ij} = \frac{\sum_{t=1}^{T-1}\xi_t(i,j)}{\sum_{t=1}^{T-1}\gamma_t(i)} \]
  • B: B代表狀態i產生各種序列的機率,所以找出對應序列k出現的\(\gamma\)再除上\(\xi\)
    • 公式:
      \[ b_j(k) = \frac{\sum_{t=1}^T,s.t.o_t=v_k\gamma(j)}{\sum_{t=1}^T\gamma_t(j)} \]

恭喜,我們現在有了新的A和B了,我們只要不斷重複這步驟就可以找到更好的A和B,至於要跑幾次就交給你決定了。

implement in python

def BaumWelch(O, a, b, pi, n_iter=100): N = a.shape[0] T = len(O) # total time for i in range(n_iter): alpha = Forward(O, a, b, pi, True) # print("alpha:", alpha) beta = Backward(O, a, b) # print("beta:", beta) xi = np.zeros((N, N, T - 1)) # init xi # compute xi # scaled alpha and beta wont effect xi # xi: probability of being in state i at time t and state j at time t + 1 for t in range(T - 1): denominator = np.dot( np.dot(alpha[t, :].T, a) * b[:, O[t + 1]].T, beta[t + 1, :]) for i in range(N): numerator = alpha[t, i] * a[i, :] * \ b[:, O[t + 1]].T * beta[t + 1, :].T xi[i, :, t] = numerator / denominator # use xi to compute gamma # scaled alpha and beta wont effect xi # gamma : probability of being in state i at time t (fix i and t then sum all j) gamma = np.sum(xi, axis=1) # use xi and gamma to update a # sum xi and gamma over time t # new a = sum(xi) / sum(gamma) # because sum(xi, 2) is two demension and sum(gamma, axis=1) is one demension, gamma need to be expand to two demension # fix row, reshape column a = np.sum(xi, 2) / np.sum(gamma, axis=1).reshape((-1, 1)) # Add additional T'th element in gamma (because xi is T-1) # hsack: stack arrays in sequence horizontally (column wise) # use the origin gamma formula to compute the last gamma denominator = alpha[T - 1] @ beta[T - 1] gamma = np.hstack((gamma, np.expand_dims( (alpha[T - 1] * beta[T - 1]) / denominator, axis=1))) # use gamma to update b # K is the number of observation types (four in this case) K = b.shape[1] denominator = np.sum(gamma, axis=1) for l in range(K): b[:, l] = np.sum(gamma[:, O == l], axis=1) b = np.divide(b, denominator.reshape((-1, 1))) return a,

同樣因為篇幅關係我把程式碼解說放在這裡

參考資料

註釋

註1

一般在講解HMM的時候,都會從時間點t=1開始,
但python的陣列是從index=0開始,所以我們會從時間點t=0開始。

註2

這裡用\(log\)來表示機率也是因為underflow的關係。

註3

\(\alpha\)不同,\(\beta\)沒有初始機率,所以一律將最後的狀態設為1。

註4

程式碼在實作時不會用到這行,因為如果是計算BW演算法只需要\(\beta\)矩陣,而單純計算序列機率只要Forward即可。