# Keras_FCN ## import module ```python= import matplotlib # Force matplotlib to not use any Xwindows backend. matplotlib.use('Agg') import matplotlib.pyplot as plt from keras.models import Sequential, model_from_json, load_model from keras.layers.core import Dense, Dropout, Flatten, Activation, SpatialDropout2D, Reshape, Lambda from keras.layers.normalization import BatchNormalization from keras.layers.advanced_activations import ELU, PReLU, LeakyReLU from keras.layers.convolutional import Conv1D from keras.layers.convolutional import Convolution1D from keras.optimizers import SGD from keras.callbacks import ModelCheckpoint from scipy.io import wavfile import pdb import scipy.io # import librosa import os from os.path import join as ojoin # os.environ["CUDA_VISIBLE_DEVICES"]="1" import time import numpy as np import numpy.matlib import argparse import random # import theano # import theano.tensor as T import tensorflow as tf from keras.callbacks import TensorBoard import keras.backend.tensorflow_backend as KTF ``` ## 檢查是否有GPU ```python= if tf.test.gpu_device_name(): print('GPU found') else: print("No GPU found") config = tf.ConfigProto(log_device_placement=True) config.gpu_options.allow_growth=True session = tf.Session(config=config) KTF.set_session(session) random.seed(999) ``` ## 定義讀檔路徑function ```python= def get_filepaths(directory): """ This function will generate the file names in a directory tree by walking the tree either top-down or bottom-up. For each directory in the tree rooted at directory top (including top itself), it yields a 3-tuple (dirpath, dirnames, filenames). """ file_paths = [] # List which will store all of the full filepaths. # Walk the tree. for root, directories, files in os.walk(directory): for filename in files: if filename.endswith('.wav'): # Join the two strings in order to form the full filepath. filepath = os.path.join(root, filename) file_paths.append(filepath) # Add it to the list. # pdb.set_trace() return file_paths # Self-explanatory. ``` ```python= mixed_file=get_filepaths('mixed_all_snr/') cleaned_file=get_filepaths('clean') ``` ```python= #整理檔案路徑排序 #確認乾淨的答案有對應到混音完的檔案 clean_files=[] for i in mixed_file: clean_file='_'.join(i.split('/')[1].split('_')[:3])+'.wav' for j in cleaned_file: # print(j) if clean_file == j.split('/')[-1]: clean_files.append(j) ``` ## 切分train-test data ```python= from sklearn.model_selection import train_test_split X_train, X_test, y_train, y_test = train_test_split(mixed_file, clean_files, test_size=0.33, random_state=42) X_train, X_validate, y_train, y_validate = train_test_split(X_train, y_train, test_size=0.33, random_state=42) ``` ```python= Train_Noisy_lists=X_train Train_Clean_paths= y_train Validate_Noisy_lists = X_validate Validate_Clean_paths = y_validate Test_Noisy_lists = X_test Test_Clean_paths = y_test Num_testdata=len(Validate_Noisy_lists) Num_traindata=len(Train_Noisy_lists) ``` ## 定義generator (for keras fit_generator) ```python= def train_data_generator(noisy_list, clean_path): index=0 while True: #noisy, rate = librosa.load(noisy_list[index],sr=16000) # print(noisy_list[index],index) # pdb.set_trace() rate, noisy = wavfile.read(noisy_list[index]) # while noisy.shape[0]/16000.>7: # Audio length <7s or OOM # index += 1 # rate, noisy = wavfile.read(noisy_list[index]) noisy=noisy.astype('float32') if len(noisy.shape)==2: noisy=(noisy[:,0]+noisy[:,1])/2 # noisy=noisy/np.max(abs(noisy)) noisy=np.reshape(noisy,(1,np.shape(noisy)[0],1)) #clean, rate =librosa.load(clean_list[clean_wav_list.index(noisy_wav_list[index])],sr=16000) rate, clean = wavfile.read(clean_path[index]) clean=clean.astype('float32') if len(clean.shape)==2: clean=(clean[:,0]+clean[:,1])/2 clean=clean/2**15 clean=np.reshape(clean,(1,np.shape(clean)[0],1)) index += 1 if index == len(noisy_list): index = 0 # permute = list(range(len(noisy_list))) # random.shuffle(permute) # noisy_list=shuffle_list(noisy_list,permute) yield noisy, clean def val_data_generator(noisy_list, clean_path): index=0 while True: #noisy, rate = librosa.load(noisy_list[index],sr=16000) rate, noisy = wavfile.read(noisy_list[index]) noisy=noisy.astype('float32') if len(noisy.shape)==2: noisy=(noisy[:,0]+noisy[:,1])/2 # noisy=noisy/np.max(abs(noisy)) noisy=np.reshape(noisy,(1,np.shape(noisy)[0],1)) rate, clean = wavfile.read(clean_path[index]) clean=clean.astype('float32') if len(clean.shape)==2: clean=(clean[:,0]+clean[:,1])/2 clean=clean/2**15 clean=np.reshape(clean,(1,np.shape(clean)[0],1)) index += 1 if index == len(noisy_list): index = 0 yield noisy, clean ``` ## 開始建模 ```python= start_time = time.time() print('model building...') model = Sequential() model.add(Conv1D(30, 55, padding='same', input_shape=(None,1))) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(30, 55, padding='same')) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(30, 55, padding='same')) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(30, 55, padding='same')) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(30, 55, padding='same')) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(30, 55, padding='same')) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(30, 55, padding='same')) model.add(BatchNormalization(axis=-1)) model.add(LeakyReLU()) #model.add(Dropout(0.06)) model.add(Conv1D(1, 55, padding='same')) model.add(Activation('tanh')) # model.load_weights('firsttry') #sgd = SGD(lr=0.001, decay=5*1e-8, momentum=0.9, nesterov=True) ``` ## 開始訓練 ```python= epoch=40 batch_size=1 model.compile(loss='mse', optimizer='adam') with open('{}.json'.format('firsttry'),'w') as f: # save the model f.write(model.to_json()) #checkpointer = ModelCheckpoint(filepath='{}.hdf5'.format('firsttry'), verbose=1, save_best_only=True, mode='min') print ('training...') g1 = train_data_generator(Train_Noisy_lists, Train_Clean_paths) g2 = val_data_generator(Validate_Noisy_lists, Validate_Clean_paths) tbCallBack = TensorBoard(log_dir='./logs', # log 目录 histogram_freq=0, # 按照何等频率(epoch)来计算直方图,0为不计算 # batch_size=32, # 用多大量的数据计算直方图 write_graph=True, # 是否存储网络结构图 write_grads=True, # 是否可视化梯度直方图 write_images=True,# 是否可视化参数 embeddings_freq=0, embeddings_layer_names=None, embeddings_metadata=None) hist=model.fit_generator(g1, samples_per_epoch=Num_traindata, # samples_per_epoch=50, nb_epoch=epoch, verbose=1, validation_data=g2, nb_val_samples=Num_testdata, max_q_size=1, nb_worker=1, pickle_safe=False, ) ``` ## 畫Loss圖 ```python= # # plotting the learning curve TrainERR=hist.history['loss'] ValidERR=hist.history['val_loss'] print ('@%f, Minimun error:%f, at iteration: %i' % (hist.history['val_loss'][epoch-1], np.min(np.asarray(ValidERR)),np.argmin(np.asarray(ValidERR))+1)) # print 'drawing the training process...' plt.figure(4) plt.plot(range(1,epoch+1),TrainERR,'b',label='TrainERR') plt.plot(range(1,epoch+1),ValidERR,'r',label='ValidERR') plt.xlim([1,epoch]) plt.legend() plt.xlabel('epoch') plt.ylabel('error') plt.grid(True) plt.show() plt.savefig('Learning_curve_{}.png'.format('FCN_firsttry'), dpi=150) end_time = time.time() print ('The code for this file ran for %.2fm' % ((end_time - start_time) / 60.)) ``` ## 開始predict囉 並轉成音檔 ```python= maxv = np.iinfo(np.int16).max for path in Test_Noisy_lists[:10]: # Ex: /mnt/hd-02/avse/testing/noisy/engine/1dB/1.wav S=path.split('/') # noise=S[-3] # dB=S[-2] wave_name=S[1] rate, noisy = wavfile.read(path) noisy=noisy.astype('float32') if len(noisy.shape)==2: noisy=(noisy[:,0]+noisy[:,1])/2 # noisy=noisy/np.max(abs(noisy)) noisy=np.reshape(noisy,(1,np.shape(noisy)[0],1)) enhanced=np.squeeze(model.predict(noisy, verbose=0, batch_size=batch_size)) # enhanced=enhanced/np.max(abs(enhanced)) enhanced=enhanced* maxv enhanced_2=enhanced.astype('int16') # enhanced=enhanced/2**15 wavfile.write(os.path.join("c", wave_name),16000,(enhanced).astype(np.int16)) # librosa.output.write_wav(os.path.join("/FCN", wave_name), (enhanced* maxv).astype(np.int16), 16000) # librosa.output.write_wav(os.path.join("FCN_enhanced_MSE",noise, dB, wave_name), (enhanced* maxv).astype(np.int16), 16000) ```