DATA Fusion Skill
===
###### tags: `機器人學`
# Kalman Filter

want to know:
- [ ] multi input
- [x] - [ ] how about drift
# EKF
need Jacobian
我的策略:直接用微小誤差去微分 算出值
# UKF

``` python
#UKF test
from scipy.linalg import sqrtm
import numpy as np
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import matplotlib.pyplot as plt
class UKF():
def __init__(self):
self.x = 0
self.n_state = 0
self.n_sig = 0
self.y_measure = 0
self.landa = 0
self.alpha = 0
self.beta = 0
self.Q = 0
self.R = 0
self.p_x = 0
self.p_yy = 0
self.p_xy = 0
self.W_m = 0
self.W_c = 0
self.n_sig = 0
def param_update(self,n_state,n_output,landa,alpha,beta,Q,R,p_x):
self.landa = landa
self.alpha = alpha
self.beta = beta
self.Q = Q
self.R = R
self.p_x = p_x
self.n_state = n_state
self.n_sig = n_state*2+1
self.n_output = n_output
########### Get value of Weight ########3
self.W_m = np.zeros(self.n_sig)
for i in range (self.n_sig):
if i == 0:
self.W_m[i] = self.landa/(self.n_state+self.landa)
else:
self.W_m[i] = 1/(2*(self.n_state+self.landa))
self.W_c= np.zeros(self.n_sig)
for i in range (self.n_sig):
if i == 0:
self.W_c[i] = self.landa/((self.n_state+self.landa)+(1-self.alpha**2+self.beta))
else:
self.W_c[i] = 1/(2*(self.n_state+self.landa))
def F(self,x):
return np.array([x[0,:]+1,x[1,:]+2])
def update(self,x,y_measure):
#######################################
### predict X #######
#######################################
######### unscentrate transform #########
######### Get sigma of X #########
r = sqrtm(self.n_state*self.p_x)
x = x.T
X = x
for i in range(self.n_state):
X = np.vstack((X,x+r[i]))
X = np.vstack((X,x-r[i]))
X = X.T
#print X
####### Get prediction of X(k) by X(k-1) ########
X_pre=self.F(X)
#print('X(k) is')
#print(X_pre)
####### Get mean of sigma ########
X_pre_mean = np.zeros((self.n_state,1))
for i in range(self.n_state):
for j in range(self.n_sig):
X_pre_mean[i,0] += X_pre[i,j]*self.W_m[j]
#print('X_pre_mean is')
#print(X_pre_mean)
####### X sigma minus X mean ########
X_diff = np.zeros((self.n_state,self.n_sig))
for i in range(self.n_sig):
X_diff.T[i] = X_pre.T[i]-X_pre_mean.T
#print('X diff is')
#print(X_diff)
####### Get covarienc of X ########
p_k = np.zeros((self.n_state,self.n_state))
for i, val in enumerate(np.array_split(X_diff, self.n_sig, 1)):
p_k += val.dot(val.T)*self.W_c[i]
p_k += self.Q
#print('p_k is')
#print(p_k)
#######################################
##### Correction ######
#######################################
########## predict Y by X ##########
Y_pre = np.zeros((self.n_output,self.n_sig))
Y_pre[0,:]=(X[0,:]**2+X_pre[1,:]**2)**0.5
Y_pre[1,:]= np.arctan(X_pre[0,:]/X_pre[1,:])
#print('Y_pre is')
#print(Y_pre)
########## get Y mean ##########
Y_pre_mean = np.zeros((self.n_output,1))
for i in range(self.n_output):
for j in range(self.n_sig):
Y_pre_mean[i,0] += self.W_m[j]*Y_pre[i,j]
#print('Y_pre_mean is')
#print(Y_pre_mean)
####### Y sigma minus y mean ########
Y_pre_diff = np.zeros((self.n_output,self.n_sig))
for i in range(self.n_sig):
Y_pre_diff.T[i] = Y_pre.T[i]-Y_pre_mean.T
#print('Y_pre diff is: \n'+str(Y_pre_diff))
####### Get covarienc of y ########
p_yy = np.zeros((self.n_state,self.n_state))
for i, val in enumerate(np.array_split(Y_pre_diff, self.n_sig, 1)):
self.p_yy += val.dot(val.T)*self.W_c[i]
#print('self.p_yy is')
#print(self.p_yy)
####### Get covarienc of xy ########
p_xy = np.zeros((self.n_state,self.n_state))
for i, val in enumerate(zip(np.array_split(Y_pre_diff,self.n_sig,1),np.array_split(Y_pre_diff,self.n_sig,1))):
p_xy += self.W_m[i]*val[0].dot(val[1].T)
####### Get Gain ########
K_k = p_xy.dot(np.linalg.inv(p_yy+R))
#print('K_k is:')
#print K_k
####### Result ########
x_final = X_pre_mean+K_k.dot(y_measure - Y_pre_mean)
#print ('X final is')
#print (x_final)
self.p_x = p_k- K_k.dot(p_yy+R).dot(K_k.T)
#print ('P final is:')
#print (self.p_x)
return x_final
if __name__ == '__main__':
np.random.seed(1234)
s1 = np.random.normal(0, 2, 201)
s2 = np.random.normal(0, 2, 201)
###############################################
#### param ###########
###############################################
########### State and OUTPUT measurement
x = np.array([[0.001],[0.001]])
n_state = np.size(x)
for i in range (200):
x = np.hstack((x,[[x[0,i]+1+s1[i]],[x[1,i]+2+s1[i]]]))
print x
x_shape = np.shape(x)
n_sig = n_state*2+1
n_output = 2
landa = 0.
alpha = 1e-3
beta = 2.
y = np.zeros((n_output,x_shape[1]))
y[0,:]=(x[0,:]**2+x[1,:]**2)**0.5
y[1,:]= np.arctan(x[0,:]/x[1,:])
#print y
######### Get preliminary covarience ########
Q = np.array([[0.01,0],[0,0.01]])
R = np.array([[1,0],[0,1]])
p_x = np.array([[1.44,0.], [0., 2.89]])
ukf1 = UKF()
ukf1.param_update(n_state,n_output,landa,alpha,beta,Q,R,p_x)
data_x1 = [0]
data_x2 = [0]
data_origin1 = [0]
data_origin2 = [0]
for i,var in enumerate(zip(np.array_split(x,201,1),np.array_split(y,201,1))):
#print i
if i > 0:
#print var_last
x_opt = ukf1.update(x_last,var[1])
print 'haha',var[0] ,x_opt
data_x1= np.append(data_x1,x_opt[0,0])
data_x2= np.append(data_x2,x_opt[1,0])
data_origin1= np.append(data_origin1,var[0][0])
data_origin2= np.append(data_origin2,var[0][1])
x_last = x_opt
else:
x_last = var[0]
plt.plot(range(201),data_origin1,label='x_origin1')
plt.plot(range(201),data_x1,label='x1')
plt.plot(range(201),data_origin2,label='x_origin2')
plt.plot(range(201),data_x2,label='x2')
#print data_x1
#print data_x2
plt.legend()
plt.show()
```
Result
