# **【Python: 股票買賣交易計算、定期定額複利計算機、爬蟲每日股價】**
:::info
- 買賣交易計算
- 定期定額複利計算
- 為什麼要投資?
- 幫自己設定一些情境
- 爬蟲:特定日期股價-上市(存進sqlite)
- 設定日期範圍,股價 plotly繪圖
- 設定日期範圍,存進sqlite
- 爬蟲:特定日期股價-上櫃(存進sqlite)
:::
<br/>
賣出股票時,會想知道扣除手續費、交易稅後,是否有賺? 按計算機太麻煩了,寫了幾個簡單的 Python functions,之後部屬到 streamlit、flask,就可以方便操作
### 買賣交易計算
```=
import math
# 股票"購買手續費",無條件進位 (1.425% 券商手續費)
def calculate_buying_fee(buying_stock_price, buying_quantity, discount=None):
if discount is None:
discount = 10
total_buying_stock_price = buying_stock_price * buying_quantity
buying_fee = (buying_stock_price * buying_quantity) * (1.425/1000) * (discount/10)
buying_fee = round(buying_fee,2)
return total_buying_stock_price, buying_fee
# 股票"賣出手續費",無條件進位 (1.425% 券商手續費 + 0.003 證券交易稅 )
def calculate_selling_fee(selling_stock_price, selling_quantity, discount=None):
if discount is None:
discount = 10
total_selling_stock_price = selling_stock_price * selling_quantity
selling_fee = ((selling_stock_price * selling_quantity) * (1.425/1000) * (discount/10))
selling_fee = round(selling_fee,2)
selling_fee_2 = ((selling_stock_price * selling_quantity) * (3/1000))
return total_selling_stock_price, selling_fee, selling_fee_2
# 計算獲利
def calculate_earning(choice, buy_stock_price, sell_stock_price, quantity, discount=None):
tax = 0
if choice == 1:
tax = 0.1
elif choice == 2:
tax = 0.3
if discount is None:
discount = 10
discount_new = discount/10
buy_price = round(buy_stock_price * quantity, 2)
buy_fee = round(buy_stock_price * quantity * (1.425/1000) * discount_new, 2)
buy = round(buy_price + buy_fee, 2)
sell_price = round(sell_stock_price * quantity, 2)
sell_fee = round(sell_stock_price * quantity * (1.425/1000) * discount_new, 2)
sell_fee_p = round(sell_stock_price * quantity * float(tax/100), 2)
sell = round(sell_price - sell_fee - sell_fee_p,2)
if discount == 10:
discount_show = 0
else:
discount_show = discount_new
earning_money = round(sell_price - (buy_price + buy_fee + sell_fee + sell_fee_p),2)
earning_money_100 = round(((sell - buy) / buy * 100), 2)
if earning_money_100 > 0:
status = "賺了"
else:
status = "賠了"
stock_choice = ""
if choice == 1:
stock_choice = "ETF"
elif choice == 2:
stock_choice = "股票"
return stock_choice, tax, discount_show, buy_price, buy_fee, buy, sell_price, sell_fee, sell_fee_p, sell, earning_money, earning_money_100, status
```
這是部屬到streamlit的結果

<br/>
### 定期定額複利計算
#### 為什麼要投資?
```=
def calculate_future_value_annually(initial_amount, monthly_saving, years, annual_interest_rate):
annual_interest_rate_100 = int(annual_interest_rate)/100
cumulative_values = []
cumulative_bank_values = []
normal_values = []
future_value = initial_amount
normal_value = initial_amount
bank_value = initial_amount
for year in range(years):
annual_savings = monthly_saving * 12
future_value += annual_savings
future_value *= (1 + (annual_interest_rate_100))
future_value = math.floor(future_value)
cumulative_values.append(future_value)
annual_savings = monthly_saving * 12
bank_value += annual_savings
bank_value *= (1 + (0.02))
bank_value = math.floor(bank_value)
cumulative_bank_values.append(bank_value)
normal_value += annual_savings
normal_value = math.floor(normal_value)
normal_values.append(normal_value)
years_list = list(range(1, years + 1))
df = pd.DataFrame({'Year': years_list, '投資複利累積金額': cumulative_values, '定存2%累積金額': cumulative_bank_values,'不投資累積金額': normal_values})
```
```=
initial_amount = 100000
monthly_saving = 15000
years = 30
annual_interest_rate = 0.05
cumulative_values, cumulative_bank_values, normal_values = calculate_future_value_annually(initial_amount, monthly_saving, years, annual_interest_rate)
years_list = list(range(1, years + 1))
df = pd.DataFrame({'Year': years_list, 'Cumulative_Future_Value': cumulative_values, 'Bank_Future_Value': cumulative_bank_values,'Normal_Value': normal_values})
df
```

```=
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df['Year'],
y=df['Cumulative_Future_Value'],
mode='lines+markers+text',
line=dict(color='red', width=2),
textposition='top center',
name='投資複利累積金額'
))
fig.add_trace(go.Scatter(
x=df['Year'],
y=df['Bank_Future_Value'],
mode='lines+markers+text',
line=dict(color='blue', width=2),
textposition='top center',
name='定存2%累積金額'
))
fig.add_trace(go.Scatter(
x=df['Year'],
y=df['Normal_Value'],
mode='lines+markers+text',
line=dict(color='mediumturquoise', width=2),
textposition='top center',
name='不投資累積金額'
))
fig.update_layout(
title=f'每年財產累積折線圖',
xaxis=dict(title='年'),
yaxis=dict(title=''),
legend=dict(
title='',
x=1.0,
y=1.4,
traceorder='normal',
orientation='v'
),
width=1000,
height=400,
)
fig.show()
```

> 也可以用series
```=
import pandas as pd
initial_amount = 100000
monthly_saving = 15000
years = 30
annual_interest_rate = 5
#
each_year = pd.Series(range(1, years + 1))
#
cumulative_values = pd.Series(0, index=range(0, years))
cumulative_values[0] = initial_amount
year_saving = monthly_saving * 12
for i in range(years):
previous_year_index = max(i - 1, 0)
cumulative_values[i] = (cumulative_values[previous_year_index] + year_saving) * (1 + float(annual_interest_rate/100))
cumulative_values = cumulative_values.astype(int)
#
cumulative_bank_values = pd.Series(0, index=range(0, years))
cumulative_bank_values[0] = initial_amount
for i in range(years):
previous_year_index = max(i - 1, 0)
cumulative_bank_values[i] = (cumulative_bank_values[previous_year_index] + year_saving) * 1.02
cumulative_bank_values = cumulative_bank_values.astype(int)
#
normal_values = pd.Series(0, index=range(0, years))
normal_values[0] = initial_amount
for i in range(years):
previous_year_index = max(i - 1, 0)
normal_values[i] = (normal_values[previous_year_index] + year_saving)
normal_values = normal_values.astype(int)
# 合併
df = pd.DataFrame({
'Year': each_year,
'投資複利累積金額': cumulative_values,
'定存2%累積金額': cumulative_bank_values,
'不投資累積金額': normal_values
})
df
```

<br/>
#### 幫自己設一些情境
假設我希望_年後能到達目標 $_,每年帳戶總額*報酬率 _%,使用折現率概念,每月要投入多少 $_?
```=
def calculate_monthly_savings(initial_amount, target_amount, annual_interest_rate, years):
# 年報酬率 -> 月報酬率
annual_interest_rate_new = annual_interest_rate/100
monthly_interest_rate = annual_interest_rate_new / 12
monthly_savings = (target_amount - initial_amount) / (((1 + monthly_interest_rate) ** (years * 12) - 1) / (monthly_interest_rate))
return monthly_savings
```
```=
initial_amount = 100000 # 初始金額 10w
target_amount = 10000000
annual_interest_rate = 5
years = 26
monthly_savings = calculate_monthly_savings(initial_amount, target_amount, annual_interest_rate, years)
print(f"每月需要存入 {monthly_savings:.2f} 元")
```

<br/>
假設我每月投入固定金額 $_,每年帳戶總額*報酬率 _%,使用折現率概念,幾年後能到達目標 $_?
```=
import pandas as pd
def calculate_years_to_goal(initial_amount, monthly_saving, annual_interest_rate, goal):
annual_interest_rate_100 = annual_interest_rate/100
current_balance = initial_amount
years = 0
cumulative_values = []
while current_balance < goal:
current_balance += monthly_saving * 12 # 每年投入金額
current_balance *= (1 + annual_interest_rate_100)
cumulative_values.append(current_balance)
years += 1
cumulative_values_str = [f'{value:.2f}' for value in cumulative_values]
df = pd.DataFrame({'Year': range(1, years + 1), '已累積金額': cumulative_values_str})
print(f"需要 {years} 年能累積到 {goal} 元的目標")
return df
```
```=
initial_amount = 100000 # 初始金額 10w
monthly_saving = 15000
annual_interest_rate = 5
goal = 10000000
df = calculate_years_to_goal(initial_amount, monthly_saving, annual_interest_rate, goal)
df
```

A
<br/>
### 爬蟲:特定日期股價-上市(存進sqlite)
到證交所 [每日收盤行情](https://www.twse.com.tw/zh/trading/historical/mi-index.html)

CTRL+SHIFT+I 開發人員介面
頁面點選全部(不包含權證、牛熊證、不包含牛熊證)【查詢】

下滑確定有 每日收盤行情
開發人員介面的Request URL,複製用網頁打開

打開後長這樣,搜尋 title

發現我要的title是第九個
```=
# 剛才的 request url 貼過來
import requests
url = 'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date=20240201&type=ALLBUT0999&response=json&_=1706840905505'
response = requests.get(url)
```
```=
response.text[:]
```

提取標題
轉成json會比較好提取
```=
import json
import pandas as pd
response_data = json.loads(response.text)
# 取得第九個title的 field
ninth_title_fields = response_data["tables"][8]["fields"]
print(ninth_title_fields)
```

提取個股
```=
# 取得第九個title的 data
ninth_title_data = response_data["tables"][8]["data"]
print(ninth_title_data)
```

合併為 dataframe
```=
import re
df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields)
df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None)
df
```

轉換資料型態(後面要做計算)
```=
# 如果有','(維持None),先刪除
df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s)
# 設 index
df = df.set_index('證券代號')
# 將指定的數值欄位轉換成數值型態
numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 將指定的字串欄位轉換成字串型態
string_columns = ['證券名稱', '漲跌(+/-)']
df[string_columns] = df[string_columns].astype(str)
df
```

```=
df.dtypes
```

紅、黑K
```=
df['k'] = df['收盤價'] / df['開盤價']
red_k = df[df['k'] > 1.05]
red_k
black_k = df[df['k'] < 1.05]
black_k
```
red_k

black_k

>存進CSV
```=
df.to_csv('daily_price.csv', encoding='utf_8_sig')
df = pd.read_csv('daily_price.csv', index_col=['證券代號'])
df
```

存到sqlite再讀取
```=
import sqlite3
conn = sqlite3.connect('test.sqlite3')
# 存進sqlite
df.to_sql('daily_price', conn, if_exists='replace')
# 讀取table
df = pd.read_sql('select * from daily_price', conn, index_col=['證券代號'])
df
```


包成function
```=
# 寫成function
import requests
import pandas as pd
import json
import os
def daily_price(date,save_path):
url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505'
response = requests.get(url)
response_data = json.loads(response.text)
ninth_title_fields = response_data["tables"][8]["fields"]
ninth_title_data = response_data["tables"][8]["data"]
df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields)
df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None)
# 如果有','(維持None),先刪除
df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s)
# 設 index
df = df.set_index('證券代號')
# 將指定的數值欄位轉換成數值型態
numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 將指定的字串欄位轉換成字串型態
string_columns = ['證券名稱', '漲跌(+/-)']
df[string_columns] = df[string_columns].astype(str)
file_path = os.path.join(save_path, f'daily_price_{date}.csv')
df.to_csv(file_path, encoding='utf_8_sig')
return df
```
```=
daily_price('20240103','@@@')
```

function,改成可選日期範圍
```=
# strftime('%Y%m%d') 方法,格式化字串
import datetime
def daily_price_range(start_date, end_date, save_path):
start_dt = datetime.datetime.strptime(start_date, '%Y%m%d')
end_dt = datetime.datetime.strptime(end_date, '%Y%m%d')
current_dt = start_dt
while current_dt <= end_dt:
current_date_str = current_dt.strftime('%Y%m%d')
try:
daily_price(current_date_str, save_path)
print(f'Successfully fetched data for {current_date_str}')
except Exception as e:
print(f'Error fetching data for {current_date_str}: {e}')
current_dt += datetime.timedelta(days=1)
daily_price_range('20230303', '20230307', '@@@')
```


<br/>
#### 設定日期範圍,股價 plotly繪圖
```=
# 不存檔、單看某欄位
# 寫成function
import requests
import pandas as pd
import json
import os
import re
def daily_price(date, stock_code):
url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505'
response = requests.get(url)
response_data = json.loads(response.text)
ninth_title_fields = response_data["tables"][8]["fields"]
ninth_title_data = response_data["tables"][8]["data"]
df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields)
df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None)
# 如果有','(維持None),先刪除
df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s)
# 將指定的數值欄位轉換成數值型態
numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 將指定的字串欄位轉換成字串型態
string_columns = ['證券代號','證券名稱', '漲跌(+/-)']
df[string_columns] = df[string_columns].astype(str)
# 設 index
df = df.set_index(['證券代號','證券名稱'])
# 提取
df = df[df.index.get_level_values('證券代號') == stock_code]
return df
#
date = '20230303'
stock_code = '1101'
daily_df = daily_price(date, stock_code)
daily_df
```

```=
# 改成可選日期範圍
# strftime('%Y%m%d') 方法,格式化字串
import datetime
def daily_price_range(start_date, end_date,stock_code):
start_dt = datetime.datetime.strptime(start_date, '%Y%m%d')
end_dt = datetime.datetime.strptime(end_date, '%Y%m%d')
current_dt = start_dt
# 設一個空的字典
result_dict = {}
while current_dt <= end_dt:
current_date_str = current_dt.strftime('%Y%m%d')
try:
df = daily_price(current_date_str,stock_code)
if df is None:
continue
value = df.loc[(stock_code,), '收盤價'].iloc[0]
if pd.isnull(value):
continue
# 结果存储在字典中
result_dict[f'{current_dt}'] = value
except Exception as e:
print(f'Error fetching data for {current_date_str}: {e}')
# 字典转换为DataFrame
result_df = pd.DataFrame(list(result_dict.items()), columns=[f'日期', '收盤價'])
# result_df_reverse = result_df[::-1]
# display(result_df_reverse)
current_dt += datetime.timedelta(days=1)
return result_df
#
start_date = '20230303'
end_date = '20230310'
stock_code = '1101'
result_df = daily_price_range(start_date, end_date, stock_code)
result_df
```

```=
import plotly.express as px
fig = px.line(result_df, x='日期', y='收盤價', title='每日收盤價 折線圖')
fig.show()
```

<br/>
#### 設定日期範圍,存進sqlite
方法跟前面差不多,差別在sqlite不能設multiIndex
```=
# 所有資料,存進sqlite
# 寫成function
import requests
import pandas as pd
import json
import re
def daily_price(date):
url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505'
response = requests.get(url)
response_data = json.loads(response.text)
ninth_title_fields = response_data["tables"][8]["fields"]
ninth_title_data = response_data["tables"][8]["data"]
df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields)
df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None)
# 如果有','(維持None),先刪除
df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s)
# 將指定的數值欄位轉換成數值型態
numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 將指定的字串欄位轉換成字串型態
string_columns = ['證券代號','證券名稱', '漲跌(+/-)']
df[string_columns] = df[string_columns].astype(str)
# 要存sqlite,不能使用 multiIndex
# df = df.set_index(['證券代號','證券名稱'])
return df
daily_price('20230303')
```

```=
import sqlite3
import os
import datetime
def store_daily_price_to_sqlite(start_date, end_date):
db_file = 'daily_price.db'
if not os.path.exists(db_file):
conn = sqlite3.connect(db_file)
conn.close()
conn = sqlite3.connect(db_file)
cur = conn.cursor()
start_dt = datetime.datetime.strptime(start_date, '%Y%m%d')
end_dt = datetime.datetime.strptime(end_date, '%Y%m%d')
current_dt = start_dt
while current_dt <= end_dt:
current_date_str = current_dt.strftime('%Y%m%d')
table_name = f'daily_price_{current_date_str}'
try:
df = daily_price(current_date_str)
df.to_sql(table_name, conn, if_exists='replace', index=False)
print(f'Successfully stored data for {current_date_str} into table {table_name}')
except Exception as e:
print(f'Error storing data for {current_date_str}: {e}')
current_dt += datetime.timedelta(days=1)
conn.close()
# Example usage:
start_date = '20230303'
end_date = '20230308'
store_daily_price_to_sqlite(start_date, end_date)
```

讀取
```=
def read_table_from_sqlite(date):
db_file = 'daily_price.db'
conn = sqlite3.connect(db_file)
table_name = f'daily_price_{date}'
query = f"SELECT * FROM {table_name}"
df = pd.read_sql_query(query, conn)
conn.close()
return df
#
date = '20230303'
df_20230303 = read_table_from_sqlite(date)
df_20230303
```



<br/>
> 發現每次都要叫出不同的table太麻煩了,改成存進同一張table
```=
# 所有資料
# 最多50次
# 寫成function
import requests
import pandas as pd
import json
import re
import sqlite3
import os
import datetime
from fake_useragent import UserAgent
import time
import random
def daily_price(date):
url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date}&type=ALLBUT0999&response=json&_=1706840905505'
# 生成隨機的使用者代理
ua = UserAgent()
user_agent = ua.random
# headers 數據
headers = {
'User-Agent': user_agent,
}
max_retries = 50 # 最大重試次數
current_retry = 0
while current_retry < max_retries:
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # 如果伺服器回傳錯誤碼,拋出異常
break # 如果成功,跳出迴圈
except requests.exceptions.RequestException as e:
print(f'Error connecting to the server: {e}')
current_retry += 1
if current_retry < max_retries:
print(f'Retrying... ({current_retry}/{max_retries})')
time.sleep(5 + random.uniform(0, 5)) # 加入延遲時間
else:
print('Max retries reached. Exiting.')
raise # 如果重試次數用完仍然失敗,拋出異常
response_data = json.loads(response.text)
ninth_title_fields = response_data["tables"][8]["fields"]
ninth_title_data = response_data["tables"][8]["data"]
df = pd.DataFrame(ninth_title_data, columns=ninth_title_fields)
df["漲跌(+/-)"] = df["漲跌(+/-)"].apply(lambda x: re.search(r'[-+]', x).group() if re.search(r'[-+]', x) else None)
# 如果有','(維持None),先刪除
# df = df.applymap(lambda s: s.replace(',', '') if isinstance(s, str) else s)
df = df.apply(lambda col: col.apply(lambda x: x.replace(',', '') if isinstance(x, str) else x))
# 將指定的數值欄位轉換成數值型態
numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '最後揭示買價', '最後揭示買量', '最後揭示賣價', '最後揭示賣量', '本益比']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 將指定的字串欄位轉換成字串型態
string_columns = ['證券代號','證券名稱', '漲跌(+/-)']
df[string_columns] = df[string_columns].astype(str)
# 要存sqlite,不能使用 multiIndex
# df = df.set_index(['證券代號','證券名稱'])
return df
# daily_price('20230303')
# 所有資料,存進sqlite 單一table
# 改成可選日期範圍
# strftime('%Y%m%d') 方法,格式化字串
import sqlite3
import os
import datetime
def store_daily_price_to_sqlite(start_date, end_date):
db_file = r'G:\我的雲端硬碟\000-AI\py\18-STOCK\tw_financial_reports\daily_price.db'
# if not os.path.exists(db_file):
# conn = sqlite3.connect(db_file)
# conn.close()
conn = sqlite3.connect(db_file)
cur = conn.cursor()
start_dt = datetime.datetime.strptime(start_date, '%Y%m%d')
end_dt = datetime.datetime.strptime(end_date, '%Y%m%d')
current_dt = start_dt
while current_dt <= end_dt:
current_date_str = current_dt.strftime('%Y%m%d')
table_name = f'daily_price_2019' # _{current_date_str}
try:
df = daily_price(current_date_str)
df['日期'] = current_date_str
df.to_sql(table_name, conn, if_exists='append', index=False)
print(f'Successfully stored data for {current_date_str} into table {table_name}')
except Exception as e:
print(f'Error storing data for {current_date_str}: {e}')
time.sleep(5 + random.uniform(0, 5))
current_dt += datetime.timedelta(days=1)
conn.close()
#
start_date = '20190101'
end_date = '20240314'
store_daily_price_to_sqlite(start_date, end_date)
```



```=
import sqlite3
import pandas as pd
def read_table_from_sqlite(date):
db_file = r'G:@@@\@@@\daily_price.db'
conn = sqlite3.connect(db_file)
table_name = f'daily_price_2021'
query = f"SELECT * FROM {table_name} WHERE 日期>='{date}'"
df = pd.read_sql_query(query, conn)
conn.close()
return df
read_table_from_sqlite('20220101')
```

```=
df[df['證券代號'] == '2330']
```

```=
import plotly.graph_objs as go
df_2330 = df[df['證券代號'] == '2330']
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_2330['日期'], y=df_2330['收盤價'], mode='lines', name='Close Price'))
fig.update_layout(title='收盤價_2330', xaxis_title='日期', yaxis_title='收盤價')
fig.show()
```

<br/>
### 爬蟲:特定日期股價-上櫃(存進sqlite)
到[證券櫃檯買賣中心 上櫃股票美日收盤行情(不含定價)](https://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430.php?l=zh-tw)

CTRL+SHIFT+I 開發人員介面
頁面點選所有證券(不包含權證、牛熊證、不包含牛熊證)
選擇隨便一個日期後,開發人員出現的網址點進去,會看到URL,我們可以來GET資料

PS 需要注意的是,上市我們是用西元年,上櫃要使用民國年

直接拿上市來改
要先把輸入的西元年date,改成民國年
```=
year_w = date[:4]
year = int(year_w) - 1911
month = date[4:6]
day = date[6:8]
```
包成function
```=
import requests
import pandas as pd
import json
import re
import sqlite3
import os
import datetime
from fake_useragent import UserAgent
import time
import random
def daily_price_otc(date):
year_w = date[:4]
year = int(year_w) - 1911
month = date[4:6]
day = date[6:8]
url = f'https://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430_result.php?l=zh-tw&d={year}/{month}/{day}&se=EW&_=1710149320138'
# 生成隨機的使用者代理
ua = UserAgent()
user_agent = ua.random
# headers 數據
headers = {
'User-Agent': user_agent,
}
max_retries = 5 # 最大重試次數
current_retry = 0
while current_retry < max_retries:
try:
response = requests.get(url, headers=headers)
response.raise_for_status() #
break
except requests.exceptions.RequestException as e:
print(f'Error connecting to the server: {e}')
current_retry += 1
if current_retry < max_retries:
print(f'Retrying... ({current_retry}/{max_retries})')
time.sleep(5 + random.uniform(0, 5))
else:
print('Max retries reached. Exiting.')
raise
response_data = json.loads(response.text)
aa_data = response_data.get('aaData', [])
selected_columns = [
'公司代號', '公司名稱', '收盤價', '漲跌', '開盤價', '最高價', '最低價',
'成交股數', '成交金額', '成交筆數', '最後揭示買價', '最後揭示買量', '最後揭示賣價','最後揭示賣量',
'發行股數', '次日漲停價', '次日跌停價'
]
df = pd.DataFrame(aa_data, columns=selected_columns)
df.rename(columns={'公司名稱': '證券名稱'}, inplace=True)
df.rename(columns={'公司代號': '證券代號'}, inplace=True)
df['收盤價'] = pd.to_numeric(df['收盤價'], errors='coerce')
df['開盤價'] = pd.to_numeric(df['開盤價'], errors='coerce')
df['收盤價'] = df['收盤價'].astype(float)
df['開盤價'] = df['開盤價'].astype(float)
df['漲跌價差'] = round(df['收盤價'] - df['開盤價'], 2)
df['漲跌(+/-)'] = df['漲跌價差'].apply(lambda x: '-' if x < 0 else '+')
df['漲跌價差'] = abs(df['漲跌價差'])
return df
date = '20240103'
daily_price_otc(date)
```

存進單一的sqlite table
```=
# 所有資料,存進sqlite 單一table
# 改成可選日期範圍
# strftime('%Y%m%d') 方法,格式化字串
import sqlite3
import os
import datetime
import time
def store_daily_price_otc_to_sqlite(start_date, end_date):
db_file = r'G:\我的雲端硬碟\000-AI\py\18-STOCK\tw_financial_reports\daily_price_otc.db'
# if not os.path.exists(db_file):
# conn = sqlite3.connect(db_file)
# conn.close()
conn = sqlite3.connect(db_file)
cur = conn.cursor()
start_dt = datetime.datetime.strptime(start_date, '%Y%m%d')
end_dt = datetime.datetime.strptime(end_date, '%Y%m%d')
current_dt = start_dt
while current_dt <= end_dt:
current_date_str = current_dt.strftime('%Y%m%d')
table_name = f'daily_price_otc_2021' # _{current_date_str}
try:
df = daily_price_otc(current_date_str)
df['日期'] = current_date_str
df.to_sql(table_name, conn, if_exists='append', index=False)
print(f'Successfully stored data for {current_date_str} into table {table_name}')
except Exception as e:
print(f'Error storing data for {current_date_str}: {e}')
time.sleep(5 + random.uniform(0, 5))
current_dt += datetime.timedelta(days=1)
conn.close()
#
start_date = '20210101'
end_date = '20240314'
store_daily_price_otc_to_sqlite(start_date, end_date)
```

因為從開發人員看,response的aa_data,多了兩個欄位(買量、賣量),沒有資料,因此都是空白的,但還是要放置,不然column數量會報錯

```=
import sqlite3
import pandas as pd
def read_table_from_sqlite(db_path, table_name, stock_code):
conn = sqlite3.connect(db_path)
query = f"SELECT * FROM {table_name} WHERE 公司代號={stock_code} AND 日期 > '2024-01-01' ORDER BY 日期"
df = pd.read_sql_query(query, conn)
conn.close()
return df
#
db_path = r'@@@\daily_price_otc.db'
table_name = 'daily_price_otc_2014'
stock_code = 1777
df = read_table_from_sqlite(db_path, table_name, stock_code)
df
```

```=
import plotly.graph_objs as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=df['日期'],
y=df['收盤價'],
mode='lines',
name='Close Price'))
fig.update_layout(title='收盤價趨勢線',
xaxis=dict(title='日期'),
yaxis=dict(title=''),
legend=dict(
title='',
x=1.0,
y=1.4,
traceorder='normal',
orientation='v'
),
width=1000,
height=500,
)
fig.show()
```

```=
import plotly.graph_objects as go
#
df['日期'] = pd.to_datetime(df['日期'], format='%Y%m%d')
df.set_index('日期', inplace=True)
#
fig = go.Figure(
data=[go.Candlestick(x=df.index,
open=df['開盤價'],
high=df['最高價'],
low=df['最低價'],
close=df['收盤價'])]
)
# 趨勢線
trend_line = go.Scatter(
x=df.index,
y=df['收盤價'].rolling(window=10).mean(),
mode='lines',
name='10 日',
line=dict(color='blue')
)
fig.add_trace(trend_line)
fig.update_layout(title='K線圖、趨勢線',
xaxis=dict(title='日期'),
yaxis=dict(title=''),
legend=dict(
title='',
x=1.0,
y=1.4,
traceorder='normal',
orientation='v'
),
width=1000,
height=600,
)
fig.show()
```
