---
tags: ethereum, epf
---
# EPF - Seventh Update
In my last update, I discussed the construction of fork data from fork parameters and the implementation of SSZ decoding on GossipSub messages. I also mentioned an issue with bad peer score that caused connection shutdowns due to the lack of Req/Res Protocol implementation. This is the main topic of this update.
This is the PR concerning this feature:
- [https://github.com/brech1/cl-p2p-setup/pull/1](https://github.com/brech1/cl-p2p-setup/pull/1)
# Req/Res Protocol
This protocol enables consensus layer peers to communicate directly in order to check liveness or request data, such as specific blocks. My implementation was based on the Lighthouse client, however it is not generic across multiple network specifications.
```rust
pub enum Protocol {
Status,
Goodbye,
BlocksByRange,
BlocksByRoot,
Ping,
MetaData,
}
```
I have decided not to include the `BlocksByRange` and `BlocksByRoot` methods in this implementation, as they do not impact peer scoring.
The supported protocol messages are:
```rust
pub struct StatusMessage {
pub fork_digest: [u8; 4],
pub finalized_root: Hash256,
pub finalized_epoch: u64,
pub head_root: Hash256,
pub head_slot: u64,
}
pub struct Ping {
pub data: u64,
}
pub struct MetaData {
pub seq_number: u64,
}
```
It can be seen that the `StatusMessage` holds block data. Currently I’m not storing any data so it won’t be possible to build this info, I have two options:
- Request this data to a peer and reply with the same parameters. This is hacky and involves trusting the other peer, so I wouldn’t like to go that way.
- Continue working on the helios codebase with access to block data that would enable to construct this response
It’s also possible that I might be missing something and this could be done in other way.
The `RPC` struct consists of a vector of the events taking place in the behavior.
```rust
pub struct RPC<Id: ReqId> {
events: Vec<NetworkBehaviourAction<RPCMessage<Id>, RPCHandler<Id>>>,
}
impl<Id> NetworkBehaviour for RPC<Id>
where
Id: ReqId,
{
type ConnectionHandler = RPCHandler<Id>;
type OutEvent = RPCMessage<Id>;
fn new_handler(&mut self) -> Self::ConnectionHandler {
RPCHandler::new(SubstreamProtocol::new(RPCProtocol {}, ()))
}
fn inject_event(
&mut self,
peer_id: PeerId,
conn_id: ConnectionId,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
} else {
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}));
}
}
fn poll(
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if !self.events.is_empty() {
return Poll::Ready(self.events.remove(0));
}
Poll::Pending
}
}
```
The implementation of the `NetworkBehaviour` trait for our `RPC` struct is relatively simple. The challenge resides in the `ConnectionHandler` which in our case is a `RPCHandler<Id>` , that deals with the events.
```rust
pub struct RPCHandler<Id> {
listen_protocol: SubstreamProtocol<RPCProtocol, ()>,
events_out: SmallVec<[HandlerEvent<Id>; 4]>,
// Dialing
dial_queue: SmallVec<[(Id, OutboundRequest); 4]>,
dial_negotiated: u32,
// Inbound
inbound_substreams: FnvHashMap<SubstreamId, InboundInfo>,
inbound_substreams_delay: DelayQueue<SubstreamId>,
// Outbound
outbound_substreams: FnvHashMap<SubstreamId, OutboundInfo<Id>>,
outbound_substreams_delay: DelayQueue<SubstreamId>,
// Substream IDs
current_inbound_substream_id: SubstreamId,
current_outbound_substream_id: SubstreamId,
// Config
max_dial_negotiated: u32,
// State
state: HandlerState,
outbound_io_error_retries: u8,
waker: Option<std::task::Waker>,
}
```
This struct has to implement the [ConnectionHandler](https://docs.rs/libp2p/latest/libp2p/swarm/trait.ConnectionHandler.html) trait in order to be used as such in the `NetworkBehaviour`.
Below is an overview of the implementation of this trait for this struct:
```rust
impl<Id> ConnectionHandler for RPCHandler<Id>
where
Id: ReqId,
{
type InEvent = RPCSend<Id>;
type OutEvent = HandlerEvent<Id>;
type Error = RPCError;
type InboundProtocol = RPCProtocol;
type OutboundProtocol = OutboundRequestContainer;
type InboundOpenInfo = ();
type OutboundOpenInfo = (Id, OutboundRequest);
// Returns the Substream Protocol to be negotiated
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
}
// Returns until when the connection should be kept alive.
fn connection_keep_alive(&self) -> KeepAlive {
}
// Adds a negotiated raw inbound substream
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) {
}
// Adds a negotiated raw outbound substream
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
request_info: Self::OutboundOpenInfo,
) {
}
// Injects an event to the queue
fn inject_event(&mut self, rpc_event: Self::InEvent) {
match rpc_event {
RPCSend::Request(id, req) => self.send_request(id, req),
RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response),
}
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
}
// Handles negotiation errors
fn inject_dial_upgrade_error(
&mut self,
request_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}
// Main function that polls the event queues
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<
libp2p::swarm::ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
}
}
```
The encoding for the inbound and outbound streams using the `tokio_util` crates is implemented as a separated module.
```rust
// The inbound codec for the RPC protocol.
pub enum InboundCodec {
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec>),
}
impl Encoder<RPCCodedResponse> for InboundCodec {
type Error = RPCError;
fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
}
}
impl Decoder for InboundCodec {
type Item = InboundRequest;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
InboundCodec::SSZSnappy(codec) => codec.decode(src),
}
}
}
// The outbound codec for the RPC protocol.
pub enum OutboundCodec {
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec>),
}
impl Encoder<OutboundRequest> for OutboundCodec {
type Error = RPCError;
fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
}
}
impl Decoder for OutboundCodec {
type Item = RPCCodedResponse;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
OutboundCodec::SSZSnappy(codec) => codec.decode(src),
}
}
}
```
As it can be seen there is a lot going on when dealing with raw streams of data, although we're lucky that the protocols have already been negotiated once they get here.
# New Behavior
The addition of this new protocol to the current network behavior was the most straightforward part.
```rust
let rpc: RPC<RequestId<()>> = RPC::new();
#[derive(NetworkBehaviour)]
struct Behaviour<AppReqId: ReqId> {
gossipsub: Gossipsub<SnappyTransform, AllowAllSubscriptionFilter>,
discovery: Discovery,
rpc: RPC<RequestId<AppReqId>>,
}
let behaviour = {
Behaviour {
gossipsub,
discovery,
rpc,
}
};
```
# Conclusion
I’m really amazed about the tools and flexibility that libp2p provides for one to build their own custom protocol on top of it.
I’m currently testing this locally and I am able to receive the RPC requests correctly. However I’m still not handling the responses correctly and keep getting shutdown.
For my next steps is the creation of the response data and the proper handling of the requests.
Until then!