# Transaction Courier Service Implementation Questions ## 1. Changes of handling messages of websocket client After looking into the documentation of `tungstenite` and `tokio-tungstenite`, I found that it can automatically respond a "Pong" message when receiving a "Ping" message. Therefore, current client will only handle "Binary" message and "Closed" message. ```rust= pub(crate) async fn start_ws_clients(db: Arc<Database>, subscribed_nodes: Vec<String>) { // start connection to all subscribed nodes 'nodes: for node_ip in subscribed_nodes { // Parse RPC url let connect_addr = format!("ws://{}/subscribe_to_transaction_events", node_ip); let url = match Url::parse(&connect_addr) { Ok(url) => url, Err(_) => { ErrorLogInfo::UrlParseFailure.log_error(format!("for node {}", node_ip)); continue 'nodes; } }; // Connect to the server let ws_stream = match connect_async(url).await { Ok((ws_stream, _)) => ws_stream, Err(_) => { ErrorLogInfo::ConnectionFailure.log_error(format!("for node {}", node_ip)); continue 'nodes; } }; log::debug!("WebSocket handshake to {} has been successfully completed", node_ip); let (mut ws_write, mut ws_read) = ws_stream.split(); let (event_tx, event_rx) = mpsc::unbounded_channel(); let mut event_rx = UnboundedReceiverStream::new(event_rx); let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL)); let db_stream = Arc::clone(&db); let db_channel = Arc::clone(&db); let node_ip_cloned = node_ip.clone(); tokio::spawn(async move { // handle incoming WebSocket messages and send a Ping message periodically every heartbeat. 'websocket: loop { tokio::select! { Some(Ok(message)) = ws_read.next() => { if let Message::Binary(data) = message { let websocket_data: WebSocketData = match Deserializable::deserialize(data.as_slice()) { Ok(result) => result, Err(_) => { ErrorLogInfo::DeserialiseMessageFailure.log_error(format!("for node {}", node_ip)); continue 'websocket; } }; log::info!("Receive event from the server: {:?}", websocket_data.tx_hash); // forward the websocket data to channel match event_tx.send(websocket_data) { Ok(result) => result, Err(_) => { ErrorLogInfo::SendToChannel.log_error(format!("for node {}", node_ip)); } }; } else if let Message::Close(_) = message { match handle_close_connection(db_stream.clone()) { Ok(result) => result, Err(e) => { ErrorLogInfo::SQLiteError.log_error(format!("{} for node {}", e, node_ip)); } }; break 'websocket; } else if message.is_pong() { log::info!("Received the pong message from the server."); } }, _ = interval.tick() => { match ws_write.send(Message::Ping(vec![])).await{ Ok(result) => result, Err(_) => { ErrorLogInfo::SendPing.log_error(format!("for node {}", node_ip)); } }; } } } }); // Spwan thread for Event inserter tokio::spawn(async move { while let Some(websocket_data) = event_rx.next().await{ match handle_event(db_channel.clone(), websocket_data.tx_hash, websocket_data.event) { Ok(result) => result, Err(e) => { ErrorLogInfo::SQLiteError.log_error(format!("{} for node {}", e, node_ip_cloned)) } } } }); } } #[derive(BorshSerialize, BorshDeserialize)] struct WebSocketData { pub(crate) tx_hash: Sha256Hash, pub(crate) event: CourierEvent, } ``` ## 2. `fullnode_id` inside the `CourierEvent` table ```sql= CREATE TABLE IF NOT EXISTS CourierEvent( id INTEGER PRIMARY KEY, tx_hash BLOB NOT NULL, fullnode_id TEXT NOT NULL, event BLOB NOT NULL, reason BLOB, FOREIGN KEY(tx_hash) REFERENCES DispatchedTransaction(hash) ON DELETE CASCADE ); ``` what is the specific `fullnode_id`, shall I use the "node_ip" address directly? ## 3. Endpoint `POST /submit_transaction` Does this endpoint simply call fullnode's POST /submit_transaction?