Try   HackMD

EPF - Seventh Update

In my last update, I discussed the construction of fork data from fork parameters and the implementation of SSZ decoding on GossipSub messages. I also mentioned an issue with bad peer score that caused connection shutdowns due to the lack of Req/Res Protocol implementation. This is the main topic of this update.

This is the PR concerning this feature:

Req/Res Protocol

This protocol enables consensus layer peers to communicate directly in order to check liveness or request data, such as specific blocks. My implementation was based on the Lighthouse client, however it is not generic across multiple network specifications.

pub enum Protocol {
    Status,
    Goodbye,
    BlocksByRange,
    BlocksByRoot,
    Ping,
    MetaData,
}

I have decided not to include the BlocksByRange and BlocksByRoot methods in this implementation, as they do not impact peer scoring.

The supported protocol messages are:

pub struct StatusMessage {
    pub fork_digest: [u8; 4],
    pub finalized_root: Hash256,
    pub finalized_epoch: u64,
    pub head_root: Hash256,
    pub head_slot: u64,
}

pub struct Ping {
    pub data: u64,
}

pub struct MetaData {
    pub seq_number: u64,
}

It can be seen that the StatusMessage holds block data. Currently I’m not storing any data so it won’t be possible to build this info, I have two options:

  • Request this data to a peer and reply with the same parameters. This is hacky and involves trusting the other peer, so I wouldn’t like to go that way.
  • Continue working on the helios codebase with access to block data that would enable to construct this response

It’s also possible that I might be missing something and this could be done in other way.

The RPC struct consists of a vector of the events taking place in the behavior.

pub struct RPC<Id: ReqId> {
    events: Vec<NetworkBehaviourAction<RPCMessage<Id>, RPCHandler<Id>>>,
}

impl<Id> NetworkBehaviour for RPC<Id>
where
    Id: ReqId,
{
    type ConnectionHandler = RPCHandler<Id>;
    type OutEvent = RPCMessage<Id>;

    fn new_handler(&mut self) -> Self::ConnectionHandler {
        RPCHandler::new(SubstreamProtocol::new(RPCProtocol {}, ()))
    }

    fn inject_event(
        &mut self,
        peer_id: PeerId,
        conn_id: ConnectionId,
        event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
    ) {
        if let Ok(RPCReceived::Request(ref id, ref req)) = event {
            self.events
                .push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
                    peer_id,
                    conn_id,
                    event,
                }))
        } else {
            self.events
                .push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
                    peer_id,
                    conn_id,
                    event,
                }));
        }
    }

    fn poll(
        &mut self,
        cx: &mut Context,
        _: &mut impl PollParameters,
    ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
        if !self.events.is_empty() {
            return Poll::Ready(self.events.remove(0));
        }
        Poll::Pending
    }
}

The implementation of the NetworkBehaviour trait for our RPC struct is relatively simple. The challenge resides in the ConnectionHandler which in our case is a RPCHandler<Id> , that deals with the events.

pub struct RPCHandler<Id> {
    listen_protocol: SubstreamProtocol<RPCProtocol, ()>,
    events_out: SmallVec<[HandlerEvent<Id>; 4]>,
    // Dialing
    dial_queue: SmallVec<[(Id, OutboundRequest); 4]>,
    dial_negotiated: u32,
    // Inbound
    inbound_substreams: FnvHashMap<SubstreamId, InboundInfo>,
    inbound_substreams_delay: DelayQueue<SubstreamId>,
    // Outbound
    outbound_substreams: FnvHashMap<SubstreamId, OutboundInfo<Id>>,
    outbound_substreams_delay: DelayQueue<SubstreamId>,
    // Substream IDs
    current_inbound_substream_id: SubstreamId,
    current_outbound_substream_id: SubstreamId,
    // Config
    max_dial_negotiated: u32,
    // State
    state: HandlerState,
    outbound_io_error_retries: u8,
    waker: Option<std::task::Waker>,
}

This struct has to implement the ConnectionHandler trait in order to be used as such in the NetworkBehaviour.
Below is an overview of the implementation of this trait for this struct:

impl<Id> ConnectionHandler for RPCHandler<Id>
where
    Id: ReqId,
{
    type InEvent = RPCSend<Id>;
    type OutEvent = HandlerEvent<Id>;
    type Error = RPCError;
    type InboundProtocol = RPCProtocol;
    type OutboundProtocol = OutboundRequestContainer;
    type InboundOpenInfo = ();
    type OutboundOpenInfo = (Id, OutboundRequest);

    // Returns the Substream Protocol to be negotiated
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
    }
		
    // Returns until when the connection should be kept alive.
    fn connection_keep_alive(&self) -> KeepAlive {
		}
		
    // Adds a negotiated raw inbound substream
    fn inject_fully_negotiated_inbound(
        &mut self,
        substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
        _info: Self::InboundOpenInfo,
    ) {
    }
		
    // Adds a negotiated raw outbound substream
    fn inject_fully_negotiated_outbound(
        &mut self,
        out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
        request_info: Self::OutboundOpenInfo,
    ) {
    }

    // Injects an event to the queue
    fn inject_event(&mut self, rpc_event: Self::InEvent) {
				match rpc_event {
            RPCSend::Request(id, req) => self.send_request(id, req),
            RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response),
        }

        if let Some(waker) = &self.waker {
            waker.wake_by_ref();
        }
		}
		
    // Handles negotiation errors
    fn inject_dial_upgrade_error(
        &mut self,
        request_info: Self::OutboundOpenInfo,
        error: ConnectionHandlerUpgrErr<
            <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
        >,
    ) {
    }

    // Main function that polls the event queues
    fn poll(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<
        libp2p::swarm::ConnectionHandlerEvent<
            Self::OutboundProtocol,
            Self::OutboundOpenInfo,
            Self::OutEvent,
            Self::Error,
        >,
    > {
    }
}

The encoding for the inbound and outbound streams using the tokio_util crates is implemented as a separated module.

// The inbound codec for the RPC protocol.
pub enum InboundCodec {
    SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec>),
}

impl Encoder<RPCCodedResponse> for InboundCodec {
    type Error = RPCError;

    fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
        match self {
            InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
        }
    }
}

impl Decoder for InboundCodec {
    type Item = InboundRequest;
    type Error = RPCError;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        match self {
            InboundCodec::SSZSnappy(codec) => codec.decode(src),
        }
    }
}

// The outbound codec for the RPC protocol.
pub enum OutboundCodec {
    SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec>),
}

impl Encoder<OutboundRequest> for OutboundCodec {
    type Error = RPCError;

    fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
        match self {
            OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
        }
    }
}

impl Decoder for OutboundCodec {
    type Item = RPCCodedResponse;
    type Error = RPCError;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        match self {
            OutboundCodec::SSZSnappy(codec) => codec.decode(src),
        }
    }
}

As it can be seen there is a lot going on when dealing with raw streams of data, although we're lucky that the protocols have already been negotiated once they get here.

New Behavior

The addition of this new protocol to the current network behavior was the most straightforward part.

let rpc: RPC<RequestId<()>> = RPC::new();

#[derive(NetworkBehaviour)]
struct Behaviour<AppReqId: ReqId> {
    gossipsub: Gossipsub<SnappyTransform, AllowAllSubscriptionFilter>,
    discovery: Discovery,
    rpc: RPC<RequestId<AppReqId>>,
}

let behaviour = {
    Behaviour {
        gossipsub,
        discovery,
        rpc,
    }
};

Conclusion

I’m really amazed about the tools and flexibility that libp2p provides for one to build their own custom protocol on top of it.

I’m currently testing this locally and I am able to receive the RPC requests correctly. However I’m still not handling the responses correctly and keep getting shutdown.

For my next steps is the creation of the response data and the proper handling of the requests.

Until then!