# Keras_FCN
## import module
```python=
import matplotlib
# Force matplotlib to not use any Xwindows backend.
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from keras.models import Sequential, model_from_json, load_model
from keras.layers.core import Dense, Dropout, Flatten, Activation, SpatialDropout2D, Reshape, Lambda
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import ELU, PReLU, LeakyReLU
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import Convolution1D
from keras.optimizers import SGD
from keras.callbacks import ModelCheckpoint
from scipy.io import wavfile
import pdb
import scipy.io
# import librosa
import os
from os.path import join as ojoin
# os.environ["CUDA_VISIBLE_DEVICES"]="1"
import time
import numpy as np
import numpy.matlib
import argparse
import random
# import theano
# import theano.tensor as T
import tensorflow as tf
from keras.callbacks import TensorBoard
import keras.backend.tensorflow_backend as KTF
```
## 檢查是否有GPU
```python=
if tf.test.gpu_device_name():
print('GPU found')
else:
print("No GPU found")
config = tf.ConfigProto(log_device_placement=True)
config.gpu_options.allow_growth=True
session = tf.Session(config=config)
KTF.set_session(session)
random.seed(999)
```
## 定義讀檔路徑function
```python=
def get_filepaths(directory):
"""
This function will generate the file names in a directory
tree by walking the tree either top-down or bottom-up. For each
directory in the tree rooted at directory top (including top itself),
it yields a 3-tuple (dirpath, dirnames, filenames).
"""
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
if filename.endswith('.wav'):
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
# pdb.set_trace()
return file_paths # Self-explanatory.
```
```python=
mixed_file=get_filepaths('mixed_all_snr/')
cleaned_file=get_filepaths('clean')
```
```python=
#整理檔案路徑排序
#確認乾淨的答案有對應到混音完的檔案
clean_files=[]
for i in mixed_file:
clean_file='_'.join(i.split('/')[1].split('_')[:3])+'.wav'
for j in cleaned_file:
# print(j)
if clean_file == j.split('/')[-1]:
clean_files.append(j)
```
## 切分train-test data
```python=
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(mixed_file, clean_files, test_size=0.33, random_state=42)
X_train, X_validate, y_train, y_validate = train_test_split(X_train, y_train, test_size=0.33, random_state=42)
```
```python=
Train_Noisy_lists=X_train
Train_Clean_paths= y_train
Validate_Noisy_lists = X_validate
Validate_Clean_paths = y_validate
Test_Noisy_lists = X_test
Test_Clean_paths = y_test
Num_testdata=len(Validate_Noisy_lists)
Num_traindata=len(Train_Noisy_lists)
```
## 定義generator (for keras fit_generator)
```python=
def train_data_generator(noisy_list, clean_path):
index=0
while True:
#noisy, rate = librosa.load(noisy_list[index],sr=16000)
# print(noisy_list[index],index)
# pdb.set_trace()
rate, noisy = wavfile.read(noisy_list[index])
# while noisy.shape[0]/16000.>7: # Audio length <7s or OOM
# index += 1
# rate, noisy = wavfile.read(noisy_list[index])
noisy=noisy.astype('float32')
if len(noisy.shape)==2:
noisy=(noisy[:,0]+noisy[:,1])/2
# noisy=noisy/np.max(abs(noisy))
noisy=np.reshape(noisy,(1,np.shape(noisy)[0],1))
#clean, rate =librosa.load(clean_list[clean_wav_list.index(noisy_wav_list[index])],sr=16000)
rate, clean = wavfile.read(clean_path[index])
clean=clean.astype('float32')
if len(clean.shape)==2:
clean=(clean[:,0]+clean[:,1])/2
clean=clean/2**15
clean=np.reshape(clean,(1,np.shape(clean)[0],1))
index += 1
if index == len(noisy_list):
index = 0
# permute = list(range(len(noisy_list)))
# random.shuffle(permute)
# noisy_list=shuffle_list(noisy_list,permute)
yield noisy, clean
def val_data_generator(noisy_list, clean_path):
index=0
while True:
#noisy, rate = librosa.load(noisy_list[index],sr=16000)
rate, noisy = wavfile.read(noisy_list[index])
noisy=noisy.astype('float32')
if len(noisy.shape)==2:
noisy=(noisy[:,0]+noisy[:,1])/2
# noisy=noisy/np.max(abs(noisy))
noisy=np.reshape(noisy,(1,np.shape(noisy)[0],1))
rate, clean = wavfile.read(clean_path[index])
clean=clean.astype('float32')
if len(clean.shape)==2:
clean=(clean[:,0]+clean[:,1])/2
clean=clean/2**15
clean=np.reshape(clean,(1,np.shape(clean)[0],1))
index += 1
if index == len(noisy_list):
index = 0
yield noisy, clean
```
## 開始建模
```python=
start_time = time.time()
print('model building...')
model = Sequential()
model.add(Conv1D(30, 55, padding='same', input_shape=(None,1)))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(30, 55, padding='same'))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(30, 55, padding='same'))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(30, 55, padding='same'))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(30, 55, padding='same'))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(30, 55, padding='same'))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(30, 55, padding='same'))
model.add(BatchNormalization(axis=-1))
model.add(LeakyReLU())
#model.add(Dropout(0.06))
model.add(Conv1D(1, 55, padding='same'))
model.add(Activation('tanh'))
# model.load_weights('firsttry')
#sgd = SGD(lr=0.001, decay=5*1e-8, momentum=0.9, nesterov=True)
```
## 開始訓練
```python=
epoch=40
batch_size=1
model.compile(loss='mse', optimizer='adam')
with open('{}.json'.format('firsttry'),'w') as f: # save the model
f.write(model.to_json())
#checkpointer = ModelCheckpoint(filepath='{}.hdf5'.format('firsttry'), verbose=1, save_best_only=True, mode='min')
print ('training...')
g1 = train_data_generator(Train_Noisy_lists, Train_Clean_paths)
g2 = val_data_generator(Validate_Noisy_lists, Validate_Clean_paths)
tbCallBack = TensorBoard(log_dir='./logs', # log 目录
histogram_freq=0, # 按照何等频率(epoch)来计算直方图,0为不计算
# batch_size=32, # 用多大量的数据计算直方图
write_graph=True, # 是否存储网络结构图
write_grads=True, # 是否可视化梯度直方图
write_images=True,# 是否可视化参数
embeddings_freq=0,
embeddings_layer_names=None,
embeddings_metadata=None)
hist=model.fit_generator(g1,
samples_per_epoch=Num_traindata,
# samples_per_epoch=50,
nb_epoch=epoch,
verbose=1,
validation_data=g2,
nb_val_samples=Num_testdata,
max_q_size=1,
nb_worker=1,
pickle_safe=False,
)
```
## 畫Loss圖
```python=
# # plotting the learning curve
TrainERR=hist.history['loss']
ValidERR=hist.history['val_loss']
print ('@%f, Minimun error:%f, at iteration: %i' % (hist.history['val_loss'][epoch-1], np.min(np.asarray(ValidERR)),np.argmin(np.asarray(ValidERR))+1))
# print 'drawing the training process...'
plt.figure(4)
plt.plot(range(1,epoch+1),TrainERR,'b',label='TrainERR')
plt.plot(range(1,epoch+1),ValidERR,'r',label='ValidERR')
plt.xlim([1,epoch])
plt.legend()
plt.xlabel('epoch')
plt.ylabel('error')
plt.grid(True)
plt.show()
plt.savefig('Learning_curve_{}.png'.format('FCN_firsttry'), dpi=150)
end_time = time.time()
print ('The code for this file ran for %.2fm' % ((end_time - start_time) / 60.))
```
## 開始predict囉 並轉成音檔
```python=
maxv = np.iinfo(np.int16).max
for path in Test_Noisy_lists[:10]: # Ex: /mnt/hd-02/avse/testing/noisy/engine/1dB/1.wav
S=path.split('/')
# noise=S[-3]
# dB=S[-2]
wave_name=S[1]
rate, noisy = wavfile.read(path)
noisy=noisy.astype('float32')
if len(noisy.shape)==2:
noisy=(noisy[:,0]+noisy[:,1])/2
# noisy=noisy/np.max(abs(noisy))
noisy=np.reshape(noisy,(1,np.shape(noisy)[0],1))
enhanced=np.squeeze(model.predict(noisy, verbose=0, batch_size=batch_size))
# enhanced=enhanced/np.max(abs(enhanced))
enhanced=enhanced* maxv
enhanced_2=enhanced.astype('int16')
# enhanced=enhanced/2**15
wavfile.write(os.path.join("c", wave_name),16000,(enhanced).astype(np.int16))
# librosa.output.write_wav(os.path.join("/FCN", wave_name), (enhanced* maxv).astype(np.int16), 16000)
# librosa.output.write_wav(os.path.join("FCN_enhanced_MSE",noise, dB, wave_name), (enhanced* maxv).astype(np.int16), 16000)
```