# bitcoin auto trade
![image](https://hackmd.io/_uploads/rJG2zFXB0.png)
最近無聊將交易策略從股票移動到幣安上,來看看24小時不斷交易放著一個月如何
功能大概實現有
每分鐘監控前一百檔成交量高的貨幣
![image](https://hackmd.io/_uploads/HkerXK7BR.png)
支持現貨/合約開單,可以指定任意槓桿倍數
支持一鍵快速逃命出清合約和現貨
交易資料都記錄在DB,使用chart.js可以標記合約和現貨買賣點
下面湊一湊應該都可以自己寫出自動交易程式
額外提供一個回測框架可以快速驗證策略是否在合適的買賣點
如果要實戰的話就用可以用和合約現貨的API直接呼叫,記得要開啟幣安API
![image](https://hackmd.io/_uploads/HyFbPYQHR.png)
因為自動交易,所以架設在GCP,改良了一下策略在第8天運算量和網路終於減小到一天2NT,用最破的機器,硬碟20G,大概一個月7U
![image](https://hackmd.io/_uploads/H10CLKmBC.png)
![image](https://hackmd.io/_uploads/rkFi_tXHA.png)
現在晚上終於可以好好睡一覺不會被雷了,只公開部分程式碼,策略參數沒提供,切勿實戰
```python=
import ccxt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
initial_fixed_amount = 1000
initial_contract_fixed_amount = 150
def calculate_obv(data):
obv = (np.sign(data['close'].diff()) * data['volume']).fillna(0).cumsum()
return obv
def calculate_obv_slope(data, window):
obv_slope = data['obv'].rolling(window=window).apply(lambda x: np.polyfit(range(len(x)), x, 1)[0])
return obv_slope
def fetch_15min_data(symbol, since=None, limit=3000):
binance = ccxt.binance()
ohlcv = binance.fetch_ohlcv(symbol, timeframe='1m', since=since, limit=limit)
return ohlcv
def calculate_rsi(data, window):
delta = data['close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=window).mean()
loss = -delta.where(delta < 0, 0).rolling(window=window).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def generate_signals(data):
volume_threshold = data['volume'].rolling(window=20).mean() * 1.5
buy_condition = (
(data['macd'] > data['signal_line']) &
)
sell_condition = (
(data['macd'] < data['signal_line']) &
)
data['signal'] = 0.0
data.loc[buy_condition, 'signal'] = 1.0
data.loc[sell_condition, 'signal'] = -1.0
data['position'] = data['signal'].diff()
return data
def preprocess_data(data):
latest_df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
latest_df['timestamp'] = pd.to_datetime(latest_df['timestamp'], unit='ms')
data = latest_df
data.set_index('timestamp', inplace=True)
data_15min = data.resample('15min').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna().reset_index()
data_15min[['open', 'high', 'low', 'close', 'volume']] = data_15min[['open', 'high', 'low', 'close', 'volume']].astype(float)
return data_15min
def mark_volatile_periods(data):
threshold = 0.5
data['volatile'] = (data['close'].pct_change().abs() > threshold).astype(int)
return data
def backtest_strategy(data):
initial_capital = 1000
capital = initial_capital
num_coins = 0
buy_signals = []
sell_signals = []
print(len(data))
data = generate_signals(data)
data = mark_volatile_periods(data)
for i in range(len(data)):
if data['position'].iloc[i] == 1:
buy_price = data['close'].iloc[i]
num_coins = capital / buy_price
capital -= num_coins * buy_price
buy_signals.append((data['timestamp'].iloc[i], buy_price))
elif data['position'].iloc[i] == -1:
sell_price = data['close'].iloc[i]
capital += num_coins * sell_price
num_coins = 0
sell_signals.append((data['timestamp'].iloc[i], sell_price))
final_capital = capital + (num_coins * data['close'].iloc[-1])
roi = (final_capital - initial_capital) / initial_capital
return data, initial_capital, final_capital, roi, buy_signals, sell_signals
def plot_and_save(data, window, buy_signals, sell_signals):
data = generate_signals(data)
data = mark_volatile_periods(data)
plt.figure(figsize=(14, 10))
plt.subplot(4, 1, 1)
plt.plot(data['timestamp'], data['close'], label='Close Price')
plt.title(f'Close Price')
plt.legend()
plt.subplot(4, 1, 2)
plt.plot(data['timestamp'], data['obv'], label='OBV', color='orange')
plt.title('On-Balance Volume (OBV)')
plt.legend()
plt.subplot(4, 1, 3)
plt.plot(data['timestamp'], data['obv_slope'], label='OBV Slope', color='green')
plt.title('OBV Slope')
plt.legend()
plt.subplot(4, 1, 4)
plt.plot(data['timestamp'], data['close'], label='Close Price')
plt.scatter(data['timestamp'][data['volatile'] == 1], data['close'][data['volatile'] == 1], color='red', label='Volatile')
plt.title('Volatile Periods')
plt.legend()
for signal in buy_signals:
plt.subplot(4, 1, 1)
plt.plot(signal[0], signal[1], 'g^', markersize=10)
for signal in sell_signals:
plt.subplot(4, 1, 1)
plt.plot(signal[0], signal[1], 'rv', markersize=10)
plt.tight_layout()
plt.savefig('obv_slope_chart.png')
symbol = 'CHZ/USDT'
data = fetch_15min_data(symbol)
data = preprocess_data(data)
print(len(data))
data, initial_capital, final_capital, roi, buy_signals, sell_signals = backtest_strategy(data)
print(f'初始資本: {initial_capital}')
print(f'最終資本: {final_capital}')
print(f'投資回報率 (ROI): {roi}')
plot_and_save(data, 14, buy_signals, sell_signals)
```
# 市場
```python=
import ccxt
import pandas as pd
import datetime as dt
# 初始化幣安API
binance = ccxt.binance({
'apiKey': 'api',
'secret': 'api',
'timeout': 30000, # 增加超時時間到30秒
'enableRateLimit': True,
'adjustForTimeDifference': True
# , # 启用时间同步
# 'proxies': {
# 'http': 'http://213:124/',
# 'https': 'http://124:124/',
# }
})
def fetch_ohlcv(symbol, timeframe='1m', since=None, limit=1000):
all_ohlcv = []
while True:
ohlcv = binance.fetch_ohlcv(symbol, timeframe, since, limit)
if len(ohlcv) == 0:
break
all_ohlcv.extend(ohlcv)
since = ohlcv[-1][0] + 1 # 更新 since 為上次數據的最後一個時間點
if len(ohlcv) < limit:
break
return pd.DataFrame(all_ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# 計算六個月前的時間戳
now = dt.datetime.now()
six_months_ago = now - dt.timedelta(days=360)
since_timestamp = int(six_months_ago.timestamp() * 1000)
# 前五十大加密貨幣對USDT
# top_50_coins = [
# 'BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'ADA/USDT', 'XRP/USDT', 'SOL/USDT', 'DOT/USDT', 'DOGE/USDT', 'AVAX/USDT', 'LINK/USDT',
# 'LUNA/USDT', 'UNI/USDT', 'MATIC/USDT', 'ICP/USDT', 'SXP/USDT', 'FIL/USDT', 'LTC/USDT', 'ALGO/USDT', 'ATOM/USDT', 'CHZ/USDT',
# 'ETC/USDT', 'VET/USDT', 'TRX/USDT', 'THETA/USDT', 'XMR/USDT', 'AAVE/USDT', 'EGLD/USDT', 'EOS/USDT', 'NEO/USDT', 'XTZ/USDT',
# 'CAKE/USDT', 'WAVES/USDT', 'NEAR/USDT', 'HBAR/USDT', 'MKR/USDT', 'GRT/USDT', 'ONT/USDT', 'BCH/USDT', 'SUSHI/USDT', 'COMP/USDT',
# 'TFUEL/USDT', 'ICX/USDT', 'RUNE/USDT', 'DASH/USDT', 'ZEC/USDT', 'MANA/USDT', 'HOT/USDT', 'SC/USDT', 'QTUM/USDT', 'IOST/USDT'
# ]
top_50_coins = ['BTC/USDT', 'ETH/USDT', '1000PEPE/USDT', 'LUNA/USDT', 'ZEC/USDT', 'SOL/USDT', 'NOT/USDT', 'PEOPLE/USDT', 'USDC/USDT', 'JASMY/USDT', 'ORDI/USDT', 'PEPE/USDT', 'WIF/USDT', 'DOGE/USDT', '1000SHIB/USDT', 'ALICE/USDT', 'TURBO/USDT', '1000BONK/USDT', 'BEAMX/USDT', 'FDUSD/USDT', '1000FLOKI/USDT', 'XRP/USDT', 'BB/USDT', 'WLD/USDT', 'LINK/USDT', 'BNB/USDT', 'UNI/USDT', 'BOME/USDT', 'ENA/USDT', 'ETHFI/USDT', 'STG/USDT', 'HIGH/USDT', 'ONDO/USDT', 'NEAR/USDT', 'LDO/USDT', 'ARB/USDT', 'AVAX/USDT', 'AUCTION/USDT', 'ENS/USDT', 'ETC/USDT', 'AR/USDT', 'BONK/USDT', 'FLOKI/USDT', 'FIL/USDT', 'MATIC/USDT', 'BCH/USDT', 'OMNI/USDT', 'ADA/USDT', 'FRONT/USDT', 'DOT/USDT', 'TIA/USDT', 'FTM/USDT', 'SHIB/USDT', '1000SATS/USDT', 'ARKM/USDT', 'LTC/USDT', 'OP/USDT', 'ENJ/USDT', 'FET/USDT', 'GALA/USDT', 'RNDR/USDT', 'LUNA2/USDT', '1INCH/USDT', 'TRB/USDT', 'AEVO/USDT', 'STX/USDT', 'W/USDT', 'CHZ/USDT', 'INJ/USDT', 'XAI/USDT', 'JTO/USDT', '1000RATS/USDT', 'EOS/USDT', 'SUI/USDT', 'TNSR/USDT', 'SAGA/USDT', 'APT/USDT', 'ATOM/USDT', 'OM/USDT', 'STRAX/USDT', 'SPELL/USDT', 'PENDLE/USDT', 'BIGTIME/USDT', 'PYTH/USDT', 'MEME/USDT', 'LPT/USDT', 'RUNE/USDT', 'TRU/USDT', 'REZ/USDT', 'MTL/USDT', 'ROSE/USDT', '1000LUNC/USDT', 'DGB/USDT', 'AGIX/USDT', 'PIXEL/USDT', 'BLUR/USDT', 'DYDX/USDT', 'GMX/USDT', 'TON/USDT', 'CRV/USDT', 'GMT/USDT', 'MKR/USDT', 'THETA/USDT', 'MYRO/USDT', 'SAND/USDT', 'VGX/USDT', 'GRT/USDT', 'AAVE/USDT', 'JUP/USDT', 'SNT/USDT', 'MANTA/USDT', 'ICP/USDT', 'SEI/USDT', 'AXS/USDT', 'BEL/USDT', 'EDU/USDT', 'DUSK/USDT']
for coin in top_50_coins:
coin_data = fetch_ohlcv(coin, '1m', since=since_timestamp)
coin_data['timestamp'] = pd.to_datetime(coin_data['timestamp'], unit='ms')
coin_symbol = coin.replace('/USDT', '')
coin_data.to_csv(f'./data/{coin_symbol}_ohlcv_last_6_months.csv', index=False)
print("抓取完成")
```
# 現貨
```python=
import ccxt
import pandas as pd
# 初始化幣安API
binance = ccxt.binance({
'apiKey': 'api',
'secret': 'api',
'timeout': 30000, # 增加超時時間到30秒
'enableRateLimit': True,
'adjustForTimeDifference': True
})
def fetch_current_price(symbol):
ticker = binance.fetch_ticker(symbol)
return ticker['last']
def fetch_min_notional(symbol):
markets = binance.load_markets()
market = markets[symbol]
return market['limits']['cost']['min']
def place_market_order(symbol, side, amount):
return binance.create_market_order(symbol, side, amount)
# 抓取當前ADA價格
symbol = 'ADA/USDT'
current_price = fetch_current_price(symbol)
print(f"當前ADA價格: {current_price}")
# 抓取ADA/USDT的最小交易金額
min_notional = fetch_min_notional(symbol)
print(f"ADA/USDT 最小交易金額: {min_notional} USDT")
# 确保10 USDT 大于最小交易金额
usdt_amount = 10
if usdt_amount < min_notional:
raise ValueError(f"交易金額低於最小限制:{min_notional} USDT")
# 計算10 USDT可以購買多少ADA
ada_amount = usdt_amount / current_price
print(f"10 USDT 可以購買 {ada_amount} ADA")
# 進行市場訂單購買ADA
order = place_market_order(symbol, 'buy', ada_amount)
print(order)
print("抓取完成並已下單")
# #print(order)
# sell_order = place_market_order(symbol, 'sell', ada_amount)
# print("賣出完成")
# print(sell_order)
```
# 合約
```python=
import ccxt
import pandas as pd
import datetime as dt
import time
import hmac
import hashlib
import requests
import logging
import pytz
# 初始化日誌記錄
logging.basicConfig(filename='contract_trading.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 初始化幣安API
binance = ccxt.binance({
'apiKey': 'api',
'secret': 'api',
'timeout': 30000, # 增加超時時間到30秒
'enableRateLimit': True,
'adjustForTimeDifference': True,
'options': {
'defaultType': 'future', # 確保使用期貨 API
},
})
# 載入市場資料
binance.load_markets()
# 查詢所有可用的槓桿
def get_all_leverage(symbol):
market = binance.market(symbol)
response = binance.fapiPrivateGetLeverageBracket({'symbol': market['id']})
leverages = [bracket['initialLeverage'] for bracket in response[0]['brackets']]
return leverages
# 獲取最新期貨價格
def get_latest_future_price(symbol):
ticker = binance.fapiPublicGetTickerPrice({'symbol': binance.market_id(symbol)})
return float(ticker['price'])
# 獲取台灣時間
def get_taiwan_time():
utc_now = dt.datetime.now(dt.timezone.utc)
taiwan_tz = pytz.timezone('Asia/Taipei')
return utc_now.astimezone(taiwan_tz).strftime('%Y-%m-%d %H:%M:%S')
# 開倉
# def open_contract(symbol, side, amount, leverage, stop_loss_price=None):
# market = binance.market(symbol)
# binance.fapiPrivatePostLeverage({'symbol': market['id'], 'leverage': leverage})
# order = binance.create_order(symbol, 'market', side, amount)
# logging.info(f"{get_taiwan_time()} - 開倉成功: {side} {symbol} {amount} @ {order['price']}, 槓桿: {leverage}")
# if stop_loss_price:
# stop_loss_side = 'sell' if side == 'buy' else 'buy'
# stop_loss_order = binance.create_order(symbol, 'stop_market', stop_loss_side, amount, None, {'stopPrice': stop_loss_price})
# logging.info(f"{get_taiwan_time()} - 設置止損: {stop_loss_side} {symbol} {amount} @ {stop_loss_price}")
# return order
def open_contract(symbol, side, amount, leverage, margin_type='ISOLATED'):
market = binance.market(symbol)
try:
# 設置槓桿
binance.fapiPrivatePostLeverage({'symbol': market['id'], 'leverage': leverage})
except ccxt.BaseError as e:
logging.error(f"Error setting leverage: {str(e)}")
try:
# 創建訂單
order = binance.create_order(symbol, 'market', side, amount)
logging.info(f"{get_taiwan_time()} - 開倉成功: {side} {symbol} {amount} @ {order['price']}, 槓桿: {leverage}, 保證金模式: {margin_type}")
return order
except ccxt.BaseError as e:
logging.error(f"Error creating order: {str(e)}")
return None
# 關倉
def close_contract(symbol, amount=None):
positions = binance.fapiPrivateV2GetPositionRisk()
for position in positions:
if position['symbol'] == binance.market(symbol)['id'] and float(position['positionAmt']) != 0:
side = 'sell' if float(position['positionAmt']) > 0 else 'buy'
print(side)
current_amount = abs(float(position['positionAmt']))
amount_to_close = current_amount if amount is None else min(amount, current_amount)
order = binance.create_order(symbol, 'market', side, amount_to_close)
logging.info(f"{get_taiwan_time()} - 關倉成功: {side} {symbol} {amount_to_close} @ {order['price']}")
print(f"Closed {side} position of {amount_to_close} {symbol}")
# 設置止損單
def set_stop_loss(symbol, side, amount, stop_loss_price):
stop_loss_side = 'sell' if side == 'buy' else 'buy'
stop_loss_order = binance.create_order(symbol, 'STOP_MARKET', stop_loss_side, amount, None, {'stopPrice': stop_loss_price})
logging.info(f"{get_taiwan_time()} - 設置止損: {stop_loss_side} {symbol} {amount} @ {stop_loss_price}")
# 取消所有止損單
def cancel_all_stop_orders(symbol):
open_orders = binance.fetch_open_orders(symbol)
for order in open_orders:
if order['type'] == 'stop_market':
binance.cancel_order(order['id'], symbol)
logging.info(f"{get_taiwan_time()} - 取消止損單: {order['side']} {symbol} {order['amount']} @ {order['price']}")
# 重新設置止損單
def reset_stop_loss(symbol, side, amount, new_stop_loss_price):
cancel_all_stop_orders(symbol)
set_stop_loss(symbol, side, amount, new_stop_loss_price)
# 計算強平價格
def get_liquidation_prices(symbol, entry_price, amount, leverage):
entry_price = float(entry_price)
leverage = float(leverage)
initial_margin = amount / leverage # 初始保證金,以USDT計算
maintenance_margin = initial_margin * 0.005 # 維持保證金,假設為初始保證金的0.5%
# 多倉強平價格
liquidation_price_long = entry_price * (1 - (initial_margin + maintenance_margin) / amount)
# 空倉強平價格
liquidation_price_short = entry_price * (1 + (initial_margin + maintenance_margin) / amount)
return liquidation_price_long, liquidation_price_short, initial_margin
# 計算特定symbol的未實現盈虧
def calculate_unrealized_pnl(symbol):
positions = binance.fapiPrivateV2GetPositionRisk()
for position in positions:
if position['symbol'] == binance.market(symbol)['id'] and float(position['positionAmt']) != 0:
entry_price = float(position['entryPrice'])
current_price = get_latest_future_price(symbol)
amount = float(position['positionAmt'])
# 計算未實現盈虧
if amount > 0:
pnl = (current_price - entry_price) * amount
else:
pnl = (entry_price - current_price) * abs(amount)
pnl_percentage = (pnl / (entry_price * abs(amount))) * 100
print(f"Symbol: {symbol}, Position: {amount}, Entry Price: {entry_price}, Current Price: {current_price}, PnL: {pnl}, PnL Percentage: {pnl_percentage}%")
logging.info(f"{get_taiwan_time()} - Symbol: {symbol}, Position: {amount}, Entry Price: {entry_price}, Current Price: {current_price}, PnL: {pnl}, PnL Percentage: {pnl_percentage}%")
def calculate_unrealized_pnl(symbol):
positions = binance.fapiPrivateV2GetPositionRisk()
for position in positions:
if position['symbol'] == binance.market(symbol)['id'] and float(position['positionAmt']) != 0:
print(position)
entry_price = float(position['entryPrice'])
current_price = get_latest_future_price(symbol)
amount = float(position['positionAmt'])
# 計算未實現盈虧
if amount > 0:
pnl = (current_price - entry_price) * amount
else:
pnl = (entry_price - current_price) * abs(amount)
pnl_percentage = (pnl / (entry_price * abs(amount))) * 100
print(f"Symbol: {symbol}, Position: {amount}, Entry Price: {entry_price}, Current Price: {current_price}, PnL: {pnl}, PnL Percentage: {pnl_percentage}%")
logging.info(f"{get_taiwan_time()} - Symbol: {symbol}, Position: {amount}, Entry Price: {entry_price}, Current Price: {current_price}, PnL: {pnl}, PnL Percentage: {pnl_percentage}%")
return pnl, pnl_percentage
return 0, 0
# 示例使用
symbol = 'ORDI/USDT'
leverages = get_all_leverage(symbol)
max_leverage = max(leverages)
print(f"可用槓桿: {leverages}")
print(f"最大槓桿: {max_leverage}")
# 獲取最新期貨價格
entry_price = get_latest_future_price(symbol)
print(f"最新期貨價格: {entry_price}")
# 假設入倉資金量
capital =0.240 # 以USDT計算
# 計算多倉和空倉的強平價格及初始保證金
for get_leverage in leverages:
print(f"槓桿: {get_leverage}")
liquidation_price_long, liquidation_price_short, initial_margin = get_liquidation_prices(symbol, entry_price, capital, get_leverage)
print(f"初始保證金: {initial_margin}")
print(f"多倉強平價格: {liquidation_price_long}")
print(f"空倉強平價格: {liquidation_price_short}")
# 設置初始止損價格
stop_loss_price = entry_price * 0.95 # 例如:止損設置為買入價格的95%
# 開啟合約並設置止損
max_leverage = 5
# order = open_contract(symbol, 'buy', capital / entry_price, max_leverage, stop_loss_price)
# order = open_contract(symbol, 'sell', capital , max_leverage)
# print(order['info']['status'])
# # 假設部分平倉一半的倉位
# amount_to_close = capital / entry_price / 2
close_contract(symbol)
#
# # 查詢並打印特定symbol的未實現盈虧
# symbol = 'ADA/USDT'
# contract_pnl, contract_pnl_percentage = calculate_unrealized_pnl(symbol)
# print(contract_pnl)
# print(contract_pnl_percentage)
# # close_contract(symbol)
# # # 重新設置止損單
# # new_stop_loss_price = entry_price * 0.80 # 新的止損價格,例如設置為買入價格的90%
# # reset_stop_loss(symbol, 'buy', capital / entry_price, new_stop_loss_price)
# def fetch_contract_balance():
# # with app.app_context():
# balance = binance.fapiPrivateV2GetBalance()
# usdt_balance = next(item for item in balance if item['asset'] == 'USDT')
# return usdt_balance['balance']
# print(fetch_contract_balance())
# def calculate_unrealized_pnl(symbol):
# try:
# positions = binance.fapiPrivateGetV2PositionRisk()
# for position in positions:
# if position['symbol'] == binance.market(symbol)['id'] and float(position['positionAmt']) != 0:
# entry_price = float(position['entryPrice'])
# current_price = get_latest_future_price(symbol)
# amount = float(position['positionAmt'])
# position_type = "Long" if amount > 0 else "Short"
# # Calculate unrealized PnL
# if amount > 0:
# pnl = (current_price - entry_price) * amount
# else:
# pnl = (entry_price - current_price) * abs(amount)
# pnl_percentage = (pnl / (entry_price * abs(amount))) * 100
# logging.info(f"{get_taiwan_time()} - Symbol: {symbol}, Position: {amount} ({position_type}), Entry Price: {entry_price}, Current Price: {current_price}, PnL: {pnl}, PnL Percentage: {pnl_percentage}%")
# return pnl, pnl_percentage, position_type
# return 0, 0, "No position"
# except ccxt.NetworkError as e:
# logging.error(f"Network error: {str(e)}")
# return 0, 0, "Network error"
# except ccxt.ExchangeError as e:
# logging.error(f"Exchange error: {str(e)}")
# return 0, 0, "Exchange error"
# except Exception as e:
# logging.error(f"An unexpected error occurred: {str(e)}")
# return 0, 0, "Unknown error"
# pnl, pnl_percentage = calculate_unrealized_pnl('MYROUSDT')
# print(f"PnL: {pnl}, PnL Percentage: {pnl_percentage}%, Position Type: ")
# if position_amt > 0:
# contract_holdings[symbol] = f"開多單 ({position_amt})"
# elif position_amt < 0:
# contract_holdings[symbol] = f"開空單 ({abs(position_amt)})"
# else:
# contract_holdings[symbol] = "0"
# return contract_holdings
# except Exception as e:
# logging.error(f"Error fetching contract holdings: {str(e)}")
# return {}
# print(get_contract_holdings())
```
# colab
本來要訓練一個小型風控dqn機器人不過colab 一直斷線,超難訓練出來好像成果不太行,開最好的機器訓練速度又拉不上去
,可能我寫的怪怪的,不過長期訓練應該有料,這邊有幫你寫模型自動備份
```python=
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
import os
import sys
import torch
import random
from torch import nn
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from collections import deque
from pytorch_lightning import Trainer
import warnings
import os
import shutil
# 禁用特定警告类型
warnings.filterwarnings("ignore", category=UserWarning)
# 定義計算布林帶的函數
def calculate_bollinger_bands(data, window, no_of_std):
rolling_mean = data.rolling(window=window).mean()
rolling_std = data.rolling(window=window).std()
upper_band = rolling_mean + (rolling_std * no_of_std)
lower_band = rolling_mean - (rolling_std * no_of_std)
return upper_band, lower_band
# 定義計算歷史波動率的函數
def calculate_historical_volatility(data, window):
log_returns = np.log(data['close'] / data['close'].shift(1))
historical_volatility = log_returns.rolling(window=window).std() * np.sqrt(252) # 年化波動率
return historical_volatility
# 計算 RSI
def calculate_rsi(data, window):
delta = data.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def create_dataloader(data, batch_size=32):
dataset = TradingDataset(data)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 計算 ADX
def calculate_adx(data, window):
tr = pd.DataFrame()
tr['tr0'] = data['high'] - data['low']
tr['tr1'] = abs(data['high'] - data['close'].shift())
tr['tr2'] = abs(data['low'] - data['close'].shift())
tr['tr'] = tr.max(axis=1)
plus_dm = data['high'] - data['high'].shift()
minus_dm = data['low'].shift() - data['low']
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm < 0] = 0
plus_dm[data['high'] - data['high'].shift() <= data['low'].shift() - data['low']] = 0
minus_dm[data['low'].shift() - data['close'] <= data['high'] - data['high'].shift()] = 0
tr14 = tr['tr'].rolling(window=window).sum()
plus_di14 = 100 * (plus_dm.rolling(window=window).sum() / tr14)
minus_di14 = 100 * (minus_dm.rolling(window=window).sum() / tr14)
dx = 100 * abs((plus_di14 - minus_di14) / (plus_di14 + minus_di14))
adx = dx.rolling(window=window).mean()
return adx
# 自定義計算 Aroon 指標
def calculate_aroon(data, window):
aroon_up = []
aroon_down = []
for i in range(window, len(data)):
high_idx = data['high'].iloc[i-window:i+1].idxmax()
low_idx = data['low'].iloc[i-window:i+1].idxmin()
aroon_up.append((window - (i - high_idx)) / window * 100)
aroon_down.append((window - (i - low_idx)) / window * 100)
return np.array(aroon_up), np.array(aroon_down)
# 判斷市場連續下跌或上漲
def detect_trend(data, window, threshold=0.5):
trend_up = data['close'].rolling(window=window).apply(lambda x: (x > x.shift()).mean(), raw=False)
trend_down = data['close'].rolling(window=window).apply(lambda x: (x < x.shift()).mean(), raw=False)
return trend_up, trend_down
# 生成信號
def generate_signals(data):
data['signal'] = 0.0
buy_condition = (data['macd'])
sell_condition = ( data['macd'] )
data.loc[buy_condition, 'signal'] = 1.0
data.loc[sell_condition, 'signal'] = -1.0
data['position'] = data['signal'].diff()
return data
# 動態調整買入和賣出比例
def adjust_buy_fraction(rsi, macd, signal_line):
if rsi < 30 and macd > signal_line:
return 1.0 # 強烈的超賣信號,買入100%
elif rsi < 30:
return 0.75 # 超賣信號,買入75%
elif rsi < 50:
return 0.5 # 較低的RSI,買入50%
elif rsi < 70:
return 0.25 # RSI處於中間位置,買入25%
else:
return 0.1 # RSI較高,買入10%
def adjust_sell_fraction(rsi, macd, signal_line):
if rsi > 70 and macd < signal_line:
return 1.0 # 強烈的超買信號,賣出100%
elif rsi > 70:
return 0.75 # 超買信號,賣出75%
elif rsi > 50:
return 0.5 # 較高的RSI,賣出50%
elif rsi > 30:
return 0.25 # RSI處於中間位置,賣出25%
else:
return 0.1 # RSI較低,賣出10%
class DQNLitModel(pl.LightningModule):
def __init__(self, state_size, action_size):
super(DQNLitModel, self).__init__()
self.state_size = state_size
self.action_size = action_size
self.model = nn.Sequential(
nn.Linear(state_size, 24),
nn.ReLU(),
nn.Linear(24, 24),
nn.ReLU(),
nn.Linear(24, action_size)
)
self.memory = deque(maxlen=20000)
self.gamma = 0.95 # 折扣率
self.epsilon = 1.0 # 探索率
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.learning_rate = 0.001
self.criterion = nn.MSELoss()
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
def forward(self, x):
return self.model(x)
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
state = torch.FloatTensor(state)
act_values = self(state)
return torch.argmax(act_values).item() # 返回動作
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def replay(self, batch_size):
if len(self.memory) < batch_size:
return
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
state = torch.FloatTensor(state).unsqueeze(0) # 添加批量維度
next_state = torch.FloatTensor(next_state).unsqueeze(0) # 添加批量維度
reward = torch.FloatTensor([reward])
target = reward
if not done:
target = reward + self.gamma * torch.max(self(next_state))
target_f = self(state).detach()
target_f[0][action] = target
self.optimizer.zero_grad()
loss = self.criterion(self(state)[0][action], target)
loss.backward()
self.optimizer.step()
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def training_step(self, batch, batch_idx):
state, action, reward, next_state, done = batch
state = state.float()
next_state = next_state.float()
action = action.long()
reward = reward.float()
done = done.bool()
# 獲取當前狀態的動作價值
current_q_values = self(state).gather(1, action.unsqueeze(-1)).squeeze(-1)
# 計算下一狀態的最大預期Q值
next_q_values = self(next_state).max(1)[0]
next_q_values[done] = 0.0
expected_q_values = reward + self.gamma * next_q_values
loss = self.criterion(current_q_values, expected_q_values.detach())
self.log('train_loss', loss)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.learning_rate)
def calculate_macd(data, short_window, long_window, signal_window):
exp1 = data['close'].ewm(span=short_window, adjust=False).mean()
exp2 = data['close'].ewm(span=long_window, adjust=False).mean()
macd_line = exp1 - exp2
signal_line = macd_line.ewm(span=signal_window, adjust=False).mean()
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def preprocess_data(data):
data['timestamp'] = pd.to_datetime(data['timestamp'])
for col in data.columns:
if col != 'timestamp':
data[col] = pd.to_numeric(data[col], errors='coerce')
data = data.dropna() # 移除任何包含NaN的行
return data
def load_and_preprocess_data(file_path):
data = pd.read_csv(file_path)
data = preprocess_data(data)
return data
class TradingDataset(Dataset):
def __init__(self, data):
self.data = data.astype(float).dropna() # 確保所有數據都是浮點數並且去掉任何NaN值
def __len__(self):
return len(self.data) - 1
def __getitem__(self, idx):
columns = ['open', 'high', 'low', 'close', 'volume', 'historical_volatility', 'aroon_up', 'aroon_down', 'macd', 'signal_line', 'upper_band', 'lower_band', 'rsi', 'adx', 'trend_up', 'trend_down']
state = torch.tensor(self.data.iloc[idx][columns].values, dtype=torch.float32)
next_state = torch.tensor(self.data.iloc[idx + 1][columns].values, dtype=torch.float32)
reward = torch.tensor([self.data['close'].iloc[idx + 1] - self.data['close'].iloc[idx]], dtype=torch.float32)
action = torch.tensor(random.choice([0, 1, 2]), dtype=torch.int64)
done = torch.tensor(idx == len(self.data) - 2, dtype=torch.bool)
return state, action, reward, next_state, done
def train_dqn(data, model, episodes, batch_size, num_workers=4):
columns = ['open', 'high', 'low', 'close', 'volume', 'historical_volatility', 'aroon_up', 'aroon_down', 'macd', 'signal_line', 'upper_band', 'lower_band', 'rsi', 'adx', 'trend_up', 'trend_down']
trainer = Trainer(max_epochs=episodes, devices="auto", accelerator="auto")
dataloader = create_dataloader(data[columns], batch_size=batch_size) # 只選擇需要的列
trainer.fit(model, dataloader)
return model
def prepare_data_for_training(file_path):
data = load_and_preprocess_data(file_path)
# 轉換timestamp為datetime
data['timestamp'] = pd.to_datetime(data['timestamp'])
# 將數據重新採樣為15分鐘線
data.set_index('timestamp', inplace=True)
data_15min = data.resample('15min').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna().reset_index()
# 轉換所有數值列為float類型
data_15min[['open', 'high', 'low', 'close', 'volume']] = data_15min[['open', 'high', 'low', 'close', 'volume']].astype(float)
# 計算歷史波動率
hv_window = 14
data_15min['historical_volatility'] = calculate_historical_volatility(data_15min, hv_window)
# 計算 Aroon 指標
aroon_window = 14
aroon_up, aroon_down = calculate_aroon(data_15min, aroon_window)
# 補全Aroon指標,使其長度與原數據匹配
data_15min['aroon_up'] = np.nan
data_15min['aroon_down'] = np.nan
data_15min.loc[aroon_window:, 'aroon_up'] = aroon_up
data_15min.loc[aroon_window:, 'aroon_down'] = aroon_down
short_window = 12
long_window = 26
signal_window = 9
data_15min['macd'], data_15min['signal_line'], data_15min['histogram'] = calculate_macd(data_15min, short_window, long_window, signal_window)
# 計算布林帶
bollinger_window = 20
no_of_std = 2
data_15min['upper_band'], data_15min['lower_band'] = calculate_bollinger_bands(data_15min['close'], bollinger_window, no_of_std)
# 計算 RSI
rsi_window = 14
data_15min['rsi'] = calculate_rsi(data_15min['close'], rsi_window)
# 計算 ADX
adx_window = 14
data_15min['adx'] = calculate_adx(data_15min, adx_window)
# 判斷市場連續下跌或上漲
trend_window = 20
trend_up, trend_down = detect_trend(data_15min, trend_window)
data_15min['trend_up'] = trend_up
data_15min['trend_down'] = trend_down
return data_15min
# 繼續訓練的函數
def continue_training(model, checkpoint_path):
# 檢查檢查點文件是否存在
if os.path.isfile(checkpoint_path):
# 載入檢查點
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['state_dict'])
print(f"已成功載入檢查點:{checkpoint_path}")
return model
else:
print(f"檢查點文件不存在:{checkpoint_path}")
return model
def backtest_strategy(file_path, data, model):
try:
# 計算信號
data = generate_signals(data)
# 初始化資金和持倉
initial_capital = 50.0
capital = initial_capital
num_ada = 0 # 持有的ADA數量
# 回測邏輯
trades = [] # 保存交易記錄
buy_signals = []
sell_signals = []
for i in range(1, len(data)):
state = data.iloc[i].drop('timestamp').values.reshape(1, -1)
action = model.act(state)
if action == 1: # 買入信號
buy_price = data['close'].iloc[i]
buy_fraction = adjust_buy_fraction(data['rsi'].iloc[i], data['macd'].iloc[i], data['signal_line'].iloc[i])
num_ada_to_buy = capital * buy_fraction / buy_price
total_cost = buy_price * num_ada_to_buy
if capital >= total_cost and num_ada_to_buy > 0:
capital -= total_cost
num_ada += num_ada_to_buy
trades.append({'timestamp': data['timestamp'].iloc[i], 'type': 'buy', 'price': buy_price, 'num_ada': num_ada_to_buy})
buy_signals.append((data['timestamp'].iloc[i], buy_price))
# print(f"買入: {data['timestamp'].iloc[i]} 價格: {buy_price} 數量: {num_ada_to_buy}")
elif action == 2: # 賣出信號
sell_price = data['close'].iloc[i]
sell_fraction = adjust_sell_fraction(data['rsi'].iloc[i], data['macd'].iloc[i], data['signal_line'].iloc[i])
num_ada_to_sell = num_ada * sell_fraction
total_profit = sell_price * num_ada_to_sell
if num_ada_to_sell > 0:
capital += total_profit
num_ada -= num_ada_to_sell
trades.append({'timestamp': data['timestamp'].iloc[i], 'type': 'sell', 'price': sell_price, 'profit': total_profit, 'num_ada': num_ada_to_sell})
sell_signals.append((data['timestamp'].iloc[i], sell_price))
# print(f"賣出: {data['timestamp'].iloc[i]} 價格: {sell_price} 數量: {num_ada_to_sell} 利潤: {total_profit}")
# 計算最終資金
final_capital = capital + (data['close'].iloc[-1] * num_ada)
roi = (final_capital - initial_capital) / initial_capital
print(f"初始資金: {initial_capital} 最終資金: {final_capital} 淨利潤: {final_capital - initial_capital} 投資回報率: {roi}")
return {
'file': os.path.basename(file_path),
'initial_capital': initial_capital,
'final_capital': final_capital,
'net_profit': final_capital - initial_capital,
'roi': roi
}
except Exception as e:
print(f"處理文件時發生錯誤: {e}")
return None
def main():
torch.set_float32_matmul_precision('high')
checkpoint_path = '/content/lightning_logs/checkpoints/epoch=9-step=10950.ckpt' # 假設檢查點保存的路徑
# 繼續訓練
global_model = continue_training(global_model, checkpoint_path)
if len(sys.argv) > 1:
# 如果提供了CSV文件路徑,則僅回測該文件
file_path = sys.argv[1]
data = prepare_data_for_training(file_path)
global_model = train_dqn(data, global_model, episodes=10, batch_size=32)
result = backtest_strategy(data, global_model)
if result:
print(result)
else:
# 否則,回測當前目錄下的所有CSV文件
csv_files = glob.glob("./data3/*.csv")
csv_files2 = glob.glob("./data2/*.csv")
results = []
data= []
data_count = 0
f1 = 'dqn_model.pth' # 欲複製的檔案
f2 = './lightning_logs/dqn_model.pth' # 存檔的位置與檔案名稱
if os.path.isfile(f1):
shutil.copyfile(f1,f2) # 複製檔案
f1 = 'test.py' # 欲複製的檔案
f2 = './lightning_logs/test.py' # 存檔的位置與檔案名稱
if os.path.isfile(f2):
shutil.copyfile(f1,f2) # 複製檔案
destination_path = '/content/drive/MyDrive/modelbackup'
source_path = '/content/lightning_logs'
if os.path.exists(source_path):
shutil.copytree(source_path,destination_path, dirs_exist_ok=True)
checkpoint_path = './lightning_logs/checkpoints/last.ckpt'
for file_path in csv_files:
data.append(prepare_data_for_training(file_path))
global_model = train_dqn(data[data_count], global_model, episodes=10, batch_size=32)
shutil.copyfile(f1,f2) # 複製檔案
shutil.copytree(source_path,destination_path, dirs_exist_ok=True)
torch.save(global_model, 'dqn_model.pth')
for file_path in csv_files2:
data.append(prepare_data_for_training(file_path))
global_model = train_dqn(data[data_count], global_model, episodes=10, batch_size=32)
shutil.copytree(source_path,destination_path, dirs_exist_ok=True)
shutil.copyfile(f1,f2) # 複製檔案
torch.save(global_model, 'dqn_model.pth')
data_count = 0
for file_path in csv_files:
result = backtest_strategy(file_path,data[data_count], global_model)
if result:
results.append(result)
data_count+=1
# 計算總和
total_initial_capital = sum([r['initial_capital'] for r in results])
total_final_capital = sum([r['final_capital'] for r in results])
total_net_profit = sum([r['net_profit'] for r in results])
total_roi = (total_final_capital - total_initial_capital) / total_initial_capital
# 添加總和到結果中
results.append({
'file': 'Total',
'initial_capital': total_initial_capital,
'final_capital': total_final_capital,
'net_profit': total_net_profit,
'roi': total_roi
})
# 轉換為DataFrame並保存到CSV文件
results_df = pd.DataFrame(results)
results_df.to_csv('backtest_results.csv', index=False)
print("所有回測完成,結果已保存到backtest_results.csv。")
if __name__ == "__main__":
main()
```