DATA Fusion Skill === ###### tags: `機器人學` # Kalman Filter ![](https://i.imgur.com/QnLJodZ.png) want to know: - [ ] multi input - [x] - [ ] how about drift # EKF need Jacobian 我的策略:直接用微小誤差去微分 算出值 # UKF ![](https://i.imgur.com/bcvflJQ.png) ``` python #UKF test from scipy.linalg import sqrtm import numpy as np import sys reload(sys) sys.setdefaultencoding('utf-8') import matplotlib.pyplot as plt class UKF(): def __init__(self): self.x = 0 self.n_state = 0 self.n_sig = 0 self.y_measure = 0 self.landa = 0 self.alpha = 0 self.beta = 0 self.Q = 0 self.R = 0 self.p_x = 0 self.p_yy = 0 self.p_xy = 0 self.W_m = 0 self.W_c = 0 self.n_sig = 0 def param_update(self,n_state,n_output,landa,alpha,beta,Q,R,p_x): self.landa = landa self.alpha = alpha self.beta = beta self.Q = Q self.R = R self.p_x = p_x self.n_state = n_state self.n_sig = n_state*2+1 self.n_output = n_output ########### Get value of Weight ########3 self.W_m = np.zeros(self.n_sig) for i in range (self.n_sig): if i == 0: self.W_m[i] = self.landa/(self.n_state+self.landa) else: self.W_m[i] = 1/(2*(self.n_state+self.landa)) self.W_c= np.zeros(self.n_sig) for i in range (self.n_sig): if i == 0: self.W_c[i] = self.landa/((self.n_state+self.landa)+(1-self.alpha**2+self.beta)) else: self.W_c[i] = 1/(2*(self.n_state+self.landa)) def F(self,x): return np.array([x[0,:]+1,x[1,:]+2]) def update(self,x,y_measure): ####################################### ### predict X ####### ####################################### ######### unscentrate transform ######### ######### Get sigma of X ######### r = sqrtm(self.n_state*self.p_x) x = x.T X = x for i in range(self.n_state): X = np.vstack((X,x+r[i])) X = np.vstack((X,x-r[i])) X = X.T #print X ####### Get prediction of X(k) by X(k-1) ######## X_pre=self.F(X) #print('X(k) is') #print(X_pre) ####### Get mean of sigma ######## X_pre_mean = np.zeros((self.n_state,1)) for i in range(self.n_state): for j in range(self.n_sig): X_pre_mean[i,0] += X_pre[i,j]*self.W_m[j] #print('X_pre_mean is') #print(X_pre_mean) ####### X sigma minus X mean ######## X_diff = np.zeros((self.n_state,self.n_sig)) for i in range(self.n_sig): X_diff.T[i] = X_pre.T[i]-X_pre_mean.T #print('X diff is') #print(X_diff) ####### Get covarienc of X ######## p_k = np.zeros((self.n_state,self.n_state)) for i, val in enumerate(np.array_split(X_diff, self.n_sig, 1)): p_k += val.dot(val.T)*self.W_c[i] p_k += self.Q #print('p_k is') #print(p_k) ####################################### ##### Correction ###### ####################################### ########## predict Y by X ########## Y_pre = np.zeros((self.n_output,self.n_sig)) Y_pre[0,:]=(X[0,:]**2+X_pre[1,:]**2)**0.5 Y_pre[1,:]= np.arctan(X_pre[0,:]/X_pre[1,:]) #print('Y_pre is') #print(Y_pre) ########## get Y mean ########## Y_pre_mean = np.zeros((self.n_output,1)) for i in range(self.n_output): for j in range(self.n_sig): Y_pre_mean[i,0] += self.W_m[j]*Y_pre[i,j] #print('Y_pre_mean is') #print(Y_pre_mean) ####### Y sigma minus y mean ######## Y_pre_diff = np.zeros((self.n_output,self.n_sig)) for i in range(self.n_sig): Y_pre_diff.T[i] = Y_pre.T[i]-Y_pre_mean.T #print('Y_pre diff is: \n'+str(Y_pre_diff)) ####### Get covarienc of y ######## p_yy = np.zeros((self.n_state,self.n_state)) for i, val in enumerate(np.array_split(Y_pre_diff, self.n_sig, 1)): self.p_yy += val.dot(val.T)*self.W_c[i] #print('self.p_yy is') #print(self.p_yy) ####### Get covarienc of xy ######## p_xy = np.zeros((self.n_state,self.n_state)) for i, val in enumerate(zip(np.array_split(Y_pre_diff,self.n_sig,1),np.array_split(Y_pre_diff,self.n_sig,1))): p_xy += self.W_m[i]*val[0].dot(val[1].T) ####### Get Gain ######## K_k = p_xy.dot(np.linalg.inv(p_yy+R)) #print('K_k is:') #print K_k ####### Result ######## x_final = X_pre_mean+K_k.dot(y_measure - Y_pre_mean) #print ('X final is') #print (x_final) self.p_x = p_k- K_k.dot(p_yy+R).dot(K_k.T) #print ('P final is:') #print (self.p_x) return x_final if __name__ == '__main__': np.random.seed(1234) s1 = np.random.normal(0, 2, 201) s2 = np.random.normal(0, 2, 201) ############################################### #### param ########### ############################################### ########### State and OUTPUT measurement x = np.array([[0.001],[0.001]]) n_state = np.size(x) for i in range (200): x = np.hstack((x,[[x[0,i]+1+s1[i]],[x[1,i]+2+s1[i]]])) print x x_shape = np.shape(x) n_sig = n_state*2+1 n_output = 2 landa = 0. alpha = 1e-3 beta = 2. y = np.zeros((n_output,x_shape[1])) y[0,:]=(x[0,:]**2+x[1,:]**2)**0.5 y[1,:]= np.arctan(x[0,:]/x[1,:]) #print y ######### Get preliminary covarience ######## Q = np.array([[0.01,0],[0,0.01]]) R = np.array([[1,0],[0,1]]) p_x = np.array([[1.44,0.], [0., 2.89]]) ukf1 = UKF() ukf1.param_update(n_state,n_output,landa,alpha,beta,Q,R,p_x) data_x1 = [0] data_x2 = [0] data_origin1 = [0] data_origin2 = [0] for i,var in enumerate(zip(np.array_split(x,201,1),np.array_split(y,201,1))): #print i if i > 0: #print var_last x_opt = ukf1.update(x_last,var[1]) print 'haha',var[0] ,x_opt data_x1= np.append(data_x1,x_opt[0,0]) data_x2= np.append(data_x2,x_opt[1,0]) data_origin1= np.append(data_origin1,var[0][0]) data_origin2= np.append(data_origin2,var[0][1]) x_last = x_opt else: x_last = var[0] plt.plot(range(201),data_origin1,label='x_origin1') plt.plot(range(201),data_x1,label='x1') plt.plot(range(201),data_origin2,label='x_origin2') plt.plot(range(201),data_x2,label='x2') #print data_x1 #print data_x2 plt.legend() plt.show() ``` Result ![](https://i.imgur.com/aLIniH2.png)