Custom net behaviour (#1122)
* expand NetworkBehaviour derive * add handler placeholder * add dummy custom handler wrapping the select * cleanup behaviour's expanded impl of NetworkBehaviour * cleanup behaviour with macro * add missing function and clean with macros * add custom InEvent for Behaviour's handler * cleanup * replace InboundProtocol with a "custom" one * add a delegating handler to put the encapsulate the noice * partially implement poll for handler * partially implement poll for handler * cleanup * Remove warnings before merge Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
parent
3c52b5c58d
commit
103300c880
474
beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs
Normal file
474
beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs
Normal file
@ -0,0 +1,474 @@
|
|||||||
|
use crate::discovery::Discovery;
|
||||||
|
use crate::rpc::*;
|
||||||
|
use libp2p::{
|
||||||
|
core::either::{EitherError, EitherOutput},
|
||||||
|
core::upgrade::{EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade, UpgradeError},
|
||||||
|
gossipsub::Gossipsub,
|
||||||
|
identify::Identify,
|
||||||
|
swarm::{
|
||||||
|
protocols_handler::{
|
||||||
|
KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||||
|
},
|
||||||
|
NegotiatedSubstream, NetworkBehaviour, ProtocolsHandler,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use types::EthSpec;
|
||||||
|
|
||||||
|
/* Auxiliary types for simplicity */
|
||||||
|
type GossipHandler = <Gossipsub as NetworkBehaviour>::ProtocolsHandler;
|
||||||
|
type RPCHandler<TSpec> = <RPC<TSpec> as NetworkBehaviour>::ProtocolsHandler;
|
||||||
|
type IdentifyHandler = <Identify as NetworkBehaviour>::ProtocolsHandler;
|
||||||
|
type DiscoveryHandler<TSpec> = <Discovery<TSpec> as NetworkBehaviour>::ProtocolsHandler;
|
||||||
|
|
||||||
|
/// Handler that combines Lighthouse's Behaviours' handlers in a delegating manner.
|
||||||
|
pub(super) struct DelegatingHandler<TSpec: EthSpec> {
|
||||||
|
/// Handler for the Gossipsub protocol.
|
||||||
|
gossip_handler: GossipHandler,
|
||||||
|
/// Handler for the RPC protocol.
|
||||||
|
rpc_handler: RPCHandler<TSpec>,
|
||||||
|
/// Handler for the Identify protocol.
|
||||||
|
identify_handler: IdentifyHandler,
|
||||||
|
/// Handler for the Discovery protocol.
|
||||||
|
discovery_handler: DiscoveryHandler<TSpec>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> DelegatingHandler<TSpec> {
|
||||||
|
pub fn new(
|
||||||
|
gossipsub: &mut Gossipsub,
|
||||||
|
rpc: &mut RPC<TSpec>,
|
||||||
|
identify: &mut Identify,
|
||||||
|
discovery: &mut Discovery<TSpec>,
|
||||||
|
) -> Self {
|
||||||
|
DelegatingHandler {
|
||||||
|
gossip_handler: gossipsub.new_handler(),
|
||||||
|
rpc_handler: rpc.new_handler(),
|
||||||
|
identify_handler: identify.new_handler(),
|
||||||
|
discovery_handler: discovery.new_handler(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives access to the gossipsub handler.
|
||||||
|
pub fn _gossip_mut(&mut self) -> &mut GossipHandler {
|
||||||
|
&mut self.gossip_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives mutable access to the rpc handler.
|
||||||
|
pub fn _rpc_mut(&mut self) -> &mut RPCHandler<TSpec> {
|
||||||
|
&mut self.rpc_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives mutable access to identify's handler.
|
||||||
|
pub fn _identify_mut(&mut self) -> &mut IdentifyHandler {
|
||||||
|
&mut self.identify_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives mutable access to discovery's handler.
|
||||||
|
pub fn _discovery_mut(&mut self) -> &mut DiscoveryHandler<TSpec> {
|
||||||
|
&mut self.discovery_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives access to the gossipsub handler.
|
||||||
|
pub fn _gossip(&self) -> &GossipHandler {
|
||||||
|
&self.gossip_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives access to the rpc handler.
|
||||||
|
pub fn _rpc(&self) -> &RPCHandler<TSpec> {
|
||||||
|
&self.rpc_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives access to identify's handler.
|
||||||
|
pub fn _identify(&self) -> &IdentifyHandler {
|
||||||
|
&self.identify_handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gives access to discovery's handler.
|
||||||
|
pub fn _discovery(&self) -> &DiscoveryHandler<TSpec> {
|
||||||
|
&self.discovery_handler
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: this can all be created with macros
|
||||||
|
|
||||||
|
/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers.
|
||||||
|
/// Simply delegated to the corresponding behaviour's handler.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum DelegateIn<TSpec: EthSpec> {
|
||||||
|
Gossipsub(<GossipHandler as ProtocolsHandler>::InEvent),
|
||||||
|
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::InEvent),
|
||||||
|
Identify(<IdentifyHandler as ProtocolsHandler>::InEvent),
|
||||||
|
Discovery(<DiscoveryHandler<TSpec> as ProtocolsHandler>::InEvent),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper around the `ProtocolsHandler::OutEvent` types of the handlers.
|
||||||
|
/// Simply delegated to the corresponding behaviour's handler.
|
||||||
|
pub enum DelegateOut<TSpec: EthSpec> {
|
||||||
|
Gossipsub(<GossipHandler as ProtocolsHandler>::OutEvent),
|
||||||
|
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::OutEvent),
|
||||||
|
Identify(<IdentifyHandler as ProtocolsHandler>::OutEvent),
|
||||||
|
Discovery(<DiscoveryHandler<TSpec> as ProtocolsHandler>::OutEvent),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper around the `ProtocolsHandler::Error` types of the handlers.
|
||||||
|
/// Simply delegated to the corresponding behaviour's handler.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DelegateError<TSpec: EthSpec> {
|
||||||
|
Gossipsub(<GossipHandler as ProtocolsHandler>::Error),
|
||||||
|
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::Error),
|
||||||
|
Identify(<IdentifyHandler as ProtocolsHandler>::Error),
|
||||||
|
Discovery(<DiscoveryHandler<TSpec> as ProtocolsHandler>::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> std::error::Error for DelegateError<TSpec> {}
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> std::fmt::Display for DelegateError<TSpec> {
|
||||||
|
fn fmt(
|
||||||
|
&self,
|
||||||
|
formater: &mut std::fmt::Formatter<'_>,
|
||||||
|
) -> std::result::Result<(), std::fmt::Error> {
|
||||||
|
match self {
|
||||||
|
DelegateError::Gossipsub(err) => err.fmt(formater),
|
||||||
|
DelegateError::RPC(err) => err.fmt(formater),
|
||||||
|
DelegateError::Identify(err) => err.fmt(formater),
|
||||||
|
DelegateError::Discovery(err) => err.fmt(formater),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type DelegateInProto<TSpec> = SelectUpgrade<
|
||||||
|
<GossipHandler as ProtocolsHandler>::InboundProtocol,
|
||||||
|
SelectUpgrade<
|
||||||
|
<RPCHandler<TSpec> as ProtocolsHandler>::InboundProtocol,
|
||||||
|
SelectUpgrade<
|
||||||
|
<IdentifyHandler as ProtocolsHandler>::InboundProtocol,
|
||||||
|
<DiscoveryHandler<TSpec> as ProtocolsHandler>::InboundProtocol,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
pub type DelegateOutProto<TSpec> = EitherUpgrade<
|
||||||
|
<GossipHandler as ProtocolsHandler>::OutboundProtocol,
|
||||||
|
EitherUpgrade<
|
||||||
|
<RPCHandler<TSpec> as ProtocolsHandler>::OutboundProtocol,
|
||||||
|
EitherUpgrade<
|
||||||
|
<IdentifyHandler as ProtocolsHandler>::OutboundProtocol,
|
||||||
|
<DiscoveryHandler<TSpec> as ProtocolsHandler>::OutboundProtocol,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
// TODO: prob make this an enum
|
||||||
|
pub type DelegateOutInfo<TSpec> = EitherOutput<
|
||||||
|
<GossipHandler as ProtocolsHandler>::OutboundOpenInfo,
|
||||||
|
EitherOutput<
|
||||||
|
<RPCHandler<TSpec> as ProtocolsHandler>::OutboundOpenInfo,
|
||||||
|
EitherOutput<
|
||||||
|
<IdentifyHandler as ProtocolsHandler>::OutboundOpenInfo,
|
||||||
|
<DiscoveryHandler<TSpec> as ProtocolsHandler>::OutboundOpenInfo,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
|
||||||
|
type InEvent = DelegateIn<TSpec>;
|
||||||
|
type OutEvent = DelegateOut<TSpec>;
|
||||||
|
type Error = DelegateError<TSpec>;
|
||||||
|
type InboundProtocol = DelegateInProto<TSpec>;
|
||||||
|
type OutboundProtocol = DelegateOutProto<TSpec>;
|
||||||
|
type OutboundOpenInfo = DelegateOutInfo<TSpec>;
|
||||||
|
|
||||||
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
|
let gossip_proto = self.gossip_handler.listen_protocol();
|
||||||
|
let rpc_proto = self.rpc_handler.listen_protocol();
|
||||||
|
let identify_proto = self.identify_handler.listen_protocol();
|
||||||
|
let discovery_proto = self.discovery_handler.listen_protocol();
|
||||||
|
|
||||||
|
let timeout = gossip_proto
|
||||||
|
.timeout()
|
||||||
|
.max(rpc_proto.timeout())
|
||||||
|
.max(identify_proto.timeout())
|
||||||
|
.max(discovery_proto.timeout())
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let select = SelectUpgrade::new(
|
||||||
|
gossip_proto.into_upgrade().1,
|
||||||
|
SelectUpgrade::new(
|
||||||
|
rpc_proto.into_upgrade().1,
|
||||||
|
SelectUpgrade::new(
|
||||||
|
identify_proto.into_upgrade().1,
|
||||||
|
discovery_proto.into_upgrade().1,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
SubstreamProtocol::new(select).with_timeout(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_inbound(
|
||||||
|
&mut self,
|
||||||
|
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
) {
|
||||||
|
match out {
|
||||||
|
// Gossipsub
|
||||||
|
EitherOutput::First(out) => self.gossip_handler.inject_fully_negotiated_inbound(out),
|
||||||
|
// RPC
|
||||||
|
EitherOutput::Second(EitherOutput::First(out)) => {
|
||||||
|
self.rpc_handler.inject_fully_negotiated_inbound(out)
|
||||||
|
}
|
||||||
|
// Identify
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::First(out))) => {
|
||||||
|
self.identify_handler.inject_fully_negotiated_inbound(out)
|
||||||
|
}
|
||||||
|
// Discovery
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(out))) => {
|
||||||
|
self.discovery_handler.inject_fully_negotiated_inbound(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_outbound(
|
||||||
|
&mut self,
|
||||||
|
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
info: Self::OutboundOpenInfo,
|
||||||
|
) {
|
||||||
|
match (protocol, info) {
|
||||||
|
// Gossipsub
|
||||||
|
(EitherOutput::First(protocol), EitherOutput::First(info)) => self
|
||||||
|
.gossip_handler
|
||||||
|
.inject_fully_negotiated_outbound(protocol, info),
|
||||||
|
// RPC
|
||||||
|
(
|
||||||
|
EitherOutput::Second(EitherOutput::First(protocol)),
|
||||||
|
EitherOutput::Second(EitherOutput::First(info)),
|
||||||
|
) => self
|
||||||
|
.rpc_handler
|
||||||
|
.inject_fully_negotiated_outbound(protocol, info),
|
||||||
|
// Identify
|
||||||
|
(
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::First(protocol))),
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))),
|
||||||
|
) => self
|
||||||
|
.identify_handler
|
||||||
|
.inject_fully_negotiated_outbound(protocol, info),
|
||||||
|
// Discovery
|
||||||
|
(
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(protocol))),
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))),
|
||||||
|
) => self
|
||||||
|
.discovery_handler
|
||||||
|
.inject_fully_negotiated_outbound(protocol, info),
|
||||||
|
// Reaching here means we got a protocol and info for different behaviours
|
||||||
|
_ => unreachable!("output and protocol don't match"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_event(&mut self, event: Self::InEvent) {
|
||||||
|
match event {
|
||||||
|
DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev),
|
||||||
|
DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev),
|
||||||
|
DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev),
|
||||||
|
DelegateIn::Discovery(ev) => self.discovery_handler.inject_event(ev),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_dial_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
info: Self::OutboundOpenInfo,
|
||||||
|
error: ProtocolsHandlerUpgrErr<
|
||||||
|
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
// TODO: find how to clean up
|
||||||
|
match info {
|
||||||
|
// Gossipsub
|
||||||
|
EitherOutput::First(info) => match error {
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
|
||||||
|
self.gossip_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => self
|
||||||
|
.gossip_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer),
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => self
|
||||||
|
.gossip_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))) => {
|
||||||
|
self.gossip_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => {
|
||||||
|
unreachable!("info and error don't match")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// RPC
|
||||||
|
EitherOutput::Second(EitherOutput::First(info)) => match error {
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
|
||||||
|
self.rpc_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => self
|
||||||
|
.rpc_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer),
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => self
|
||||||
|
.rpc_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(
|
||||||
|
EitherError::A(err),
|
||||||
|
))) => self.rpc_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
|
||||||
|
),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => {
|
||||||
|
unreachable!("info and error don't match")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Identify
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))) => match error {
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
|
||||||
|
self.identify_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => self
|
||||||
|
.identify_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer),
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => self
|
||||||
|
.identify_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(
|
||||||
|
EitherError::B(EitherError::A(err)),
|
||||||
|
))) => self.identify_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
|
||||||
|
),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => {
|
||||||
|
unreachable!("info and error don't match")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Discovery
|
||||||
|
EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))) => match error {
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
|
||||||
|
self.discovery_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => self
|
||||||
|
.discovery_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer),
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => self
|
||||||
|
.discovery_handler
|
||||||
|
.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(
|
||||||
|
EitherError::B(EitherError::B(err)),
|
||||||
|
))) => self.discovery_handler.inject_dial_upgrade_error(
|
||||||
|
info,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
|
||||||
|
),
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => {
|
||||||
|
unreachable!("info and error don't match")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
|
self.gossip_handler
|
||||||
|
.connection_keep_alive()
|
||||||
|
.max(self.rpc_handler.connection_keep_alive())
|
||||||
|
.max(self.identify_handler.connection_keep_alive())
|
||||||
|
.max(self.discovery_handler.connection_keep_alive())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<
|
||||||
|
ProtocolsHandlerEvent<
|
||||||
|
Self::OutboundProtocol,
|
||||||
|
Self::OutboundOpenInfo,
|
||||||
|
Self::OutEvent,
|
||||||
|
Self::Error,
|
||||||
|
>,
|
||||||
|
> {
|
||||||
|
match self.gossip_handler.poll(cx) {
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Gossipsub(event)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Gossipsub(
|
||||||
|
event,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
protocol: protocol.map_upgrade(|u| EitherUpgrade::A(u)),
|
||||||
|
info: EitherOutput::First(info),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Poll::Pending => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.rpc_handler.poll(cx) {
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::RPC(event)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::RPC(event)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
protocol: protocol.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::A(u))),
|
||||||
|
info: EitherOutput::Second(EitherOutput::First(info)),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Poll::Pending => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.identify_handler.poll(cx) {
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Identify(event)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Identify(event)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
protocol: protocol
|
||||||
|
.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::A(u)))),
|
||||||
|
info: EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Poll::Pending => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.discovery_handler.poll(cx) {
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Discovery(event)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Discovery(
|
||||||
|
event,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
protocol: protocol
|
||||||
|
.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::B(u)))),
|
||||||
|
info: EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Poll::Pending => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
141
beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs
Normal file
141
beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
use crate::discovery::Discovery;
|
||||||
|
use crate::rpc::*;
|
||||||
|
use delegate::DelegatingHandler;
|
||||||
|
pub(super) use delegate::{
|
||||||
|
DelegateError, DelegateIn, DelegateInProto, DelegateOut, DelegateOutInfo, DelegateOutProto,
|
||||||
|
};
|
||||||
|
use libp2p::{
|
||||||
|
core::upgrade::{InboundUpgrade, OutboundUpgrade},
|
||||||
|
gossipsub::Gossipsub,
|
||||||
|
identify::Identify,
|
||||||
|
swarm::protocols_handler::{
|
||||||
|
KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||||
|
},
|
||||||
|
swarm::{NegotiatedSubstream, ProtocolsHandler},
|
||||||
|
};
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use types::EthSpec;
|
||||||
|
|
||||||
|
mod delegate;
|
||||||
|
|
||||||
|
/// Handler that combines Lighthouse's Behaviours' handlers in a delegating manner.
|
||||||
|
pub struct BehaviourHandler<TSpec: EthSpec> {
|
||||||
|
/// Handler combining all sub behaviour's handlers.
|
||||||
|
delegate: DelegatingHandler<TSpec>,
|
||||||
|
/// Keep alive for this handler.
|
||||||
|
keep_alive: KeepAlive,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
|
||||||
|
pub fn new(
|
||||||
|
gossipsub: &mut Gossipsub,
|
||||||
|
rpc: &mut RPC<TSpec>,
|
||||||
|
identify: &mut Identify,
|
||||||
|
discovery: &mut Discovery<TSpec>,
|
||||||
|
) -> Self {
|
||||||
|
BehaviourHandler {
|
||||||
|
delegate: DelegatingHandler::new(gossipsub, rpc, identify, discovery),
|
||||||
|
keep_alive: KeepAlive::Yes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
|
||||||
|
Delegate(DelegateIn<TSpec>),
|
||||||
|
// TODO: replace custom with incoming events
|
||||||
|
Custom,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum BehaviourHandlerOut<TSpec: EthSpec> {
|
||||||
|
Delegate(DelegateOut<TSpec>),
|
||||||
|
// TODO: replace custom with events to send
|
||||||
|
Custom,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
|
||||||
|
type InEvent = BehaviourHandlerIn<TSpec>;
|
||||||
|
type OutEvent = BehaviourHandlerOut<TSpec>;
|
||||||
|
type Error = DelegateError<TSpec>;
|
||||||
|
type InboundProtocol = DelegateInProto<TSpec>;
|
||||||
|
type OutboundProtocol = DelegateOutProto<TSpec>;
|
||||||
|
type OutboundOpenInfo = DelegateOutInfo<TSpec>;
|
||||||
|
|
||||||
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
|
self.delegate.listen_protocol()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_inbound(
|
||||||
|
&mut self,
|
||||||
|
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
) {
|
||||||
|
self.delegate.inject_fully_negotiated_inbound(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_outbound(
|
||||||
|
&mut self,
|
||||||
|
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
info: Self::OutboundOpenInfo,
|
||||||
|
) {
|
||||||
|
self.delegate.inject_fully_negotiated_outbound(out, info)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_event(&mut self, event: Self::InEvent) {
|
||||||
|
match event {
|
||||||
|
BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev),
|
||||||
|
/* Events comming from the behaviour */
|
||||||
|
BehaviourHandlerIn::Custom => {
|
||||||
|
// TODO: implement
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_dial_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
info: Self::OutboundOpenInfo,
|
||||||
|
err: ProtocolsHandlerUpgrErr<
|
||||||
|
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
self.delegate.inject_dial_upgrade_error(info, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
|
// TODO: refine this logic
|
||||||
|
self.keep_alive.min(self.delegate.connection_keep_alive())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<
|
||||||
|
ProtocolsHandlerEvent<
|
||||||
|
Self::OutboundProtocol,
|
||||||
|
Self::OutboundOpenInfo,
|
||||||
|
Self::OutEvent,
|
||||||
|
Self::Error,
|
||||||
|
>,
|
||||||
|
> {
|
||||||
|
match self.delegate.poll(cx) {
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||||
|
BehaviourHandlerOut::Delegate(event),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::Close(err)) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::Close(err))
|
||||||
|
}
|
||||||
|
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => {
|
||||||
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
protocol,
|
||||||
|
info,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Poll::Pending => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
|
||||||
|
// TODO: speak to our behaviour here
|
||||||
|
}
|
||||||
|
}
|
@ -5,12 +5,19 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic};
|
|||||||
use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
||||||
use discv5::Discv5Event;
|
use discv5::Discv5Event;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use handler::{BehaviourHandler, BehaviourHandlerIn, BehaviourHandlerOut, DelegateIn, DelegateOut};
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
core::identity::Keypair,
|
core::{
|
||||||
|
connection::{ConnectedPoint, ConnectionId, ListenerId},
|
||||||
|
identity::Keypair,
|
||||||
|
Multiaddr,
|
||||||
|
},
|
||||||
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
|
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
|
||||||
identify::{Identify, IdentifyEvent},
|
identify::{Identify, IdentifyEvent},
|
||||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
|
swarm::{
|
||||||
NetworkBehaviour, PeerId,
|
NetworkBehaviour, NetworkBehaviourAction as NBAction, PollParameters, ProtocolsHandler,
|
||||||
|
},
|
||||||
|
PeerId,
|
||||||
};
|
};
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use slog::{crit, debug, o};
|
use slog::{crit, debug, o};
|
||||||
@ -21,13 +28,13 @@ use std::{
|
|||||||
};
|
};
|
||||||
use types::{EnrForkId, EthSpec, SubnetId};
|
use types::{EnrForkId, EthSpec, SubnetId};
|
||||||
|
|
||||||
|
mod handler;
|
||||||
|
|
||||||
const MAX_IDENTIFY_ADDRESSES: usize = 10;
|
const MAX_IDENTIFY_ADDRESSES: usize = 10;
|
||||||
|
|
||||||
/// Builds the network behaviour that manages the core protocols of eth2.
|
/// Builds the network behaviour that manages the core protocols of eth2.
|
||||||
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
||||||
/// behaviours.
|
/// behaviours.
|
||||||
#[derive(NetworkBehaviour)]
|
|
||||||
#[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")]
|
|
||||||
pub struct Behaviour<TSpec: EthSpec> {
|
pub struct Behaviour<TSpec: EthSpec> {
|
||||||
/// The routing pub-sub mechanism for eth2.
|
/// The routing pub-sub mechanism for eth2.
|
||||||
gossipsub: Gossipsub,
|
gossipsub: Gossipsub,
|
||||||
@ -40,32 +47,202 @@ pub struct Behaviour<TSpec: EthSpec> {
|
|||||||
/// Discovery behaviour.
|
/// Discovery behaviour.
|
||||||
discovery: Discovery<TSpec>,
|
discovery: Discovery<TSpec>,
|
||||||
/// The peer manager that keeps track of peer's reputation and status.
|
/// The peer manager that keeps track of peer's reputation and status.
|
||||||
#[behaviour(ignore)]
|
|
||||||
peer_manager: PeerManager<TSpec>,
|
peer_manager: PeerManager<TSpec>,
|
||||||
/// The events generated by this behaviour to be consumed in the swarm poll.
|
/// The events generated by this behaviour to be consumed in the swarm poll.
|
||||||
#[behaviour(ignore)]
|
|
||||||
events: Vec<BehaviourEvent<TSpec>>,
|
events: Vec<BehaviourEvent<TSpec>>,
|
||||||
|
// TODO: add events to send to the handler
|
||||||
/// The current meta data of the node, so respond to pings and get metadata
|
/// The current meta data of the node, so respond to pings and get metadata
|
||||||
#[behaviour(ignore)]
|
|
||||||
meta_data: MetaData<TSpec>,
|
meta_data: MetaData<TSpec>,
|
||||||
/// A cache of recently seen gossip messages. This is used to filter out any possible
|
/// A cache of recently seen gossip messages. This is used to filter out any possible
|
||||||
/// duplicates that may still be seen over gossipsub.
|
/// duplicates that may still be seen over gossipsub.
|
||||||
#[behaviour(ignore)]
|
|
||||||
// TODO: Remove this
|
// TODO: Remove this
|
||||||
seen_gossip_messages: LruCache<MessageId, ()>,
|
seen_gossip_messages: LruCache<MessageId, ()>,
|
||||||
/// A collections of variables accessible outside the network service.
|
/// A collections of variables accessible outside the network service.
|
||||||
#[behaviour(ignore)]
|
|
||||||
network_globals: Arc<NetworkGlobals<TSpec>>,
|
network_globals: Arc<NetworkGlobals<TSpec>>,
|
||||||
#[behaviour(ignore)]
|
|
||||||
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
|
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
|
||||||
// NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick
|
// NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick
|
||||||
// lookups for every gossipsub message send.
|
// lookups for every gossipsub message send.
|
||||||
enr_fork_id: EnrForkId,
|
enr_fork_id: EnrForkId,
|
||||||
#[behaviour(ignore)]
|
|
||||||
/// Logger for behaviour actions.
|
/// Logger for behaviour actions.
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Calls the given function with the given args on all sub behaviours.
|
||||||
|
macro_rules! delegate_to_behaviours {
|
||||||
|
($self: ident, $fn: ident, $($arg: ident), *) => {
|
||||||
|
$self.gossipsub.$fn($($arg),*);
|
||||||
|
$self.eth2_rpc.$fn($($arg),*);
|
||||||
|
$self.identify.$fn($($arg),*);
|
||||||
|
$self.discovery.$fn($($arg),*);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
|
||||||
|
type ProtocolsHandler = BehaviourHandler<TSpec>;
|
||||||
|
type OutEvent = BehaviourEvent<TSpec>;
|
||||||
|
|
||||||
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
|
BehaviourHandler::new(
|
||||||
|
&mut self.gossipsub,
|
||||||
|
&mut self.eth2_rpc,
|
||||||
|
&mut self.identify,
|
||||||
|
&mut self.discovery,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
out.extend(self.gossipsub.addresses_of_peer(peer_id));
|
||||||
|
out.extend(self.eth2_rpc.addresses_of_peer(peer_id));
|
||||||
|
out.extend(self.identify.addresses_of_peer(peer_id));
|
||||||
|
out.extend(self.discovery.addresses_of_peer(peer_id));
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||||
|
delegate_to_behaviours!(self, inject_connected, peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||||
|
delegate_to_behaviours!(self, inject_disconnected, peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connection_established(
|
||||||
|
&mut self,
|
||||||
|
peer_id: &PeerId,
|
||||||
|
conn_id: &ConnectionId,
|
||||||
|
endpoint: &ConnectedPoint,
|
||||||
|
) {
|
||||||
|
delegate_to_behaviours!(
|
||||||
|
self,
|
||||||
|
inject_connection_established,
|
||||||
|
peer_id,
|
||||||
|
conn_id,
|
||||||
|
endpoint
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connection_closed(
|
||||||
|
&mut self,
|
||||||
|
peer_id: &PeerId,
|
||||||
|
conn_id: &ConnectionId,
|
||||||
|
endpoint: &ConnectedPoint,
|
||||||
|
) {
|
||||||
|
delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_addr_reach_failure(
|
||||||
|
&mut self,
|
||||||
|
peer_id: Option<&PeerId>,
|
||||||
|
addr: &Multiaddr,
|
||||||
|
error: &dyn std::error::Error,
|
||||||
|
) {
|
||||||
|
delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||||
|
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
delegate_to_behaviours!(self, inject_new_external_addr, addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
|
||||||
|
delegate_to_behaviours!(self, inject_listener_error, id, err);
|
||||||
|
}
|
||||||
|
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
|
||||||
|
delegate_to_behaviours!(self, inject_listener_closed, id, reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_event(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
conn_id: ConnectionId,
|
||||||
|
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
// Events comming from the handler, redirected to each behaviour
|
||||||
|
BehaviourHandlerOut::Delegate(delegate) => match delegate {
|
||||||
|
DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev),
|
||||||
|
DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev),
|
||||||
|
DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, ev),
|
||||||
|
DelegateOut::Discovery(ev) => self.discovery.inject_event(peer_id, conn_id, ev),
|
||||||
|
},
|
||||||
|
/* Custom events sent BY the handler */
|
||||||
|
BehaviourHandlerOut::Custom => {
|
||||||
|
// TODO: implement
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut Context,
|
||||||
|
poll_params: &mut impl PollParameters,
|
||||||
|
) -> Poll<NBAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
|
||||||
|
// TODO: move where it's less distracting
|
||||||
|
macro_rules! poll_behaviour {
|
||||||
|
/* $behaviour: The sub-behaviour being polled.
|
||||||
|
* $on_event_fn: Function to call if we get an event from the sub-behaviour.
|
||||||
|
* $notify_handler_event_closure: Closure mapping the received event type to
|
||||||
|
* the one that the handler should get.
|
||||||
|
*/
|
||||||
|
($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => {
|
||||||
|
loop {
|
||||||
|
// poll the sub-behaviour
|
||||||
|
match self.$behaviour.poll(cx, poll_params) {
|
||||||
|
Poll::Ready(action) => match action {
|
||||||
|
// call the designated function to handle the event from sub-behaviour
|
||||||
|
NBAction::GenerateEvent(event) => self.$on_event_fn(event),
|
||||||
|
NBAction::DialAddress { address } => {
|
||||||
|
return Poll::Ready(NBAction::DialAddress { address })
|
||||||
|
}
|
||||||
|
NBAction::DialPeer { peer_id, condition } => {
|
||||||
|
return Poll::Ready(NBAction::DialPeer { peer_id, condition })
|
||||||
|
}
|
||||||
|
NBAction::NotifyHandler {
|
||||||
|
peer_id,
|
||||||
|
handler,
|
||||||
|
event,
|
||||||
|
} => {
|
||||||
|
return Poll::Ready(NBAction::NotifyHandler {
|
||||||
|
peer_id,
|
||||||
|
handler,
|
||||||
|
// call the closure mapping the received event to the needed one
|
||||||
|
// in order to notify the handler
|
||||||
|
event: BehaviourHandlerIn::Delegate(
|
||||||
|
$notify_handler_event_closure(event),
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
NBAction::ReportObservedAddr { address } => {
|
||||||
|
return Poll::Ready(NBAction::ReportObservedAddr { address })
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Poll::Pending => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub);
|
||||||
|
poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC);
|
||||||
|
poll_behaviour!(identify, on_identify_event, DelegateIn::Identify);
|
||||||
|
poll_behaviour!(discovery, on_discovery_event, DelegateIn::Discovery);
|
||||||
|
|
||||||
|
self.custom_poll(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Implements the combined behaviour for the libp2p service.
|
/// Implements the combined behaviour for the libp2p service.
|
||||||
impl<TSpec: EthSpec> Behaviour<TSpec> {
|
impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
@ -370,11 +547,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
|
fn on_gossip_event(&mut self, event: GossipsubEvent) {
|
||||||
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<TSpec> {
|
|
||||||
fn inject_event(&mut self, event: GossipsubEvent) {
|
|
||||||
match event {
|
match event {
|
||||||
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
|
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
|
||||||
// Note: We are keeping track here of the peer that sent us the message, not the
|
// Note: We are keeping track here of the peer that sent us the message, not the
|
||||||
@ -412,10 +586,8 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
|
|||||||
GossipsubEvent::Unsubscribed { .. } => {}
|
GossipsubEvent::Unsubscribed { .. } => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<RPCMessage<TSpec>> for Behaviour<TSpec> {
|
fn on_rpc_event(&mut self, message: RPCMessage<TSpec>) {
|
||||||
fn inject_event(&mut self, message: RPCMessage<TSpec>) {
|
|
||||||
let peer_id = message.peer_id;
|
let peer_id = message.peer_id;
|
||||||
// The METADATA and PING RPC responses are handled within the behaviour and not
|
// The METADATA and PING RPC responses are handled within the behaviour and not
|
||||||
// propagated
|
// propagated
|
||||||
@ -457,15 +629,12 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<RPCMessage<TSpec>> for Behavio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|
||||||
/// Consumes the events list when polled.
|
/// Consumes the events list when polled.
|
||||||
fn poll<TBehaviourIn>(
|
fn custom_poll<TBehaviourIn>(
|
||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
_: &mut impl PollParameters,
|
) -> Poll<NBAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
|
||||||
) -> Poll<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
|
|
||||||
// check the peer manager for events
|
// check the peer manager for events
|
||||||
loop {
|
loop {
|
||||||
match self.peer_manager.poll_next_unpin(cx) {
|
match self.peer_manager.poll_next_unpin(cx) {
|
||||||
@ -473,9 +642,9 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
PeerManagerEvent::Status(peer_id) => {
|
PeerManagerEvent::Status(peer_id) => {
|
||||||
// it's time to status. We don't keep a beacon chain reference here, so we inform
|
// it's time to status. We don't keep a beacon chain reference here, so we inform
|
||||||
// the network to send a status to this peer
|
// the network to send a status to this peer
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
|
return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::StatusPeer(
|
||||||
BehaviourEvent::StatusPeer(peer_id),
|
peer_id,
|
||||||
));
|
)));
|
||||||
}
|
}
|
||||||
PeerManagerEvent::Ping(peer_id) => {
|
PeerManagerEvent::Ping(peer_id) => {
|
||||||
// send a ping request to this peer
|
// send a ping request to this peer
|
||||||
@ -497,15 +666,13 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !self.events.is_empty() {
|
if !self.events.is_empty() {
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
|
return Poll::Ready(NBAction::GenerateEvent(self.events.remove(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSpec> {
|
fn on_identify_event(&mut self, event: IdentifyEvent) {
|
||||||
fn inject_event(&mut self, event: IdentifyEvent) {
|
|
||||||
match event {
|
match event {
|
||||||
IdentifyEvent::Received {
|
IdentifyEvent::Received {
|
||||||
peer_id,
|
peer_id,
|
||||||
@ -534,10 +701,8 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<T
|
|||||||
IdentifyEvent::Error { .. } => {}
|
IdentifyEvent::Error { .. } => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<Discv5Event> for Behaviour<TSpec> {
|
fn on_discovery_event(&mut self, _event: Discv5Event) {
|
||||||
fn inject_event(&mut self, _event: Discv5Event) {
|
|
||||||
// discv5 has no events to inject
|
// discv5 has no events to inject
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user