PSO

tags: 實驗室課 計算智慧與規劃

雲端連結

https://drive.google.com/drive/u/2/folders/1CePJn6aaQqI29aa64gSEnhkqGJB-YXKs

回家作業

  • 個人
    • 試試其他的test bench
    • 使用其他的回歸式進行PM2.5預測
  • 個人或團體報告不拘
    • 使用PSO預測其他資料集
    • other data裡推薦使用"台灣電力消耗"或"world-happiness-report"(前置處理較少)
    • 也可自行尋找其他資料集

PSO

  • 參考自身、群體
    • 電梯從眾
  • ?
    • 群體智慧
    • 歷史記憶
    • Bottom-up
    • Top-down
  • 應用
    • 多問題
  • 優缺點
  • 演算法
  • 如何驗證演算法是否正確?
    • Test Tech
    • 開源驗證
  • 設計
    • 依問題設計 fitness function
    • 2種

  1. 初始化
  2. 更新個體最佳、全體最佳
  3. 達到更迭次數(次數自己決定)


程式碼

  • solve_math.py
from model import fly # 這裡是用來放參數和要算的公式的 # 參數設定fly (maxcycle, numParticle, dim, lowerbound, upperbound, data, fitnessFunc) weight, fitness = fly(2000, 20, 2, -5.12, 5.12, None, "Drop_wave") # Drop_wave(basic pso) print(weight, fitness) # weight, fitness = fly() # Mccormick(basic pso) # print(weight, fitness)
import random import math import sys import numpy as np from matplotlib import pyplot as plt def predict(pos, data, MAX, MIN, fitnessFunc): # --- 參數 --- # # @pos: 粒子的位置 # @data: 預測附加的data # @MAX: normalize的最大值 # @MIN: normalize的最小值 # @fitnessFunc: 指定的fitness function # print("pos", pos) # print("datalen", len(data), len(data[1])) predict_y = [0 for i in range(len(data))] if (fitnessFunc == "線性回歸式") : for i in range(len(data)) : for dim in range(1, len(pos)) : # 從 1開始的原因是因為第一個資料是pm2.5 predict_y[i] += data[i][dim-1] * pos[dim] return predict_y # 預測list # 算分數 def fitness(pos, data, fitnessFunc): # --- 參數 --- # # @pos: 粒子的位置 # @data: 預測附加的data # @fitnessFunc: 指定的fitness function ans = 0 if (fitnessFunc == "Drop_wave") : tmp = pos[0]**2 + pos[1]**2 ans = -(1+math.cos(12*math.sqrt(tmp))) / (0.5 * tmp + 2) return ans # 計算出的fitness值 def fly(maxcycle, numParticle, dim, lowerbound, upperbound, data, fitnessFunc): # --- 參數 --- # # @maxcycle: 最大迭代次數 # @numParticle: 粒子數 # @dim: 調變數值之數量(有幾維) # @lowerbound: 調變數值之下界 # @upperbound: 調變數值之上界 # @data: 預測附加的data # @fitnessFunc: 指定的fitness function # 初始化 particleX = []# 存放粒子位置 particleV = []# 存放粒子速度 for i in range(numParticle): # 裡面的for 是為了維度(設定成) particleX.append([lowerbound + random.random() * (upperbound - lowerbound) for j in range(dim)]) # 不要把範圍寫死,所以要在upperbound 和 lowerBound 之間 後面* 0.13 是取一個 0.1~0.2 之間的數 particleV.append([random.random() * (upperbound - lowerbound) * 0.13 for j in range(dim)]) # 粒子適應值 particleFitness = [0 for i in range(numParticle)] # 個體最佳位置 pBest = np.zeros(shape=[numParticle, dim]) # 個體最佳適應值 pBest_fitness = [sys.float_info.max for i in range(numParticle)] # 全域最佳位置 gBest = np.zeros(shape=[1, dim]) # 全域最佳適應值 gBest_fitness = sys.float_info.max #&nbsp;最大速度 vmax = (upperbound - lowerbound) * 0.12 # 運行次數 currentcycle = 0 generation = [] # 開始迭代 while currentcycle < maxcycle : for i in range(numParticle) : particleFitness[i] = fitness(particleX[i], data, fitnessFunc) # 更新個體最優 if (particleFitness[i] < pBest_fitness[i]) : pBest_fitness[i] = particleFitness[i] # 紀錄分數 pBest[i] = particleX[i] # 紀錄位置 # 更新全域最優(Basic or 三鄰居) if (pBest_fitness[i] < gBest_fitness) : gBest_fitness = pBest_fitness[i] # 紀錄分數 gBest = pBest[i] # 紀錄位置 # 更新粒子速度及位置 # for i in range(numParticle) : # 每一維都要各自改 # print("particleV", particleV[0][0]) for j in range(dim) : #print("particleV", particleV[0][0]) #print("pBest", pBest) #print("gBest[0]", gBest[0]) particleV[i][j] = 0.7298 * particleV[i][j] + 1.49 * random.random() * (pBest[i][j]- particleX[i][j]) + 1.496 * random.random() * (gBest[j] - particleX[i][j]) if particleV[i][j] > vmax : particleV[i][j] = vmax if particleV[i][j] < -vmax : particleV[i][j] = -vmax particleX[i][j] = particleX[i][j] + particleV[i][j] currentcycle += 1 # 每迭代100次,印出目前最佳 if currentcycle % 100 == 0 : print("currentcycle : " , currentcycle, "gbest_fitness : ", gBest_fitness) generation.append(gBest_fitness) # 繪圖記錄收斂情形 plt.title("Fitness : " + str(gBest_fitness)) plt.plot(generation) plt.savefig('result/PSO_gbest_for_generation.png') plt.close('all') return gBest, gBest_fitness # 最佳位置 # 最佳fitness
import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler from matplotlib import pyplot as plt from model import fly, predict import math def eval_point(actual, predict): # --- 參數 --- # # @actual: 真實值 # @predict: 預測值 RMSE = 0 MAE = 0 for i in range(len(actual)) : # RMSE RMSE += (actual[i] - predict[i])**2 # MAE if (actual[i] - predict[i] < 0) : RMSE += predict[i] - actual[i] else : RMSE += actual[i] - predict[i] RMSE = RMSE / len(actual) MAE = MAE / len(actual) return RMSE, MAE # RMSE, MAE def analysis_by_pso(train_data, test_data, predict_element, elements_index, test_dates, particle_num, lb, ub, maxcycle, fitnessFunc): # --- 參數 --- # # @train_data: 訓練集 # @test_data: 測試集 # @predict_element: 想預測的column Name # @elements_index: 用來預測的column Names # @test_dates: 測試的日期 # @particle_num: 粒子數 # @lb: 調變數值之下界 # @ub: 調變數值之上界 # @maxcycle: 最大迭代次數 # @fitnessFunc: 指定的fitness function # --- prepare data --- # # fill data train_data.replace(-1, np.nan, inplace = True) test_data.replace(-1, np.nan, inplace = True) for e_index in elements_index : train_data[e_index].interpolate(method='linear' , limit_area = "inside" , inplace = True) test_data[e_index].interpolate(method='linear' , limit_area = "inside" , inplace = True) # set test data # normalized: max-min min_max = MinMaxScaler()#[0,1] train_col = [predict_element] + elements_index normalized_train_data = pd.DataFrame(min_max.fit_transform(train_data[train_col]), columns = train_col) normalized_testDates_data = pd.DataFrame(min_max.fit_transform(test_data[elements_index]), columns = elements_index) # testDates_data y_train_max = train_data[predict_element].max() y_train_min = train_data[predict_element].min() # --- training --- # # train weight (PSO) dim = normalized_train_data.shape[1] weight, fitness = fly(maxcycle, particle_num, dim, lb, ub, normalized_train_data, fitnessFunc) # --- test --- # # predict predict_data = predict(weight, normalized_testDates_data.values, y_train_max, y_train_min, fitnessFunc) # --- eval --- # # set valid data actual_data = list(test_data[predict_element]) # testDates_data # calc RMSE, MAE RMSE, MAE = eval_point(actual_data, predict_data) # --- save result to .csv --- # result = pd.DataFrame(test_data[['date', 'hour']]) # testDates_data result["actual_values"] = actual_data result["predict_values"] = predict_data result.reset_index(drop=True, inplace=True) result.to_csv("result/ans.csv", index=False) # --- plot picture --- # result.plot(y = ['actual_values','predict_values'], legend=True,rot=45, title="RMSE:"+str(RMSE)+"&nbsp;MAE:"+str(MAE)) plt.savefig('result/predict.png') return RMSE, MAE, weight, fitness # RMSE, MAE, weight(PSO), fitness(PSO) if __name__ == '__main__': train_data = pd.read_csv('data/2018.csv', encoding = "big5") test_data = pd.read_csv('data/2019.csv', encoding = "big5") predict_element = 'PM2.5' elements_index = ['pressure', 'temperature', 'humidity', 'wind_speed', 'precip'] test_dates = ["2019-1-2", "2019-1-20", "2019-2-1", "2019-2-10", "2019-3-10", "2019-3-20", "2019-4-4", "2019-4-23", "2019-5-11", "2019-5-29", "2019-6-6", "2019-6-27", "2019-7-10", "2019-7-18", "2019-8-1", "2019-8-9", "2019-9-13", "2019-9-24", "2019-10-8", "2019-10-17", "2019-11-5", "2019-11-19", "2019-12-2", "2019-12-18"] particle_num = 25 lb = 0 ub = 1 maxcycle = 10000 RMSE, MAE, weight, fitness = analysis_by_pso(train_data, test_data, predict_element, elements_index, test_dates, particle_num, lb, ub, maxcycle, "線性回歸式")