From 103300c880f54c53aca570d4c428f204159338d9 Mon Sep 17 00:00:00 2001 From: divma Date: Mon, 25 May 2020 23:24:38 -0500 Subject: [PATCH] Custom net behaviour (#1122) * expand NetworkBehaviour derive * add handler placeholder * add dummy custom handler wrapping the select * cleanup behaviour's expanded impl of NetworkBehaviour * cleanup behaviour with macro * add missing function and clean with macros * add custom InEvent for Behaviour's handler * cleanup * replace InboundProtocol with a "custom" one * add a delegating handler to put the encapsulate the noice * partially implement poll for handler * partially implement poll for handler * cleanup * Remove warnings before merge Co-authored-by: Age Manning --- .../src/behaviour/handler/delegate.rs | 474 ++++++++++++++++++ .../eth2-libp2p/src/behaviour/handler/mod.rs | 141 ++++++ .../src/{behaviour.rs => behaviour/mod.rs} | 233 +++++++-- 3 files changed, 814 insertions(+), 34 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs create mode 100644 beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs rename beacon_node/eth2-libp2p/src/{behaviour.rs => behaviour/mod.rs} (72%) diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs new file mode 100644 index 000000000..2ab10ee93 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs @@ -0,0 +1,474 @@ +use crate::discovery::Discovery; +use crate::rpc::*; +use libp2p::{ + core::either::{EitherError, EitherOutput}, + core::upgrade::{EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade, UpgradeError}, + gossipsub::Gossipsub, + identify::Identify, + swarm::{ + protocols_handler::{ + KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, + }, + NegotiatedSubstream, NetworkBehaviour, ProtocolsHandler, + }, +}; +use std::task::{Context, Poll}; +use types::EthSpec; + +/* Auxiliary types for simplicity */ +type GossipHandler = ::ProtocolsHandler; +type RPCHandler = as NetworkBehaviour>::ProtocolsHandler; +type IdentifyHandler = ::ProtocolsHandler; +type DiscoveryHandler = as NetworkBehaviour>::ProtocolsHandler; + +/// Handler that combines Lighthouse's Behaviours' handlers in a delegating manner. +pub(super) struct DelegatingHandler { + /// Handler for the Gossipsub protocol. + gossip_handler: GossipHandler, + /// Handler for the RPC protocol. + rpc_handler: RPCHandler, + /// Handler for the Identify protocol. + identify_handler: IdentifyHandler, + /// Handler for the Discovery protocol. + discovery_handler: DiscoveryHandler, +} + +impl DelegatingHandler { + pub fn new( + gossipsub: &mut Gossipsub, + rpc: &mut RPC, + identify: &mut Identify, + discovery: &mut Discovery, + ) -> Self { + DelegatingHandler { + gossip_handler: gossipsub.new_handler(), + rpc_handler: rpc.new_handler(), + identify_handler: identify.new_handler(), + discovery_handler: discovery.new_handler(), + } + } + + /// Gives access to the gossipsub handler. + pub fn _gossip_mut(&mut self) -> &mut GossipHandler { + &mut self.gossip_handler + } + + /// Gives mutable access to the rpc handler. + pub fn _rpc_mut(&mut self) -> &mut RPCHandler { + &mut self.rpc_handler + } + + /// Gives mutable access to identify's handler. + pub fn _identify_mut(&mut self) -> &mut IdentifyHandler { + &mut self.identify_handler + } + + /// Gives mutable access to discovery's handler. + pub fn _discovery_mut(&mut self) -> &mut DiscoveryHandler { + &mut self.discovery_handler + } + + /// Gives access to the gossipsub handler. + pub fn _gossip(&self) -> &GossipHandler { + &self.gossip_handler + } + + /// Gives access to the rpc handler. + pub fn _rpc(&self) -> &RPCHandler { + &self.rpc_handler + } + + /// Gives access to identify's handler. + pub fn _identify(&self) -> &IdentifyHandler { + &self.identify_handler + } + + /// Gives access to discovery's handler. + pub fn _discovery(&self) -> &DiscoveryHandler { + &self.discovery_handler + } +} + +// TODO: this can all be created with macros + +/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers. +/// Simply delegated to the corresponding behaviour's handler. +#[derive(Debug, Clone)] +pub enum DelegateIn { + Gossipsub(::InEvent), + RPC( as ProtocolsHandler>::InEvent), + Identify(::InEvent), + Discovery( as ProtocolsHandler>::InEvent), +} + +/// Wrapper around the `ProtocolsHandler::OutEvent` types of the handlers. +/// Simply delegated to the corresponding behaviour's handler. +pub enum DelegateOut { + Gossipsub(::OutEvent), + RPC( as ProtocolsHandler>::OutEvent), + Identify(::OutEvent), + Discovery( as ProtocolsHandler>::OutEvent), +} + +/// Wrapper around the `ProtocolsHandler::Error` types of the handlers. +/// Simply delegated to the corresponding behaviour's handler. +#[derive(Debug)] +pub enum DelegateError { + Gossipsub(::Error), + RPC( as ProtocolsHandler>::Error), + Identify(::Error), + Discovery( as ProtocolsHandler>::Error), +} + +impl std::error::Error for DelegateError {} + +impl std::fmt::Display for DelegateError { + fn fmt( + &self, + formater: &mut std::fmt::Formatter<'_>, + ) -> std::result::Result<(), std::fmt::Error> { + match self { + DelegateError::Gossipsub(err) => err.fmt(formater), + DelegateError::RPC(err) => err.fmt(formater), + DelegateError::Identify(err) => err.fmt(formater), + DelegateError::Discovery(err) => err.fmt(formater), + } + } +} + +pub type DelegateInProto = SelectUpgrade< + ::InboundProtocol, + SelectUpgrade< + as ProtocolsHandler>::InboundProtocol, + SelectUpgrade< + ::InboundProtocol, + as ProtocolsHandler>::InboundProtocol, + >, + >, +>; + +pub type DelegateOutProto = EitherUpgrade< + ::OutboundProtocol, + EitherUpgrade< + as ProtocolsHandler>::OutboundProtocol, + EitherUpgrade< + ::OutboundProtocol, + as ProtocolsHandler>::OutboundProtocol, + >, + >, +>; + +// TODO: prob make this an enum +pub type DelegateOutInfo = EitherOutput< + ::OutboundOpenInfo, + EitherOutput< + as ProtocolsHandler>::OutboundOpenInfo, + EitherOutput< + ::OutboundOpenInfo, + as ProtocolsHandler>::OutboundOpenInfo, + >, + >, +>; + +impl ProtocolsHandler for DelegatingHandler { + type InEvent = DelegateIn; + type OutEvent = DelegateOut; + type Error = DelegateError; + type InboundProtocol = DelegateInProto; + type OutboundProtocol = DelegateOutProto; + type OutboundOpenInfo = DelegateOutInfo; + + fn listen_protocol(&self) -> SubstreamProtocol { + let gossip_proto = self.gossip_handler.listen_protocol(); + let rpc_proto = self.rpc_handler.listen_protocol(); + let identify_proto = self.identify_handler.listen_protocol(); + let discovery_proto = self.discovery_handler.listen_protocol(); + + let timeout = gossip_proto + .timeout() + .max(rpc_proto.timeout()) + .max(identify_proto.timeout()) + .max(discovery_proto.timeout()) + .clone(); + + let select = SelectUpgrade::new( + gossip_proto.into_upgrade().1, + SelectUpgrade::new( + rpc_proto.into_upgrade().1, + SelectUpgrade::new( + identify_proto.into_upgrade().1, + discovery_proto.into_upgrade().1, + ), + ), + ); + + SubstreamProtocol::new(select).with_timeout(timeout) + } + + fn inject_fully_negotiated_inbound( + &mut self, + out: >::Output, + ) { + match out { + // Gossipsub + EitherOutput::First(out) => self.gossip_handler.inject_fully_negotiated_inbound(out), + // RPC + EitherOutput::Second(EitherOutput::First(out)) => { + self.rpc_handler.inject_fully_negotiated_inbound(out) + } + // Identify + EitherOutput::Second(EitherOutput::Second(EitherOutput::First(out))) => { + self.identify_handler.inject_fully_negotiated_inbound(out) + } + // Discovery + EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(out))) => { + self.discovery_handler.inject_fully_negotiated_inbound(out) + } + } + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol: >::Output, + info: Self::OutboundOpenInfo, + ) { + match (protocol, info) { + // Gossipsub + (EitherOutput::First(protocol), EitherOutput::First(info)) => self + .gossip_handler + .inject_fully_negotiated_outbound(protocol, info), + // RPC + ( + EitherOutput::Second(EitherOutput::First(protocol)), + EitherOutput::Second(EitherOutput::First(info)), + ) => self + .rpc_handler + .inject_fully_negotiated_outbound(protocol, info), + // Identify + ( + EitherOutput::Second(EitherOutput::Second(EitherOutput::First(protocol))), + EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))), + ) => self + .identify_handler + .inject_fully_negotiated_outbound(protocol, info), + // Discovery + ( + EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(protocol))), + EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))), + ) => self + .discovery_handler + .inject_fully_negotiated_outbound(protocol, info), + // Reaching here means we got a protocol and info for different behaviours + _ => unreachable!("output and protocol don't match"), + } + } + + fn inject_event(&mut self, event: Self::InEvent) { + match event { + DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev), + DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev), + DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev), + DelegateIn::Discovery(ev) => self.discovery_handler.inject_event(ev), + } + } + + fn inject_dial_upgrade_error( + &mut self, + info: Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + // TODO: find how to clean up + match info { + // Gossipsub + EitherOutput::First(info) => match error { + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { + self.gossip_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + ) + } + ProtocolsHandlerUpgrErr::Timer => self + .gossip_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer), + ProtocolsHandlerUpgrErr::Timeout => self + .gossip_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))) => { + self.gossip_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + ) + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { + unreachable!("info and error don't match") + } + }, + // RPC + EitherOutput::Second(EitherOutput::First(info)) => match error { + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { + self.rpc_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + ) + } + ProtocolsHandlerUpgrErr::Timer => self + .rpc_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer), + ProtocolsHandlerUpgrErr::Timeout => self + .rpc_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( + EitherError::A(err), + ))) => self.rpc_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + ), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { + unreachable!("info and error don't match") + } + }, + // Identify + EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))) => match error { + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { + self.identify_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + ) + } + ProtocolsHandlerUpgrErr::Timer => self + .identify_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer), + ProtocolsHandlerUpgrErr::Timeout => self + .identify_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( + EitherError::B(EitherError::A(err)), + ))) => self.identify_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + ), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { + unreachable!("info and error don't match") + } + }, + // Discovery + EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))) => match error { + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { + self.discovery_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + ) + } + ProtocolsHandlerUpgrErr::Timer => self + .discovery_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer), + ProtocolsHandlerUpgrErr::Timeout => self + .discovery_handler + .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( + EitherError::B(EitherError::B(err)), + ))) => self.discovery_handler.inject_dial_upgrade_error( + info, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + ), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { + unreachable!("info and error don't match") + } + }, + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.gossip_handler + .connection_keep_alive() + .max(self.rpc_handler.connection_keep_alive()) + .max(self.identify_handler.connection_keep_alive()) + .max(self.discovery_handler.connection_keep_alive()) + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + match self.gossip_handler.poll(cx) { + Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Gossipsub(event))); + } + Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Gossipsub( + event, + ))); + } + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.map_upgrade(|u| EitherUpgrade::A(u)), + info: EitherOutput::First(info), + }); + } + Poll::Pending => (), + }; + + match self.rpc_handler.poll(cx) { + Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::RPC(event))); + } + Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::RPC(event))); + } + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::A(u))), + info: EitherOutput::Second(EitherOutput::First(info)), + }); + } + Poll::Pending => (), + }; + + match self.identify_handler.poll(cx) { + Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Identify(event))); + } + Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Identify(event))); + } + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol + .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::A(u)))), + info: EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))), + }); + } + Poll::Pending => (), + }; + + match self.discovery_handler.poll(cx) { + Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Discovery(event))); + } + Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Discovery( + event, + ))); + } + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol + .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::B(u)))), + info: EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))), + }); + } + Poll::Pending => (), + }; + + Poll::Pending + } +} diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs new file mode 100644 index 000000000..772a428fa --- /dev/null +++ b/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs @@ -0,0 +1,141 @@ +use crate::discovery::Discovery; +use crate::rpc::*; +use delegate::DelegatingHandler; +pub(super) use delegate::{ + DelegateError, DelegateIn, DelegateInProto, DelegateOut, DelegateOutInfo, DelegateOutProto, +}; +use libp2p::{ + core::upgrade::{InboundUpgrade, OutboundUpgrade}, + gossipsub::Gossipsub, + identify::Identify, + swarm::protocols_handler::{ + KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, + }, + swarm::{NegotiatedSubstream, ProtocolsHandler}, +}; +use std::task::{Context, Poll}; +use types::EthSpec; + +mod delegate; + +/// Handler that combines Lighthouse's Behaviours' handlers in a delegating manner. +pub struct BehaviourHandler { + /// Handler combining all sub behaviour's handlers. + delegate: DelegatingHandler, + /// Keep alive for this handler. + keep_alive: KeepAlive, +} + +impl BehaviourHandler { + pub fn new( + gossipsub: &mut Gossipsub, + rpc: &mut RPC, + identify: &mut Identify, + discovery: &mut Discovery, + ) -> Self { + BehaviourHandler { + delegate: DelegatingHandler::new(gossipsub, rpc, identify, discovery), + keep_alive: KeepAlive::Yes, + } + } +} + +#[derive(Clone)] +pub enum BehaviourHandlerIn { + Delegate(DelegateIn), + // TODO: replace custom with incoming events + Custom, +} + +pub enum BehaviourHandlerOut { + Delegate(DelegateOut), + // TODO: replace custom with events to send + Custom, +} + +impl ProtocolsHandler for BehaviourHandler { + type InEvent = BehaviourHandlerIn; + type OutEvent = BehaviourHandlerOut; + type Error = DelegateError; + type InboundProtocol = DelegateInProto; + type OutboundProtocol = DelegateOutProto; + type OutboundOpenInfo = DelegateOutInfo; + + fn listen_protocol(&self) -> SubstreamProtocol { + self.delegate.listen_protocol() + } + + fn inject_fully_negotiated_inbound( + &mut self, + out: >::Output, + ) { + self.delegate.inject_fully_negotiated_inbound(out) + } + + fn inject_fully_negotiated_outbound( + &mut self, + out: >::Output, + info: Self::OutboundOpenInfo, + ) { + self.delegate.inject_fully_negotiated_outbound(out, info) + } + + fn inject_event(&mut self, event: Self::InEvent) { + match event { + BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev), + /* Events comming from the behaviour */ + BehaviourHandlerIn::Custom => { + // TODO: implement + } + } + } + + fn inject_dial_upgrade_error( + &mut self, + info: Self::OutboundOpenInfo, + err: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + self.delegate.inject_dial_upgrade_error(info, err) + } + + fn connection_keep_alive(&self) -> KeepAlive { + // TODO: refine this logic + self.keep_alive.min(self.delegate.connection_keep_alive()) + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + match self.delegate.poll(cx) { + Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom( + BehaviourHandlerOut::Delegate(event), + )) + } + Poll::Ready(ProtocolsHandlerEvent::Close(err)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(err)) + } + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol, + info, + }); + } + Poll::Pending => (), + } + + Poll::Pending + + // TODO: speak to our behaviour here + } +} diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour/mod.rs similarity index 72% rename from beacon_node/eth2-libp2p/src/behaviour.rs rename to beacon_node/eth2-libp2p/src/behaviour/mod.rs index 03990a2ee..3316364ca 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour/mod.rs @@ -5,12 +5,19 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use discv5::Discv5Event; use futures::prelude::*; +use handler::{BehaviourHandler, BehaviourHandlerIn, BehaviourHandlerOut, DelegateIn, DelegateOut}; use libp2p::{ - core::identity::Keypair, + core::{ + connection::{ConnectedPoint, ConnectionId, ListenerId}, + identity::Keypair, + Multiaddr, + }, gossipsub::{Gossipsub, GossipsubEvent, MessageId}, identify::{Identify, IdentifyEvent}, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, PeerId, + swarm::{ + NetworkBehaviour, NetworkBehaviourAction as NBAction, PollParameters, ProtocolsHandler, + }, + PeerId, }; use lru::LruCache; use slog::{crit, debug, o}; @@ -21,13 +28,13 @@ use std::{ }; use types::{EnrForkId, EthSpec, SubnetId}; +mod handler; + const MAX_IDENTIFY_ADDRESSES: usize = 10; /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, @@ -40,32 +47,202 @@ pub struct Behaviour { /// Discovery behaviour. discovery: Discovery, /// The peer manager that keeps track of peer's reputation and status. - #[behaviour(ignore)] peer_manager: PeerManager, /// The events generated by this behaviour to be consumed in the swarm poll. - #[behaviour(ignore)] events: Vec>, + // TODO: add events to send to the handler /// The current meta data of the node, so respond to pings and get metadata - #[behaviour(ignore)] meta_data: MetaData, /// A cache of recently seen gossip messages. This is used to filter out any possible /// duplicates that may still be seen over gossipsub. - #[behaviour(ignore)] // TODO: Remove this seen_gossip_messages: LruCache, /// A collections of variables accessible outside the network service. - #[behaviour(ignore)] network_globals: Arc>, - #[behaviour(ignore)] /// Keeps track of the current EnrForkId for upgrading gossipsub topics. // NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick // lookups for every gossipsub message send. enr_fork_id: EnrForkId, - #[behaviour(ignore)] /// Logger for behaviour actions. log: slog::Logger, } +/// Calls the given function with the given args on all sub behaviours. +macro_rules! delegate_to_behaviours { + ($self: ident, $fn: ident, $($arg: ident), *) => { + $self.gossipsub.$fn($($arg),*); + $self.eth2_rpc.$fn($($arg),*); + $self.identify.$fn($($arg),*); + $self.discovery.$fn($($arg),*); + }; +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = BehaviourHandler; + type OutEvent = BehaviourEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + BehaviourHandler::new( + &mut self.gossipsub, + &mut self.eth2_rpc, + &mut self.identify, + &mut self.discovery, + ) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + let mut out = Vec::new(); + out.extend(self.gossipsub.addresses_of_peer(peer_id)); + out.extend(self.eth2_rpc.addresses_of_peer(peer_id)); + out.extend(self.identify.addresses_of_peer(peer_id)); + out.extend(self.discovery.addresses_of_peer(peer_id)); + out + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + delegate_to_behaviours!(self, inject_connected, peer_id); + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + delegate_to_behaviours!(self, inject_disconnected, peer_id); + } + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + conn_id: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + delegate_to_behaviours!( + self, + inject_connection_established, + peer_id, + conn_id, + endpoint + ); + } + + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + conn_id: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error, + ) { + delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error); + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + delegate_to_behaviours!(self, inject_dial_failure, peer_id); + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + delegate_to_behaviours!(self, inject_new_listen_addr, addr); + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + delegate_to_behaviours!(self, inject_expired_listen_addr, addr); + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + delegate_to_behaviours!(self, inject_new_external_addr, addr); + } + + fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { + delegate_to_behaviours!(self, inject_listener_error, id, err); + } + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { + delegate_to_behaviours!(self, inject_listener_closed, id, reason); + } + + fn inject_event( + &mut self, + peer_id: PeerId, + conn_id: ConnectionId, + event: ::OutEvent, + ) { + match event { + // Events comming from the handler, redirected to each behaviour + BehaviourHandlerOut::Delegate(delegate) => match delegate { + DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), + DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), + DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, ev), + DelegateOut::Discovery(ev) => self.discovery.inject_event(peer_id, conn_id, ev), + }, + /* Custom events sent BY the handler */ + BehaviourHandlerOut::Custom => { + // TODO: implement + } + } + } + + fn poll( + &mut self, + cx: &mut Context, + poll_params: &mut impl PollParameters, + ) -> Poll::InEvent, Self::OutEvent>> { + // TODO: move where it's less distracting + macro_rules! poll_behaviour { + /* $behaviour: The sub-behaviour being polled. + * $on_event_fn: Function to call if we get an event from the sub-behaviour. + * $notify_handler_event_closure: Closure mapping the received event type to + * the one that the handler should get. + */ + ($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => { + loop { + // poll the sub-behaviour + match self.$behaviour.poll(cx, poll_params) { + Poll::Ready(action) => match action { + // call the designated function to handle the event from sub-behaviour + NBAction::GenerateEvent(event) => self.$on_event_fn(event), + NBAction::DialAddress { address } => { + return Poll::Ready(NBAction::DialAddress { address }) + } + NBAction::DialPeer { peer_id, condition } => { + return Poll::Ready(NBAction::DialPeer { peer_id, condition }) + } + NBAction::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(NBAction::NotifyHandler { + peer_id, + handler, + // call the closure mapping the received event to the needed one + // in order to notify the handler + event: BehaviourHandlerIn::Delegate( + $notify_handler_event_closure(event), + ), + }); + } + NBAction::ReportObservedAddr { address } => { + return Poll::Ready(NBAction::ReportObservedAddr { address }) + } + }, + Poll::Pending => break, + } + } + }; + } + + poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub); + poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC); + poll_behaviour!(identify, on_identify_event, DelegateIn::Identify); + poll_behaviour!(discovery, on_discovery_event, DelegateIn::Discovery); + + self.custom_poll(cx) + } +} + /// Implements the combined behaviour for the libp2p service. impl Behaviour { pub fn new( @@ -370,11 +547,8 @@ impl Behaviour { } } */ -} -// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour -impl NetworkBehaviourEventProcess for Behaviour { - fn inject_event(&mut self, event: GossipsubEvent) { + fn on_gossip_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message(propagation_source, id, gs_msg) => { // Note: We are keeping track here of the peer that sent us the message, not the @@ -412,10 +586,8 @@ impl NetworkBehaviourEventProcess for Behaviour< GossipsubEvent::Unsubscribed { .. } => {} } } -} -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, message: RPCMessage) { + fn on_rpc_event(&mut self, message: RPCMessage) { let peer_id = message.peer_id; // The METADATA and PING RPC responses are handled within the behaviour and not // propagated @@ -457,15 +629,12 @@ impl NetworkBehaviourEventProcess> for Behavio } } } -} -impl Behaviour { /// Consumes the events list when polled. - fn poll( + fn custom_poll( &mut self, cx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { // check the peer manager for events loop { match self.peer_manager.poll_next_unpin(cx) { @@ -473,9 +642,9 @@ impl Behaviour { PeerManagerEvent::Status(peer_id) => { // it's time to status. We don't keep a beacon chain reference here, so we inform // the network to send a status to this peer - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - BehaviourEvent::StatusPeer(peer_id), - )); + return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::StatusPeer( + peer_id, + ))); } PeerManagerEvent::Ping(peer_id) => { // send a ping request to this peer @@ -497,15 +666,13 @@ impl Behaviour { } if !self.events.is_empty() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + return Poll::Ready(NBAction::GenerateEvent(self.events.remove(0))); } Poll::Pending } -} -impl NetworkBehaviourEventProcess for Behaviour { - fn inject_event(&mut self, event: IdentifyEvent) { + fn on_identify_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Received { peer_id, @@ -534,10 +701,8 @@ impl NetworkBehaviourEventProcess for Behaviour {} } } -} -impl NetworkBehaviourEventProcess for Behaviour { - fn inject_event(&mut self, _event: Discv5Event) { + fn on_discovery_event(&mut self, _event: Discv5Event) { // discv5 has no events to inject } }