--- tags: ethereum, epf --- # EPF - Seventh Update In my last update, I discussed the construction of fork data from fork parameters and the implementation of SSZ decoding on GossipSub messages. I also mentioned an issue with bad peer score that caused connection shutdowns due to the lack of Req/Res Protocol implementation. This is the main topic of this update. This is the PR concerning this feature: - [https://github.com/brech1/cl-p2p-setup/pull/1](https://github.com/brech1/cl-p2p-setup/pull/1) # Req/Res Protocol This protocol enables consensus layer peers to communicate directly in order to check liveness or request data, such as specific blocks. My implementation was based on the Lighthouse client, however it is not generic across multiple network specifications. ```rust pub enum Protocol { Status, Goodbye, BlocksByRange, BlocksByRoot, Ping, MetaData, } ``` I have decided not to include the `BlocksByRange` and `BlocksByRoot` methods in this implementation, as they do not impact peer scoring. The supported protocol messages are: ```rust pub struct StatusMessage { pub fork_digest: [u8; 4], pub finalized_root: Hash256, pub finalized_epoch: u64, pub head_root: Hash256, pub head_slot: u64, } pub struct Ping { pub data: u64, } pub struct MetaData { pub seq_number: u64, } ``` It can be seen that the `StatusMessage` holds block data. Currently I’m not storing any data so it won’t be possible to build this info, I have two options: - Request this data to a peer and reply with the same parameters. This is hacky and involves trusting the other peer, so I wouldn’t like to go that way. - Continue working on the helios codebase with access to block data that would enable to construct this response It’s also possible that I might be missing something and this could be done in other way. The `RPC` struct consists of a vector of the events taking place in the behavior. ```rust pub struct RPC<Id: ReqId> { events: Vec<NetworkBehaviourAction<RPCMessage<Id>, RPCHandler<Id>>>, } impl<Id> NetworkBehaviour for RPC<Id> where Id: ReqId, { type ConnectionHandler = RPCHandler<Id>; type OutEvent = RPCMessage<Id>; fn new_handler(&mut self) -> Self::ConnectionHandler { RPCHandler::new(SubstreamProtocol::new(RPCProtocol {}, ())) } fn inject_event( &mut self, peer_id: PeerId, conn_id: ConnectionId, event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent, ) { if let Ok(RPCReceived::Request(ref id, ref req)) = event { self.events .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { peer_id, conn_id, event, })) } else { self.events .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { peer_id, conn_id, event, })); } } fn poll( &mut self, cx: &mut Context, _: &mut impl PollParameters, ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { if !self.events.is_empty() { return Poll::Ready(self.events.remove(0)); } Poll::Pending } } ``` The implementation of the `NetworkBehaviour` trait for our `RPC` struct is relatively simple. The challenge resides in the `ConnectionHandler` which in our case is a `RPCHandler<Id>` , that deals with the events. ```rust pub struct RPCHandler<Id> { listen_protocol: SubstreamProtocol<RPCProtocol, ()>, events_out: SmallVec<[HandlerEvent<Id>; 4]>, // Dialing dial_queue: SmallVec<[(Id, OutboundRequest); 4]>, dial_negotiated: u32, // Inbound inbound_substreams: FnvHashMap<SubstreamId, InboundInfo>, inbound_substreams_delay: DelayQueue<SubstreamId>, // Outbound outbound_substreams: FnvHashMap<SubstreamId, OutboundInfo<Id>>, outbound_substreams_delay: DelayQueue<SubstreamId>, // Substream IDs current_inbound_substream_id: SubstreamId, current_outbound_substream_id: SubstreamId, // Config max_dial_negotiated: u32, // State state: HandlerState, outbound_io_error_retries: u8, waker: Option<std::task::Waker>, } ``` This struct has to implement the [ConnectionHandler](https://docs.rs/libp2p/latest/libp2p/swarm/trait.ConnectionHandler.html) trait in order to be used as such in the `NetworkBehaviour`. Below is an overview of the implementation of this trait for this struct: ```rust impl<Id> ConnectionHandler for RPCHandler<Id> where Id: ReqId, { type InEvent = RPCSend<Id>; type OutEvent = HandlerEvent<Id>; type Error = RPCError; type InboundProtocol = RPCProtocol; type OutboundProtocol = OutboundRequestContainer; type InboundOpenInfo = (); type OutboundOpenInfo = (Id, OutboundRequest); // Returns the Substream Protocol to be negotiated fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> { } // Returns until when the connection should be kept alive. fn connection_keep_alive(&self) -> KeepAlive { } // Adds a negotiated raw inbound substream fn inject_fully_negotiated_inbound( &mut self, substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, _info: Self::InboundOpenInfo, ) { } // Adds a negotiated raw outbound substream fn inject_fully_negotiated_outbound( &mut self, out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, request_info: Self::OutboundOpenInfo, ) { } // Injects an event to the queue fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { RPCSend::Request(id, req) => self.send_request(id, req), RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response), } if let Some(waker) = &self.waker { waker.wake_by_ref(); } } // Handles negotiation errors fn inject_dial_upgrade_error( &mut self, request_info: Self::OutboundOpenInfo, error: ConnectionHandlerUpgrErr< <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error, >, ) { } // Main function that polls the event queues fn poll( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll< libp2p::swarm::ConnectionHandlerEvent< Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error, >, > { } } ``` The encoding for the inbound and outbound streams using the `tokio_util` crates is implemented as a separated module. ```rust // The inbound codec for the RPC protocol. pub enum InboundCodec { SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec>), } impl Encoder<RPCCodedResponse> for InboundCodec { type Error = RPCError; fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } } } impl Decoder for InboundCodec { type Item = InboundRequest; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { match self { InboundCodec::SSZSnappy(codec) => codec.decode(src), } } } // The outbound codec for the RPC protocol. pub enum OutboundCodec { SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec>), } impl Encoder<OutboundRequest> for OutboundCodec { type Error = RPCError; fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } } } impl Decoder for OutboundCodec { type Item = RPCCodedResponse; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { match self { OutboundCodec::SSZSnappy(codec) => codec.decode(src), } } } ``` As it can be seen there is a lot going on when dealing with raw streams of data, although we're lucky that the protocols have already been negotiated once they get here. # New Behavior The addition of this new protocol to the current network behavior was the most straightforward part. ```rust let rpc: RPC<RequestId<()>> = RPC::new(); #[derive(NetworkBehaviour)] struct Behaviour<AppReqId: ReqId> { gossipsub: Gossipsub<SnappyTransform, AllowAllSubscriptionFilter>, discovery: Discovery, rpc: RPC<RequestId<AppReqId>>, } let behaviour = { Behaviour { gossipsub, discovery, rpc, } }; ``` # Conclusion I’m really amazed about the tools and flexibility that libp2p provides for one to build their own custom protocol on top of it. I’m currently testing this locally and I am able to receive the RPC requests correctly. However I’m still not handling the responses correctly and keep getting shutdown. For my next steps is the creation of the response data and the proper handling of the requests. Until then!