# Backtest of stocks price using Python
# 利用三個策略,分析股市最大報酬率
### 主程式控制三個子檔案<!-- more -->
```
main.py
from stock_1 import stock_1
from stock_2 import stock_2
from stock_3 import stock_3
from threading import Thread
import time
ROI =[]
class MyThread(Thread):
def __init__(self, target=None, args=(), **kwargs):
super(MyThread, self).__init__()
self._target = target
self._args = args
self._kwargs = kwargs
def run(self):
if self._target == None:
return
self.__result__ = self._target(*self._args, **self._kwargs)
def get_result(self):
self.join()#當需要取得結果值的時候阻塞等待子執行緒完成
return self.__result__
def LoadROI(Ans):
for i in range (len(Ans)):
ROI.append(Ans[i])
print("ROI :",Ans[i],"%")
def Main():
t1 = MyThread(target=stock_1)
t2 = MyThread(target=stock_2)
t3 = MyThread(target=stock_3)
#程式開始
t1.start()
t2.start()
t3.start()
#程式結束
t1.join() # join() 等待程式自然結束或拋出Error
t2.join()
t3.join()
Ans1=t1.__result__
Ans2=t2.__result__
Ans3=t3.__result__
LoadROI(Ans1)
LoadROI(Ans2)
LoadROI(Ans3)
ROI.sort()
print("ROI of 1st :",ROI[-1],"%")
print("ROI of 2nd :",ROI[-2],"%")
print("ROI of 3rd :",ROI[-3],"%")
SUM = ROI[-1]+ROI[-2]+ROI[-3]
print("Sum of ROI :",SUM,"%")
Main()
```
### 策略一(SMA & EMA )
```
stock_1.py
#import the necessary libraries
import numpy as np
import pandas as pd
import datetime as datetime
import matplotlib.pyplot as plt
import os
import pandas as pd
def stock_1():
ROI = []
dir = os.getcwd()+"\stocks"
files = os.listdir(dir)
for file in files:
df = pd.read_csv(dir+"\\"+file)
#Create the 3 days simple moving average
SMA3 = pd.DataFrame()
SMA3['Close Price']= df['Close'].rolling(3).mean()
#Create the 16 days Exponential moving average
EMA16 = pd.DataFrame()
EMA16['Close Price']= df['Close'].ewm(span=16).mean()
#Create a new dataframe to store all the datasets
data =pd.DataFrame()
data['df'] = df['Close']
data['SMA3'] = SMA3['Close Price']
data['EMA16'] = EMA16['Close Price']
sigPriceBuy = []
sigPriceSell= []
flag = -1
for i in range(len(data)):
if data['SMA3'][i] > data['EMA16'][i]:
if flag != 1:
sigPriceBuy.append(data['df'][i])
sigPriceSell.append(np.nan)
flag = 1
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
elif data['SMA3'][i] < data['EMA16'][i]:
if flag != 0:
sigPriceBuy.append(np.nan)
sigPriceSell.append(data['df'][i])
flag = 0
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
data['Buy_Signal_Price'] = sigPriceBuy
data['Sell_Signal_Price']= sigPriceSell
cost = 0
income = 0
flagbuy = False
flagsell = False
flag = False
for i in range(len(data['Buy_Signal_Price'])):
if(flag == True):
if(flagsell == True & flagbuy == False):
flagsell = False
flag == False
if(np.isnan(data['Buy_Signal_Price'][i]) == False):
tempbuy = data['Buy_Signal_Price'][i]
flagbuy = True
if(np.isnan(data['Sell_Signal_Price'][i]) == False):
tempsell = data['Sell_Signal_Price'][i]
flagsell = True
flag == False
if(flagsell == True & flagbuy == True):
cost += tempbuy*1.001425
income += tempsell*0.995575
flagbuy = False
flagsell = False
flag == False
print("投資報酬率:", round(100 * (income - cost) / cost, 2), "%")
ROI.append(round(100 * (income - cost) / cost, 2))
print("最大投資報酬率:", max(ROI), "%")
return ROI
```
### 策略二(ML SimpleRNN)
#### 可能想法有錯誤,不符合題目,要注意!
```
stock_2.py
#=======================Imports=====================================
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from keras.models import Sequential
from keras.layers import Dense,SimpleRNN
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
import os
def extract_data(data,time_step):
X=[]
y=[]
for i in range(len(data)-time_step):
X.append([a for a in data[i:i+time_step]])#X陣列是訓練集
y.append(data[i+time_step])#y是預測集
X = np.array(X)#轉換為一個陣列
X = X.reshape(X.shape[0],X.shape[1],1)
return X,y
def stock_2():
ROI = []
dir = os.getcwd()+"\stocks"
files = os.listdir(dir)
for file in files:
df = pd.read_csv(dir+"\\"+file)
df.head()
price = df.loc[:,'Close']
price.head()
price_norm = price/max(price)
time_step = 8
X,y = extract_data(price_norm,time_step)
y=np.array(y)
model = Sequential()
#單層有五個神經元units,input_shape(樣本數,序列長度,序列維度):數據是一維的,activation:啟動函數
model.add(SimpleRNN(units = 5, input_shape=(time_step,1),activation='relu'))
#輸出數值:索引神經元為1;線性(回歸預測常用)
#輸出y的一層
model.add(Dense(units=1,activation='linear'))
#configure the model
#(優化器,損失函數)
#平方差mean_squared_errar
model.compile(optimizer='adam',loss='mean_squared_error')
model.summary()
#模型訓練
#訓練樣本,每次30個樣本,共訓練200次
model.fit(X,y,batch_size=64,epochs=200)
#預測結果視覺化
#y_train_predict = model.predict(X)*max(price)
#y_train = y*max(price)
#==============================predict====================================
X_test_norm,y_test_norm = extract_data(price_norm,time_step)
predict = model.predict(X_test_norm)*max(price)
y_test_norm=np.array(y_test_norm);
close = y_test_norm*max(price)
maxpre = max(predict)*0.75
minpre = min(predict)*1.25
data =pd.DataFrame()
data['df'] = df['Close'][:-8]
data['predict'] = predict
sigPriceBuy = []
sigPriceSell= []
flag = -1
for i in range(len(data)):
if(data['predict'][i] < minpre or data['df'][i] < minpre):
if flag != 1:
sigPriceBuy.append(data['df'][i])
sigPriceSell.append(np.nan)
flag = 1
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
elif(data['predict'][i] > maxpre or data['df'][i] > maxpre ):
if flag != 0:
sigPriceBuy.append(np.nan)
sigPriceSell.append(data['df'][i])
flag = 0
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
data['Buy_Signal_Price'] = sigPriceBuy
data['Sell_Signal_Price']= sigPriceSell
cost = 1
income = 0
flagbuy = False
flagsell = False
flag = False
for i in range(len(data['Buy_Signal_Price'])):
if(flag == True):
if(flagsell == True & flagbuy == False):
flagsell = False
flag == False
if(np.isnan(data['Buy_Signal_Price'][i]) == False):
tempbuy = data['Buy_Signal_Price'][i]
flagbuy = True
if(np.isnan(data['Sell_Signal_Price'][i]) == False):
tempsell = data['Sell_Signal_Price'][i]
flagsell = True
flag == False
if(flagsell == True & flagbuy == True):
cost += tempbuy*1.001425
income += tempsell*0.995575
flagbuy = False
flagsell = False
flag == False
print("投資報酬率:", round(100 * (income - cost) / cost, 2), "%")
ROI.append(round(100 * (income - cost) / cost, 2))
print("最大投資報酬率:", max(ROI), "%")
return ROI
```
### 策略三(RSI & MACD)
```
stock_3
import numpy as np
import pandas as pd
import os
import talib
def stock_3():
ROI = []
dir = os.getcwd()+"\stocks"
files = os.listdir(dir)
for file in files:
df = pd.read_csv(dir+"\\"+file)
#Create the EMA12
EMA12 = pd.DataFrame()
EMA12['Close Price']= df['Close'].ewm(span=15).mean()
#Create the EMA26
EMA26 = pd.DataFrame()
EMA26['Close Price']= df['Close'].ewm(span=24).mean()
#DIF
DIF = pd.DataFrame()
DIF['Value'] = EMA12['Close Price']-EMA26['Close Price']
#MACD
MACD= pd.DataFrame()
MACD['Value'] = DIF['Value'].ewm(span=12).mean()
#Create a new dataframe to store all the datasets
data =pd.DataFrame()
data['df'] = df['Close']
data['EMA12'] = EMA12['Close Price']
data['EMA26'] = EMA26['Close Price']
data['DIF'] = DIF['Value']#快線
data['MACD'] = MACD['Value']#慢線
data['RSI'] = talib.RSI(df['Close'])
sigPriceBuy = []
sigPriceSell= []
flag = -1
highRSI=max(data['RSI'])*75/100
lowRSI =min(data['RSI'])*125/100
for i in range(len(data)):
if(data['RSI'][i]<lowRSI) or (data['DIF'][i] > data['MACD'][i]):
if flag != 1:
sigPriceBuy.append(data['df'][i])
sigPriceSell.append(np.nan)
flag = 1
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
elif (data['RSI'][i]>highRSI) or (data['DIF'][i] < data['MACD'][i]):
if flag != 0:
sigPriceBuy.append(np.nan)
sigPriceSell.append(data['df'][i])
flag = 0
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
else:
sigPriceBuy.append(np.nan)
sigPriceSell.append(np.nan)
#print(sigPriceBuy[i],sigPriceSell[i],"\n")
data['Buy_Signal_Price'] = sigPriceBuy
data['Sell_Signal_Price']= sigPriceSell
cost = 0
income = 0
flagbuy = False
flagsell = False
flag = False
for i in range(len(data['Buy_Signal_Price'])):
if(flag == True):
if(flagsell == True & flagbuy == False):
flagsell = False
flag == False
if(np.isnan(data['Buy_Signal_Price'][i]) == False):
tempbuy = data['Buy_Signal_Price'][i]
flagbuy = True
if(np.isnan(data['Sell_Signal_Price'][i]) == False):
tempsell = data['Sell_Signal_Price'][i]
flagsell = True
flag == False
if(flagsell == True & flagbuy == True):
cost += tempbuy*1.001425
income += tempsell*0.995575
flagbuy = False
flagsell = False
flag == False
if cost==0:
cost=1
ROI.append(round(100 * (income - cost) / cost, 2))
print("成本:", cost, "元")
print("收益 :", income, "元")
print("投資報酬率:", round(100 * (income - cost) / cost, 2), "%\n")
print("最大投資報酬率:", max(ROI), "%")
return ROI
```
資源包連結:https://reurl.cc/3NZNYM
---
###### tags: 東華
###### tags: 資訊工程學系
###### tags: 股市分析與投資學
###### tags: 大三
---