# 機器學習期末報告 AI股票 ## 介紹 此為訓練單一股票之預測模型 由於教室的網路速度有點慢(比烏龜慢),因此本次的報告使用==google colab==進行 >[!Important] >**請注意,以下所獲取之預測資料僅供參考,本組只進行技術教授與說明,不負任何投資損失責任** ## 參照程式碼 https://github.com/lex-hue/Stock-Predictor-V4 https://hackmd.io/@s02260441/Hki9NN5jL ## 預測Model的建置順序 ### install.py 本次會用到的套件我們這次透過function來直接將其包含起來,也能知道我們在這過程中下載了哪些的套件與工具。 ```python= def install_dependencies(): packages = [ "pandas", "ta", "yfinance", "numpy", "scikit-learn", "matplotlib", "tensorflow", "statsmodels", "pdfplumber" "mplfinance" ] total_packages = len(packages) progress = 0 for package in packages: progress += 1 print(f"Installing {package}... ({progress}/{total_packages})") !pip install {package} install_dependencies() ``` ### 1-1.取得台灣股市資訊 本報告在網路上取得一份PDF檔,裡面包含台灣大部分的股市的名稱與代號,本程式將會去爬該PDF的內容,將檔案內所有的股市資訊給整理成一份字典,並且製作成下拉選單,方便選取想要的股市資訊。 PDF 下載連結: https://drive.google.com/file/d/1G54CplA-WuDdJttQ6W5UuU_JjwR2myA8/view?usp=sharing ```python= from IPython.display import display import ipywidgets as widgets import pdfplumber options_mapping = {} pdf = pdfplumber.open('SE1026.pdf') for i, page in enumerate(pdf.pages): tables = page.extract_table() if tables: if tables[0][0] == "股票代號" and tables[0][1] == "股票名稱": tables = tables[1:] for table in tables: for index in range(0, len(table), 2): options_mapping[table[index+1]] = table[index] pdf.close() stock_name = None stock_symbol = None # 下拉選單 combobox = widgets.Combobox( options=list(options_mapping.keys()), # 使用顯示值作為選項 description='請選擇:' ) # 創建處理函數 def handle_combobox_change(change): global stock_name, stock_symbol stock_name = change.new stock_symbol = options_mapping.get(stock_name) # 將處理函數綁定到下拉選單的變化事件 combobox.observe(handle_combobox_change, names='value') # 顯示下拉選單 display(combobox) ``` > stock_name 和 stock_symbol是全域變數 > 只要在下拉方塊選擇了新的值,那麼這兩個變數就會及時更新 > 後面的程式若需要輸入股票代號就會採用這些資訊。 > ### 1-2.檢查下拉選單的選擇值 ```python= if stock_name: print("你選擇了:", stock_name) print("股票代號為:", stock_symbol) else: print("尚未選擇.") ``` ### 2.抓取所選股市的資訊 根據下拉選單所選的股票代號以及所輸入的日期範圍,以下程式碼會抓該股票代號的相關資訊 匯出成csv,並且整理、運算出其他常見的技術分析指標 ```python= import os import pandas as pd import numpy as np import ta import matplotlib.pyplot as plt import yfinance as yf from statsmodels.tsa.seasonal import seasonal_decompose def download_and_prepare_data(ticker_symbol,start_date, end_date): data_dir = 'data' if not os.path.exists(data_dir): os.makedirs(data_dir) data = yf.download(ticker_symbol, period="max", interval="1d", start=start_date, end=end_date) df = pd.DataFrame(data) data_file = f"./data/{ticker_symbol}.csv" df.to_csv(data_file) print("資料已下載並儲存至", data_file) def preprocess_data(ticker_symbol): data_file = f"./data/{ticker_symbol}.csv" df = pd.read_csv(data_file) # 使用 ta 庫計算技術指標(均線、相對強弱指標、指數平滑異同移動平均線) df["SMA"] = ta.trend.sma_indicator(df["Close"], window=14) df["RSI"] = ta.momentum.rsi(df["Close"], window=14) df["MACD"] = ta.trend.macd_diff(df["Close"], window_slow=26, window_fast=12, window_sign=9) df_bollinger = ta.volatility.BollingerBands(df["Close"], window=20) df["upper_band"] = df_bollinger.bollinger_hband() df["middle_band"] = df_bollinger.bollinger_mavg() df["lower_band"] = df_bollinger.bollinger_lband() aroon_indicator = ta.trend.AroonIndicator(high=df["High"], low=df["Low"], window=25) df["aroon_up"] = aroon_indicator.aroon_up() df["aroon_down"] = aroon_indicator.aroon_down() open_prices = df["Open"] close_prices = df["Close"] kicking_pattern = np.zeros_like(open_prices) for i in range(1, len(open_prices)): if open_prices[i] < open_prices[i-1] and \ open_prices[i] > close_prices[i-1] and \ close_prices[i] > open_prices[i-1] and \ close_prices[i] < close_prices[i-1] and \ open_prices[i] - close_prices[i] > open_prices[i-1] - close_prices[i-1]: kicking_pattern[i] = 100 # 一些正值來表示這個模式 # 將 kicking_pattern 作為新的欄位添加到 DataFrame 中 df["kicking"] = kicking_pattern # 計算 ATR 和 SuperTrend def calculate_atr(high, low, close, window=14): true_ranges = np.maximum.reduce([high - low, np.abs(high - close.shift()), np.abs(low - close.shift())]) atr = np.zeros_like(high) atr[window - 1] = np.mean(true_ranges[:window]) for i in range(window, len(high)): atr[i] = (atr[i - 1] * (window - 1) + true_ranges[i]) / window return atr df["ATR"] = calculate_atr(df["High"], df["Low"], df["Close"], window=14) # 使用減少的敏感度和所有指標計算 Supertrend 信號 df["upper_band_supertrend"] = df["High"] - (df["ATR"]) df["lower_band_supertrend"] = df["Low"] + (df["ATR"]) # 根據對指標的敏感度定義上升趨勢和下降趨勢條件 uptrend_conditions = [ (df["Close"] > df["lower_band_supertrend"]), (df["Close"] > df["SMA"]), (df["Close"] > df["middle_band"]), (df["Close"] > df["MACD"]), (df["RSI"] > 50), (df["aroon_up"] > df["aroon_down"]), (df["kicking"] == 1), # 假設 "kicking" 是指標,其中 1 表示上升趨勢。 (df["Close"] > df["upper_band_supertrend"]) ] downtrend_conditions = [ (df["Close"] < df["upper_band_supertrend"]), (df["Close"] < df["SMA"]), (df["Close"] < df["middle_band"]), (df["Close"] < df["MACD"]), (df["RSI"] < 50), (df["aroon_up"] < df["aroon_down"]), (df["kicking"] == -1), # 假設 "kicking" 是指標,其中 -1 表示下降趨勢。 (df["Close"] < df["lower_band_supertrend"]) ] # 將初始信號值設置為 0 df["supertrend_signal"] = 0 # 根據對指標的敏感度更新信號 df.loc[np.any(uptrend_conditions, axis=0), "supertrend_signal"] = 1 df.loc[np.any(downtrend_conditions, axis=0), "supertrend_signal"] = -1 # 分解時間序列數據 result = seasonal_decompose(df["Close"], model="additive", period=365) # 將分解的組件添加到 DataFrame 中 df["trend"] = result.trend df["seasonal"] = result.seasonal df["residual"] = result.resid # 按您希望的順序串聯欄位 df2 = pd.concat( [ df["Date"], df["Close"], df["Open"], df["Adj Close"], df["Volume"], df["High"], df["Low"], df["SMA"], df["MACD"], df["upper_band"], df["middle_band"], df["lower_band"], df["supertrend_signal"], df["RSI"], df["aroon_up"], df["aroon_down"], df["kicking"], df["upper_band_supertrend"], df["lower_band_supertrend"], df["trend"], df["seasonal"], df["residual"] ], axis=1, ) # 用 0 填充缺失值 df2.fillna(0, inplace=True) # 將 DataFrame 保存到新 CSV 文件,包含指標和分解的組件 df2.to_csv("data.csv", index=False) # 去除相同方向的連續信號(減少敏感度) signal_changes = df["supertrend_signal"].diff().fillna(0) consecutive_mask = (signal_changes == 0) & (signal_changes.shift(-1) == 0) df.loc[consecutive_mask, "supertrend_signal"] = 0 # 繪製數據 fig, (ax1, ax2, ax3, ax4, ax5) = plt.subplots(5, 1, figsize=(12, 8), sharex=True) ax1.plot(df["Open"], label="Open") ax1.plot(df["Close"], label="Close") ax1.plot(df["trend"], label="Trend") ax1.plot(df["SMA"], label="SMA") ax1.fill_between( df.index, df["upper_band"], df["lower_band"], alpha=0.2, color="gray" ) ax1.plot(df["upper_band"], linestyle="dashed", color="gray") ax1.plot(df["middle_band"], linestyle="dashed", color="gray") ax1.plot(df["lower_band"], linestyle="dashed", color="gray") ax1.scatter( df.index[df["supertrend_signal"] == 1], df["Close"][df["supertrend_signal"] == 1], marker="^", color="green", s=100, ) ax1.scatter( df.index[df["supertrend_signal"] == -1], df["Close"][df["supertrend_signal"] == -1], marker="v", color="red", s=100, ) ax1.legend() ax2.plot(df["aroon_up"], label="Aroon Up") ax2.plot(df["aroon_down"], label="Aroon Down") ax2.legend() ax3.plot(df["RSI"], label="RSI") ax3.legend() ax4.plot(df["seasonal"], label="Seasonal") ax4.legend() ax5.plot(df["residual"], label="Residual") ax5.legend() plt.xlim(df.index[0], df.index[-1]) plt.show() if __name__ == "__main__": ticker_symbol = stock_symbol + ".TW" tic = yf.Ticker(ticker_symbol) info = tic.get_info() start_date = input("請輸入開始日期(yyyy-mm-dd):") end_date = input("請輸入結束日期(yyyy-mm-dd):") print(f"{stock_name}({ticker_symbol}) 的信息:\n") print(info["shortName"],"\n") indicator = "未知" if "recommendationKey" in info: if info["recommendationKey"] == "buy": indicator = "買入" elif info["recommendationKey"] == "sell": indicator = "賣出" elif info["recommendationKey"] == "strong buy": indicator = "強烈買入" elif info["recommendationKey"] == "hold": indicator = "持有" elif info["recommendationKey"] == "underperform": indicator = "表現不佳" print(f"推薦趨勢為 {indicator}") download = input("您是否要下載此股票代碼(y/n)?:").lower() if download == "y": download_and_prepare_data(ticker_symbol, start_date, end_date) preprocess_data(ticker_symbol) else: print("退出腳本..") ``` #### 函數 download_and_prepare_data * 參數 ticker_symbol: 股票代碼 start_date: 開始日期 end_date: 結束日期 * 流程 檢查資料夾data是否存在,不存在則創建。 使用yfinance庫下載股票數據。 將數據保存到本地CSV文件中。 #### 函數 preprocess_data 這個函數從CSV文件讀取數據,計算各種技術指標,並將結果保存到新的CSV文件中。 * 流程: 先從CSV文件中讀取數據。 1. 使用ta庫計算技術指標,包括均線(SMA)、相對強弱指標(RSI)、指數平滑異同移動平均線(MACD)、布林帶(Bollinger Bands)和Aroon指標。 2. 定義和計算“踢腿形態”(Kicking Pattern)。 3. 計算平均真實波幅(ATR)和SuperTrend指標 4. 分解時間序列數據成趨勢、季節性和殘差。 5. 將所有計算結果和原始數據合併,保存到新的CSV文件中。 6. 繪製數據圖表,包括開盤價、收盤價、SMA、布林帶、SuperTrend信號、Aroon指標、RSI、季節性成分和殘差成分。 * 簡單移動平均線(Simple Moving Average, SMA) SMA 是一種常見的技術指標,用於平滑價格數據,消除短期波動,從而更容易識別價格趨勢。計算方法是將一定時間範圍內的收盤價格平均化。 * 布林帶(Bollinger Bands) 布林帶由三條線組成:中軌線(SMA)、上軌線和下軌線。它們用於衡量市場的波動性和相對價格水平。 * SuperTrend 信號 SuperTrend 指標基於平均真實波幅(ATR)計算,用於識別趨勢方向。SuperTrend 信號分為上升趨勢和下降趨勢。 * Aroon 指標 Aroon 指標用於測量趨勢的強度和變化,包含 Aroon Up 和 Aroon Down。 * 相對強弱指標(Relative Strength Index, RSI) RSI 是一種動量指標,用於測量價格變動的速度和變動幅度。RSI 的值介於0和100之間。 * 季節性成分(Seasonal Component) 季節性成分來自於時間序列分解,用於揭示數據中的週期性變動。 * 殘差成分(Residual Component) 殘差成分是時間序列分解後,扣除趨勢和季節性成分後剩餘的部分,代表數據中的隨機波動。 #### 主程序 主程序首先要求用戶輸入股票代碼、開始日期和結束日期。 流程: 1. 使用yfinance庫獲取股票信息。 2. 顯示股票簡稱和推薦趨勢(例如,買入、賣出、持有等)。 3. 詢問用戶是否下載股票數據。 4. 如果用戶選擇下載,則調用download_and_prepare_data函數下載數據,然後調用preprocess_data函數進行數據處理和可視化。 ### 3.訓練model 根據(2)所獲取的csv檔,開始訓練模型 ```python= import os os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" import pandas as pd import numpy as np import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import BatchNormalization, PReLU, LSTM, Dense from sklearn.metrics import r2_score, mean_absolute_percentage_error from sklearn.preprocessing import MaxAbsScaler print("TensorFlow 版本:", tf.__version__) # 定義一個函數來加載數據 (將 'data.csv' 替換為你的數據文件) def load_data(file_path: str) -> pd.DataFrame: data = pd.read_csv(file_path) return data[['Close']] # 只加載 'Close' 列 # 定義獎勵函數 def get_reward(y_true, y_pred): mape = mean_absolute_percentage_error(y_true, y_pred) r2 = r2_score(y_true, y_pred) reward = (((1 - mape) * 0.1) + ((r2) * 1.9)) / 2 return reward # 定義一個函數來創建 LSTM 模型 def create_LSTM_model() -> Sequential: model = Sequential() model.add(LSTM(units=150, return_sequences=True)) model.add(PReLU()) model.add(PReLU()) model.add(PReLU()) model.add(LSTM(units=150)) model.add(PReLU()) model.add(PReLU()) model.add(PReLU()) # 添加最終輸出層 model.add(Dense(units=1, activation='linear')) return model # 加載數據 data = load_data("data.csv") # 將數據分為訓練集和測試集 train_data = data.iloc[:int(0.8*len(data))] test_data = data.iloc[int(0.8*len(data)):] # 標準化數據 (只使用 'Close' 列) scaler = MaxAbsScaler() train_data_norm = scaler.fit_transform(train_data) test_data_norm = scaler.transform(test_data) # 定義時間步長 timesteps = 100 # 創建時間步長的序列 (只使用 'Close' 值) def create_sequences(data, timesteps): X = [] y = [] for i in range(timesteps, len(data)): X.append(data[i-timesteps:i]) y.append(data[i, 0]) return np.array(X), np.array(y) X_train, y_train = create_sequences(train_data_norm, timesteps) X_test, y_test = create_sequences(test_data_norm, timesteps) # 構建並編譯 LSTM 模型 model = create_LSTM_model() # 編譯模型 model.compile(optimizer='adam', loss="huber") # 訓練 epochs = 25 batch_size = 32 for i in range(epochs): print(f"第 {i+1} 個訓練周期 / 共 {epochs} 個訓練周期") history = model.fit(X_train, y_train, batch_size=batch_size, validation_data=(X_test, y_test), epochs=1) # 在測試集上評估模型 y_pred_test = model.predict(X_test) test_reward = get_reward(y_test, y_pred_test) print("測試集獎勵:", test_reward) if i == 0: best_reward = test_reward if test_reward >= best_reward: best_reward = test_reward print("模型已保存!") model.save("model.keras") # 加載最佳模型並評估 model = tf.keras.models.load_model("model.keras") y_pred_test = model.predict(X_test) test_reward = get_reward(y_test, y_pred_test) test_loss = model.evaluate(X_test, y_test) print("最終測試集獎勵:", test_reward) print("最終測試集損失:", test_loss) ``` 這段程式碼通過LSTM神經網絡模型,對股票收盤價進行時間序列預測,並使用獎勵函數來評估模型的預測性能。 ### 4.評估模型的可用性 利用剛剛整理出的指標歷史數據來評估我們模型的可用性 ```python= import os import sys import pandas as pd import numpy as np import tensorflow as tf from sklearn.preprocessing import MaxAbsScaler from tensorflow.keras.models import load_model from sklearn.metrics import ( mean_squared_error, r2_score, mean_absolute_percentage_error, ) import matplotlib.pyplot as plt print("TensorFlow 版本:", tf.__version__) # 定義獎勵函數 def get_reward(y_true, y_pred): mape = mean_absolute_percentage_error(y_true, y_pred) r2 = r2_score(y_true, y_pred) reward = (((1 - mape) * 0.1) + ((r2) * 1.9)) / 2 return reward # 加載數據 data = pd.read_csv("data.csv") # 將數據分為訓練集和測試集 test_data = data.iloc[int(0.8 * len(data)) :] # 標準化數據 scaler = MaxAbsScaler() test_data_norm = scaler.fit_transform( test_data[ [ "Close" ] ] ) # 定義時間步長 timesteps = 100 # 創建時間步長的序列 def create_sequences(data, timesteps): X = [] y = [] for i in range(timesteps, len(data)): X.append(data[i - timesteps : i]) y.append(data[i, 0]) return np.array(X), np.array(y) X_test, y_test = create_sequences(test_data_norm, timesteps) # 加載預訓練模型 model = load_model("model.keras") print("\n評估模型") # 評估模型 y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) mape = mean_absolute_percentage_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) # 計算獎勵 reward = get_reward(y_test, y_pred) # 打印評估指標 # MAPE 衡量的是預測值與實際值之間的百分比誤差,這個指標的值越低,表示預測結果越精確。 print("MAPE:", mape) # R² 衡量的是模型解釋數據變異的程度,R² 的值範圍在 0 到 1 之間,值越接近 1,表示模型的解釋能力越強。 print("MSE:", mse) # 獎勵的計算是將 (1 - MAPE) 與 R² 進行加權平均這裡將 (1 - MAPE) 和 R² 各自乘以不同的權重,然後取平均。選擇這些權重是為了更好地平衡兩個指標在獎勵值中的影響。 print("R2:", r2) print("Reward:", reward) # 繪製預測值與實際值的比較圖 plt.plot(y_test, label="Actual") plt.plot(y_pred, label="Predicted") plt.legend() plt.show() ``` 通過標準化、創建序列、加載模型和評估模型的方式,對股票數據進行了有效的預測和評估。可視化部分有助於直觀地理解模型的預測效果。 Q:使用歷史數據來驗證模型的準確性? * 避免過擬合:將數據集劃分為訓練集和測試集,確保模型在訓練過程中不會看到測試數據,這樣可以避免過擬合,即模型在訓練數據上表現良好但在未知數據上表現不佳。 * 模型評估:使用測試集來評估模型,可以幫助我們了解模型在未見過的數據上的表現,即模型的泛化能力 ### 5.微調模型 設定一個你期望達到的模型品質的值(0~1) 1大概率是不可能達到,建議0.9即可 加載預訓練的LSTM模型,對其進行評估並根據用戶定義的獎勵閾值(reward threshold)進行微調。程式碼結合了模型評估、重新訓練和保存最佳模型的過程,並提供了處理中斷信號的功能。 ```python= import os import signal os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" import pandas as pd import numpy as np import tensorflow as tf from sklearn.preprocessing import MaxAbsScaler from tensorflow.keras.models import load_model from sklearn.metrics import ( mean_squared_error, r2_score, mean_absolute_percentage_error, ) import matplotlib.pyplot as plt from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping # 定義獎勵函數 def get_reward(y_true, y_pred): mape = mean_absolute_percentage_error(y_true, y_pred) r2 = r2_score(y_true, y_pred) reward = (((1 - mape) * 0.1) + ((r2) * 1.9)) / 2 return reward # 加載數據 data = pd.read_csv("data.csv") # 分割數據為訓練集和測試集 train_data = data.iloc[: int(0.8 * len(data))] test_data = data.iloc[int(0.8 * len(data)) :] # 正規化數據 scaler = MaxAbsScaler() train_data_norm = scaler.fit_transform( train_data[ [ "Close" ] ] ) test_data_norm = scaler.fit_transform( test_data[ [ "Close" ] ] ) # 定義時間步長 timesteps = 100 # 創建時間步長的序列 def create_sequences(data, timesteps): X = [] y = [] for i in range(timesteps, len(data)): X.append(data[i - timesteps : i]) y.append(data[i, 0]) return np.array(X), np.array(y) X_train, y_train = create_sequences(train_data_norm, timesteps) X_test, y_test = create_sequences(test_data_norm, timesteps) # 定義獎勵閾值 reward_threshold = float( input("輸入獎勵閾值 (0 - 1, 建議 0.9): ") ) # 初始化獎勵和其他指標 rewards = [] mses = [] mapes = [] r2s = [] count = 0 # 處理 SIGINT 信號 (CTRL + C) def handle_interrupt(signal, frame): print("\n收到中斷信號.") # 詢問用戶是否確認結束程序 user_input = input( f"你確定要結束程序嗎? (yes/no): " ) if user_input.lower() == "yes": exit(0) else: print("繼續進行微調過程") # 註冊信號處理器 signal.signal(signal.SIGINT, handle_interrupt) while True: # 加載模型 model = load_model("model.keras") print("\n評估模型") # 評估模型 y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) mape = mean_absolute_percentage_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) # 添加獎勵 reward = get_reward(y_test, y_pred) best_reward1 = reward rewards.append(reward) mses.append(mse) mapes.append(mape) r2s.append(r2) # 打印當前獎勵 print("獎勵:", rewards) print("MAPE:", mape) print("MSE:", mse) print("R2:", r2) count += 1 print("迴圈次數:", count) # 檢查是否達到獎勵閾值 if len(rewards) >= 1 and sum(rewards[-1:]) >= reward_threshold: print("達到獎勵閾值!") model.save("model.keras") break else: epochs = 10 print(f"訓練模型 {epochs} 次") batch_size = 32 for i in range(epochs): print(f"第 {i+1} / {epochs} 輪") history = model.fit(X_train, y_train, batch_size=batch_size, validation_data=(X_test, y_test), epochs=1) # 在測試集上評估模型 y_pred_test = model.predict(X_test) test_reward = get_reward(y_test, y_pred_test) print("測試獎勵:", test_reward) if test_reward >= best_reward1: print("模型已保存!") best_reward1 = test_reward model.save("model.keras") if test_reward >= reward_threshold: print("模型已達到獎勵閾值", test_reward, "。保存並停止訓練!") model.save("model.keras") break ``` 這個函式實際上是一種優化算法,被稱為 "強化學習"。這種方法的核心思想是通過不斷地試驗和調整,以找到最優解決方案。 這種方法的有效性在於以下幾點: >[!Tip]搜索空間: >儘管看起來是一種 "重複" 的行為,但實際上是在搜索一個巨大的參數空間,以找到最佳的模型結構和參數設置。 >[!Tip]反饋機制: > 通過不斷評估模型的性能,系統可以根據反饋信息調整策略,從而逐步提高模型的性能。 ### 6.主動預測 主動去預測未來指定天數內的數據,並儲存下來欄位(Date, Close)。 ```python= import numpy as np import numpy as np import pandas as pd from sklearn.preprocessing import MaxAbsScaler import tensorflow as tf # 載入模型 model = tf.keras.models.load_model('model.keras') # 載入和準備數據 data = pd.read_csv(f'data/{ticker_symbol}.csv') close_prices = data['Close'].values.reshape(-1, 1) # 歸一化 scaler = MaxAbsScaler() scaled_data = scaler.fit_transform(close_prices) # 選擇最後的時間視窗資料作為輸入 timesteps = 100 last_sequence = scaled_data[-timesteps:] last_sequence = np.expand_dims(last_sequence, axis=0) # 預測未來N天 future_days = int(input('請輸入您想要預測的天數(建議可以設置30天):')) predictions = [] for _ in range(future_days): # 獲取最新預測 current_pred = model.predict(last_sequence) # 調整 current_pred 形狀以匹配 last_sequence current_pred_reshaped = np.expand_dims(current_pred, axis=-1) # 使其成為 (1, 1, 1) # 更新輸入資料 last_sequence = np.append(last_sequence[:, 1:, :], current_pred_reshaped, axis=1) # 保存預測結果以反歸一化 predictions.append(current_pred_reshaped.flatten()[0]) # 反歸一化預測結果 predictions = np.array(predictions).reshape(-1, 1) predicted_prices = scaler.inverse_transform(predictions) predicted_prices = predicted_prices.flatten() # 展平陣列 # 列印預測結果 print("預測的未來 {} 天的價格是:".format(future_days)) print(predicted_prices) # 原始收盤價 original_prices = data['Close'].values # 生成日期索引 dates = pd.date_range(start=data['Date'].iloc[-1], periods=future_days) predictions_df = pd.DataFrame({ "Date": dates, "Predicted Close": predicted_prices }) predictions_df.to_csv('predictions.csv', index=False) ``` 通過設置獎勵閾值和進行多次迴圈訓練,程式碼確保了模型在不斷改進的同時,及時保存最佳模型。信號處理部分則提供了安全退出和繼續訓練的選擇。 ### 7.比較 等到未來的指定天數後,使用剛剛預測出來的做比對 ```python= import os import pandas as pd import matplotlib.pyplot as plt # 獲取"data"文件夾中的CSV文件列表 data_folder = "data" csv_files = [file for file in os.listdir(data_folder) if file.endswith(".csv")] # 顯示可用的CSV文件 print("可用的CSV文件:") for i, file in enumerate(csv_files): print(f"{i + 1}. {file}") # 用戶選擇一個CSV文件 selected_file = None while selected_file is None: try: file_number = int(input("請輸入您要選擇的CSV文件相應的編號: ")) if file_number < 1 or file_number > len(csv_files): raise ValueError() selected_file = csv_files[file_number - 1] except ValueError: print("無效的輸入。請輸入有效的編號。") # 加載預測數據和實際數據 predicted_data = pd.read_csv("predictions.csv") actual_data = pd.read_csv(os.path.join(data_folder, selected_file)) # 確保日期列是日期格式 predicted_data["Date"] = pd.to_datetime(predicted_data["Date"]) actual_data["Date"] = pd.to_datetime(actual_data["Date"]) # 如果您的實際數據包含未來的日期,這個步驟是必需的 actual_data = actual_data[actual_data["Date"] >= predicted_data["Date"].min()] # 重命名列以增加清晰度 predicted_data.rename(columns={"Predicted Close": "Predict Close"}, inplace=True) # 在日期列上結合預測數據和實際數據 combined_data = pd.merge(predicted_data, actual_data[['Date', 'Close']], on="Date", how='left') combined_data.rename(columns={"Close": "Actual Close"}, inplace=True) # 計算預測值和實際值之間的絕對百分比誤差 combined_data["Absolute % Error"] = ( abs(combined_data["Predict Close"] - combined_data["Actual Close"]) / combined_data["Actual Close"] * 100 ) # 計算平均絕對百分比誤差並打印它 mape = combined_data["Absolute % Error"].mean() print(f"平均絕對百分比誤差: {mape:.2f}%") # 找到絕對百分比誤差最大和最小的行並打印它們 min_error_row = combined_data.loc[combined_data["Absolute % Error"].idxmin()] max_error_row = combined_data.loc[combined_data["Absolute % Error"].idxmax()] print(f"\n最準確的預測:\n{min_error_row}\n") print(f"最不準確的預測:\n{max_error_row}\n") # 繪製預測和實際收盤價 plt.figure(figsize=(10, 6)) plt.plot(combined_data["Date"], combined_data["Predict Close"], label="Stock price forecasting") plt.plot(combined_data["Date"], combined_data["Actual Close"], label="closing price") plt.xlabel("date") plt.ylabel("price") plt.title("Compare") plt.legend() plt.xticks(rotation=45) plt.tight_layout() plt.show() ``` 通過用戶選擇CSV文件,然後計算預測值與實際值之間的誤差,並將這些結果可視化展示出來,從而幫助用戶理解模型的準確性和改進空間。 ### 8.利用抓取之資料繪製熟悉的 K棒圖 這個主要是可以利用最常規的手法,利用爬蟲去獲取股票資訊,之後繪製出k棒圖。 可以選擇我們先前所抓取的csv檔案,並且可以設定時間區間,使我們更能自由查看股票的波動。 ```python= import pandas as pd import os import mplfinance as mpf from datetime import datetime # 獲取"data"文件夾中的CSV文件列表 data_folder = "data" csv_files = [file for file in os.listdir(data_folder) if file.endswith(".csv")] # 顯示可用的CSV文件 print("可用的CSV文件:") for i, file in enumerate(csv_files): print(f"{i + 1}. {file}") # 用戶選擇一個CSV文件 selected_file = None while selected_file is None: try: file_number = int(input("請輸入您要選擇的CSV文件相應的編號: ")) if file_number < 1 or file_number > len(csv_files): raise ValueError() selected_file = csv_files[file_number - 1] except ValueError: print("無效的輸入。請輸入有效的編號。") # 讀取CSV文件並設置日期列為DatetimeIndex df = pd.read_csv(os.path.join(data_folder, selected_file), parse_dates=['Date'], index_col='Date') # 讓使用者選擇分析區間 start_date = input("請輸入開始日期 (YYYY-MM-DD): ") end_date = input("請輸入結束日期 (YYYY-MM-DD): ") try: start_date = datetime.strptime(start_date, '%Y-%m-%d') end_date = datetime.strptime(end_date, '%Y-%m-%d') df = df.loc[start_date:end_date] except ValueError: print("日期格式錯誤,請使用 YYYY-MM-DD 格式。") exit() mc = mpf.make_marketcolors(up='r', down='g', inherit=True) s = mpf.make_mpf_style(base_mpf_style='yahoo', marketcolors=mc) kwargs = dict(type='candle', mav=(10,20,60), volume=True, figratio=(16,9), figscale=2, title=selected_file, style=s) mpf.plot(df, **kwargs) ``` 根據用戶指定的日期範圍篩選數據,然後使用mplfinance庫繪製K線圖。這種方法可以用於分析股票或其他金融數據的歷史價格走勢,並可視化其在選定區間內的表現。