# Backtest of stocks price using Python # 利用三個策略,分析股市最大報酬率 ### 主程式控制三個子檔案<!-- more --> ``` main.py from stock_1 import stock_1 from stock_2 import stock_2 from stock_3 import stock_3 from threading import Thread import time ROI =[] class MyThread(Thread): def __init__(self, target=None, args=(), **kwargs): super(MyThread, self).__init__() self._target = target self._args = args self._kwargs = kwargs def run(self): if self._target == None: return self.__result__ = self._target(*self._args, **self._kwargs) def get_result(self): self.join()#當需要取得結果值的時候阻塞等待子執行緒完成 return self.__result__ def LoadROI(Ans): for i in range (len(Ans)): ROI.append(Ans[i]) print("ROI :",Ans[i],"%") def Main(): t1 = MyThread(target=stock_1) t2 = MyThread(target=stock_2) t3 = MyThread(target=stock_3) #程式開始 t1.start() t2.start() t3.start() #程式結束 t1.join() # join() 等待程式自然結束或拋出Error t2.join() t3.join() Ans1=t1.__result__ Ans2=t2.__result__ Ans3=t3.__result__ LoadROI(Ans1) LoadROI(Ans2) LoadROI(Ans3) ROI.sort() print("ROI of 1st :",ROI[-1],"%") print("ROI of 2nd :",ROI[-2],"%") print("ROI of 3rd :",ROI[-3],"%") SUM = ROI[-1]+ROI[-2]+ROI[-3] print("Sum of ROI :",SUM,"%") Main() ``` ### 策略一(SMA & EMA ) ``` stock_1.py #import the necessary libraries import numpy as np import pandas as pd import datetime as datetime import matplotlib.pyplot as plt import os import pandas as pd def stock_1(): ROI = [] dir = os.getcwd()+"\stocks" files = os.listdir(dir) for file in files: df = pd.read_csv(dir+"\\"+file) #Create the 3 days simple moving average SMA3 = pd.DataFrame() SMA3['Close Price']= df['Close'].rolling(3).mean() #Create the 16 days Exponential moving average EMA16 = pd.DataFrame() EMA16['Close Price']= df['Close'].ewm(span=16).mean() #Create a new dataframe to store all the datasets data =pd.DataFrame() data['df'] = df['Close'] data['SMA3'] = SMA3['Close Price'] data['EMA16'] = EMA16['Close Price'] sigPriceBuy = [] sigPriceSell= [] flag = -1 for i in range(len(data)): if data['SMA3'][i] > data['EMA16'][i]: if flag != 1: sigPriceBuy.append(data['df'][i]) sigPriceSell.append(np.nan) flag = 1 else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) elif data['SMA3'][i] < data['EMA16'][i]: if flag != 0: sigPriceBuy.append(np.nan) sigPriceSell.append(data['df'][i]) flag = 0 else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) data['Buy_Signal_Price'] = sigPriceBuy data['Sell_Signal_Price']= sigPriceSell cost = 0 income = 0 flagbuy = False flagsell = False flag = False for i in range(len(data['Buy_Signal_Price'])): if(flag == True): if(flagsell == True & flagbuy == False): flagsell = False flag == False if(np.isnan(data['Buy_Signal_Price'][i]) == False): tempbuy = data['Buy_Signal_Price'][i] flagbuy = True if(np.isnan(data['Sell_Signal_Price'][i]) == False): tempsell = data['Sell_Signal_Price'][i] flagsell = True flag == False if(flagsell == True & flagbuy == True): cost += tempbuy*1.001425 income += tempsell*0.995575 flagbuy = False flagsell = False flag == False print("投資報酬率:", round(100 * (income - cost) / cost, 2), "%") ROI.append(round(100 * (income - cost) / cost, 2)) print("最大投資報酬率:", max(ROI), "%") return ROI ``` ### 策略二(ML SimpleRNN) #### 可能想法有錯誤,不符合題目,要注意! ``` stock_2.py #=======================Imports===================================== import pandas as pd import numpy as np from matplotlib import pyplot as plt from keras.models import Sequential from keras.layers import Dense,SimpleRNN from sklearn.metrics import mean_squared_error from sklearn.metrics import mean_squared_error, mean_absolute_error import math import os def extract_data(data,time_step): X=[] y=[] for i in range(len(data)-time_step): X.append([a for a in data[i:i+time_step]])#X陣列是訓練集 y.append(data[i+time_step])#y是預測集 X = np.array(X)#轉換為一個陣列 X = X.reshape(X.shape[0],X.shape[1],1) return X,y def stock_2(): ROI = [] dir = os.getcwd()+"\stocks" files = os.listdir(dir) for file in files: df = pd.read_csv(dir+"\\"+file) df.head() price = df.loc[:,'Close'] price.head() price_norm = price/max(price) time_step = 8 X,y = extract_data(price_norm,time_step) y=np.array(y) model = Sequential() #單層有五個神經元units,input_shape(樣本數,序列長度,序列維度):數據是一維的,activation:啟動函數 model.add(SimpleRNN(units = 5, input_shape=(time_step,1),activation='relu')) #輸出數值:索引神經元為1;線性(回歸預測常用) #輸出y的一層 model.add(Dense(units=1,activation='linear')) #configure the model #(優化器,損失函數) #平方差mean_squared_errar model.compile(optimizer='adam',loss='mean_squared_error') model.summary() #模型訓練 #訓練樣本,每次30個樣本,共訓練200次 model.fit(X,y,batch_size=64,epochs=200) #預測結果視覺化 #y_train_predict = model.predict(X)*max(price) #y_train = y*max(price) #==============================predict==================================== X_test_norm,y_test_norm = extract_data(price_norm,time_step) predict = model.predict(X_test_norm)*max(price) y_test_norm=np.array(y_test_norm); close = y_test_norm*max(price) maxpre = max(predict)*0.75 minpre = min(predict)*1.25 data =pd.DataFrame() data['df'] = df['Close'][:-8] data['predict'] = predict sigPriceBuy = [] sigPriceSell= [] flag = -1 for i in range(len(data)): if(data['predict'][i] < minpre or data['df'][i] < minpre): if flag != 1: sigPriceBuy.append(data['df'][i]) sigPriceSell.append(np.nan) flag = 1 else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) elif(data['predict'][i] > maxpre or data['df'][i] > maxpre ): if flag != 0: sigPriceBuy.append(np.nan) sigPriceSell.append(data['df'][i]) flag = 0 else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) data['Buy_Signal_Price'] = sigPriceBuy data['Sell_Signal_Price']= sigPriceSell cost = 1 income = 0 flagbuy = False flagsell = False flag = False for i in range(len(data['Buy_Signal_Price'])): if(flag == True): if(flagsell == True & flagbuy == False): flagsell = False flag == False if(np.isnan(data['Buy_Signal_Price'][i]) == False): tempbuy = data['Buy_Signal_Price'][i] flagbuy = True if(np.isnan(data['Sell_Signal_Price'][i]) == False): tempsell = data['Sell_Signal_Price'][i] flagsell = True flag == False if(flagsell == True & flagbuy == True): cost += tempbuy*1.001425 income += tempsell*0.995575 flagbuy = False flagsell = False flag == False print("投資報酬率:", round(100 * (income - cost) / cost, 2), "%") ROI.append(round(100 * (income - cost) / cost, 2)) print("最大投資報酬率:", max(ROI), "%") return ROI ``` ### 策略三(RSI & MACD) ``` stock_3 import numpy as np import pandas as pd import os import talib def stock_3(): ROI = [] dir = os.getcwd()+"\stocks" files = os.listdir(dir) for file in files: df = pd.read_csv(dir+"\\"+file) #Create the EMA12 EMA12 = pd.DataFrame() EMA12['Close Price']= df['Close'].ewm(span=15).mean() #Create the EMA26 EMA26 = pd.DataFrame() EMA26['Close Price']= df['Close'].ewm(span=24).mean() #DIF DIF = pd.DataFrame() DIF['Value'] = EMA12['Close Price']-EMA26['Close Price'] #MACD MACD= pd.DataFrame() MACD['Value'] = DIF['Value'].ewm(span=12).mean() #Create a new dataframe to store all the datasets data =pd.DataFrame() data['df'] = df['Close'] data['EMA12'] = EMA12['Close Price'] data['EMA26'] = EMA26['Close Price'] data['DIF'] = DIF['Value']#快線 data['MACD'] = MACD['Value']#慢線 data['RSI'] = talib.RSI(df['Close']) sigPriceBuy = [] sigPriceSell= [] flag = -1 highRSI=max(data['RSI'])*75/100 lowRSI =min(data['RSI'])*125/100 for i in range(len(data)): if(data['RSI'][i]<lowRSI) or (data['DIF'][i] > data['MACD'][i]): if flag != 1: sigPriceBuy.append(data['df'][i]) sigPriceSell.append(np.nan) flag = 1 else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) elif (data['RSI'][i]>highRSI) or (data['DIF'][i] < data['MACD'][i]): if flag != 0: sigPriceBuy.append(np.nan) sigPriceSell.append(data['df'][i]) flag = 0 else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) else: sigPriceBuy.append(np.nan) sigPriceSell.append(np.nan) #print(sigPriceBuy[i],sigPriceSell[i],"\n") data['Buy_Signal_Price'] = sigPriceBuy data['Sell_Signal_Price']= sigPriceSell cost = 0 income = 0 flagbuy = False flagsell = False flag = False for i in range(len(data['Buy_Signal_Price'])): if(flag == True): if(flagsell == True & flagbuy == False): flagsell = False flag == False if(np.isnan(data['Buy_Signal_Price'][i]) == False): tempbuy = data['Buy_Signal_Price'][i] flagbuy = True if(np.isnan(data['Sell_Signal_Price'][i]) == False): tempsell = data['Sell_Signal_Price'][i] flagsell = True flag == False if(flagsell == True & flagbuy == True): cost += tempbuy*1.001425 income += tempsell*0.995575 flagbuy = False flagsell = False flag == False if cost==0: cost=1 ROI.append(round(100 * (income - cost) / cost, 2)) print("成本:", cost, "元") print("收益 :", income, "元") print("投資報酬率:", round(100 * (income - cost) / cost, 2), "%\n") print("最大投資報酬率:", max(ROI), "%") return ROI ``` 資源包連結:https://reurl.cc/3NZNYM --- ###### tags: 東華 ###### tags: 資訊工程學系 ###### tags: 股市分析與投資學 ###### tags: 大三 ---