# 機器學習期末報告 AI股票
## 介紹
此為訓練單一股票之預測模型
由於教室的網路速度有點慢(比烏龜慢),因此本次的報告使用==google colab==進行
>[!Important]
>**請注意,以下所獲取之預測資料僅供參考,本組只進行技術教授與說明,不負任何投資損失責任**
## 參照程式碼
https://github.com/lex-hue/Stock-Predictor-V4
https://hackmd.io/@s02260441/Hki9NN5jL
## 預測Model的建置順序
### install.py
本次會用到的套件我們這次透過function來直接將其包含起來,也能知道我們在這過程中下載了哪些的套件與工具。
```python=
def install_dependencies():
packages = [
"pandas",
"ta",
"yfinance",
"numpy",
"scikit-learn",
"matplotlib",
"tensorflow",
"statsmodels",
"pdfplumber"
"mplfinance"
]
total_packages = len(packages)
progress = 0
for package in packages:
progress += 1
print(f"Installing {package}... ({progress}/{total_packages})")
!pip install {package}
install_dependencies()
```
### 1-1.取得台灣股市資訊
本報告在網路上取得一份PDF檔,裡面包含台灣大部分的股市的名稱與代號,本程式將會去爬該PDF的內容,將檔案內所有的股市資訊給整理成一份字典,並且製作成下拉選單,方便選取想要的股市資訊。
PDF 下載連結:
https://drive.google.com/file/d/1G54CplA-WuDdJttQ6W5UuU_JjwR2myA8/view?usp=sharing
```python=
from IPython.display import display
import ipywidgets as widgets
import pdfplumber
options_mapping = {}
pdf = pdfplumber.open('SE1026.pdf')
for i, page in enumerate(pdf.pages):
tables = page.extract_table()
if tables:
if tables[0][0] == "股票代號" and tables[0][1] == "股票名稱":
tables = tables[1:]
for table in tables:
for index in range(0, len(table), 2):
options_mapping[table[index+1]] = table[index]
pdf.close()
stock_name = None
stock_symbol = None
# 下拉選單
combobox = widgets.Combobox(
options=list(options_mapping.keys()), # 使用顯示值作為選項
description='請選擇:'
)
# 創建處理函數
def handle_combobox_change(change):
global stock_name, stock_symbol
stock_name = change.new
stock_symbol = options_mapping.get(stock_name)
# 將處理函數綁定到下拉選單的變化事件
combobox.observe(handle_combobox_change, names='value')
# 顯示下拉選單
display(combobox)
```
> stock_name 和 stock_symbol是全域變數
> 只要在下拉方塊選擇了新的值,那麼這兩個變數就會及時更新
> 後面的程式若需要輸入股票代號就會採用這些資訊。
>
### 1-2.檢查下拉選單的選擇值
```python=
if stock_name:
print("你選擇了:", stock_name)
print("股票代號為:", stock_symbol)
else:
print("尚未選擇.")
```
### 2.抓取所選股市的資訊
根據下拉選單所選的股票代號以及所輸入的日期範圍,以下程式碼會抓該股票代號的相關資訊
匯出成csv,並且整理、運算出其他常見的技術分析指標
```python=
import os
import pandas as pd
import numpy as np
import ta
import matplotlib.pyplot as plt
import yfinance as yf
from statsmodels.tsa.seasonal import seasonal_decompose
def download_and_prepare_data(ticker_symbol,start_date, end_date):
data_dir = 'data'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
data = yf.download(ticker_symbol, period="max", interval="1d", start=start_date, end=end_date)
df = pd.DataFrame(data)
data_file = f"./data/{ticker_symbol}.csv"
df.to_csv(data_file)
print("資料已下載並儲存至", data_file)
def preprocess_data(ticker_symbol):
data_file = f"./data/{ticker_symbol}.csv"
df = pd.read_csv(data_file)
# 使用 ta 庫計算技術指標(均線、相對強弱指標、指數平滑異同移動平均線)
df["SMA"] = ta.trend.sma_indicator(df["Close"], window=14)
df["RSI"] = ta.momentum.rsi(df["Close"], window=14)
df["MACD"] = ta.trend.macd_diff(df["Close"], window_slow=26, window_fast=12, window_sign=9)
df_bollinger = ta.volatility.BollingerBands(df["Close"], window=20)
df["upper_band"] = df_bollinger.bollinger_hband()
df["middle_band"] = df_bollinger.bollinger_mavg()
df["lower_band"] = df_bollinger.bollinger_lband()
aroon_indicator = ta.trend.AroonIndicator(high=df["High"], low=df["Low"], window=25)
df["aroon_up"] = aroon_indicator.aroon_up()
df["aroon_down"] = aroon_indicator.aroon_down()
open_prices = df["Open"]
close_prices = df["Close"]
kicking_pattern = np.zeros_like(open_prices)
for i in range(1, len(open_prices)):
if open_prices[i] < open_prices[i-1] and \
open_prices[i] > close_prices[i-1] and \
close_prices[i] > open_prices[i-1] and \
close_prices[i] < close_prices[i-1] and \
open_prices[i] - close_prices[i] > open_prices[i-1] - close_prices[i-1]:
kicking_pattern[i] = 100 # 一些正值來表示這個模式
# 將 kicking_pattern 作為新的欄位添加到 DataFrame 中
df["kicking"] = kicking_pattern
# 計算 ATR 和 SuperTrend
def calculate_atr(high, low, close, window=14):
true_ranges = np.maximum.reduce([high - low, np.abs(high - close.shift()), np.abs(low - close.shift())])
atr = np.zeros_like(high)
atr[window - 1] = np.mean(true_ranges[:window])
for i in range(window, len(high)):
atr[i] = (atr[i - 1] * (window - 1) + true_ranges[i]) / window
return atr
df["ATR"] = calculate_atr(df["High"], df["Low"], df["Close"], window=14)
# 使用減少的敏感度和所有指標計算 Supertrend 信號
df["upper_band_supertrend"] = df["High"] - (df["ATR"])
df["lower_band_supertrend"] = df["Low"] + (df["ATR"])
# 根據對指標的敏感度定義上升趨勢和下降趨勢條件
uptrend_conditions = [
(df["Close"] > df["lower_band_supertrend"]),
(df["Close"] > df["SMA"]),
(df["Close"] > df["middle_band"]),
(df["Close"] > df["MACD"]),
(df["RSI"] > 50),
(df["aroon_up"] > df["aroon_down"]),
(df["kicking"] == 1), # 假設 "kicking" 是指標,其中 1 表示上升趨勢。
(df["Close"] > df["upper_band_supertrend"])
]
downtrend_conditions = [
(df["Close"] < df["upper_band_supertrend"]),
(df["Close"] < df["SMA"]),
(df["Close"] < df["middle_band"]),
(df["Close"] < df["MACD"]),
(df["RSI"] < 50),
(df["aroon_up"] < df["aroon_down"]),
(df["kicking"] == -1), # 假設 "kicking" 是指標,其中 -1 表示下降趨勢。
(df["Close"] < df["lower_band_supertrend"])
]
# 將初始信號值設置為 0
df["supertrend_signal"] = 0
# 根據對指標的敏感度更新信號
df.loc[np.any(uptrend_conditions, axis=0), "supertrend_signal"] = 1
df.loc[np.any(downtrend_conditions, axis=0), "supertrend_signal"] = -1
# 分解時間序列數據
result = seasonal_decompose(df["Close"], model="additive", period=365)
# 將分解的組件添加到 DataFrame 中
df["trend"] = result.trend
df["seasonal"] = result.seasonal
df["residual"] = result.resid
# 按您希望的順序串聯欄位
df2 = pd.concat(
[
df["Date"],
df["Close"],
df["Open"],
df["Adj Close"],
df["Volume"],
df["High"],
df["Low"],
df["SMA"],
df["MACD"],
df["upper_band"],
df["middle_band"],
df["lower_band"],
df["supertrend_signal"],
df["RSI"],
df["aroon_up"],
df["aroon_down"],
df["kicking"],
df["upper_band_supertrend"],
df["lower_band_supertrend"],
df["trend"],
df["seasonal"],
df["residual"]
],
axis=1,
)
# 用 0 填充缺失值
df2.fillna(0, inplace=True)
# 將 DataFrame 保存到新 CSV 文件,包含指標和分解的組件
df2.to_csv("data.csv", index=False)
# 去除相同方向的連續信號(減少敏感度)
signal_changes = df["supertrend_signal"].diff().fillna(0)
consecutive_mask = (signal_changes == 0) & (signal_changes.shift(-1) == 0)
df.loc[consecutive_mask, "supertrend_signal"] = 0
# 繪製數據
fig, (ax1, ax2, ax3, ax4, ax5) = plt.subplots(5, 1, figsize=(12, 8), sharex=True)
ax1.plot(df["Open"], label="Open")
ax1.plot(df["Close"], label="Close")
ax1.plot(df["trend"], label="Trend")
ax1.plot(df["SMA"], label="SMA")
ax1.fill_between(
df.index, df["upper_band"], df["lower_band"], alpha=0.2, color="gray"
)
ax1.plot(df["upper_band"], linestyle="dashed", color="gray")
ax1.plot(df["middle_band"], linestyle="dashed", color="gray")
ax1.plot(df["lower_band"], linestyle="dashed", color="gray")
ax1.scatter(
df.index[df["supertrend_signal"] == 1],
df["Close"][df["supertrend_signal"] == 1],
marker="^",
color="green",
s=100,
)
ax1.scatter(
df.index[df["supertrend_signal"] == -1],
df["Close"][df["supertrend_signal"] == -1],
marker="v",
color="red",
s=100,
)
ax1.legend()
ax2.plot(df["aroon_up"], label="Aroon Up")
ax2.plot(df["aroon_down"], label="Aroon Down")
ax2.legend()
ax3.plot(df["RSI"], label="RSI")
ax3.legend()
ax4.plot(df["seasonal"], label="Seasonal")
ax4.legend()
ax5.plot(df["residual"], label="Residual")
ax5.legend()
plt.xlim(df.index[0], df.index[-1])
plt.show()
if __name__ == "__main__":
ticker_symbol = stock_symbol + ".TW"
tic = yf.Ticker(ticker_symbol)
info = tic.get_info()
start_date = input("請輸入開始日期(yyyy-mm-dd):")
end_date = input("請輸入結束日期(yyyy-mm-dd):")
print(f"{stock_name}({ticker_symbol}) 的信息:\n")
print(info["shortName"],"\n")
indicator = "未知"
if "recommendationKey" in info:
if info["recommendationKey"] == "buy":
indicator = "買入"
elif info["recommendationKey"] == "sell":
indicator = "賣出"
elif info["recommendationKey"] == "strong buy":
indicator = "強烈買入"
elif info["recommendationKey"] == "hold":
indicator = "持有"
elif info["recommendationKey"] == "underperform":
indicator = "表現不佳"
print(f"推薦趨勢為 {indicator}")
download = input("您是否要下載此股票代碼(y/n)?:").lower()
if download == "y":
download_and_prepare_data(ticker_symbol, start_date, end_date)
preprocess_data(ticker_symbol)
else:
print("退出腳本..")
```
#### 函數 download_and_prepare_data
* 參數
ticker_symbol: 股票代碼
start_date: 開始日期
end_date: 結束日期
* 流程
檢查資料夾data是否存在,不存在則創建。
使用yfinance庫下載股票數據。
將數據保存到本地CSV文件中。
#### 函數 preprocess_data
這個函數從CSV文件讀取數據,計算各種技術指標,並將結果保存到新的CSV文件中。
* 流程:
先從CSV文件中讀取數據。
1. 使用ta庫計算技術指標,包括均線(SMA)、相對強弱指標(RSI)、指數平滑異同移動平均線(MACD)、布林帶(Bollinger Bands)和Aroon指標。
2. 定義和計算“踢腿形態”(Kicking Pattern)。
3. 計算平均真實波幅(ATR)和SuperTrend指標
4. 分解時間序列數據成趨勢、季節性和殘差。
5. 將所有計算結果和原始數據合併,保存到新的CSV文件中。
6. 繪製數據圖表,包括開盤價、收盤價、SMA、布林帶、SuperTrend信號、Aroon指標、RSI、季節性成分和殘差成分。
* 簡單移動平均線(Simple Moving Average, SMA)
SMA 是一種常見的技術指標,用於平滑價格數據,消除短期波動,從而更容易識別價格趨勢。計算方法是將一定時間範圍內的收盤價格平均化。
* 布林帶(Bollinger Bands)
布林帶由三條線組成:中軌線(SMA)、上軌線和下軌線。它們用於衡量市場的波動性和相對價格水平。
* SuperTrend 信號
SuperTrend 指標基於平均真實波幅(ATR)計算,用於識別趨勢方向。SuperTrend 信號分為上升趨勢和下降趨勢。
* Aroon 指標
Aroon 指標用於測量趨勢的強度和變化,包含 Aroon Up 和 Aroon Down。
* 相對強弱指標(Relative Strength Index, RSI)
RSI 是一種動量指標,用於測量價格變動的速度和變動幅度。RSI 的值介於0和100之間。
* 季節性成分(Seasonal Component)
季節性成分來自於時間序列分解,用於揭示數據中的週期性變動。
* 殘差成分(Residual Component)
殘差成分是時間序列分解後,扣除趨勢和季節性成分後剩餘的部分,代表數據中的隨機波動。
#### 主程序
主程序首先要求用戶輸入股票代碼、開始日期和結束日期。
流程:
1. 使用yfinance庫獲取股票信息。
2. 顯示股票簡稱和推薦趨勢(例如,買入、賣出、持有等)。
3. 詢問用戶是否下載股票數據。
4. 如果用戶選擇下載,則調用download_and_prepare_data函數下載數據,然後調用preprocess_data函數進行數據處理和可視化。
### 3.訓練model
根據(2)所獲取的csv檔,開始訓練模型
```python=
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization, PReLU, LSTM, Dense
from sklearn.metrics import r2_score, mean_absolute_percentage_error
from sklearn.preprocessing import MaxAbsScaler
print("TensorFlow 版本:", tf.__version__)
# 定義一個函數來加載數據 (將 'data.csv' 替換為你的數據文件)
def load_data(file_path: str) -> pd.DataFrame:
data = pd.read_csv(file_path)
return data[['Close']] # 只加載 'Close' 列
# 定義獎勵函數
def get_reward(y_true, y_pred):
mape = mean_absolute_percentage_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
reward = (((1 - mape) * 0.1) + ((r2) * 1.9)) / 2
return reward
# 定義一個函數來創建 LSTM 模型
def create_LSTM_model() -> Sequential:
model = Sequential()
model.add(LSTM(units=150, return_sequences=True))
model.add(PReLU())
model.add(PReLU())
model.add(PReLU())
model.add(LSTM(units=150))
model.add(PReLU())
model.add(PReLU())
model.add(PReLU())
# 添加最終輸出層
model.add(Dense(units=1, activation='linear'))
return model
# 加載數據
data = load_data("data.csv")
# 將數據分為訓練集和測試集
train_data = data.iloc[:int(0.8*len(data))]
test_data = data.iloc[int(0.8*len(data)):]
# 標準化數據 (只使用 'Close' 列)
scaler = MaxAbsScaler()
train_data_norm = scaler.fit_transform(train_data)
test_data_norm = scaler.transform(test_data)
# 定義時間步長
timesteps = 100
# 創建時間步長的序列 (只使用 'Close' 值)
def create_sequences(data, timesteps):
X = []
y = []
for i in range(timesteps, len(data)):
X.append(data[i-timesteps:i])
y.append(data[i, 0])
return np.array(X), np.array(y)
X_train, y_train = create_sequences(train_data_norm, timesteps)
X_test, y_test = create_sequences(test_data_norm, timesteps)
# 構建並編譯 LSTM 模型
model = create_LSTM_model()
# 編譯模型
model.compile(optimizer='adam', loss="huber")
# 訓練
epochs = 25
batch_size = 32
for i in range(epochs):
print(f"第 {i+1} 個訓練周期 / 共 {epochs} 個訓練周期")
history = model.fit(X_train, y_train, batch_size=batch_size, validation_data=(X_test, y_test), epochs=1)
# 在測試集上評估模型
y_pred_test = model.predict(X_test)
test_reward = get_reward(y_test, y_pred_test)
print("測試集獎勵:", test_reward)
if i == 0:
best_reward = test_reward
if test_reward >= best_reward:
best_reward = test_reward
print("模型已保存!")
model.save("model.keras")
# 加載最佳模型並評估
model = tf.keras.models.load_model("model.keras")
y_pred_test = model.predict(X_test)
test_reward = get_reward(y_test, y_pred_test)
test_loss = model.evaluate(X_test, y_test)
print("最終測試集獎勵:", test_reward)
print("最終測試集損失:", test_loss)
```
這段程式碼通過LSTM神經網絡模型,對股票收盤價進行時間序列預測,並使用獎勵函數來評估模型的預測性能。
### 4.評估模型的可用性
利用剛剛整理出的指標歷史數據來評估我們模型的可用性
```python=
import os
import sys
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import MaxAbsScaler
from tensorflow.keras.models import load_model
from sklearn.metrics import (
mean_squared_error,
r2_score,
mean_absolute_percentage_error,
)
import matplotlib.pyplot as plt
print("TensorFlow 版本:", tf.__version__)
# 定義獎勵函數
def get_reward(y_true, y_pred):
mape = mean_absolute_percentage_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
reward = (((1 - mape) * 0.1) + ((r2) * 1.9)) / 2
return reward
# 加載數據
data = pd.read_csv("data.csv")
# 將數據分為訓練集和測試集
test_data = data.iloc[int(0.8 * len(data)) :]
# 標準化數據
scaler = MaxAbsScaler()
test_data_norm = scaler.fit_transform(
test_data[
[
"Close"
]
]
)
# 定義時間步長
timesteps = 100
# 創建時間步長的序列
def create_sequences(data, timesteps):
X = []
y = []
for i in range(timesteps, len(data)):
X.append(data[i - timesteps : i])
y.append(data[i, 0])
return np.array(X), np.array(y)
X_test, y_test = create_sequences(test_data_norm, timesteps)
# 加載預訓練模型
model = load_model("model.keras")
print("\n評估模型")
# 評估模型
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# 計算獎勵
reward = get_reward(y_test, y_pred)
# 打印評估指標
# MAPE 衡量的是預測值與實際值之間的百分比誤差,這個指標的值越低,表示預測結果越精確。
print("MAPE:", mape)
# R² 衡量的是模型解釋數據變異的程度,R² 的值範圍在 0 到 1 之間,值越接近 1,表示模型的解釋能力越強。
print("MSE:", mse)
# 獎勵的計算是將 (1 - MAPE) 與 R² 進行加權平均這裡將 (1 - MAPE) 和 R² 各自乘以不同的權重,然後取平均。選擇這些權重是為了更好地平衡兩個指標在獎勵值中的影響。
print("R2:", r2)
print("Reward:", reward)
# 繪製預測值與實際值的比較圖
plt.plot(y_test, label="Actual")
plt.plot(y_pred, label="Predicted")
plt.legend()
plt.show()
```
通過標準化、創建序列、加載模型和評估模型的方式,對股票數據進行了有效的預測和評估。可視化部分有助於直觀地理解模型的預測效果。
Q:使用歷史數據來驗證模型的準確性?
* 避免過擬合:將數據集劃分為訓練集和測試集,確保模型在訓練過程中不會看到測試數據,這樣可以避免過擬合,即模型在訓練數據上表現良好但在未知數據上表現不佳。
* 模型評估:使用測試集來評估模型,可以幫助我們了解模型在未見過的數據上的表現,即模型的泛化能力
### 5.微調模型
設定一個你期望達到的模型品質的值(0~1) 1大概率是不可能達到,建議0.9即可
加載預訓練的LSTM模型,對其進行評估並根據用戶定義的獎勵閾值(reward threshold)進行微調。程式碼結合了模型評估、重新訓練和保存最佳模型的過程,並提供了處理中斷信號的功能。
```python=
import os
import signal
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import MaxAbsScaler
from tensorflow.keras.models import load_model
from sklearn.metrics import (
mean_squared_error,
r2_score,
mean_absolute_percentage_error,
)
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
# 定義獎勵函數
def get_reward(y_true, y_pred):
mape = mean_absolute_percentage_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
reward = (((1 - mape) * 0.1) + ((r2) * 1.9)) / 2
return reward
# 加載數據
data = pd.read_csv("data.csv")
# 分割數據為訓練集和測試集
train_data = data.iloc[: int(0.8 * len(data))]
test_data = data.iloc[int(0.8 * len(data)) :]
# 正規化數據
scaler = MaxAbsScaler()
train_data_norm = scaler.fit_transform(
train_data[
[
"Close"
]
]
)
test_data_norm = scaler.fit_transform(
test_data[
[
"Close"
]
]
)
# 定義時間步長
timesteps = 100
# 創建時間步長的序列
def create_sequences(data, timesteps):
X = []
y = []
for i in range(timesteps, len(data)):
X.append(data[i - timesteps : i])
y.append(data[i, 0])
return np.array(X), np.array(y)
X_train, y_train = create_sequences(train_data_norm, timesteps)
X_test, y_test = create_sequences(test_data_norm, timesteps)
# 定義獎勵閾值
reward_threshold = float(
input("輸入獎勵閾值 (0 - 1, 建議 0.9): ")
)
# 初始化獎勵和其他指標
rewards = []
mses = []
mapes = []
r2s = []
count = 0
# 處理 SIGINT 信號 (CTRL + C)
def handle_interrupt(signal, frame):
print("\n收到中斷信號.")
# 詢問用戶是否確認結束程序
user_input = input(
f"你確定要結束程序嗎? (yes/no): "
)
if user_input.lower() == "yes":
exit(0)
else:
print("繼續進行微調過程")
# 註冊信號處理器
signal.signal(signal.SIGINT, handle_interrupt)
while True:
# 加載模型
model = load_model("model.keras")
print("\n評估模型")
# 評估模型
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# 添加獎勵
reward = get_reward(y_test, y_pred)
best_reward1 = reward
rewards.append(reward)
mses.append(mse)
mapes.append(mape)
r2s.append(r2)
# 打印當前獎勵
print("獎勵:", rewards)
print("MAPE:", mape)
print("MSE:", mse)
print("R2:", r2)
count += 1
print("迴圈次數:", count)
# 檢查是否達到獎勵閾值
if len(rewards) >= 1 and sum(rewards[-1:]) >= reward_threshold:
print("達到獎勵閾值!")
model.save("model.keras")
break
else:
epochs = 10
print(f"訓練模型 {epochs} 次")
batch_size = 32
for i in range(epochs):
print(f"第 {i+1} / {epochs} 輪")
history = model.fit(X_train, y_train, batch_size=batch_size, validation_data=(X_test, y_test), epochs=1)
# 在測試集上評估模型
y_pred_test = model.predict(X_test)
test_reward = get_reward(y_test, y_pred_test)
print("測試獎勵:", test_reward)
if test_reward >= best_reward1:
print("模型已保存!")
best_reward1 = test_reward
model.save("model.keras")
if test_reward >= reward_threshold:
print("模型已達到獎勵閾值", test_reward, "。保存並停止訓練!")
model.save("model.keras")
break
```
這個函式實際上是一種優化算法,被稱為 "強化學習"。這種方法的核心思想是通過不斷地試驗和調整,以找到最優解決方案。
這種方法的有效性在於以下幾點:
>[!Tip]搜索空間:
>儘管看起來是一種 "重複" 的行為,但實際上是在搜索一個巨大的參數空間,以找到最佳的模型結構和參數設置。
>[!Tip]反饋機制:
> 通過不斷評估模型的性能,系統可以根據反饋信息調整策略,從而逐步提高模型的性能。
### 6.主動預測
主動去預測未來指定天數內的數據,並儲存下來欄位(Date, Close)。
```python=
import numpy as np
import numpy as np
import pandas as pd
from sklearn.preprocessing import MaxAbsScaler
import tensorflow as tf
# 載入模型
model = tf.keras.models.load_model('model.keras')
# 載入和準備數據
data = pd.read_csv(f'data/{ticker_symbol}.csv')
close_prices = data['Close'].values.reshape(-1, 1)
# 歸一化
scaler = MaxAbsScaler()
scaled_data = scaler.fit_transform(close_prices)
# 選擇最後的時間視窗資料作為輸入
timesteps = 100
last_sequence = scaled_data[-timesteps:]
last_sequence = np.expand_dims(last_sequence, axis=0)
# 預測未來N天
future_days = int(input('請輸入您想要預測的天數(建議可以設置30天):'))
predictions = []
for _ in range(future_days):
# 獲取最新預測
current_pred = model.predict(last_sequence)
# 調整 current_pred 形狀以匹配 last_sequence
current_pred_reshaped = np.expand_dims(current_pred, axis=-1) # 使其成為 (1, 1, 1)
# 更新輸入資料
last_sequence = np.append(last_sequence[:, 1:, :], current_pred_reshaped, axis=1)
# 保存預測結果以反歸一化
predictions.append(current_pred_reshaped.flatten()[0])
# 反歸一化預測結果
predictions = np.array(predictions).reshape(-1, 1)
predicted_prices = scaler.inverse_transform(predictions)
predicted_prices = predicted_prices.flatten() # 展平陣列
# 列印預測結果
print("預測的未來 {} 天的價格是:".format(future_days))
print(predicted_prices)
# 原始收盤價
original_prices = data['Close'].values
# 生成日期索引
dates = pd.date_range(start=data['Date'].iloc[-1], periods=future_days)
predictions_df = pd.DataFrame({
"Date": dates,
"Predicted Close": predicted_prices
})
predictions_df.to_csv('predictions.csv', index=False)
```
通過設置獎勵閾值和進行多次迴圈訓練,程式碼確保了模型在不斷改進的同時,及時保存最佳模型。信號處理部分則提供了安全退出和繼續訓練的選擇。
### 7.比較
等到未來的指定天數後,使用剛剛預測出來的做比對
```python=
import os
import pandas as pd
import matplotlib.pyplot as plt
# 獲取"data"文件夾中的CSV文件列表
data_folder = "data"
csv_files = [file for file in os.listdir(data_folder) if file.endswith(".csv")]
# 顯示可用的CSV文件
print("可用的CSV文件:")
for i, file in enumerate(csv_files):
print(f"{i + 1}. {file}")
# 用戶選擇一個CSV文件
selected_file = None
while selected_file is None:
try:
file_number = int(input("請輸入您要選擇的CSV文件相應的編號: "))
if file_number < 1 or file_number > len(csv_files):
raise ValueError()
selected_file = csv_files[file_number - 1]
except ValueError:
print("無效的輸入。請輸入有效的編號。")
# 加載預測數據和實際數據
predicted_data = pd.read_csv("predictions.csv")
actual_data = pd.read_csv(os.path.join(data_folder, selected_file))
# 確保日期列是日期格式
predicted_data["Date"] = pd.to_datetime(predicted_data["Date"])
actual_data["Date"] = pd.to_datetime(actual_data["Date"])
# 如果您的實際數據包含未來的日期,這個步驟是必需的
actual_data = actual_data[actual_data["Date"] >= predicted_data["Date"].min()]
# 重命名列以增加清晰度
predicted_data.rename(columns={"Predicted Close": "Predict Close"}, inplace=True)
# 在日期列上結合預測數據和實際數據
combined_data = pd.merge(predicted_data, actual_data[['Date', 'Close']], on="Date", how='left')
combined_data.rename(columns={"Close": "Actual Close"}, inplace=True)
# 計算預測值和實際值之間的絕對百分比誤差
combined_data["Absolute % Error"] = (
abs(combined_data["Predict Close"] - combined_data["Actual Close"]) / combined_data["Actual Close"] * 100
)
# 計算平均絕對百分比誤差並打印它
mape = combined_data["Absolute % Error"].mean()
print(f"平均絕對百分比誤差: {mape:.2f}%")
# 找到絕對百分比誤差最大和最小的行並打印它們
min_error_row = combined_data.loc[combined_data["Absolute % Error"].idxmin()]
max_error_row = combined_data.loc[combined_data["Absolute % Error"].idxmax()]
print(f"\n最準確的預測:\n{min_error_row}\n")
print(f"最不準確的預測:\n{max_error_row}\n")
# 繪製預測和實際收盤價
plt.figure(figsize=(10, 6))
plt.plot(combined_data["Date"], combined_data["Predict Close"], label="Stock price forecasting")
plt.plot(combined_data["Date"], combined_data["Actual Close"], label="closing price")
plt.xlabel("date")
plt.ylabel("price")
plt.title("Compare")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
```
通過用戶選擇CSV文件,然後計算預測值與實際值之間的誤差,並將這些結果可視化展示出來,從而幫助用戶理解模型的準確性和改進空間。
### 8.利用抓取之資料繪製熟悉的 K棒圖
這個主要是可以利用最常規的手法,利用爬蟲去獲取股票資訊,之後繪製出k棒圖。
可以選擇我們先前所抓取的csv檔案,並且可以設定時間區間,使我們更能自由查看股票的波動。
```python=
import pandas as pd
import os
import mplfinance as mpf
from datetime import datetime
# 獲取"data"文件夾中的CSV文件列表
data_folder = "data"
csv_files = [file for file in os.listdir(data_folder) if file.endswith(".csv")]
# 顯示可用的CSV文件
print("可用的CSV文件:")
for i, file in enumerate(csv_files):
print(f"{i + 1}. {file}")
# 用戶選擇一個CSV文件
selected_file = None
while selected_file is None:
try:
file_number = int(input("請輸入您要選擇的CSV文件相應的編號: "))
if file_number < 1 or file_number > len(csv_files):
raise ValueError()
selected_file = csv_files[file_number - 1]
except ValueError:
print("無效的輸入。請輸入有效的編號。")
# 讀取CSV文件並設置日期列為DatetimeIndex
df = pd.read_csv(os.path.join(data_folder, selected_file), parse_dates=['Date'], index_col='Date')
# 讓使用者選擇分析區間
start_date = input("請輸入開始日期 (YYYY-MM-DD): ")
end_date = input("請輸入結束日期 (YYYY-MM-DD): ")
try:
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
df = df.loc[start_date:end_date]
except ValueError:
print("日期格式錯誤,請使用 YYYY-MM-DD 格式。")
exit()
mc = mpf.make_marketcolors(up='r', down='g', inherit=True)
s = mpf.make_mpf_style(base_mpf_style='yahoo', marketcolors=mc)
kwargs = dict(type='candle', mav=(10,20,60), volume=True, figratio=(16,9), figscale=2, title=selected_file, style=s)
mpf.plot(df, **kwargs)
```
根據用戶指定的日期範圍篩選數據,然後使用mplfinance庫繪製K線圖。這種方法可以用於分析股票或其他金融數據的歷史價格走勢,並可視化其在選定區間內的表現。