# 智慧計算與規劃WEEK8 - CNN實作
###### tags: `計算智慧與規劃`
- [Tensorflow教學](https://hackmd.io/eo6cYw5JQpafUUrlpsDROg#Playground)
## 問題種類
### prediction 預測
- 不可以做預測
- value => 有大小關係
### classification 分類
- 可以做預測
- label => 沒有大小關係
- supervised, 有training data的
### clustering 分群
- 有特徵, label
- 訓練集和測試集
- unsupervised
## 方法
### 回歸
- linear 線性回歸
- nonlinear 非線性回歸
### 類神經網路
- NN
> 現代神經網路是一種非線性統計性資料建模工具
> 基於數學統計學類型的學習方法(Learning Method)得以最佳化
## Google MLCC
https://developers.google.com/machine-learning/crash-course/ml-intro
- 
- 上圖左邊是特徵,右邊是分類情況
- layer 之間線條粗細不同, 代表連結強度
- 無法用線條分類,有可能是因為**問題太複雜**或**特徵不夠**, **特徵取的不好**
- 
- 改成非線性
- 
## 實作
### 目標: 預測PM2.5值
- 訓練集(8643筆)

- 測試集(569筆)

- 訓練目標: 由訓練集訓練model,預測測試集的PM2.5值
## 程式碼
### dataLoader
``` python=
import tensorflow as tf
import numpy as np
import pandas as pd
class DataLoader():
def __init__(self):
trainData = pd.read_csv("train.csv", sep=",")
testData = pd.read_csv("test.csv", sep=",")
# 資料從訓練集和測試集取第3欄後面的值
self.__trainData = [ x[3:] for x in trainData.values ]
self.__testData = [ x[3:] for x in testData.values]
self.argsLength = len(self.__testData[0])
self.num_train_data = len(self.__trainData)
self.train_data = tf.constant(self.__trainData)
self.test_data = tf.constant(self.__testData)
self.train_label = tf.constant([ x[2] for x in trainData.values ])
self.test_label = tf.constant([ x[2] for x in testData.values ])
#忽略哪幾個參數
def ignore(self, ignoreList):
need = set([ i for i in range(self.argsLength) ]) - set(ignoreList)
self.train_data = tf.constant([ [ x[i] for i in need ] for x in self.__trainData ])
self.test_data = tf.constant([ [ x[i] for i in need ] for x in self.__testData ])
def get_batch(self, batch_size):
index = np.random.randint(0, self.num_train_data, batch_size)
train_data = tf.constant([self.train_data[i].numpy() for i in index])
train_label = tf.constant([self.train_label[i].numpy() for i in index])
return train_data, train_label
```
### linear
```python=
import tensorflow as tf
import dataLoader
data = dataLoader.DataLoader()
# 忽略哪幾個參數
data.ignore([])
X, y = data.train_data, data.train_label
# keras.Sequential:一個高級api把全部訓練model的東西都包在裡面
model = tf.keras.Sequential(name="Linertest")
# units=1 輸出數1個,activation='linear':做線性規劃的動作
model.add(tf.keras.layers.Dense(units=1, activation='linear'))
# SGD:梯度下降 learning_rate=0.01 : 每次移動0.01
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
loss="mae",
metrics=[tf.keras.metrics.RootMeanSquaredError()])
# batch_size 每次訓練的資料筆數 -> 每次拿1000比資料測試他
# 如果裡面有8600筆資料,每次給機器1000筆資料8600/1000=9(無條件進位),那他會訓練9次
batch_size = 1000
# 從結果中印出1000次結果給我們看(期間可能產生很多結果,但是只印出其中的1000次結果出來)
# 總共跑 9 * 1000 次
epochs = 1000
model.fit(X, y, epochs=epochs, batch_size=batch_size)
model.summary()
# 把 model 存起來
model.save('linerModel', save_format="tf")
```
### CNN
```python=
from tensorflow.keras import Sequential, layers, metrics
from tensorflow.keras import regularizers, optimizers
import dataLoader_cnn as dataLoader
# 預計明天每個小時的空汙
from tensorflow.keras import Sequential, layers, metrics
from tensorflow.keras import regularizers, optimizers
import dataLoader_cnn as dataLoader
# 預計明天每個小時的空汙
from tensorflow.keras import Sequential, layers, metrics
from tensorflow.keras import regularizers, optimizers
import dataLoader_cnn as dataLoader
# 預計明天每個小時的空汙
if __name__ == "__main__":
data = dataLoader.DataLoader()
#
data.ignore([])
X, y = data.train_data, data.train_label
model = Sequential(name="MyCNN")
# 原本的資料是24*6的資料
model.add(layers.InputLayer(input_shape=(24,6)))
# 前段cnn,後段全連階層
# filters=12 要有12個參數 ;kernel_size=3:以三個為一個單位做一次卷積,所以會有24/3=8組;activation='relu'做非線性回歸的動作
# Conv1D 1D產生1維的數據 -> 而每一個單位都會有一個參數 所以本來會有24*6的大小數據,但是要滿足12個參數
model.add(layers.Conv1D(filters=12, kernel_size=3, padding='same', activation='relu'))
# 利用非線性回歸將原本的值產生另外一個值
model.add(layers.ReLU())
# MaxPool1D 池化:(pool_size=2) 以2個為單位用col形式,找出每兩個的最大值產出一個輸出
model.add(layers.MaxPool1D(pool_size=2))
model.add(layers.Conv1D(filters=6, kernel_size=3, padding='same', activation='relu'))
model.add(layers.ReLU())
model.add(layers.MaxPool1D(pool_size=2))
model.add(layers.Flatten())
model.add(layers.Dense(units=12, activation='relu', name="hidden1",
kernel_regularizer=regularizers.L1(0.01),
activity_regularizer=regularizers.L2(0.01)))
model.add(layers.Dense(units=4, activation='relu', name="hidden2",
kernel_regularizer=regularizers.L1(0.01),
activity_regularizer=regularizers.L2(0.01)))\
# 最後產生一個結果
model.add(layers.Dense(units=1, name="output"))
model.summary()
model.compile(optimizer=optimizers.SGD(learning_rate=0.01),
loss='mae',
metrics=[metrics.RootMeanSquaredError()])
epochs = 100
model.fit(X, y, epochs=epochs)
model.save('cnnModel', save_format="tf")
```
### dataLoader_cnn
```python=
import tensorflow as tf
import numpy as np
import pandas as pd
from datetime import date
class DataLoader():
def __init__(self):
trainData = pd.read_csv("train.csv", sep=",")
testData = pd.read_csv("test.csv", sep=",")
past = date.fromisoformat(trainData.values[0][0])
now_data = []
now_label = 0
self.__train_data = []
self.train_label = []
for x in trainData.values:
now = date.fromisoformat(x[0])
if now == past:
now_data.append(x[3:])
now_label += x[2]
else:
if len(now_data) == 24:
self.__train_data.append(now_data)
self.train_label.append(now_label/24)
now_data = [x[3:]]
now_label = x[2]
past = now
self.num_train_data = len(self.__train_data)
self.argsLength = len(self.__train_data[0][0])
self.train_data = tf.constant(self.__train_data)
self.train_label = tf.constant(self.train_label)
past = date.fromisoformat(testData.values[0][0])
now_data = []
now_label = 0
self.__test_data = []
self.test_label = []
for x in testData.values:
now = date.fromisoformat(x[0])
if now == past:
now_data.append(x[3:])
now_label += x[2]
else:
if len(now_data) == 24:
self.__test_data.append(now_data)
self.test_label.append(now_label/24)
now_data = [x[3:]]
now_label = x[2]
past = now
self.test_data = tf.constant(self.__test_data)
self.test_label = tf.constant(self.test_label)
# 要忽略那些參數
def ignore(self, ignoreList):
need = set([ i for i in range(self.argsLength) ]) - set(ignoreList)
temp = []
for i in range(len(self.__train_data)):
temp.append([[ x[i] for i in need ] for x in self.__train_data[i] ])
self.train_data = tf.constant(temp)
temp = []
for i in range(len(self.__test_data)):
temp.append([[ x[i] for i in need ] for x in self.__test_data[i] ])
self.test_data = tf.constant(temp)
# 隨機亂數取多少個訓練數據來訓練機器
def get_batch(self, batch_size):
index = np.random.randint(0, self.num_train_data, batch_size)
train_data = tf.constant([self.train_data[i].numpy() for i in index])
train_label = tf.constant([self.train_label[i].numpy() for i in index])
return train_data, train_label
```
### test
```python=
import tensorflow as tf
import dataLoader
REAL_PM = 319
if __name__ == "__main__":
data = dataLoader.DataLoader()
data.ignore([])
X, y = data.test_data, data.test_label
modelName = "linerModel"
model = tf.saved_model.load(modelName)
y_pred = model(X)
print("Liner:")
print(tf.reduce_mean(tf.math.abs(y_pred - y) * REAL_PM).numpy())
modelName = "myNNModel"
model = tf.saved_model.load(modelName)
y_pred = model(X)
print("NN:")
print(tf.reduce_mean(tf.math.abs(y_pred - y) * REAL_PM).numpy())
```
### cnn過程

1. 原始資料(輸入)
- 圖片大小: 24 * 6

2. convolution 卷積
- filters=12: 留下12個特徵點
- kernel_size=3: 3個一組filter
- padding='same': 挑選的方法
- filter大小 = 1 * 3 (Conv'1D' * kernel_size)
- 空氣的參數是一維特徵,因為每項參數不會相互影響
- 圖片大小: 24 * 12

3. ReLU => 震動數字(微調FILTER數值)
- 圖片大小: 24 * 12
4. MaxPool池化 => 縮小圖片
- 將重要特徵取出來
- pool_size=2
- 2個一組取最大的('Max'pool1D)
- 圖片大小: 12 * 12
==...這邊的池化是水平合併==
5. 重複數次3. 和4.
- 圖片大小: 6 * 6

6. Flatten 扁平化
- 二維 => 一維
- 6 * 6 => 36

7. hidden layer
- units=12, 圖片大小: 12
- units=4, 圖片大小: 4
- units=1, 圖片大小: 1
=> 最後會輸出1張圖

原本輸入的參數為6個,因為要加上一個誤差值參數 -> 所以總共會有7個參數
> hiddenl1 Param:(原本6個參數+1個誤差值參數) * 12個層數 = 84
> hiddenl2 Param:(前面每一層輸出當作參數+1個誤差值參數)*4個層數
> dense param:(每一層參數)
- 線性參數不需要隱藏層,因為線性改變後還是線性


- model.add 增加mlcc左右的隱藏層數(下圖藍色圈起來的地方)
- unit 增加mlcc隱藏層的內部上下的層數(下圖橘色圈起來的地方)
###### 補充
###### NN 中的 Hidden Layer
