# PSO
###### tags: `實驗室課` `計算智慧與規劃`
### 雲端連結
https://drive.google.com/drive/u/2/folders/1CePJn6aaQqI29aa64gSEnhkqGJB-YXKs
### 回家作業
- 個人
- 試試其他的[test bench](http://www.sfu.ca/~ssurjano/optimization.html)
- 使用其他的回歸式進行PM2.5預測
- 個人或團體報告不拘
- 使用PSO預測其他資料集
- other data裡推薦使用"台灣電力消耗"或"world-happiness-report"(前置處理較少)
- 也可自行尋找其他資料集
### PSO
- 參考自身、群體
- 電梯從眾
- ?
- 群體智慧
- 歷史記憶
- Bottom-up
- Top-down
- 應用
- 多問題
- 優缺點

- 演算法

- 如何驗證演算法是否正確?
- Test Tech
- 開源驗證
- 設計

- 依問題設計 fitness function
- 2種



1. 初始化
2. 更新個體最佳、全體最佳
3. 達到更迭次數(次數自己決定)


# 程式碼
- solve_math.py
```python=
from model import fly
# 這裡是用來放參數和要算的公式的
# 參數設定fly (maxcycle, numParticle, dim, lowerbound, upperbound, data, fitnessFunc)
weight, fitness = fly(2000, 20, 2, -5.12, 5.12, None, "Drop_wave") # Drop_wave(basic pso)
print(weight, fitness)
# weight, fitness = fly() # Mccormick(basic pso)
# print(weight, fitness)
```
- model.py
```python=
import random
import math
import sys
import numpy as np
from matplotlib import pyplot as plt
def predict(pos, data, MAX, MIN, fitnessFunc):
# --- 參數 --- #
# @pos: 粒子的位置
# @data: 預測附加的data
# @MAX: normalize的最大值
# @MIN: normalize的最小值
# @fitnessFunc: 指定的fitness function
# print("pos", pos)
# print("datalen", len(data), len(data[1]))
predict_y = [0 for i in range(len(data))]
if (fitnessFunc == "線性回歸式") :
for i in range(len(data)) :
for dim in range(1, len(pos)) : # 從 1開始的原因是因為第一個資料是pm2.5
predict_y[i] += data[i][dim-1] * pos[dim]
return predict_y # 預測list
# 算分數
def fitness(pos, data, fitnessFunc):
# --- 參數 --- #
# @pos: 粒子的位置
# @data: 預測附加的data
# @fitnessFunc: 指定的fitness function
ans = 0
if (fitnessFunc == "Drop_wave") :
tmp = pos[0]**2 + pos[1]**2
ans = -(1+math.cos(12*math.sqrt(tmp))) / (0.5 * tmp + 2)
return ans # 計算出的fitness值
def fly(maxcycle, numParticle, dim, lowerbound, upperbound, data, fitnessFunc):
# --- 參數 --- #
# @maxcycle: 最大迭代次數
# @numParticle: 粒子數
# @dim: 調變數值之數量(有幾維)
# @lowerbound: 調變數值之下界
# @upperbound: 調變數值之上界
# @data: 預測附加的data
# @fitnessFunc: 指定的fitness function
# 初始化
particleX = []# 存放粒子位置
particleV = []# 存放粒子速度
for i in range(numParticle):
# 裡面的for 是為了維度(設定成)
particleX.append([lowerbound + random.random() * (upperbound - lowerbound) for j in range(dim)])
# 不要把範圍寫死,所以要在upperbound 和 lowerBound 之間 後面* 0.13 是取一個 0.1~0.2 之間的數
particleV.append([random.random() * (upperbound - lowerbound) * 0.13 for j in range(dim)])
# 粒子適應值
particleFitness = [0 for i in range(numParticle)]
# 個體最佳位置
pBest = np.zeros(shape=[numParticle, dim])
# 個體最佳適應值
pBest_fitness = [sys.float_info.max for i in range(numParticle)]
# 全域最佳位置
gBest = np.zeros(shape=[1, dim])
# 全域最佳適應值
gBest_fitness = sys.float_info.max
# 最大速度
vmax = (upperbound - lowerbound) * 0.12
# 運行次數
currentcycle = 0
generation = []
# 開始迭代
while currentcycle < maxcycle :
for i in range(numParticle) :
particleFitness[i] = fitness(particleX[i], data, fitnessFunc)
# 更新個體最優
if (particleFitness[i] < pBest_fitness[i]) :
pBest_fitness[i] = particleFitness[i] # 紀錄分數
pBest[i] = particleX[i] # 紀錄位置
# 更新全域最優(Basic or 三鄰居)
if (pBest_fitness[i] < gBest_fitness) :
gBest_fitness = pBest_fitness[i] # 紀錄分數
gBest = pBest[i] # 紀錄位置
# 更新粒子速度及位置
# for i in range(numParticle) :
# 每一維都要各自改
# print("particleV", particleV[0][0])
for j in range(dim) :
#print("particleV", particleV[0][0])
#print("pBest", pBest)
#print("gBest[0]", gBest[0])
particleV[i][j] = 0.7298 * particleV[i][j] + 1.49 * random.random() * (pBest[i][j]- particleX[i][j]) + 1.496 * random.random() * (gBest[j] - particleX[i][j])
if particleV[i][j] > vmax :
particleV[i][j] = vmax
if particleV[i][j] < -vmax :
particleV[i][j] = -vmax
particleX[i][j] = particleX[i][j] + particleV[i][j]
currentcycle += 1
# 每迭代100次,印出目前最佳
if currentcycle % 100 == 0 :
print("currentcycle : " , currentcycle, "gbest_fitness : ", gBest_fitness)
generation.append(gBest_fitness)
# 繪圖記錄收斂情形
plt.title("Fitness : " + str(gBest_fitness))
plt.plot(generation)
plt.savefig('result/PSO_gbest_for_generation.png')
plt.close('all')
return gBest, gBest_fitness # 最佳位置 # 最佳fitness
```
- analysis.py
```python=
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from matplotlib import pyplot as plt
from model import fly, predict
import math
def eval_point(actual, predict):
# --- 參數 --- #
# @actual: 真實值
# @predict: 預測值
RMSE = 0
MAE = 0
for i in range(len(actual)) :
# RMSE
RMSE += (actual[i] - predict[i])**2
# MAE
if (actual[i] - predict[i] < 0) :
RMSE += predict[i] - actual[i]
else :
RMSE += actual[i] - predict[i]
RMSE = RMSE / len(actual)
MAE = MAE / len(actual)
return RMSE, MAE # RMSE, MAE
def analysis_by_pso(train_data, test_data, predict_element, elements_index, test_dates, particle_num, lb, ub, maxcycle, fitnessFunc):
# --- 參數 --- #
# @train_data: 訓練集
# @test_data: 測試集
# @predict_element: 想預測的column Name
# @elements_index: 用來預測的column Names
# @test_dates: 測試的日期
# @particle_num: 粒子數
# @lb: 調變數值之下界
# @ub: 調變數值之上界
# @maxcycle: 最大迭代次數
# @fitnessFunc: 指定的fitness function
# --- prepare data --- #
# fill data
train_data.replace(-1, np.nan, inplace = True)
test_data.replace(-1, np.nan, inplace = True)
for e_index in elements_index :
train_data[e_index].interpolate(method='linear' , limit_area = "inside" , inplace = True)
test_data[e_index].interpolate(method='linear' , limit_area = "inside" , inplace = True)
# set test data
# normalized: max-min
min_max = MinMaxScaler()#[0,1]
train_col = [predict_element] + elements_index
normalized_train_data = pd.DataFrame(min_max.fit_transform(train_data[train_col]), columns = train_col)
normalized_testDates_data = pd.DataFrame(min_max.fit_transform(test_data[elements_index]), columns = elements_index) # testDates_data
y_train_max = train_data[predict_element].max()
y_train_min = train_data[predict_element].min()
# --- training --- #
# train weight (PSO)
dim = normalized_train_data.shape[1]
weight, fitness = fly(maxcycle, particle_num, dim, lb, ub, normalized_train_data, fitnessFunc)
# --- test --- #
# predict
predict_data = predict(weight, normalized_testDates_data.values, y_train_max, y_train_min, fitnessFunc)
# --- eval --- #
# set valid data
actual_data = list(test_data[predict_element]) # testDates_data
# calc RMSE, MAE
RMSE, MAE = eval_point(actual_data, predict_data)
# --- save result to .csv --- #
result = pd.DataFrame(test_data[['date', 'hour']]) # testDates_data
result["actual_values"] = actual_data
result["predict_values"] = predict_data
result.reset_index(drop=True, inplace=True)
result.to_csv("result/ans.csv", index=False)
# --- plot picture --- #
result.plot(y = ['actual_values','predict_values'], legend=True,rot=45, title="RMSE:"+str(RMSE)+" MAE:"+str(MAE))
plt.savefig('result/predict.png')
return RMSE, MAE, weight, fitness # RMSE, MAE, weight(PSO), fitness(PSO)
if __name__ == '__main__':
train_data = pd.read_csv('data/2018.csv', encoding = "big5")
test_data = pd.read_csv('data/2019.csv', encoding = "big5")
predict_element = 'PM2.5'
elements_index = ['pressure', 'temperature', 'humidity', 'wind_speed', 'precip']
test_dates = ["2019-1-2", "2019-1-20", "2019-2-1", "2019-2-10", "2019-3-10", "2019-3-20", "2019-4-4", "2019-4-23", "2019-5-11", "2019-5-29", "2019-6-6", "2019-6-27", "2019-7-10", "2019-7-18", "2019-8-1", "2019-8-9", "2019-9-13", "2019-9-24", "2019-10-8", "2019-10-17", "2019-11-5", "2019-11-19", "2019-12-2", "2019-12-18"]
particle_num = 25
lb = 0
ub = 1
maxcycle = 10000
RMSE, MAE, weight, fitness = analysis_by_pso(train_data, test_data, predict_element, elements_index, test_dates, particle_num, lb, ub, maxcycle, "線性回歸式")
```