# Transaction Courier Service Implementation Questions
## 1. Changes of handling messages of websocket client
After looking into the documentation of `tungstenite` and `tokio-tungstenite`, I found that it can automatically respond a "Pong" message when receiving a "Ping" message. Therefore, current client will only handle "Binary" message and "Closed" message.
```rust=
pub(crate) async fn start_ws_clients(db: Arc<Database>, subscribed_nodes: Vec<String>) {
// start connection to all subscribed nodes
'nodes: for node_ip in subscribed_nodes {
// Parse RPC url
let connect_addr = format!("ws://{}/subscribe_to_transaction_events", node_ip);
let url = match Url::parse(&connect_addr) {
Ok(url) => url,
Err(_) => {
ErrorLogInfo::UrlParseFailure.log_error(format!("for node {}", node_ip));
continue 'nodes;
}
};
// Connect to the server
let ws_stream = match connect_async(url).await {
Ok((ws_stream, _)) => ws_stream,
Err(_) => {
ErrorLogInfo::ConnectionFailure.log_error(format!("for node {}", node_ip));
continue 'nodes;
}
};
log::debug!("WebSocket handshake to {} has been successfully completed", node_ip);
let (mut ws_write, mut ws_read) = ws_stream.split();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let mut event_rx = UnboundedReceiverStream::new(event_rx);
let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL));
let db_stream = Arc::clone(&db);
let db_channel = Arc::clone(&db);
let node_ip_cloned = node_ip.clone();
tokio::spawn(async move {
// handle incoming WebSocket messages and send a Ping message periodically every heartbeat.
'websocket: loop {
tokio::select! {
Some(Ok(message)) = ws_read.next() => {
if let Message::Binary(data) = message {
let websocket_data: WebSocketData = match Deserializable::deserialize(data.as_slice()) {
Ok(result) => result,
Err(_) => {
ErrorLogInfo::DeserialiseMessageFailure.log_error(format!("for node {}", node_ip));
continue 'websocket;
}
};
log::info!("Receive event from the server: {:?}", websocket_data.tx_hash);
// forward the websocket data to channel
match event_tx.send(websocket_data) {
Ok(result) => result,
Err(_) => {
ErrorLogInfo::SendToChannel.log_error(format!("for node {}", node_ip));
}
};
} else if let Message::Close(_) = message {
match handle_close_connection(db_stream.clone()) {
Ok(result) => result,
Err(e) => {
ErrorLogInfo::SQLiteError.log_error(format!("{} for node {}", e, node_ip));
}
};
break 'websocket;
} else if message.is_pong() {
log::info!("Received the pong message from the server.");
}
},
_ = interval.tick() => {
match ws_write.send(Message::Ping(vec![])).await{
Ok(result) => result,
Err(_) => {
ErrorLogInfo::SendPing.log_error(format!("for node {}", node_ip));
}
};
}
}
}
});
// Spwan thread for Event inserter
tokio::spawn(async move {
while let Some(websocket_data) = event_rx.next().await{
match handle_event(db_channel.clone(), websocket_data.tx_hash, websocket_data.event) {
Ok(result) => result,
Err(e) => {
ErrorLogInfo::SQLiteError.log_error(format!("{} for node {}", e, node_ip_cloned))
}
}
}
});
}
}
#[derive(BorshSerialize, BorshDeserialize)]
struct WebSocketData {
pub(crate) tx_hash: Sha256Hash,
pub(crate) event: CourierEvent,
}
```
## 2. `fullnode_id` inside the `CourierEvent` table
```sql=
CREATE TABLE IF NOT EXISTS CourierEvent(
id INTEGER PRIMARY KEY,
tx_hash BLOB NOT NULL,
fullnode_id TEXT NOT NULL,
event BLOB NOT NULL,
reason BLOB,
FOREIGN KEY(tx_hash) REFERENCES DispatchedTransaction(hash) ON DELETE CASCADE
);
```
what is the specific `fullnode_id`, shall I use the "node_ip" address directly?
## 3. Endpoint `POST /submit_transaction`
Does this endpoint simply call fullnode's POST /submit_transaction?