---
tags: ethereum, epf
---
# EPF - Fifth Update
Since my last update I've been working on a minimal implementation of a consensus layer peer-to-peer networking stack.
It can be found [here](https://github.com/brech1/cl-p2p-setup).
In this update I’d like to make an overview on its current state, the resources I used and the next steps.
## Discovery
- [File](https://github.com/brech1/cl-p2p-setup/blob/master/src/discovery.rs)
Following the consensus layer specifications, the peer discovery service implements the [Node Discovery Protocol v5](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md). I’m using the [Sigma Prime](https://github.com/sigp/discv5) rust implementation.
This is a simplified code snippet for the service constructor:
```rust
pub async fn new(local_key: &Keypair) -> Self {
// Setup socket
let listen_socket = "0.0.0.0:9000".parse::<SocketAddr>().unwrap();
// Build ENR
let enr_key: CombinedKey = key_from_libp2p(&local_key).unwrap();
let local_enr = build_enr(&enr_key);
// Setup config
let config = Discv5ConfigBuilder::new().build();
// Create discv5 instance
let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config).unwrap();
// Add bootnode
let ef_bootnode_enr = Enr::from_str(BOOTNODE).unwrap();
discv5.add_enr(ef_bootnode_enr).expect("bootnode error");
// Start
discv5.start(listen_socket).await.unwrap();
// Obtain an event stream
let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream()));
return Self {
discv5,
enr: local_enr,
event_stream,
};
}
```
The constructor receives the libp2p `Keypair` and parses it to a `CombinedKey` before building the enr with the default listen sockets:
```rust
pub fn build_enr(combined_key: &CombinedKey) -> Enr {
let mut enr_builder = enr::EnrBuilder::new("v4");
enr_builder.ip("0.0.0.0".parse().unwrap());
enr_builder.udp4(9000);
enr_builder.tcp4(9000);
enr_builder.build(combined_key).unwrap()
}
```
Once the enr is built, we create the default config for the discv5 service and proceed to spin up an instance of it. We add a single bootnode, although we could add multiple ones, and start the service. With the service running we should also obtain the event stream, important for implementing polling.
So far we have a running discovery service on its own, but we should integrate this with the libp2p networking stack. The first step is to implement the `swarm::NetworkBehaviour` trait on our struct. This allows us to use the `Discovery` struct in the defined `Behaviour`.
```rust
impl NetworkBehaviour for Discovery {
type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler;
type OutEvent = DiscoveredPeers;
fn new_handler(&mut self) -> Self::ConnectionHandler {
libp2p::swarm::dummy::ConnectionHandler {}
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut peer_address: Vec<Multiaddr> = Vec::new();
if let Some(address) = self.multiaddr_map.get(peer_id) {
peer_address.push(address.clone());
}
return peer_address;
}
fn poll(
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if !self.started {
self.started = true;
self.find_peers();
return Poll::Pending;
}
if let Some(dp) = self.get_peers(cx) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp));
};
match self.event_stream {
EventStream::Awaiting(ref mut fut) => {
if let Poll::Ready(event_stream) = fut.poll_unpin(cx) {
match event_stream {
Ok(stream) => {
println!("Discv5 event stream ready");
self.event_stream = EventStream::Present(stream);
}
Err(_) => {
println!("Discv5 event stream failed");
self.event_stream = EventStream::InActive;
}
}
}
}
EventStream::InActive => {}
EventStream::Present(ref mut stream) => {
while let Poll::Ready(Some(event)) = stream.poll_recv(cx) {
match event {
Discv5Event::SessionEstablished(enr, _) => {
println!("Session Established: {:?}", enr);
}
_ => (),
}
}
}
}
Poll::Pending
}
}
```
Not every method of the trait is implemented, but these will be sufficient for our current needs.
The `addresses_of_peer` method is used by libp2p when dialing peers. Peers are identified by a `PeerId` but that’s not useful for dialing. Whenever we add a new peer, it’s important to store the peer `Multiaddr` so we can later implement a `PeerId -> Multiaddr` map.
The `poll` method is the main driver of the behaviour. At first there’s a check to start a single peer discovery query. This should be later changed to correctly implement a query manager.
If the query futures have finalized and returned a result, we return the discovered peers wrapped in `Poll::Ready` and `NetworkBehaviourAction::GenerateEvent` . We’ll later see how this is processed by the libp2p swarm.
The last thing I’d like to review about the discovery is the `find_peers` associated method.
```rust
fn find_peers(&mut self) {
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
Box::new(move |enr: &Enr| enr.tcp4().is_some() && enr.udp4().is_some());
let target = NodeId::random();
let peers_enr = self.discv5.find_node_predicate(target, predicate, 16);
self.peers_future.push(Box::pin(peers_enr));
}
```
Discv5 allows us to find new peers based on a predicate. This is a great feature, since we can filter which peers we want to establish a connection with. We setup a random `NodeId` as a target since there isn’t a specific node we want to connect to.
The `find_node_predicate` methods returns a `Future` , that is pushed to a futures `Vec` to poll until the query is finalized.
## Network
- [File](https://github.com/brech1/cl-p2p-setup/blob/master/src/main.rs)
I based the libp2p setup on the [gossipsub chat](https://github.com/libp2p/rust-libp2p/blob/master/examples/gossipsub-chat.rs) example available on the rust-libp2p repository.
However, I’m using tokio instead of async-std. Libp2p has a built-in tokio based TCP transport with both Yamux and Mplex for substream multiplexing. This is great since it’s the same config detailed in the CL p2p specifications.
It’s now easy to spin up the discovery service thanks to all our previous work. We just have to send a reference to our `secp256k1` Keypair.
```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a random PeerId
let local_key = identity::Keypair::generate_secp256k1();
let local_peer_id = PeerId::from(local_key.public());
// Set up an encrypted DNS-enabled TCP Transport over the Mplex protocol.
let transport = libp2p::tokio_development_transport(local_key.clone())?;
let discovery = Discovery::new(&local_key).await;
// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
.max_transmit_size(10 * 1_048_576)
.fanout_ttl(Duration::from_secs(60))
.heartbeat_interval(Duration::from_millis(10_000))
.validation_mode(ValidationMode::Anonymous)
.fanout_ttl(Duration::from_secs(60))
.history_length(12)
.max_messages_per_rpc(Some(500))
.build()
.expect("Valid config");
// build a gossipsub network behaviour
let mut gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gossipsub_config)
.expect("Correct configuration");
// We create a custom network behaviour that combines Gossipsub and Discv5.
#[derive(NetworkBehaviour)]
struct Behaviour {
gossipsub: Gossipsub,
discovery: Discovery,
}
let behaviour = {
Behaviour {
gossipsub,
discovery,
}
};
// Create a Swarm to manage peers and events
let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id)
.notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero"))
.connection_event_buffer_size(64)
.connection_limits(ConnectionLimits::default())
.build();
// Listen
swarm.listen_on("/ip4/0.0.0.0/tcp/9000".parse()?)?;
// Loop
// ..
}
```
For the configuration, the validation mode should be set to `ValidationMode::Anonymous` and the privacy argument for the constructor should also be `MessageAuthenticity::Anonymous`.
We create the custom behaviour with Gossipsub and our Discv5 implementation, and we derive the `swarm::NetworkBehaviour` trait on it. Now it’s possible to create our Swarm, with a tokio executor, and listen on the correct socket.
And we’re running. The last step is to loop through the swarm executor and act based on the events. I only implemented the `add_explicit_peer` method for our discovered peers, this will dial them if they’re not already connected.
```rust
loop {
tokio::select! {
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
BehaviourEvent::Gossipsub(ev) => println!("Gossipsub: {ev:?}"),
BehaviourEvent::Discovery(discovered) => {
for (peer_id, _multiaddr) in discovered.peers {
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
},
_ => println!("Swarm: {event:?}"),
}
}
}
```
## Next Steps
There are two main topics to continue this work on
### Chain Data
Consensus Layer peers will drop the connection if the connecting peer ENR [`eth2`](https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#eth2-field) field doesn't hold the correct (equal) fork data, generated from the chain specs, such as these for [Bellatrix](https://ethereum.org/en/history/#bellatrix):
```
BELLATRIX_FORK_VERSION = 0x02000000
BELLATRIX_FORK_EPOCH = 144896
```
But it's not as easy as adding a simple struct! This should be SSZ encoded and parsed, more details in the next update!
### Message Processing
Incoming `gossipsub` messages are [SSZ](https://ethereum.org/en/developers/docs/data-structures-and-encoding/ssz/) encoded and compressed using [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)). So once the connections are stable, the next step is implementing the correct processing for the messages.