Upgrade to latest libp2p (#2605)

This is a pre-cursor to the next libp2p upgrade. 

It is currently being used for staging a number of PR upgrades which are contingent on the latest libp2p.
This commit is contained in:
Age Manning 2021-10-29 01:59:29 +00:00
parent 2c4413454a
commit 1790010260
6 changed files with 393 additions and 742 deletions

1028
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -39,15 +39,9 @@ regex = "1.3.9"
strum = { version = "0.21.0", features = ["derive"] } strum = { version = "0.21.0", features = ["derive"] }
superstruct = "0.2.0" superstruct = "0.2.0"
# TODO: remove the rev-reference and go back to release versions once there is a release that
# includes the following PR:
#
# https://github.com/libp2p/rust-libp2p/pull/2175
[dependencies.libp2p] [dependencies.libp2p]
#version = "0.39.1" version = "0.40.0-rc.3"
#default-features = false default-features = false
git = "https://github.com/libp2p/rust-libp2p"
rev = "ce23cbe76a0382b6fcb0e49f1b2612df86f6465d"
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio"] features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio"]
[dev-dependencies] [dev-dependencies]

View File

@ -4,7 +4,8 @@ use crate::behaviour::gossipsub_scoring_parameters::{
use crate::config::gossipsub_config; use crate::config::gossipsub_config;
use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS}; use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
use crate::peer_manager::{ use crate::peer_manager::{
peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, peerdb::score::PeerAction, peerdb::score::ReportSource, ConnectionDirection, PeerManager,
PeerManagerEvent,
}; };
use crate::rpc::*; use crate::rpc::*;
use crate::service::METADATA_FILENAME; use crate::service::METADATA_FILENAME;
@ -26,7 +27,7 @@ use libp2p::{
}, },
identify::{Identify, IdentifyConfig, IdentifyEvent}, identify::{Identify, IdentifyConfig, IdentifyEvent},
swarm::{ swarm::{
AddressScore, DialPeerCondition, NetworkBehaviourAction as NBAction, AddressScore, DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction as NBAction,
NetworkBehaviourEventProcess, PollParameters, NetworkBehaviourEventProcess, PollParameters,
}, },
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
@ -121,7 +122,11 @@ enum InternalBehaviourMessage {
/// This core behaviour is managed by `Behaviour` which adds peer management to all core /// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours. /// behaviours.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")] #[behaviour(
out_event = "BehaviourEvent<TSpec>",
poll_method = "poll",
event_process = true
)]
pub struct Behaviour<TSpec: EthSpec> { pub struct Behaviour<TSpec: EthSpec> {
/* Sub-Behaviours */ /* Sub-Behaviours */
/// The routing pub-sub mechanism for eth2. /// The routing pub-sub mechanism for eth2.
@ -192,9 +197,11 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
"".into(), "".into(),
local_key.public(), // Still send legitimate public key local_key.public(), // Still send legitimate public key
) )
.with_cache_size(0)
} else { } else {
IdentifyConfig::new("eth2/1.0.0".into(), local_key.public()) IdentifyConfig::new("eth2/1.0.0".into(), local_key.public())
.with_agent_version(lighthouse_version::version_with_platform()) .with_agent_version(lighthouse_version::version_with_platform())
.with_cache_size(0)
}; };
// Build and start the discovery sub-behaviour // Build and start the discovery sub-behaviour
@ -848,6 +855,15 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
.remove_subscription(&peer_id, &subnet_id); .remove_subscription(&peer_id, &subnet_id);
} }
} }
GossipsubEvent::GossipsubNotSupported { peer_id } => {
debug!(self.log, "Peer does not support gossipsub"; "peer_id" => %peer_id);
self.peer_manager.report_peer(
&peer_id,
PeerAction::LowToleranceError,
ReportSource::Gossipsub,
Some(GoodbyeReason::Unknown),
);
}
} }
} }
} }
@ -1031,11 +1047,13 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<T
impl<TSpec: EthSpec> Behaviour<TSpec> { impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Consumes the events list and drives the Lighthouse global NetworkBehaviour. /// Consumes the events list and drives the Lighthouse global NetworkBehaviour.
fn poll<THandlerIn>( fn poll(
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NBAction<THandlerIn, BehaviourEvent<TSpec>>> { ) -> Poll<
NBAction<BehaviourEvent<TSpec>, <Behaviour<TSpec> as NetworkBehaviour>::ProtocolsHandler>,
> {
if let Some(waker) = &self.waker { if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) { if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone()); self.waker = Some(cx.waker().clone());
@ -1048,9 +1066,11 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
if let Some(event) = self.internal_events.pop_front() { if let Some(event) = self.internal_events.pop_front() {
match event { match event {
InternalBehaviourMessage::DialPeer(peer_id) => { InternalBehaviourMessage::DialPeer(peer_id) => {
let handler = self.new_handler();
return Poll::Ready(NBAction::DialPeer { return Poll::Ready(NBAction::DialPeer {
peer_id, peer_id,
condition: DialPeerCondition::Disconnected, condition: DialPeerCondition::Disconnected,
handler,
}); });
} }
InternalBehaviourMessage::SocketUpdated(address) => { InternalBehaviourMessage::SocketUpdated(address) => {

View File

@ -23,8 +23,8 @@ use futures::stream::FuturesUnordered;
pub use libp2p::{ pub use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}, core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
swarm::{ swarm::{
protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction as NBAction, protocols_handler::ProtocolsHandler, DialError, NetworkBehaviour,
NotifyHandler, PollParameters, SubstreamProtocol, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, SubstreamProtocol,
}, },
}; };
use lru::LruCache; use lru::LruCache;
@ -933,9 +933,10 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
fn inject_disconnected(&mut self, _peer_id: &PeerId) {} fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_connection_established( fn inject_connection_established(
&mut self, &mut self,
_: &PeerId, _peer_id: &PeerId,
_: &ConnectionId, _connection_id: &ConnectionId,
_connected_point: &ConnectedPoint, _endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
) { ) {
} }
fn inject_connection_closed( fn inject_connection_closed(
@ -943,6 +944,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
_: &PeerId, _: &PeerId,
_: &ConnectionId, _: &ConnectionId,
_connected_point: &ConnectedPoint, _connected_point: &ConnectedPoint,
_handler: Self::ProtocolsHandler,
) { ) {
} }
fn inject_event( fn inject_event(
@ -953,10 +955,17 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
) { ) {
} }
fn inject_dial_failure(&mut self, peer_id: &PeerId) { fn inject_dial_failure(
// set peer as disconnected in discovery DHT &mut self,
debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id); peer_id: Option<PeerId>,
self.disconnect_peer(peer_id); _handler: Self::ProtocolsHandler,
_error: &DialError,
) {
if let Some(peer_id) = peer_id {
// set peer as disconnected in discovery DHT
debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id);
self.disconnect_peer(&peer_id);
}
} }
// Main execution loop to drive the behaviour // Main execution loop to drive the behaviour
@ -964,7 +973,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NBAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> { ) -> Poll<NBAction<Self::OutEvent, Self::ProtocolsHandler>> {
if !self.started { if !self.started {
return Poll::Pending; return Poll::Pending;
} }

View File

@ -101,7 +101,7 @@ pub struct RPC<TSpec: EthSpec> {
/// Rate limiter /// Rate limiter
limiter: RateLimiter, limiter: RateLimiter,
/// Queue of events to be processed. /// Queue of events to be processed.
events: Vec<NetworkBehaviourAction<RPCSend<TSpec>, RPCMessage<TSpec>>>, events: Vec<NetworkBehaviourAction<RPCMessage<TSpec>, RPCHandler<TSpec>>>,
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
/// Slog logger for RPC behaviour. /// Slog logger for RPC behaviour.
log: slog::Logger, log: slog::Logger,
@ -218,8 +218,9 @@ where
fn inject_connection_established( fn inject_connection_established(
&mut self, &mut self,
_peer_id: &PeerId, _peer_id: &PeerId,
_: &ConnectionId, _connection_id: &ConnectionId,
_connected_point: &ConnectedPoint, _endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
) { ) {
} }
@ -228,6 +229,7 @@ where
_peer_id: &PeerId, _peer_id: &PeerId,
_: &ConnectionId, _: &ConnectionId,
_connected_point: &ConnectedPoint, _connected_point: &ConnectedPoint,
_handler: Self::ProtocolsHandler,
) { ) {
} }
@ -297,12 +299,7 @@ where
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll< ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
// let the rate limiter prune // let the rate limiter prune
let _ = self.limiter.poll_unpin(cx); let _ = self.limiter.poll_unpin(cx);
if !self.events.is_empty() { if !self.events.is_empty() {

View File

@ -320,6 +320,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
peer_id, peer_id,
endpoint, endpoint,
num_established, num_established,
concurrent_dial_errors: _,
} => { } => {
// Inform the peer manager. // Inform the peer manager.
// We require the ENR to inject into the peer db, if it exists. // We require the ENR to inject into the peer db, if it exists.
@ -364,20 +365,14 @@ impl<TSpec: EthSpec> Service<TSpec> {
SwarmEvent::BannedPeer { peer_id, .. } => { SwarmEvent::BannedPeer { peer_id, .. } => {
debug!(self.log, "Banned peer connection rejected"; "peer_id" => %peer_id); debug!(self.log, "Banned peer connection rejected"; "peer_id" => %peer_id);
} }
SwarmEvent::UnreachableAddr { SwarmEvent::OutgoingConnectionError { peer_id, error } => {
peer_id, debug!(self.log, "Failed to dial address"; "peer_id" => ?peer_id, "error" => %error);
address, if let Some(peer_id) = peer_id {
error, self.swarm
attempts_remaining, .behaviour_mut()
} => { .peer_manager_mut()
debug!(self.log, "Failed to dial address"; "peer_id" => %peer_id, "address" => %address, "error" => %error, "attempts_remaining" => attempts_remaining); .inject_dial_failure(&peer_id);
self.swarm }
.behaviour_mut()
.peer_manager_mut()
.inject_dial_failure(&peer_id);
}
SwarmEvent::UnknownPeerUnreachableAddr { address, error } => {
debug!(self.log, "Peer not known at dialed address"; "address" => %address, "error" => %error);
} }
SwarmEvent::ExpiredListenAddr { address, .. } => { SwarmEvent::ExpiredListenAddr { address, .. } => {
debug!(self.log, "Listen address expired"; "address" => %address) debug!(self.log, "Listen address expired"; "address" => %address)