Try   HackMD

EPF - Fifth Update

Since my last update I've been working on a minimal implementation of a consensus layer peer-to-peer networking stack.

It can be found here.

In this update I’d like to make an overview on its current state, the resources I used and the next steps.

Discovery

Following the consensus layer specifications, the peer discovery service implements the Node Discovery Protocol v5. I’m using the Sigma Prime rust implementation.

This is a simplified code snippet for the service constructor:

pub async fn new(local_key: &Keypair) -> Self {
    // Setup socket
    let listen_socket = "0.0.0.0:9000".parse::<SocketAddr>().unwrap();

    // Build ENR
    let enr_key: CombinedKey = key_from_libp2p(&local_key).unwrap();

    let local_enr = build_enr(&enr_key);

    // Setup config
    let config = Discv5ConfigBuilder::new().build();

    // Create discv5 instance
    let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config).unwrap();

    // Add bootnode
    let ef_bootnode_enr = Enr::from_str(BOOTNODE).unwrap();
    discv5.add_enr(ef_bootnode_enr).expect("bootnode error");

    // Start
    discv5.start(listen_socket).await.unwrap();

    // Obtain an event stream
    let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream()));

    return Self {
        discv5,
        enr: local_enr,
        event_stream,
    };
}

The constructor receives the libp2p Keypair and parses it to a CombinedKey before building the enr with the default listen sockets:

pub fn build_enr(combined_key: &CombinedKey) -> Enr {
    let mut enr_builder = enr::EnrBuilder::new("v4");

    enr_builder.ip("0.0.0.0".parse().unwrap());

    enr_builder.udp4(9000);

    enr_builder.tcp4(9000);

    enr_builder.build(combined_key).unwrap()
}

Once the enr is built, we create the default config for the discv5 service and proceed to spin up an instance of it. We add a single bootnode, although we could add multiple ones, and start the service. With the service running we should also obtain the event stream, important for implementing polling.

So far we have a running discovery service on its own, but we should integrate this with the libp2p networking stack. The first step is to implement the swarm::NetworkBehaviour trait on our struct. This allows us to use the Discovery struct in the defined Behaviour.

impl NetworkBehaviour for Discovery {
    type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler;
    type OutEvent = DiscoveredPeers;

    fn new_handler(&mut self) -> Self::ConnectionHandler {
        libp2p::swarm::dummy::ConnectionHandler {}
    }

    fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
        let mut peer_address: Vec<Multiaddr> = Vec::new();

        if let Some(address) = self.multiaddr_map.get(peer_id) {
            peer_address.push(address.clone());
        }

        return peer_address;
    }

    fn poll(
        &mut self,
        cx: &mut Context,
        _: &mut impl PollParameters,
    ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
        if !self.started {
            self.started = true;
            self.find_peers();

            return Poll::Pending;
        }

        if let Some(dp) = self.get_peers(cx) {
            return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp));
        };

        match self.event_stream {
            EventStream::Awaiting(ref mut fut) => {
                if let Poll::Ready(event_stream) = fut.poll_unpin(cx) {
                    match event_stream {
                        Ok(stream) => {
                            println!("Discv5 event stream ready");
                            self.event_stream = EventStream::Present(stream);
                        }
                        Err(_) => {
                            println!("Discv5 event stream failed");
                            self.event_stream = EventStream::InActive;
                        }
                    }
                }
            }
            EventStream::InActive => {}
            EventStream::Present(ref mut stream) => {
                while let Poll::Ready(Some(event)) = stream.poll_recv(cx) {
                    match event {
                        Discv5Event::SessionEstablished(enr, _) => {
                            println!("Session Established: {:?}", enr);
                        }
                        _ => (),
                    }
                }
            }
        }
        Poll::Pending
    }
}

Not every method of the trait is implemented, but these will be sufficient for our current needs.

The addresses_of_peer method is used by libp2p when dialing peers. Peers are identified by a PeerId but that’s not useful for dialing. Whenever we add a new peer, it’s important to store the peer Multiaddr so we can later implement a PeerId -> Multiaddr map.

The poll method is the main driver of the behaviour. At first there’s a check to start a single peer discovery query. This should be later changed to correctly implement a query manager.

If the query futures have finalized and returned a result, we return the discovered peers wrapped in Poll::Ready and NetworkBehaviourAction::GenerateEvent . We’ll later see how this is processed by the libp2p swarm.

The last thing I’d like to review about the discovery is the find_peers associated method.

fn find_peers(&mut self) {
    let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
        Box::new(move |enr: &Enr| enr.tcp4().is_some() && enr.udp4().is_some());

    let target = NodeId::random();

    let peers_enr = self.discv5.find_node_predicate(target, predicate, 16);

    self.peers_future.push(Box::pin(peers_enr));
}

Discv5 allows us to find new peers based on a predicate. This is a great feature, since we can filter which peers we want to establish a connection with. We setup a random NodeId as a target since there isn’t a specific node we want to connect to.

The find_node_predicate methods returns a Future , that is pushed to a futures Vec to poll until the query is finalized.

Network

I based the libp2p setup on the gossipsub chat example available on the rust-libp2p repository.

However, I’m using tokio instead of async-std. Libp2p has a built-in tokio based TCP transport with both Yamux and Mplex for substream multiplexing. This is great since it’s the same config detailed in the CL p2p specifications.

It’s now easy to spin up the discovery service thanks to all our previous work. We just have to send a reference to our secp256k1 Keypair.

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a random PeerId
    let local_key = identity::Keypair::generate_secp256k1();
    let local_peer_id = PeerId::from(local_key.public());

    // Set up an encrypted DNS-enabled TCP Transport over the Mplex protocol.
    let transport = libp2p::tokio_development_transport(local_key.clone())?;

    let discovery = Discovery::new(&local_key).await;

    // Set a custom gossipsub configuration
    let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
        .max_transmit_size(10 * 1_048_576)
        .fanout_ttl(Duration::from_secs(60))
        .heartbeat_interval(Duration::from_millis(10_000))
        .validation_mode(ValidationMode::Anonymous)
        .fanout_ttl(Duration::from_secs(60))
        .history_length(12)
        .max_messages_per_rpc(Some(500))
        .build()
        .expect("Valid config");

    // build a gossipsub network behaviour
    let mut gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gossipsub_config)
        .expect("Correct configuration");

    // We create a custom network behaviour that combines Gossipsub and Discv5.
    #[derive(NetworkBehaviour)]
    struct Behaviour {
        gossipsub: Gossipsub,
        discovery: Discovery,
    }

    let behaviour = {
        Behaviour {
            gossipsub,
            discovery,
        }
    };

    // Create a Swarm to manage peers and events
    let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id)
        .notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero"))
        .connection_event_buffer_size(64)
        .connection_limits(ConnectionLimits::default())
        .build();

    // Listen
    swarm.listen_on("/ip4/0.0.0.0/tcp/9000".parse()?)?;

    // Loop
    // ..
}

For the configuration, the validation mode should be set to ValidationMode::Anonymous and the privacy argument for the constructor should also be MessageAuthenticity::Anonymous.

We create the custom behaviour with Gossipsub and our Discv5 implementation, and we derive the swarm::NetworkBehaviour trait on it. Now it’s possible to create our Swarm, with a tokio executor, and listen on the correct socket.

And we’re running. The last step is to loop through the swarm executor and act based on the events. I only implemented the add_explicit_peer method for our discovered peers, this will dial them if they’re not already connected.

loop {
    tokio::select! {
        event = swarm.select_next_some() => match event {
            SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
                BehaviourEvent::Gossipsub(ev) => println!("Gossipsub: {ev:?}"),
                BehaviourEvent::Discovery(discovered) => {
                    for (peer_id, _multiaddr) in discovered.peers {
                        swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
                    }
                },
            },
            _ => println!("Swarm: {event:?}"),
        }
    }
}

Next Steps

There are two main topics to continue this work on

Chain Data

Consensus Layer peers will drop the connection if the connecting peer ENR eth2 field doesn't hold the correct (equal) fork data, generated from the chain specs, such as these for Bellatrix:

BELLATRIX_FORK_VERSION = 0x02000000
BELLATRIX_FORK_EPOCH = 144896

But it's not as easy as adding a simple struct! This should be SSZ encoded and parsed, more details in the next update!

Message Processing

Incoming gossipsub messages are SSZ encoded and compressed using Snappy. So once the connections are stable, the next step is implementing the correct processing for the messages.