# **【Python: 股票買賣交易計算、定期定額複利計算機、爬蟲每日股價】** :::info - 買賣交易計算 - 定期定額複利計算 - 為什麼要投資? - 幫自己設定一些情境 - 爬蟲:特定日期股價-上市(存進sqlite) - 設定日期範圍,股價 plotly繪圖 - 設定日期範圍,存進sqlite - 爬蟲:特定日期股價-上櫃(存進sqlite) ::: <br/> 賣出股票時,會想知道扣除手續費、交易稅後,是否有賺? 按計算機太麻煩了,寫了幾個簡單的 Python functions,之後部屬到 streamlit、flask,就可以方便操作 ### 買賣交易計算 ```= import math # 股票"購買手續費",無條件進位 (1.425% 券商手續費) def calculate_buying_fee(buying_stock_price, buying_quantity, discount=None): if discount is None: discount = 10 total_buying_stock_price = buying_stock_price * buying_quantity buying_fee = (buying_stock_price * buying_quantity) * (1.425/1000) * (discount/10) buying_fee = round(buying_fee,2) return total_buying_stock_price, buying_fee # 股票"賣出手續費",無條件進位 (1.425% 券商手續費 + 0.003 證券交易稅 ) def calculate_selling_fee(selling_stock_price, selling_quantity, discount=None): if discount is None: discount = 10 total_selling_stock_price = selling_stock_price * selling_quantity selling_fee = ((selling_stock_price * selling_quantity) * (1.425/1000) * (discount/10)) selling_fee = round(selling_fee,2) selling_fee_2 = ((selling_stock_price * selling_quantity) * (3/1000)) return total_selling_stock_price, selling_fee, selling_fee_2 # 計算獲利 def calculate_earning(choice, buy_stock_price, sell_stock_price, quantity, discount=None): tax = 0 if choice == 1: tax = 0.1 elif choice == 2: tax = 0.3 if discount is None: discount = 10 discount_new = discount/10 buy_price = round(buy_stock_price * quantity, 2) buy_fee = round(buy_stock_price * quantity * (1.425/1000) * discount_new, 2) buy = round(buy_price + buy_fee, 2) sell_price = round(sell_stock_price * quantity, 2) sell_fee = round(sell_stock_price * quantity * (1.425/1000) * discount_new, 2) sell_fee_p = round(sell_stock_price * quantity * float(tax/100), 2) sell = round(sell_price - sell_fee - sell_fee_p,2) if discount == 10: discount_show = 0 else: discount_show = discount_new earning_money = round(sell_price - (buy_price + buy_fee + sell_fee + sell_fee_p),2) earning_money_100 = round(((sell - buy) / buy * 100), 2) if earning_money_100 > 0: status = "賺了" else: status = "賠了" stock_choice = "" if choice == 1: stock_choice = "ETF" elif choice == 2: stock_choice = "股票" return stock_choice, tax, discount_show, buy_price, buy_fee, buy, sell_price, sell_fee, sell_fee_p, sell, earning_money, earning_money_100, status ``` 這是部屬到streamlit的結果 ![螢幕擷取畫面 2024-03-21 012124](https://hackmd.io/_uploads/SJyEo5dCp.png) <br/> ### 定期定額複利計算 #### 為什麼要投資? ```= def calculate_future_value_annually(initial_amount, monthly_saving, years, annual_interest_rate): annual_interest_rate_100 = int(annual_interest_rate)/100 cumulative_values = [] cumulative_bank_values = [] normal_values = [] future_value = initial_amount normal_value = initial_amount bank_value = initial_amount for year in range(years): annual_savings = monthly_saving * 12 future_value += annual_savings future_value *= (1 + (annual_interest_rate_100)) future_value = math.floor(future_value) cumulative_values.append(future_value) annual_savings = monthly_saving * 12 bank_value += annual_savings bank_value *= (1 + (0.02)) bank_value = math.floor(bank_value) cumulative_bank_values.append(bank_value) normal_value += annual_savings normal_value = math.floor(normal_value) normal_values.append(normal_value) years_list = list(range(1, years + 1)) df = pd.DataFrame({'Year': years_list, '投資複利累積金額': cumulative_values, '定存2%累積金額': cumulative_bank_values,'不投資累積金額': normal_values}) ``` ```= initial_amount = 100000 monthly_saving = 15000 years = 30 annual_interest_rate = 0.05 cumulative_values, cumulative_bank_values, normal_values = calculate_future_value_annually(initial_amount, monthly_saving, years, annual_interest_rate) years_list = list(range(1, years + 1)) df = pd.DataFrame({'Year': years_list, 'Cumulative_Future_Value': cumulative_values, 'Bank_Future_Value': cumulative_bank_values,'Normal_Value': normal_values}) df ``` ![image](https://hackmd.io/_uploads/BJUvO6ApT.png) ```= import plotly.graph_objects as go fig = go.Figure() fig.add_trace(go.Scatter( x=df['Year'], y=df['Cumulative_Future_Value'], mode='lines+markers+text', line=dict(color='red', width=2), textposition='top center', name='投資複利累積金額' )) fig.add_trace(go.Scatter( x=df['Year'], y=df['Bank_Future_Value'], mode='lines+markers+text', line=dict(color='blue', width=2), textposition='top center', name='定存2%累積金額' )) fig.add_trace(go.Scatter( x=df['Year'], y=df['Normal_Value'], mode='lines+markers+text', line=dict(color='mediumturquoise', width=2), textposition='top center', name='不投資累積金額' )) fig.update_layout( title=f'每年財產累積折線圖', xaxis=dict(title='年'), yaxis=dict(title=''), legend=dict( title='', x=1.0, y=1.4, traceorder='normal', orientation='v' ), width=1000, height=400, ) fig.show() ``` ![image](https://hackmd.io/_uploads/ryrY_TApT.png) > 也可以用series ```= import pandas as pd initial_amount = 100000 monthly_saving = 15000 years = 30 annual_interest_rate = 5 # each_year = pd.Series(range(1, years + 1)) # cumulative_values = pd.Series(0, index=range(0, years)) cumulative_values[0] = initial_amount year_saving = monthly_saving * 12 for i in range(years): previous_year_index = max(i - 1, 0) cumulative_values[i] = (cumulative_values[previous_year_index] + year_saving) * (1 + float(annual_interest_rate/100)) cumulative_values = cumulative_values.astype(int) # cumulative_bank_values = pd.Series(0, index=range(0, years)) cumulative_bank_values[0] = initial_amount for i in range(years): previous_year_index = max(i - 1, 0) cumulative_bank_values[i] = (cumulative_bank_values[previous_year_index] + year_saving) * 1.02 cumulative_bank_values = cumulative_bank_values.astype(int) # normal_values = pd.Series(0, index=range(0, years)) normal_values[0] = initial_amount for i in range(years): previous_year_index = max(i - 1, 0) normal_values[i] = (normal_values[previous_year_index] + year_saving) normal_values = normal_values.astype(int) # 合併 df = pd.DataFrame({ 'Year': each_year, '投資複利累積金額': cumulative_values, '定存2%累積金額': cumulative_bank_values, '不投資累積金額': normal_values }) df ``` ![image](https://hackmd.io/_uploads/Sy5FI8JCa.png) <br/> #### 幫自己設一些情境 假設我希望_年後能到達目標 $_,每年帳戶總額*報酬率 _%,使用折現率概念,每月要投入多少 $_? ```= def calculate_monthly_savings(initial_amount, target_amount, annual_interest_rate, years): # 年報酬率 -> 月報酬率 annual_interest_rate_new = annual_interest_rate/100 monthly_interest_rate = annual_interest_rate_new / 12 monthly_savings = (target_amount - initial_amount) / (((1 + monthly_interest_rate) ** (years * 12) - 1) / (monthly_interest_rate)) return monthly_savings ``` ```= initial_amount = 100000 # 初始金額 10w target_amount = 10000000 annual_interest_rate = 5 years = 26 monthly_savings = calculate_monthly_savings(initial_amount, target_amount, annual_interest_rate, years) print(f"每月需要存入 {monthly_savings:.2f} 元") ``` ![螢幕擷取畫面 2024-03-21 012741](https://hackmd.io/_uploads/SyJohquAa.png) <br/> 假設我每月投入固定金額 $_,每年帳戶總額*報酬率 _%,使用折現率概念,幾年後能到達目標 $_? ```= import pandas as pd def calculate_years_to_goal(initial_amount, monthly_saving, annual_interest_rate, goal): annual_interest_rate_100 = annual_interest_rate/100 current_balance = initial_amount years = 0 cumulative_values = [] while current_balance < goal: current_balance += monthly_saving * 12 # 每年投入金額 current_balance *= (1 + annual_interest_rate_100) cumulative_values.append(current_balance) years += 1 cumulative_values_str = [f'{value:.2f}' for value in cumulative_values] df = pd.DataFrame({'Year': range(1, years + 1), '已累積金額': cumulative_values_str}) print(f"需要 {years} 年能累積到 {goal} 元的目標") return df ``` ```= initial_amount = 100000 # 初始金額 10w monthly_saving = 15000 annual_interest_rate = 5 goal = 10000000 df = calculate_years_to_goal(initial_amount, monthly_saving, annual_interest_rate, goal) df ``` ![螢幕擷取畫面 2024-03-21 012708](https://hackmd.io/_uploads/B1HY2cO0p.png) A <br/> ### 爬蟲:特定日期股價-上市(存進sqlite) 到證交所 [每日收盤行情](https://www.twse.com.tw/zh/trading/historical/mi-index.html) ![螢幕擷取畫面 2024-02-04 135951](https://hackmd.io/_uploads/rkbxOjhqp.png) CTRL+SHIFT+I 開發人員介面 頁面點選全部(不包含權證、牛熊證、不包含牛熊證)【查詢】 ![螢幕擷取畫面 2024-02-02 121727](https://hackmd.io/_uploads/rklhpy5qa.png) 下滑確定有 每日收盤行情 開發人員介面的Request URL,複製用網頁打開 ![螢幕擷取畫面 2024-02-02 121901](https://hackmd.io/_uploads/rk39a155a.png) 打開後長這樣,搜尋 title ![螢幕擷取畫面 2024-02-02 122008](https://hackmd.io/_uploads/SJLyR1q96.png) 發現我要的title是第九個 ```= # 剛才的 request url 貼過來 import requests url = 'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date=20240201&type=ALLBUT0999&response=json&_=1706840905505' response = requests.get(url) ``` ```= response.text[:] ``` ![螢幕擷取畫面 2024-02-02 122317](https://hackmd.io/_uploads/By7B0199a.png) 提取標題 轉成json會比較好提取 ```= import json import pandas as pd response_data = json.loads(response.text) # 取得第九個title的 field ninth_title_fields = response_data["tables"][8]["fields"] print(ninth_title_fields) ``` ![螢幕擷取畫面 2024-02-02 122348](https://hackmd.io/_uploads/BytDCyc5p.png) 提取個股 ```= # 取得第九個title的 data ninth_title_data = response_data["tables"][8]["data"] print(ninth_title_data) ``` ![螢幕擷取畫面 2024-02-02 122429](https://hackmd.io/_uploads/BkCtCk9ca.png) 合併為 dataframe ```= import re df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields) df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None) df ``` ![螢幕擷取畫面 2024-02-02 122513](https://hackmd.io/_uploads/rydnAy9qa.png) 轉換資料型態(後面要做計算) ```= # 如果有','(維持None),先刪除 df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s) # 設 index df = df.set_index('證券代號') # 將指定的數值欄位轉換成數值型態 numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce') # 將指定的字串欄位轉換成字串型態 string_columns = ['證券名稱', '漲跌(+/-)'] df[string_columns] = df[string_columns].astype(str) df ``` ![螢幕擷取畫面 2024-02-02 131745](https://hackmd.io/_uploads/Skn-igc5T.png) ```= df.dtypes ``` ![螢幕擷取畫面 2024-02-02 132149](https://hackmd.io/_uploads/S1ecghgc9p.png) 紅、黑K ```= df['k'] = df['收盤價'] / df['開盤價'] red_k = df[df['k'] > 1.05] red_k black_k = df[df['k'] < 1.05] black_k ``` red_k ![螢幕擷取畫面 2024-02-02 133410](https://hackmd.io/_uploads/Hyzxyb9q6.png) black_k ![螢幕擷取畫面 2024-02-02 133422](https://hackmd.io/_uploads/rJNl1Zqqp.png) >存進CSV ```= df.to_csv('daily_price.csv', encoding='utf_8_sig') df = pd.read_csv('daily_price.csv', index_col=['證券代號']) df ``` ![螢幕擷取畫面 2024-02-02 132459](https://hackmd.io/_uploads/Sys22x59T.png) 存到sqlite再讀取 ```= import sqlite3 conn = sqlite3.connect('test.sqlite3') # 存進sqlite df.to_sql('daily_price', conn, if_exists='replace') # 讀取table df = pd.read_sql('select * from daily_price', conn, index_col=['證券代號']) df ``` ![螢幕擷取畫面 2024-02-02 133700](https://hackmd.io/_uploads/S1jt1Z5c6.png) ![螢幕擷取畫面 2024-02-02 142508](https://hackmd.io/_uploads/r1nJsb95T.png) 包成function ```= # 寫成function import requests import pandas as pd import json import os def daily_price(date,save_path): url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505' response = requests.get(url) response_data = json.loads(response.text) ninth_title_fields = response_data["tables"][8]["fields"] ninth_title_data = response_data["tables"][8]["data"] df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields) df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None) # 如果有','(維持None),先刪除 df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s) # 設 index df = df.set_index('證券代號') # 將指定的數值欄位轉換成數值型態 numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce') # 將指定的字串欄位轉換成字串型態 string_columns = ['證券名稱', '漲跌(+/-)'] df[string_columns] = df[string_columns].astype(str) file_path = os.path.join(save_path, f'daily_price_{date}.csv') df.to_csv(file_path, encoding='utf_8_sig') return df ``` ```= daily_price('20240103','@@@') ``` ![螢幕擷取畫面 2024-02-02 135229](https://hackmd.io/_uploads/rkC7m-99a.png) function,改成可選日期範圍 ```= # strftime('%Y%m%d') 方法,格式化字串 import datetime def daily_price_range(start_date, end_date, save_path): start_dt = datetime.datetime.strptime(start_date, '%Y%m%d') end_dt = datetime.datetime.strptime(end_date, '%Y%m%d') current_dt = start_dt while current_dt <= end_dt: current_date_str = current_dt.strftime('%Y%m%d') try: daily_price(current_date_str, save_path) print(f'Successfully fetched data for {current_date_str}') except Exception as e: print(f'Error fetching data for {current_date_str}: {e}') current_dt += datetime.timedelta(days=1) daily_price_range('20230303', '20230307', '@@@') ``` ![螢幕擷取畫面 2024-02-02 173915](https://hackmd.io/_uploads/By3P_N99a.png) ![螢幕擷取畫面 2024-02-02 173935](https://hackmd.io/_uploads/rk0DdN95p.png) <br/> #### 設定日期範圍,股價 plotly繪圖 ```= # 不存檔、單看某欄位 # 寫成function import requests import pandas as pd import json import os import re def daily_price(date, stock_code): url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505' response = requests.get(url) response_data = json.loads(response.text) ninth_title_fields = response_data["tables"][8]["fields"] ninth_title_data = response_data["tables"][8]["data"] df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields) df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None) # 如果有','(維持None),先刪除 df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s) # 將指定的數值欄位轉換成數值型態 numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce') # 將指定的字串欄位轉換成字串型態 string_columns = ['證券代號','證券名稱', '漲跌(+/-)'] df[string_columns] = df[string_columns].astype(str) # 設 index df = df.set_index(['證券代號','證券名稱']) # 提取 df = df[df.index.get_level_values('證券代號') == stock_code] return df # date = '20230303' stock_code = '1101' daily_df = daily_price(date, stock_code) daily_df ``` ![螢幕擷取畫面 2024-02-07 150649](https://hackmd.io/_uploads/SyuGnilsp.png) ```= # 改成可選日期範圍 # strftime('%Y%m%d') 方法,格式化字串 import datetime def daily_price_range(start_date, end_date,stock_code): start_dt = datetime.datetime.strptime(start_date, '%Y%m%d') end_dt = datetime.datetime.strptime(end_date, '%Y%m%d') current_dt = start_dt # 設一個空的字典 result_dict = {} while current_dt <= end_dt: current_date_str = current_dt.strftime('%Y%m%d') try: df = daily_price(current_date_str,stock_code) if df is None: continue value = df.loc[(stock_code,), '收盤價'].iloc[0] if pd.isnull(value): continue # 结果存储在字典中 result_dict[f'{current_dt}'] = value except Exception as e: print(f'Error fetching data for {current_date_str}: {e}') # 字典转换为DataFrame result_df = pd.DataFrame(list(result_dict.items()), columns=[f'日期', '收盤價']) # result_df_reverse = result_df[::-1] # display(result_df_reverse) current_dt += datetime.timedelta(days=1) return result_df # start_date = '20230303' end_date = '20230310' stock_code = '1101' result_df = daily_price_range(start_date, end_date, stock_code) result_df ``` ![螢幕擷取畫面 2024-02-07 150716](https://hackmd.io/_uploads/S1WV2sxjT.png) ```= import plotly.express as px fig = px.line(result_df, x='日期', y='收盤價', title='每日收盤價 折線圖') fig.show() ``` ![螢幕擷取畫面 2024-02-07 150737](https://hackmd.io/_uploads/ryYB3jlsp.png) <br/> #### 設定日期範圍,存進sqlite 方法跟前面差不多,差別在sqlite不能設multiIndex ```= # 所有資料,存進sqlite # 寫成function import requests import pandas as pd import json import re def daily_price(date): url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505' response = requests.get(url) response_data = json.loads(response.text) ninth_title_fields = response_data["tables"][8]["fields"] ninth_title_data = response_data["tables"][8]["data"] df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields) df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None) # 如果有','(維持None),先刪除 df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s) # 將指定的數值欄位轉換成數值型態 numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce') # 將指定的字串欄位轉換成字串型態 string_columns = ['證券代號','證券名稱', '漲跌(+/-)'] df[string_columns] = df[string_columns].astype(str) # 要存sqlite,不能使用 multiIndex # df = df.set_index(['證券代號','證券名稱']) return df daily_price('20230303') ``` ![螢幕擷取畫面 2024-02-07 150348](https://hackmd.io/_uploads/S1nwsieoa.png) ```= import sqlite3 import os import datetime def store_daily_price_to_sqlite(start_date, end_date): db_file = 'daily_price.db' if not os.path.exists(db_file): conn = sqlite3.connect(db_file) conn.close() conn = sqlite3.connect(db_file) cur = conn.cursor() start_dt = datetime.datetime.strptime(start_date, '%Y%m%d') end_dt = datetime.datetime.strptime(end_date, '%Y%m%d') current_dt = start_dt while current_dt <= end_dt: current_date_str = current_dt.strftime('%Y%m%d') table_name = f'daily_price_{current_date_str}' try: df = daily_price(current_date_str) df.to_sql(table_name, conn, if_exists='replace', index=False) print(f'Successfully stored data for {current_date_str} into table {table_name}') except Exception as e: print(f'Error storing data for {current_date_str}: {e}') current_dt += datetime.timedelta(days=1) conn.close() # Example usage: start_date = '20230303' end_date = '20230308' store_daily_price_to_sqlite(start_date, end_date) ``` ![螢幕擷取畫面 2024-02-07 150438](https://hackmd.io/_uploads/rJ7csjxja.png) 讀取 ```= def read_table_from_sqlite(date): db_file = 'daily_price.db' conn = sqlite3.connect(db_file) table_name = f'daily_price_{date}' query = f"SELECT * FROM {table_name}" df = pd.read_sql_query(query, conn) conn.close() return df # date = '20230303' df_20230303 = read_table_from_sqlite(date) df_20230303 ``` ![螢幕擷取畫面 2024-02-07 150537](https://hackmd.io/_uploads/B1J0ijeip.png) ![螢幕擷取畫面 2024-02-07 150558](https://hackmd.io/_uploads/HyoJ3oeoT.png) ![螢幕擷取畫面 2024-02-07 150604](https://hackmd.io/_uploads/Bk612jxsa.png) <br/> > 發現每次都要叫出不同的table太麻煩了,改成存進同一張table ```= # 所有資料 # 最多50次 # 寫成function import requests import pandas as pd import json import re import sqlite3 import os import datetime from fake_useragent import UserAgent import time import random def daily_price(date): url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505' # 生成隨機的使用者代理 ua = UserAgent() user_agent = ua.random # headers 數據 headers = { 'User-Agent': user_agent, } max_retries = 50 # 最大重試次數 current_retry = 0 while current_retry < max_retries: try: response = requests.get(url, headers=headers) response.raise_for_status() # 如果伺服器回傳錯誤碼,拋出異常 break # 如果成功,跳出迴圈 except requests.exceptions.RequestException as e: print(f'Error connecting to the server: {e}') current_retry += 1 if current_retry < max_retries: print(f'Retrying... ({current_retry}/{max_retries})') time.sleep(5 + random.uniform(0, 5)) # 加入延遲時間 else: print('Max retries reached. Exiting.') raise # 如果重試次數用完仍然失敗,拋出異常 response_data = json.loads(response.text) ninth_title_fields = response_data["tables"][8]["fields"] ninth_title_data = response_data["tables"][8]["data"] df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields) df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None) # 如果有','(維持None),先刪除 # df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s) df = df.apply(lambda col: col.apply(lambda x: x.replace(',', '') if isinstance(x, str) else x)) # 將指定的數值欄位轉換成數值型態 numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce') # 將指定的字串欄位轉換成字串型態 string_columns = ['證券代號','證券名稱', '漲跌(+/-)'] df[string_columns] = df[string_columns].astype(str) # 要存sqlite,不能使用 multiIndex # df = df.set_index(['證券代號','證券名稱']) return df # daily_price('20230303') # 所有資料,存進sqlite 單一table # 改成可選日期範圍 # strftime('%Y%m%d') 方法,格式化字串 import sqlite3 import os import datetime def store_daily_price_to_sqlite(start_date, end_date): db_file = r'G:\我的雲端硬碟\000-AI\py\18-STOCK\tw_financial_reports\daily_price.db' # if not os.path.exists(db_file): # conn = sqlite3.connect(db_file) # conn.close() conn = sqlite3.connect(db_file) cur = conn.cursor() start_dt = datetime.datetime.strptime(start_date, '%Y%m%d') end_dt = datetime.datetime.strptime(end_date, '%Y%m%d') current_dt = start_dt while current_dt <= end_dt: current_date_str = current_dt.strftime('%Y%m%d') table_name = f'daily_price_2019' # _{current_date_str} try: df = daily_price(current_date_str) df['日期'] = current_date_str df.to_sql(table_name, conn, if_exists='append', index=False) print(f'Successfully stored data for {current_date_str} into table {table_name}') except Exception as e: print(f'Error storing data for {current_date_str}: {e}') time.sleep(5 + random.uniform(0, 5)) current_dt += datetime.timedelta(days=1) conn.close() # start_date = '20190101' end_date = '20240314' store_daily_price_to_sqlite(start_date, end_date) ``` ![螢幕擷取畫面 2024-02-07 153724](https://hackmd.io/_uploads/ByXSX3giT.png) ![1707292338976](https://hackmd.io/_uploads/S1_08hlsa.jpg) ![螢幕擷取畫面 2024-02-07 154023](https://hackmd.io/_uploads/S14l4heop.png) ```= import sqlite3 import pandas as pd def read_table_from_sqlite(date): db_file = r'G:@@@\@@@\daily_price.db' conn = sqlite3.connect(db_file) table_name = f'daily_price_2021' query = f"SELECT * FROM {table_name} WHERE 日期>='{date}'" df = pd.read_sql_query(query, conn) conn.close() return df read_table_from_sqlite('20220101') ``` ![螢幕擷取畫面 2024-02-07 153914](https://hackmd.io/_uploads/SkNhQneoT.png) ```= df[df['證券代號'] == '2330'] ``` ![螢幕擷取畫面 2024-02-07 153936](https://hackmd.io/_uploads/HJ3Tmnxsa.png) ```= import plotly.graph_objs as go df_2330 = df[df['證券代號'] == '2330'] fig = go.Figure() fig.add_trace(go.Scatter(x=df_2330['日期'], y=df_2330['收盤價'], mode='lines', name='Close Price')) fig.update_layout(title='收盤價_2330', xaxis_title='日期', yaxis_title='收盤價') fig.show() ``` ![螢幕擷取畫面 2024-02-07 153957](https://hackmd.io/_uploads/HJT07hgia.png) <br/> ### 爬蟲:特定日期股價-上櫃(存進sqlite) 到[證券櫃檯買賣中心 上櫃股票美日收盤行情(不含定價)](https://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430.php?l=zh-tw) ![image](https://hackmd.io/_uploads/HkcEgo0Ta.png) CTRL+SHIFT+I 開發人員介面 頁面點選所有證券(不包含權證、牛熊證、不包含牛熊證) 選擇隨便一個日期後,開發人員出現的網址點進去,會看到URL,我們可以來GET資料 ![image](https://hackmd.io/_uploads/BksceiR66.png) PS 需要注意的是,上市我們是用西元年,上櫃要使用民國年 ![image](https://hackmd.io/_uploads/HkPWbj0pT.png) 直接拿上市來改 要先把輸入的西元年date,改成民國年 ```= year_w = date[:4] year = int(year_w) - 1911 month = date[4:6] day = date[6:8] ``` 包成function ```= import requests import pandas as pd import json import re import sqlite3 import os import datetime from fake_useragent import UserAgent import time import random def daily_price_otc(date): year_w = date[:4] year = int(year_w) - 1911 month = date[4:6] day = date[6:8] url = f'https://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430_result.php?l=zh-tw&d={year}/{month}/{day}&se=EW&_=1710149320138' # 生成隨機的使用者代理 ua = UserAgent() user_agent = ua.random # headers 數據 headers = { 'User-Agent': user_agent, } max_retries = 5 # 最大重試次數 current_retry = 0 while current_retry < max_retries: try: response = requests.get(url, headers=headers) response.raise_for_status() # break except requests.exceptions.RequestException as e: print(f'Error connecting to the server: {e}') current_retry += 1 if current_retry < max_retries: print(f'Retrying... ({current_retry}/{max_retries})') time.sleep(5 + random.uniform(0, 5)) else: print('Max retries reached. Exiting.') raise response_data = json.loads(response.text) aa_data = response_data.get('aaData', []) selected_columns = [ '公司代號', '公司名稱', '收盤價', '漲跌', '開盤價', '最高價', '最低價', '成交股數', '成交金額', '成交筆數', '最後揭示買價', '最後揭示買量', '最後揭示賣價','最後揭示賣量', '發行股數', '次日漲停價', '次日跌停價' ] df = pd.DataFrame(aa_data, columns=selected_columns) df.rename(columns={'公司名稱': '證券名稱'}, inplace=True) df.rename(columns={'公司代號': '證券代號'}, inplace=True) df['收盤價'] = pd.to_numeric(df['收盤價'], errors='coerce') df['開盤價'] = pd.to_numeric(df['開盤價'], errors='coerce') df['收盤價'] = df['收盤價'].astype(float) df['開盤價'] = df['開盤價'].astype(float) df['漲跌價差'] = round(df['收盤價'] - df['開盤價'], 2) df['漲跌(+/-)'] = df['漲跌價差'].apply(lambda x: '-' if x < 0 else '+') df['漲跌價差'] = abs(df['漲跌價差']) return df date = '20240103' daily_price_otc(date) ``` ![螢幕擷取畫面 2024-03-13 115926](https://hackmd.io/_uploads/r1Q3EsRTT.png) 存進單一的sqlite table ```= # 所有資料,存進sqlite 單一table # 改成可選日期範圍 # strftime('%Y%m%d') 方法,格式化字串 import sqlite3 import os import datetime import time def store_daily_price_otc_to_sqlite(start_date, end_date): db_file = r'G:\我的雲端硬碟\000-AI\py\18-STOCK\tw_financial_reports\daily_price_otc.db' # if not os.path.exists(db_file): # conn = sqlite3.connect(db_file) # conn.close() conn = sqlite3.connect(db_file) cur = conn.cursor() start_dt = datetime.datetime.strptime(start_date, '%Y%m%d') end_dt = datetime.datetime.strptime(end_date, '%Y%m%d') current_dt = start_dt while current_dt <= end_dt: current_date_str = current_dt.strftime('%Y%m%d') table_name = f'daily_price_otc_2021' # _{current_date_str} try: df = daily_price_otc(current_date_str) df['日期'] = current_date_str df.to_sql(table_name, conn, if_exists='append', index=False) print(f'Successfully stored data for {current_date_str} into table {table_name}') except Exception as e: print(f'Error storing data for {current_date_str}: {e}') time.sleep(5 + random.uniform(0, 5)) current_dt += datetime.timedelta(days=1) conn.close() # start_date = '20210101' end_date = '20240314' store_daily_price_otc_to_sqlite(start_date, end_date) ``` ![image](https://hackmd.io/_uploads/S17maoC6p.png) 因為從開發人員看,response的aa_data,多了兩個欄位(買量、賣量),沒有資料,因此都是空白的,但還是要放置,不然column數量會報錯 ![image](https://hackmd.io/_uploads/Hk5wb2C6T.png) ```= import sqlite3 import pandas as pd def read_table_from_sqlite(db_path, table_name, stock_code): conn = sqlite3.connect(db_path) query = f"SELECT * FROM {table_name} WHERE 公司代號={stock_code} AND 日期 > '2024-01-01' ORDER BY 日期" df = pd.read_sql_query(query, conn) conn.close() return df # db_path = r'@@@\daily_price_otc.db' table_name = 'daily_price_otc_2014' stock_code = 1777 df = read_table_from_sqlite(db_path, table_name, stock_code) df ``` ![image](https://hackmd.io/_uploads/rJO8v3Rpp.png) ```= import plotly.graph_objs as go fig = go.Figure() fig.add_trace(go.Scatter(x=df['日期'], y=df['收盤價'], mode='lines', name='Close Price')) fig.update_layout(title='收盤價趨勢線', xaxis=dict(title='日期'), yaxis=dict(title=''), legend=dict( title='', x=1.0, y=1.4, traceorder='normal', orientation='v' ), width=1000, height=500, ) fig.show() ``` ![image](https://hackmd.io/_uploads/rkgOi3Raa.png) ```= import plotly.graph_objects as go # df['日期'] = pd.to_datetime(df['日期'], format='%Y%m%d') df.set_index('日期', inplace=True) # fig = go.Figure( data=[go.Candlestick(x=df.index, open=df['開盤價'], high=df['最高價'], low=df['最低價'], close=df['收盤價'])] ) # 趨勢線 trend_line = go.Scatter( x=df.index, y=df['收盤價'].rolling(window=10).mean(), mode='lines', name='10 日', line=dict(color='blue') ) fig.add_trace(trend_line) fig.update_layout(title='K線圖、趨勢線', xaxis=dict(title='日期'), yaxis=dict(title=''), legend=dict( title='', x=1.0, y=1.4, traceorder='normal', orientation='v' ), width=1000, height=600, ) fig.show() ``` ![image](https://hackmd.io/_uploads/rkGwi2A6a.png)