# lstm-m2o-0106-10step-final ###### tags: `Code` # Score ![](https://i.imgur.com/qR04iMq.png) # G-Research Crypto Forecasting Use your ML expertise to predict real crypto market data # 環境設定 ```python import time import pandas as pd import numpy as np from datetime import datetime from sklearn.preprocessing import StandardScaler import tensorflow as tf from tensorflow import keras ``` ## 訓練資料設定 ```python #data_folder = '' data_folder = '../input/g-research-crypto-forecasting/' crypto_df = pd.read_csv(data_folder+'train.csv') ``` ## 建立貨幣 map ```python # Adding Crypto symbols to the dataframes for easy reference asset_symbols_map = { 0: 'BNB', 1: 'BTC', 2: 'BCH', 3: 'ADA', 4: 'DOGE', 5: 'EOS.IO', 6: 'ETH', 7: 'ETC', 8: 'IOTA', 9: 'LTC', 10: 'MKR', 11: 'XMR', 12: 'XLM', 13: 'TRON' } ``` # 工具函數 ## find_range - 找出加密貨幣中時間最長的 ```python def find_range(df): # 找出加密貨幣中時間最長的 max_len = 0 max_index = -1 min_start_time = -1 max_finish_time = -1 for i in asset_symbols_map: _ = (df["Asset_ID"]==i).sum() _st = int(df[df["Asset_ID"]==i].iloc[0]['timestamp']) _ft = int(df[df["Asset_ID"]==i].iloc[-1]['timestamp']) if min_start_time == -1: min_start_time = _st if min_start_time > _st: min_start_time = _st if max_finish_time < _ft: max_finish_time = _ft if _ >= max_len: max_len = _ max_index = i return min_start_time, max_finish_time, max_len, max_index ``` ## fill_missing_value - 填充缺失值 & 挑選位於尾部的資料 ```python def fill_missing_value(pdData, crypto_map, tail_num=None, selectime=None , showinfo=False): # 強迫每人拉自一樣長度 (看最長的是誰) # 保證加密貨幣都有值 # 保證大家長度都一樣 crypto_min = dict() pdData = pdData.sort_values(by=['timestamp']) min_start_time, max_finish_time, max_len, max_index = find_range(pdData.reset_index()) for i in asset_symbols_map: crypto_min[asset_symbols_map[i]] = pdData[pdData["Asset_ID"]==i].set_index(['timestamp']) crypto_min[asset_symbols_map[i]] = crypto_min[asset_symbols_map[i]].reindex(range(min_start_time, max_finish_time+60,60), method='pad') crypto_min[asset_symbols_map[i]] = crypto_min[asset_symbols_map[i]].fillna(method='ffill').fillna(method='bfill') if showinfo == True: print('['+str(i)+']-'+asset_symbols_map[i]+'=============================================') print('**Info**') print(crypto_min[asset_symbols_map[i]].info()) print('\n**Timestamp**') print((crypto_min[asset_symbols_map[i]].index[1:]-crypto_min[asset_symbols_map[i]].index[:-1]).value_counts().head()) print('\n**Missing Value**') print(crypto_min[asset_symbols_map[i]].isna().sum()) print('\n\n') # 是否限制輸出大小為每個貨幣的尾巴幾筆 if tail_num != None: if selectime == None: for i in asset_symbols_map: crypto_min[asset_symbols_map[i]] = crypto_min[asset_symbols_map[i]].tail(tail_num) else: for i in asset_symbols_map: crypto_min[asset_symbols_map[i]] = crypto_min[asset_symbols_map[i]].loc[selectime-60*(tail_num-1) : selectime+60] # 再次合併所有加密貨幣的資料 for i in asset_symbols_map: if i == 0: crypto_min_new = crypto_min[asset_symbols_map[i]] else: crypto_min_new = crypto_min_new.append(crypto_min[asset_symbols_map[i]]) return crypto_min_new.reset_index() ``` ## get_features - 生成額外特徵 ```python def upper_shadow(df): return df['High'] - np.maximum(df['Close'], df['Open']) def lower_shadow(df): return np.minimum(df['Close'], df['Open']) - df['Low'] def get_features(df, row = False): df_feat = df.drop(['Target'],axis=1) df_feat['spread'] = df_feat['High'] - df_feat['Low'] df_feat['mean_trade'] = df_feat['Volume']/df_feat['Count'] df_feat['log_price_change'] = np.log(df_feat['Close']/df_feat['Open']) df_feat['upper_Shadow'] = upper_shadow(df_feat) df_feat['lower_Shadow'] = lower_shadow(df_feat) df_feat["high_div_low"] = df_feat["High"] / df_feat["Low"] df_feat['trade'] = df_feat['Close'] - df_feat['Open'] df_feat['Target'] = df['Target'] return df_feat ``` ## make_trianing_data - 生成訓練模型需使用的資料 ```python def make_trianing_data(pdData): crypto_min = dict() for i in asset_symbols_map: crypto_min[asset_symbols_map[i]] = pdData[pdData["Asset_ID"]==i].sort_values(by=['timestamp']).set_index(['timestamp']).drop('Asset_ID', axis=1) # crypto_min[asset_symbols_map[i]] = pdData[pdData["Asset_ID"]==i].sort_values(by=['timestamp']).set_index(['timestamp']) # crypto_min[asset_symbols_map[i]] = crypto_min[asset_symbols_map[i]][['Count','Open','High','Low','Close','Volume','VWAP','Target']] _ = list() for i in asset_symbols_map: _.append(crypto_min[asset_symbols_map[i]].to_numpy()) x = np.array(_) # 取出沒有 target 的部分 x_feature = x[:,:,:-1] #print(x_feature.shape) # 取出 target 的部分 #x_target = x[:,:,-1][:,:,np.newaxis] y_target = x[:,:,:] #print(x_target.shape) # 重新調整 x_feature = np.concatenate(x_feature,axis=1) #print(x_feature.shape) y_target = np.concatenate(y_target,axis=1) #print(x_target.shape) return x_feature, y_target ``` ## time_series - 製作時間序列 ```python def time_series(x,y,n_steps): # 製作時間序列 batch_size = x.shape[0]-n_steps+1 x_dim = x.shape[1] y_dim = y.shape[1] x_ = np.zeros((batch_size,n_steps,x_dim)) y_ = np.zeros((batch_size,y_dim)) for j in range(batch_size): x_[j] = x[j:j+n_steps,:] y_[j] = y[j+n_steps-1,:] return x_, y_, x_dim, y_dim ``` # 資料前處理 & 模型訓練 ## 訓練資料範圍設定 ```python # 選擇時間範圍 # auxiliary function, from datetime to timestamp totimestamp = lambda s: np.int32(time.mktime(datetime.strptime(s, "%d/%m/%Y").timetuple())) # 開始與結束的時間 start_time = totimestamp('01/06/2021') end_time = 1632181440 #totimestamp('01/09/2021') # 時間序列步長 n_steps = 10 ``` ## 資料前處理 - 添補空缺值 - 生成額外的特徵 - 生成訓練資料 - 標準化 - 生成時間序列格式 ```python stime = time.time() # 填補空缺值 crypto_min = fill_missing_value(pdData=crypto_df.set_index(['timestamp']).loc[start_time:].reset_index(), crypto_map=asset_symbols_map, showinfo=False) # 生成額外的特徵 crypto_min = get_features(crypto_min) # 生成訓練資料 x_train, y_train = make_trianing_data(crypto_min) print(x_train.shape, y_train.shape) # 標準化 ## fit x_scaler=StandardScaler().fit(x_train) y_scaler=StandardScaler().fit(y_train) ## transform x_train = x_scaler.transform(x_train) y_train = y_scaler.transform(y_train) # 生成時間序列格式 x_train, y_train, x_dim, y_dim = time_series(x_train, y_train, n_steps) print(x_train.shape, y_train.shape) print('Cost time:',time.time()-stime) ``` (161280, 196) (161280, 210) (161271, 10, 196) (161271, 210) Cost time: 8.154935598373413 ## 建立模型 - Sequential model 1. LSTM `600` 2. dropout `0.2` 3. LSTM `600` 4. dropout `0.2` 5. Dense - EarlyStopping - Loss: `mse` - Optimizer: `adam` - Epoch: `100` - Validation_split: `0.01` ```python stime = time.time() model = keras.models.Sequential([ keras.layers.LSTM(600, return_sequences=True, input_shape=[None, x_dim]), keras.layers.Dropout(0.2), keras.layers.LSTM(600), keras.layers.Dropout(0.2), keras.layers.Dense(y_dim) ]) callback = tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=2, mode='auto', restore_best_weights=True ) model.compile(loss="mse", optimizer="adam") history = model.fit(x_train, y_train, epochs=100, callbacks = [callback], validation_split=0.01) print('Cost time:',time.time()-stime) ``` 2022-01-11 14:20:22.549889: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:22.655182: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:22.656349: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:22.659820: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. 2022-01-11 14:20:22.660894: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:22.662222: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:22.663509: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:24.294054: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:24.294930: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:24.295574: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero 2022-01-11 14:20:24.296138: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15403 MB memory: -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0 2022-01-11 14:20:25.925427: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 1251718720 exceeds 10% of free system memory. 2022-01-11 14:20:27.446024: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 1251718720 exceeds 10% of free system memory. 2022-01-11 14:20:28.469333: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2) Epoch 1/100 2022-01-11 14:20:31.501281: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8005 4990/4990 [==============================] - 42s 8ms/step - loss: 0.2516 - val_loss: 0.3045 Epoch 2/100 4990/4990 [==============================] - 37s 7ms/step - loss: 0.2088 - val_loss: 0.2786 Epoch 3/100 4990/4990 [==============================] - 38s 8ms/step - loss: 0.1956 - val_loss: 0.2781 Epoch 4/100 4990/4990 [==============================] - 37s 7ms/step - loss: 0.1919 - val_loss: 0.2896 Epoch 5/100 4990/4990 [==============================] - 38s 8ms/step - loss: 0.1834 - val_loss: 0.3059 Cost time: 198.7042691707611 # 運行測試 API ## 測試前置 - 建立測試環境 - 從訓練資料抽出最後 n_steps 筆資料 - 為了讓測試資料塞入補值 - 為了能生成測試的時間序列格式 ```python # 建立測試環境 import gresearch_crypto env = gresearch_crypto.make_env() iter_test = env.iter_test() ``` ```python # 限制輸出 n_steps 筆 crypto_min_tail = fill_missing_value(crypto_min, asset_symbols_map, n_steps) ``` ## 開始測試 - 測資會被塞入訓練資料重新填值 ```python start_time = time.time() end_time = time.time() n_i =1 for (test_dfs, pred_dfs) in iter_test: test_dfs = test_dfs.sort_values(by=['timestamp']) test_dfs['Target'] = 0 row_id = test_dfs[['Asset_ID','row_id']].set_index('Asset_ID')['row_id'].to_dict() test_dfs = get_features(test_dfs) selecttime = int(crypto_min_tail.iloc[-1]['timestamp'])+60 test_dfs['timestamp'] = selecttime crypto_min_tail= crypto_min_tail.append(test_dfs.drop(['row_id'],axis=1), ignore_index=True).drop_duplicates() crypto_min_tail= fill_missing_value(crypto_min_tail, asset_symbols_map, n_steps, selecttime) x_test, y_test = make_trianing_data(crypto_min_tail) x_test = x_scaler.transform(x_test) x_test, _,_,_ = time_series(x_test, y_test, n_steps) result = model.predict(x_test) res_tg = y_scaler.inverse_transform(result).reshape((14,-1))[:,-1] res_tg = res_tg.tolist() output = list() for i,j in enumerate(row_id): output.append(float('%0.8f'%(res_tg[j]))) pred_dfs['Target'] = output env.predict(pred_dfs) print('Iter: '+str(n_i)+', cost time: '+str(time.time()-end_time),end='\r') n_i += 1 end_time = time.time() print('\nEnd time: ',time.time()-start_time) ``` This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set. Iter: 4, cost time: 0.10308551788330078 End time: 1.037280559539795 ## Local API 模擬器 ```python # import time # import numpy as np # import pandas as pd # from numpy import dtype # asset_details = pd.read_csv(data_folder+'asset_details.csv') # id_2_weight = dict(zip(asset_details.Asset_ID, asset_details.Weight)) # dtypes = {'timestamp': np.int64, 'Asset_ID': np.int8, # 'Count': np.int32, 'Open': np.float64, # 'High': np.float64, 'Low': np.float64, # 'Close': np.float64, 'Volume': np.float64, # 'VWAP': np.float64, 'Target': np.float64} # def datestring_to_timestamp(ts): # return int(pd.Timestamp(ts).timestamp()) # def read_csv_slice(file_path=data_folder+'train.csv', dtypes=dtypes, use_window=None): # df = pd.read_csv(file_path, dtype=dtypes) # if use_window is not None: # df = df[(df.timestamp >= use_window[0]) & (df.timestamp < use_window[1])] # return df # def weighted_correlation(a, b, weights): # w = np.ravel(weights) # a = np.ravel(a) # b = np.ravel(b) # sum_w = np.sum(w) # mean_a = np.sum(a * w) / sum_w # mean_b = np.sum(b * w) / sum_w # var_a = np.sum(w * np.square(a - mean_a)) / sum_w # var_b = np.sum(w * np.square(b - mean_b)) / sum_w # cov = np.sum((a * b * w)) / np.sum(w) - mean_a * mean_b # corr = cov / np.sqrt(var_a * var_b) # return corr # # start = datestring_to_timestamp('2021-06-13-00-00') # # end = datestring_to_timestamp('2021-09-22-01-00') # start = 1623542400 # end = 1623542620 # file_path = data_folder+'supplemental_train.csv' # train_df = read_csv_slice(file_path,use_window=[start, end]) # class API: # def __init__(self, df): # df = df.astype(dtypes) # df['row_id'] = df.index # dfg = df.groupby('timestamp') # self.data_iter = dfg.__iter__() # self.init_num_times = len(dfg) # self.next_calls = 0 # self.pred_calls = 0 # self.predictions = [] # self.targets = [] # print("This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set. ;)") # def __iter__(self): # return self # def __len__(self): # return self.init_num_times - self.next_calls # def __next__(self): # assert self.pred_calls == self.next_calls, "You must call `predict()` successfully before you can get the next batch of data." # timestamp, df = next(self.data_iter) # self.next_calls += 1 # data_df = df.drop(columns=['Target']) # true_df = df.drop(columns=['timestamp','Count','Open','High','Low','Close','Volume','VWAP']) # true_df = true_df[['row_id', 'Target', 'Asset_ID']] # self.targets.append(true_df) # pred_df = true_df.drop(columns=['Asset_ID']) # pred_df['Target'] = 0. # return data_df, pred_df # def predict(self, pred_df): # assert self.pred_calls == self.next_calls - 1, "You must get the next batch of data from the API before making a new prediction." # assert pred_df.columns.to_list() == ['row_id', 'Target'], "Prediction dataframe should have columns `row_id` and `Target`." # pred_df = pred_df.astype({'row_id': dtype('int64'), 'Target': dtype('float64')}) # self.predictions.append(pred_df) # self.pred_calls += 1 # def score(self, id_2_weight=id_2_weight): # pred_df = pd.concat(self.predictions).rename(columns={'Target':'Prediction'}) # true_df = pd.concat(self.targets) # scoring_df = pd.merge(true_df, pred_df, on='row_id', how='left') # scoring_df['Weight'] = scoring_df.Asset_ID.map(id_2_weight) # scoring_df = scoring_df[scoring_df.Target.isna()==False] # if scoring_df.Prediction.var(ddof=0) < 1e-10: # score = -1 # else: # score = weighted_correlation(scoring_df.Prediction, scoring_df.Target, scoring_df.Weight) # return scoring_df, score ``` ```python # # 限制輸出 n_steps 筆 # crypto_min_tail = fill_missing_value(crypto_min, asset_symbols_map, n_steps) ``` ```python # api = API(train_df) # print() # start_time = time.time() # end_time = time.time() # n_i =1 # for (test_dfs, pred_dfs) in api: # test_dfs = test_dfs.sort_values(by=['timestamp']) # test_dfs['Target'] = 0 # row_id = test_dfs[['Asset_ID','row_id']].set_index('Asset_ID')['row_id'].to_dict() # test_dfs = get_features(test_dfs) # selecttime = int(crypto_min_tail.iloc[-1]['timestamp'])+60 # test_dfs['timestamp'] = selecttime # crypto_min_tail= crypto_min_tail.append(test_dfs.drop(['row_id'],axis=1), ignore_index=True).drop_duplicates() # crypto_min_tail= fill_missing_value(crypto_min_tail, asset_symbols_map, n_steps, selecttime) # x_test, y_test = make_trianing_data(crypto_min_tail) # # 做標準化 # # transform # x_test = x_scaler.transform(x_test) # y_test = y_scaler.transform(y_test) # x_test, y_test,_,_ = time_series(x_test, y_test, n_steps) # result = model.predict(x_test) # res_tg = y_scaler.inverse_transform(result).reshape((14,-1))[:,-1] # res_tg = res_tg.tolist() # output = list() # for i,j in enumerate(row_id): # output.append(float('%0.8f'%(res_tg[j]))) # pred_dfs['Target'] = output # api.predict(pred_dfs) # print('Iter: '+str(n_i)+', cost time: '+str(time.time()-end_time),end='\r') # n_i += 1 # end_time = time.time() # df, score = api.score() # print() # print(f"Your LB score is {round(score, 4)}") ```