From 4bf1af4e8520f235de8fe5f94afedf953df5e6a4 Mon Sep 17 00:00:00 2001 From: Divma Date: Wed, 2 Mar 2022 22:07:17 +0000 Subject: [PATCH] Custom RPC request management for sync (#3029) ## Proposed Changes Make `lighthouse_network` generic over request ids, now usable by sync --- .../lighthouse_network/src/behaviour/mod.rs | 116 +++-- .../src/peer_manager/network_behaviour.rs | 6 +- .../lighthouse_network/src/rpc/handler.rs | 48 +- .../lighthouse_network/src/rpc/methods.rs | 26 - beacon_node/lighthouse_network/src/rpc/mod.rs | 87 ++-- beacon_node/lighthouse_network/src/service.rs | 18 +- .../lighthouse_network/tests/common/mod.rs | 8 +- .../lighthouse_network/tests/rpc_tests.rs | 50 +- beacon_node/network/src/router/mod.rs | 5 +- beacon_node/network/src/router/processor.rs | 68 +-- beacon_node/network/src/service.rs | 14 +- .../network/src/sync/backfill_sync/mod.rs | 7 +- beacon_node/network/src/sync/manager.rs | 470 +++++++++--------- beacon_node/network/src/sync/mod.rs | 3 - .../network/src/sync/network_context.rs | 138 +++-- .../network/src/sync/range_sync/batch.rs | 8 +- .../network/src/sync/range_sync/chain.rs | 6 +- .../network/src/sync/range_sync/range.rs | 13 +- 18 files changed, 570 insertions(+), 521 deletions(-) diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index f03e13948..b5b0049cb 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -70,9 +70,16 @@ pub type PeerRequestId = (ConnectionId, SubstreamId); pub type SubscriptionFilter = MaxCountSubscriptionFilter; pub type Gossipsub = BaseGossipsub; +/// Identifier of a request. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestId { + Application(AppReqId), + Behaviour, +} + /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] -pub enum BehaviourEvent { +pub enum BehaviourEvent { /// We have successfully dialed and connected to a peer. PeerConnectedOutgoing(PeerId), /// A peer has successfully dialed and connected to us. @@ -86,7 +93,7 @@ pub enum BehaviourEvent { /// An RPC Request that was sent failed. RPCFailed { /// The id of the failed request. - id: RequestId, + id: AppReqId, /// The peer to which this request was sent. peer_id: PeerId, }, @@ -102,7 +109,7 @@ pub enum BehaviourEvent { /// Peer that sent the response. peer_id: PeerId, /// Id of the request to which the peer is responding. - id: RequestId, + id: AppReqId, /// Response the peer sent. response: Response, }, @@ -134,16 +141,16 @@ enum InternalBehaviourMessage { /// behaviours. #[derive(NetworkBehaviour)] #[behaviour( - out_event = "BehaviourEvent", + out_event = "BehaviourEvent", poll_method = "poll", event_process = true )] -pub struct Behaviour { +pub struct Behaviour { /* Sub-Behaviours */ /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, /// The Eth2 RPC specified in the wire-0 protocol. - eth2_rpc: RPC, + eth2_rpc: RPC, TSpec>, /// Discv5 Discovery protocol. discovery: Discovery, /// Keep regular connection to peers and disconnect if absent. @@ -156,7 +163,7 @@ pub struct Behaviour { /* Auxiliary Fields */ /// The output events generated by this behaviour to be consumed in the swarm poll. #[behaviour(ignore)] - events: VecDeque>, + events: VecDeque>, /// Internal behaviour events, the NBAction type is composed of sub-behaviours, so we use a /// custom type here to avoid having to specify the concrete type. #[behaviour(ignore)] @@ -192,7 +199,7 @@ pub struct Behaviour { } /// Implements the combined behaviour for the libp2p service. -impl Behaviour { +impl Behaviour { pub async fn new( local_key: &Keypair, ctx: ServiceContext<'_>, @@ -562,9 +569,9 @@ impl Behaviour { /* Eth2 RPC behaviour functions */ /// Send a request to a peer over RPC. - pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { + pub fn send_request(&mut self, peer_id: PeerId, request_id: AppReqId, request: Request) { self.eth2_rpc - .send_request(peer_id, request_id, request.into()) + .send_request(peer_id, RequestId::Application(request_id), request.into()) } /// Send a successful response to a peer over RPC. @@ -718,12 +725,12 @@ impl Behaviour { } /// Sends a Ping request to the peer. - fn ping(&mut self, id: RequestId, peer_id: PeerId) { + fn ping(&mut self, peer_id: PeerId) { let ping = crate::rpc::Ping { data: *self.network_globals.local_metadata.read().seq_number(), }; - trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id); - + trace!(self.log, "Sending Ping"; "peer_id" => %peer_id); + let id = RequestId::Behaviour; self.eth2_rpc .send_request(peer_id, id, OutboundRequest::Ping(ping)); } @@ -761,13 +768,19 @@ impl Behaviour { // RPC Propagation methods /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. - fn propagate_response(&mut self, id: RequestId, peer_id: PeerId, response: Response) { - if !matches!(id, RequestId::Behaviour) { - self.add_event(BehaviourEvent::ResponseReceived { + fn propagate_response( + &mut self, + id: RequestId, + peer_id: PeerId, + response: Response, + ) { + match id { + RequestId::Application(id) => self.add_event(BehaviourEvent::ResponseReceived { peer_id, id, response, - }); + }), + RequestId::Behaviour => {} } } @@ -793,7 +806,7 @@ impl Behaviour { } /// Adds an event to the queue waking the current task to process it. - fn add_event(&mut self, event: BehaviourEvent) { + fn add_event(&mut self, event: BehaviourEvent) { self.events.push_back(event); if let Some(waker) = &self.waker { waker.wake_by_ref(); @@ -869,7 +882,11 @@ impl Behaviour { */ // Gossipsub -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + AppReqId: ReqId, + TSpec: EthSpec, +{ fn inject_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message { @@ -961,8 +978,13 @@ impl NetworkBehaviourEventProcess for Behaviour< } // RPC -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RPCMessage) { +impl NetworkBehaviourEventProcess, TSpec>> + for Behaviour +where + AppReqId: ReqId, + TSpec: EthSpec, +{ + fn inject_event(&mut self, event: RPCMessage, TSpec>) { let peer_id = event.peer_id; if !self.peer_manager.is_connected(&peer_id) { @@ -1006,7 +1028,7 @@ impl NetworkBehaviourEventProcess> for Behavio ConnectionDirection::Outgoing, ); // inform failures of requests comming outside the behaviour - if !matches!(id, RequestId::Behaviour) { + if let RequestId::Application(id) = id { self.add_event(BehaviourEvent::RPCFailed { peer_id, id }); } } @@ -1090,7 +1112,11 @@ impl NetworkBehaviourEventProcess> for Behavio } // Discovery -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + AppReqId: ReqId, + TSpec: EthSpec, +{ fn inject_event(&mut self, event: DiscoveryEvent) { match event { DiscoveryEvent::SocketUpdated(socket_addr) => { @@ -1119,7 +1145,11 @@ impl NetworkBehaviourEventProcess for Behaviour< } // Identify -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour +where + TSpec: EthSpec, + AppReqId: ReqId, +{ fn inject_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Received { peer_id, mut info } => { @@ -1140,15 +1170,20 @@ impl NetworkBehaviourEventProcess for Behaviour Behaviour { +type BehaviourHandler = + as NetworkBehaviour>::ConnectionHandler; + +impl Behaviour +where + TSpec: EthSpec, + AppReqId: ReqId, +{ /// Consumes the events list and drives the Lighthouse global NetworkBehaviour. fn poll( &mut self, cx: &mut Context, _: &mut impl PollParameters, - ) -> Poll< - NBAction, as NetworkBehaviour>::ConnectionHandler>, - > { + ) -> Poll, BehaviourHandler>> { if let Some(waker) = &self.waker { if waker.will_wake(cx.waker()) { self.waker = Some(cx.waker().clone()); @@ -1207,7 +1242,9 @@ impl Behaviour { } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess + for Behaviour +{ fn inject_event(&mut self, event: PeerManagerEvent) { match event { PeerManagerEvent::PeerConnectedIncoming(peer_id) => { @@ -1242,7 +1279,7 @@ impl NetworkBehaviourEventProcess for Behaviou } PeerManagerEvent::Ping(peer_id) => { // send a ping request to this peer - self.ping(RequestId::Behaviour, peer_id); + self.ping(peer_id); } PeerManagerEvent::MetaData(peer_id) => { self.send_meta_data_request(peer_id); @@ -1251,7 +1288,8 @@ impl NetworkBehaviourEventProcess for Behaviou debug!(self.log, "Peer Manager disconnecting peer"; "peer_id" => %peer_id, "reason" => %reason); // send one goodbye - self.eth2_rpc.shutdown(peer_id, reason); + self.eth2_rpc + .shutdown(peer_id, RequestId::Behaviour, reason); } } } @@ -1335,3 +1373,19 @@ pub fn save_metadata_to_disk(dir: &Path, metadata: MetaData, log: } } } + +impl slog::Value for RequestId { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + match self { + RequestId::Behaviour => slog::Value::serialize("Behaviour", record, key, serializer), + RequestId::Application(ref id) => { + slog::Value::serialize(&format_args!("{:?}", id), record, key, serializer) + } + } + } +} diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 394eff152..3bda64f0b 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -110,9 +110,13 @@ impl NetworkBehaviour for PeerManager { _connection_id: &ConnectionId, endpoint: &ConnectedPoint, _failed_addresses: Option<&Vec>, - _other_established: usize, + other_established: usize, ) { debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint()); + if other_established == 0 { + self.events.push(PeerManagerEvent::MetaData(*peer_id)); + } + // Check NAT if metrics are enabled if self.network_globals.local_enr.read().udp().is_some() { metrics::check_nat(); diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index e471e8ec4..2b9e7c490 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -1,12 +1,10 @@ #![allow(clippy::type_complexity)] #![allow(clippy::cognitive_complexity)] -use super::methods::{ - GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination, -}; +use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode, ResponseTermination}; use super::outbound::OutboundRequestContainer; use super::protocol::{max_rpc_size, InboundRequest, Protocol, RPCError, RPCProtocol}; -use super::{RPCReceived, RPCSend}; +use super::{RPCReceived, RPCSend, ReqId}; use crate::rpc::outbound::{OutboundFramed, OutboundRequest}; use crate::rpc::protocol::InboundFramed; use fnv::FnvHashMap; @@ -49,11 +47,11 @@ pub struct SubstreamId(usize); type InboundSubstream = InboundFramed; /// Events the handler emits to the behaviour. -type HandlerEvent = Result, HandlerErr>; +pub type HandlerEvent = Result, HandlerErr>; /// An error encountered by the handler. #[derive(Debug)] -pub enum HandlerErr { +pub enum HandlerErr { /// An error occurred for this peer's request. This can occur during protocol negotiation, /// message passing, or if the handler identifies that we are sending an error response to the peer. Inbound { @@ -69,7 +67,7 @@ pub enum HandlerErr { /// indicates an error. Outbound { /// Application-given Id of the request for which an error occurred. - id: RequestId, + id: Id, /// Information of the protocol. proto: Protocol, /// The error that occurred. @@ -78,7 +76,7 @@ pub enum HandlerErr { } /// Implementation of `ConnectionHandler` for the RPC protocol. -pub struct RPCHandler +pub struct RPCHandler where TSpec: EthSpec, { @@ -86,10 +84,10 @@ where listen_protocol: SubstreamProtocol, ()>, /// Queue of events to produce in `poll()`. - events_out: SmallVec<[HandlerEvent; 4]>, + events_out: SmallVec<[HandlerEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[(RequestId, OutboundRequest); 4]>, + dial_queue: SmallVec<[(Id, OutboundRequest); 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, @@ -101,7 +99,7 @@ where inbound_substreams_delay: DelayQueue, /// Map of outbound substreams that need to be driven to completion. - outbound_substreams: FnvHashMap>, + outbound_substreams: FnvHashMap>, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, @@ -163,7 +161,7 @@ struct InboundInfo { } /// Contains the information the handler keeps on established outbound substreams. -struct OutboundInfo { +struct OutboundInfo { /// State of the substream. state: OutboundSubstreamState, /// Key to keep track of the substream's timeout via `self.outbound_substreams_delay`. @@ -172,8 +170,8 @@ struct OutboundInfo { proto: Protocol, /// Number of chunks to be seen from the peer's response. remaining_chunks: Option, - /// `RequestId` as given by the application that sent the request. - req_id: RequestId, + /// `Id` as given by the application that sent the request. + req_id: Id, } /// State of an inbound substream connection. @@ -204,7 +202,7 @@ pub enum OutboundSubstreamState { Poisoned, } -impl RPCHandler +impl RPCHandler where TSpec: EthSpec, { @@ -235,7 +233,7 @@ where /// Initiates the handler's shutdown process, sending an optional Goodbye message to the /// peer. - fn shutdown(&mut self, goodbye_reason: Option) { + fn shutdown(&mut self, goodbye_reason: Option<(Id, GoodbyeReason)>) { if matches!(self.state, HandlerState::Active) { if !self.dial_queue.is_empty() { debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); @@ -250,9 +248,8 @@ where } // Queue our goodbye message. - if let Some(reason) = goodbye_reason { - self.dial_queue - .push((RequestId::Router, OutboundRequest::Goodbye(reason))); + if let Some((id, reason)) = goodbye_reason { + self.dial_queue.push((id, OutboundRequest::Goodbye(reason))); } self.state = HandlerState::ShuttingDown(Box::new(sleep_until( @@ -262,7 +259,7 @@ where } /// Opens an outbound substream with a request. - fn send_request(&mut self, id: RequestId, req: OutboundRequest) { + fn send_request(&mut self, id: Id, req: OutboundRequest) { match self.state { HandlerState::Active => { self.dial_queue.push((id, req)); @@ -310,16 +307,17 @@ where } } -impl ConnectionHandler for RPCHandler +impl ConnectionHandler for RPCHandler where TSpec: EthSpec, + Id: ReqId, { - type InEvent = RPCSend; - type OutEvent = HandlerEvent; + type InEvent = RPCSend; + type OutEvent = HandlerEvent; type Error = RPCError; type InboundProtocol = RPCProtocol; type OutboundProtocol = OutboundRequestContainer; - type OutboundOpenInfo = (RequestId, OutboundRequest); // Keep track of the id and the request + type OutboundOpenInfo = (Id, OutboundRequest); // Keep track of the id and the request type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -432,7 +430,7 @@ where match rpc_event { RPCSend::Request(id, req) => self.send_request(id, req), RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response), - RPCSend::Shutdown(reason) => self.shutdown(Some(reason)), + RPCSend::Shutdown(id, reason) => self.shutdown(Some((id, reason))), } // In any case, we need the handler to process the event. if let Some(waker) = &self.waker { diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 2d43ecbf0..087f8e533 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -56,17 +56,6 @@ impl ToString for ErrorType { /* Requests */ -/// Identifier of a request. -/// -// NOTE: The handler stores the `RequestId` to inform back of responses and errors, but it's execution -// is independent of the contents on this type. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum RequestId { - Router, - Sync(usize), - Behaviour, -} - /// The STATUS request/response handshake message. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct StatusMessage { @@ -432,18 +421,3 @@ impl slog::KV for StatusMessage { slog::Result::Ok(()) } } - -impl slog::Value for RequestId { - fn serialize( - &self, - record: &slog::Record, - key: slog::Key, - serializer: &mut dyn slog::Serializer, - ) -> slog::Result { - match self { - RequestId::Behaviour => slog::Value::serialize("Behaviour", record, key, serializer), - RequestId::Router => slog::Value::serialize("Router", record, key, serializer), - RequestId::Sync(ref id) => slog::Value::serialize(id, record, key, serializer), - } - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index ba88f66bb..884acd9bc 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -5,13 +5,13 @@ //! syncing. use futures::future::FutureExt; -use handler::RPCHandler; -use libp2p::core::{connection::ConnectionId, ConnectedPoint}; +use handler::{HandlerEvent, RPCHandler}; +use libp2p::core::connection::ConnectionId; use libp2p::swarm::{ handler::ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, SubstreamProtocol, }; -use libp2p::{Multiaddr, PeerId}; +use libp2p::PeerId; use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr}; use slog::{crit, debug, o}; use std::marker::PhantomData; @@ -27,7 +27,7 @@ pub(crate) use protocol::{InboundRequest, RPCProtocol}; pub use handler::SubstreamId; pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, MaxRequestBlocks, - RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, + RPCResponseErrorCode, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; pub(crate) use outbound::OutboundRequest; pub use protocol::{max_rpc_size, Protocol, RPCError}; @@ -39,14 +39,18 @@ mod outbound; mod protocol; mod rate_limiter; +/// Composite trait for a request id. +pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} +impl ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {} + /// RPC events sent from Lighthouse. #[derive(Debug, Clone)] -pub enum RPCSend { +pub enum RPCSend { /// A request sent from Lighthouse. /// - /// The `RequestId` is given by the application making the request. These + /// The `Id` is given by the application making the request. These /// go over *outbound* connections. - Request(RequestId, OutboundRequest), + Request(Id, OutboundRequest), /// A response sent from Lighthouse. /// /// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the @@ -54,12 +58,12 @@ pub enum RPCSend { /// connections. Response(SubstreamId, RPCCodedResponse), /// Lighthouse has requested to terminate the connection with a goodbye message. - Shutdown(GoodbyeReason), + Shutdown(Id, GoodbyeReason), } /// RPC events received from outside Lighthouse. #[derive(Debug, Clone)] -pub enum RPCReceived { +pub enum RPCReceived { /// A request received from the outside. /// /// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the @@ -67,47 +71,47 @@ pub enum RPCReceived { Request(SubstreamId, InboundRequest), /// A response received from the outside. /// - /// The `RequestId` corresponds to the application given ID of the original request sent to the + /// The `Id` corresponds to the application given ID of the original request sent to the /// peer. The second parameter is a single chunk of a response. These go over *outbound* /// connections. - Response(RequestId, RPCResponse), + Response(Id, RPCResponse), /// Marks a request as completed - EndOfStream(RequestId, ResponseTermination), + EndOfStream(Id, ResponseTermination), } -impl std::fmt::Display for RPCSend { +impl std::fmt::Display for RPCSend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RPCSend::Request(id, req) => write!(f, "RPC Request(id: {:?}, {})", id, req), RPCSend::Response(id, res) => write!(f, "RPC Response(id: {:?}, {})", id, res), - RPCSend::Shutdown(reason) => write!(f, "Sending Goodbye: {}", reason), + RPCSend::Shutdown(_id, reason) => write!(f, "Sending Goodbye: {}", reason), } } } /// Messages sent to the user from the RPC protocol. -pub struct RPCMessage { +pub struct RPCMessage { /// The peer that sent the message. pub peer_id: PeerId, /// Handler managing this message. pub conn_id: ConnectionId, /// The message that was sent. - pub event: as ConnectionHandler>::OutEvent, + pub event: HandlerEvent, } /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. -pub struct RPC { +pub struct RPC { /// Rate limiter limiter: RateLimiter, /// Queue of events to be processed. - events: Vec, RPCHandler>>, + events: Vec, RPCHandler>>, fork_context: Arc, /// Slog logger for RPC behaviour. log: slog::Logger, } -impl RPC { +impl RPC { pub fn new(fork_context: Arc, log: slog::Logger) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); let limiter = RPCRateLimiterBuilder::new() @@ -150,12 +154,7 @@ impl RPC { /// Submits an RPC request. /// /// The peer must be connected for this to succeed. - pub fn send_request( - &mut self, - peer_id: PeerId, - request_id: RequestId, - event: OutboundRequest, - ) { + pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, event: OutboundRequest) { self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, @@ -165,21 +164,22 @@ impl RPC { /// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This /// gracefully terminates the RPC behaviour with a goodbye message. - pub fn shutdown(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn shutdown(&mut self, peer_id: PeerId, id: Id, reason: GoodbyeReason) { self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, - event: RPCSend::Shutdown(reason), + event: RPCSend::Shutdown(id, reason), }); } } -impl NetworkBehaviour for RPC +impl NetworkBehaviour for RPC where TSpec: EthSpec, + Id: ReqId, { - type ConnectionHandler = RPCHandler; - type OutEvent = RPCMessage; + type ConnectionHandler = RPCHandler; + type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ConnectionHandler { RPCHandler::new( @@ -196,33 +196,6 @@ where ) } - // handled by discovery - fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { - Vec::new() - } - - // Use connection established/closed instead of these currently - fn inject_connection_established( - &mut self, - peer_id: &PeerId, - _connection_id: &ConnectionId, - _endpoint: &ConnectedPoint, - _failed_addresses: Option<&Vec>, - other_established: usize, - ) { - if other_established == 0 { - // find the peer's meta-data - debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id); - let rpc_event = - RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData)); - self.events.push(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event: rpc_event, - }); - } - } - fn inject_event( &mut self, peer_id: PeerId, diff --git a/beacon_node/lighthouse_network/src/service.rs b/beacon_node/lighthouse_network/src/service.rs index 0f5607e54..bcd546fb0 100644 --- a/beacon_node/lighthouse_network/src/service.rs +++ b/beacon_node/lighthouse_network/src/service.rs @@ -4,9 +4,7 @@ use crate::behaviour::{ use crate::config::NetworkLoad; use crate::discovery::enr; use crate::multiaddr::Protocol; -use crate::rpc::{ - GoodbyeReason, MetaData, MetaDataV1, MetaDataV2, RPCResponseErrorCode, RequestId, -}; +use crate::rpc::{GoodbyeReason, MetaData, MetaDataV1, MetaDataV2, RPCResponseErrorCode, ReqId}; use crate::types::{error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipKind}; use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource}; @@ -42,9 +40,9 @@ pub const METADATA_FILENAME: &str = "metadata"; /// /// This is a subset of the events that a libp2p swarm emits. #[derive(Debug)] -pub enum Libp2pEvent { +pub enum Libp2pEvent { /// A behaviour event - Behaviour(BehaviourEvent), + Behaviour(BehaviourEvent), /// A new listening address has been established. NewListenAddr(Multiaddr), /// We reached zero listening addresses. @@ -52,9 +50,9 @@ pub enum Libp2pEvent { } /// The configuration and state of the libp2p components for the beacon node. -pub struct Service { +pub struct Service { /// The libp2p Swarm handler. - pub swarm: Swarm>, + pub swarm: Swarm>, /// The bandwidth logger for the underlying libp2p transport. pub bandwidth: Arc, /// This node's PeerId. @@ -71,7 +69,7 @@ pub struct Context<'a> { pub gossipsub_registry: Option<&'a mut Registry>, } -impl Service { +impl Service { pub async fn new( executor: task_executor::TaskExecutor, ctx: Context<'_>, @@ -260,7 +258,7 @@ impl Service { } /// Sends a request to a peer, with a given Id. - pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { + pub fn send_request(&mut self, peer_id: PeerId, request_id: AppReqId, request: Request) { self.swarm .behaviour_mut() .send_request(peer_id, request_id, request); @@ -307,7 +305,7 @@ impl Service { .send_successful_response(peer_id, id, response); } - pub async fn next_event(&mut self) -> Libp2pEvent { + pub async fn next_event(&mut self) -> Libp2pEvent { loop { match self.swarm.select_next_some().await { SwarmEvent::Behaviour(behaviour) => { diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index 5656cf078..e79fdf464 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -21,6 +21,8 @@ pub mod behaviour; pub mod swarm; type E = MinimalEthSpec; +type ReqId = usize; + use tempfile::Builder as TempBuilder; /// Returns a dummy fork context @@ -33,10 +35,10 @@ pub fn fork_context() -> ForkContext { ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) } -pub struct Libp2pInstance(LibP2PService, exit_future::Signal); +pub struct Libp2pInstance(LibP2PService, exit_future::Signal); impl std::ops::Deref for Libp2pInstance { - type Target = LibP2PService; + type Target = LibP2PService; fn deref(&self) -> &Self::Target { &self.0 } @@ -113,7 +115,7 @@ pub async fn build_libp2p_instance( } #[allow(dead_code)] -pub fn get_enr(node: &LibP2PService) -> Enr { +pub fn get_enr(node: &LibP2PService) -> Enr { node.swarm.behaviour().local_enr() } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index b270765f8..6f32e6526 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -88,15 +88,14 @@ fn test_status_rpc() { Libp2pEvent::Behaviour(BehaviourEvent::PeerConnectedOutgoing(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); - sender.swarm.behaviour_mut().send_request( - peer_id, - RequestId::Sync(10), - rpc_request.clone(), - ); + sender + .swarm + .behaviour_mut() + .send_request(peer_id, 10, rpc_request.clone()); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(10), + id: 10, response, }) => { // Should receive the RPC response @@ -186,7 +185,7 @@ fn test_blocks_by_range_chunked_rpc() { // keep count of the number of messages received let mut messages_received = 0; - let request_id = RequestId::Sync(messages_to_send as usize); + let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -313,7 +312,7 @@ fn test_blocks_by_range_over_limit() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); - let request_id = RequestId::Sync(messages_to_send as usize); + let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -413,7 +412,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { // keep count of the number of messages received let mut messages_received: u64 = 0; - let request_id = RequestId::Sync(messages_to_send as usize); + let request_id = messages_to_send as usize; // build the sender future let sender_future = async { loop { @@ -553,15 +552,14 @@ fn test_blocks_by_range_single_empty_rpc() { Libp2pEvent::Behaviour(BehaviourEvent::PeerConnectedOutgoing(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); - sender.swarm.behaviour_mut().send_request( - peer_id, - RequestId::Sync(10), - rpc_request.clone(), - ); + sender + .swarm + .behaviour_mut() + .send_request(peer_id, 10, rpc_request.clone()); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(10), + id: 10, response, }) => match response { Response::BlocksByRange(Some(_)) => { @@ -679,15 +677,14 @@ fn test_blocks_by_root_chunked_rpc() { Libp2pEvent::Behaviour(BehaviourEvent::PeerConnectedOutgoing(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); - sender.swarm.behaviour_mut().send_request( - peer_id, - RequestId::Sync(6), - rpc_request.clone(), - ); + sender + .swarm + .behaviour_mut() + .send_request(peer_id, 6, rpc_request.clone()); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(6), + id: 6, response, }) => match response { Response::BlocksByRoot(Some(_)) => { @@ -814,15 +811,14 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { Libp2pEvent::Behaviour(BehaviourEvent::PeerConnectedOutgoing(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); - sender.swarm.behaviour_mut().send_request( - peer_id, - RequestId::Sync(10), - rpc_request.clone(), - ); + sender + .swarm + .behaviour_mut() + .send_request(peer_id, 10, rpc_request.clone()); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(10), + id: 10, response, }) => { debug!(log, "Sender received a response"); diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 8d639c5ee..03b877506 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -8,12 +8,11 @@ mod processor; use crate::error; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, RequestId}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use futures::prelude::*; use lighthouse_network::{ - rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, - Response, + MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; use processor::Processor; use slog::{debug, o, trace}; diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 04589d140..b8db9c17f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -1,7 +1,8 @@ use crate::beacon_processor::{ BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, }; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, RequestId}; +use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use lighthouse_network::rpc::*; @@ -100,8 +101,11 @@ impl Processor { /// this function notifies the sync manager of the error. pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { // Check if the failed RPC belongs to sync - if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RPCError(peer_id, id)); + if let RequestId::Sync(request_id) = request_id { + self.send_to_sync(SyncMessage::RpcError { + peer_id, + request_id, + }); } } @@ -176,24 +180,28 @@ impl Processor { request_id: RequestId, beacon_block: Option>>, ) { + let request_id = match request_id { + RequestId::Sync(sync_id) => match sync_id { + SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { + unreachable!("Block lookups do not request BBRange requests") + } + id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id, + }, + RequestId::Router => unreachable!("All BBRange requests belong to sync"), + }; + trace!( self.log, "Received BlocksByRange Response"; "peer" => %peer_id, ); - if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::BlocksByRangeResponse { - peer_id, - request_id: id, - beacon_block, - }); - } else { - debug!( - self.log, - "All blocks by range responses should belong to sync" - ); - } + self.send_to_sync(SyncMessage::RpcBlock { + peer_id, + request_id, + beacon_block, + seen_timestamp: timestamp_now(), + }); } /// Handle a `BlocksByRoot` response from the peer. @@ -203,25 +211,27 @@ impl Processor { request_id: RequestId, beacon_block: Option>>, ) { + let request_id = match request_id { + RequestId::Sync(sync_id) => match sync_id { + id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, + SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { + unreachable!("Batch syncing do not request BBRoot requests") + } + }, + RequestId::Router => unreachable!("All BBRoot requests belong to sync"), + }; + trace!( self.log, "Received BlocksByRoot Response"; "peer" => %peer_id, ); - - if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::BlocksByRootResponse { - peer_id, - request_id: id, - beacon_block, - seen_timestamp: timestamp_now(), - }); - } else { - debug!( - self.log, - "All Blocks by Root responses should belong to sync" - ) - } + self.send_to_sync(SyncMessage::RpcBlock { + peer_id, + request_id, + beacon_block, + seen_timestamp: timestamp_now(), + }); } /// Process a gossip message declaring a new block. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index a16c2c677..a8995de2e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,3 +1,4 @@ +use super::sync::manager::RequestId as SyncId; use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::router::{Router, RouterMessage}; use crate::subnet_service::SyncCommitteeService; @@ -14,7 +15,7 @@ use lighthouse_network::{ prometheus_client::registry::Registry, MessageAcceptance, Service as LibP2PService, }; use lighthouse_network::{ - rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, + rpc::{GoodbyeReason, RPCResponseErrorCode}, Context, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet, }; @@ -42,6 +43,13 @@ const SUBSCRIBE_DELAY_SLOTS: u64 = 2; /// Delay after a fork where we unsubscribe from pre-fork topics. const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2; +/// Application level requests sent to the network. +#[derive(Debug, Clone, Copy)] +pub enum RequestId { + Sync(SyncId), + Router, +} + /// Types of messages that the network service can receive. #[derive(Debug)] pub enum NetworkMessage { @@ -112,7 +120,7 @@ pub struct NetworkService { /// A reference to the underlying beacon chain. beacon_chain: Arc>, /// The underlying libp2p service that drives all the network interactions. - libp2p: LibP2PService, + libp2p: LibP2PService, /// An attestation and subnet manager service. attestation_service: AttestationService, /// A sync committeee subnet manager service. @@ -389,7 +397,7 @@ impl NetworkService { /// Handle an event received from the network. async fn on_libp2p_event( &mut self, - ev: Libp2pEvent, + ev: Libp2pEvent, shutdown_sender: &mut Sender, ) { match ev { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 0c34eef27..2c700e9fa 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,9 +8,8 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. -use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; -use crate::sync::manager::BatchProcessResult; +use crate::sync::manager::{BatchProcessResult, Id}; use crate::sync::network_context::SyncNetworkContext; use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchState}; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -357,7 +356,7 @@ impl BackFillSync { network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, - request_id: RequestId, + request_id: Id, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { // A batch could be retried without the peer failing the request (disconnecting/ @@ -392,7 +391,7 @@ impl BackFillSync { network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, - request_id: RequestId, + request_id: Id, beacon_block: Option>, ) -> Result { // check if we have this batch diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 960dd12af..0c84087f9 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -37,7 +37,6 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; @@ -52,6 +51,7 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use ssz_types::VariableList; use std::boxed::Box; +use std::collections::hash_map::Entry; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; @@ -73,23 +73,31 @@ const PARENT_FAIL_TOLERANCE: usize = 5; /// is further back than the most recent head slot. const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; +pub type Id = u32; + +/// Id of rpc requests sent by sync to the network. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum RequestId { + /// Request searching for a block given a hash. + SingleBlock { id: Id }, + /// Request searching for a block's parent. The id is the chain + ParentLookup { id: Id }, + /// Request was from the backfill sync algorithm. + BackFillSync { id: Id }, + /// The request was from a chain in the range sync algorithm. + RangeSync { id: Id }, +} + #[derive(Debug)] /// A message than can be sent to the sync manager thread. pub enum SyncMessage { /// A useful peer has been discovered. AddPeer(PeerId, SyncInfo), - /// A [`BlocksByRange`] response has been received. - BlocksByRangeResponse { - peer_id: PeerId, + /// A block has been received from the RPC. + RpcBlock { request_id: RequestId, - beacon_block: Option>>, - }, - - /// A [`BlocksByRoot`] response has been received. - BlocksByRootResponse { peer_id: PeerId, - request_id: RequestId, beacon_block: Option>>, seen_timestamp: Duration, }, @@ -105,7 +113,10 @@ pub enum SyncMessage { Disconnect(PeerId), /// An RPC Error has occurred on a request. - RPCError(PeerId, RequestId), + RpcError { + peer_id: PeerId, + request_id: RequestId, + }, /// A batch has been processed by the block processor thread. BatchProcessed { @@ -157,7 +168,7 @@ struct ParentRequests { last_submitted_peer: PeerId, /// The request ID of this lookup is in progress. - pending: Option, + pending: Option, } /// The primary object for handling and driving all the current syncing logic. It maintains the @@ -193,7 +204,7 @@ pub struct SyncManager { /// received or not. /// /// The flag allows us to determine if the peer returned data or sent us nothing. - single_block_lookups: FnvHashMap, + single_block_lookups: FnvHashMap, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, @@ -313,46 +324,31 @@ impl SyncManager { /// There are two reasons we could have received a BlocksByRoot response /// - We requested a single hash and have received a response for the single_block_lookup /// - We are looking up parent blocks in parent lookup search - async fn blocks_by_root_response( + async fn parent_lookup_response( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: Id, block: Option>, - seen_timestamp: Duration, + _seen_timestamp: Duration, ) { + let mut parent_request = if let Some(pos) = self + .parent_queue + .iter() + .position(|request| request.pending == Some(request_id)) + { + // we remove from the queue and process it. It will get re-added if required + self.parent_queue.remove(pos) + } else { + if block.is_some() { + debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); + } + return; + }; + match block { Some(block) => { // data was returned, not just a stream termination - // check if this is a single block lookup - i.e we were searching for a specific hash - let mut single_block_hash = None; - if let Some(block_request) = self.single_block_lookups.get_mut(&request_id) { - // update the state of the lookup indicating a block was received from the peer - block_request.block_returned = true; - single_block_hash = Some(block_request.hash); - } - if let Some(block_hash) = single_block_hash { - self.single_block_lookup_response(peer_id, block, block_hash, seen_timestamp) - .await; - return; - } - - // This wasn't a single block lookup request, it must be a response to a parent request search - // find the request - let mut parent_request = match self - .parent_queue - .iter() - .position(|request| request.pending == Some(request_id)) - { - // we remove from the queue and process it. It will get re-added if required - Some(pos) => self.parent_queue.remove(pos), - None => { - // No pending request, invalid request_id or coding error - warn!(self.log, "BlocksByRoot response unknown"; "request_id" => request_id); - return; - } - }; - // check if the parent of this block isn't in our failed cache. If it is, this // chain should be dropped and the peer downscored. if self.failed_chains.contains(&block.message().parent_root()) { @@ -382,38 +378,6 @@ impl SyncManager { self.process_parent_request(parent_request).await; } None => { - // this is a stream termination - - // stream termination for a single block lookup, remove the key - if let Some(single_block_request) = self.single_block_lookups.remove(&request_id) { - // The peer didn't respond with a block that it referenced. - // This can be allowed as some clients may implement pruning. We mildly - // tolerate this behaviour. - if !single_block_request.block_returned { - warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => %single_block_request.hash, "peer_id" => %peer_id); - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "bbroot_no_block", - ); - } - return; - } - - // This wasn't a single block lookup request, it must be a response to a parent request search - // find the request and remove it - let mut parent_request = match self - .parent_queue - .iter() - .position(|request| request.pending == Some(request_id)) - { - Some(pos) => self.parent_queue.remove(pos), - None => { - // No pending request, the parent request has been processed and this is - // the resulting stream termination. - return; - } - }; // An empty response has been returned to a parent request // if an empty response is given, the peer didn't have the requested block, try again parent_request.failed_attempts += 1; @@ -458,71 +422,95 @@ impl SyncManager { /// lookup search is started. async fn single_block_lookup_response( &mut self, + request_id: Id, peer_id: PeerId, - block: SignedBeaconBlock, - expected_block_hash: Hash256, + block: Option>, seen_timestamp: Duration, ) { - // verify the hash is correct and try and process the block - if expected_block_hash != block.canonical_root() { - // The peer that sent this, sent us the wrong block. - // We do not tolerate this behaviour. The peer is instantly disconnected and banned. - warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => %peer_id); - self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); - return; - } - - let block_result = match self.process_block_async(block.clone()).await { - Some(block_result) => block_result, - None => return, - }; - - // we have the correct block, try and process it - match block_result { - Ok(block_root) => { - // Block has been processed, so write the block time to the cache. - self.chain.block_times_cache.write().set_time_observed( - block_root, - block.slot(), - seen_timestamp, - None, - None, - ); - info!(self.log, "Processed block"; "block" => %block_root); - - match self.chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "location" => "single block" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => ?e, - "location" => "single block" - ), + if let Entry::Occupied(mut entry) = self.single_block_lookups.entry(request_id) { + match block { + None => { + // Stream termination. Remove the lookup + let (_, single_block_request) = entry.remove_entry(); + // The peer didn't respond with a block that it referenced. + // This can be allowed as some clients may implement pruning. We mildly + // tolerate this behaviour. + if !single_block_request.block_returned { + warn!(self.log, "Peer didn't respond with a block it referenced"; + "referenced_block_hash" => %single_block_request.hash, "peer_id" => %peer_id); + self.network.report_peer( + peer_id, + PeerAction::MidToleranceError, + "bbroot_no_block", + ); + } + } + Some(block) => { + // update the state of the lookup indicating a block was received from the peer + entry.get_mut().block_returned = true; + // verify the hash is correct and try and process the block + if entry.get().hash != block.canonical_root() { + // The peer that sent this, sent us the wrong block. + // We do not tolerate this behaviour. The peer is instantly disconnected and banned. + warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => %peer_id); + self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); + return; + } + + let block_result = match self.process_block_async(block.clone()).await { + Some(block_result) => block_result, + None => return, + }; + + // we have the correct block, try and process it + match block_result { + Ok(block_root) => { + // Block has been processed, so write the block time to the cache. + self.chain.block_times_cache.write().set_time_observed( + block_root, + block.slot(), + seen_timestamp, + None, + None, + ); + info!(self.log, "Processed block"; "block" => %block_root); + + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "single block" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => ?e, + "location" => "single block" + ), + } + } + Err(BlockError::ParentUnknown { .. }) => { + // We don't know of the blocks parent, begin a parent lookup search + self.add_unknown_block(peer_id, block); + } + Err(BlockError::BlockIsAlreadyKnown) => { + trace!(self.log, "Single block lookup already known"); + } + Err(BlockError::BeaconChainError(e)) => { + warn!(self.log, "Unexpected block processing error"; "error" => ?e); + } + outcome => { + warn!(self.log, "Single block lookup failed"; "outcome" => ?outcome); + // This could be a range of errors. But we couldn't process the block. + // For now we consider this a mid tolerance error. + self.network.report_peer( + peer_id, + PeerAction::MidToleranceError, + "single_block_lookup_failed", + ); + } + } } - } - Err(BlockError::ParentUnknown { .. }) => { - // We don't know of the blocks parent, begin a parent lookup search - self.add_unknown_block(peer_id, block); - } - Err(BlockError::BlockIsAlreadyKnown) => { - trace!(self.log, "Single block lookup already known"); - } - Err(BlockError::BeaconChainError(e)) => { - warn!(self.log, "Unexpected block processing error"; "error" => ?e); - } - outcome => { - warn!(self.log, "Single block lookup failed"; "outcome" => ?outcome); - // This could be a range of errors. But we couldn't process the block. - // For now we consider this a mid tolerance error. - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "single_block_lookup_failed", - ); } } } @@ -612,7 +600,7 @@ impl SyncManager { block_roots: VariableList::from(vec![block_hash]), }; - if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { + if let Ok(request_id) = self.network.single_block_lookup_request(peer_id, request) { self.single_block_lookups .insert(request_id, SingleBlockRequest::new(block_hash)); } @@ -621,27 +609,47 @@ impl SyncManager { /// Handles RPC errors related to requests that were emitted from the sync manager. fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) { trace!(self.log, "Sync manager received a failed RPC"); - // remove any single block lookups - if self.single_block_lookups.remove(&request_id).is_some() { - // this was a single block request lookup, look no further - return; + match request_id { + RequestId::SingleBlock { id } => { + self.single_block_lookups.remove(&id); + } + RequestId::ParentLookup { id } => { + if let Some(pos) = self + .parent_queue + .iter() + .position(|request| request.pending == Some(id)) + { + // increment the failure of a parent lookup if the request matches a parent search + let mut parent_request = self.parent_queue.remove(pos); + parent_request.failed_attempts += 1; + parent_request.last_submitted_peer = peer_id; + self.request_parent(parent_request); + } + } + RequestId::BackFillSync { id } => { + if let Some(batch_id) = self.network.backfill_sync_response(id, true) { + match self + .backfill_sync + .inject_error(&mut self.network, batch_id, &peer_id, id) + { + Ok(_) => {} + Err(_) => self.update_sync_state(), + } + } + } + RequestId::RangeSync { id } => { + if let Some((chain_id, batch_id)) = self.network.range_sync_response(id, true) { + self.range_sync.inject_error( + &mut self.network, + peer_id, + batch_id, + chain_id, + id, + ); + self.update_sync_state() + } + } } - - // increment the failure of a parent lookup if the request matches a parent search - if let Some(pos) = self - .parent_queue - .iter() - .position(|request| request.pending == Some(request_id)) - { - let mut parent_request = self.parent_queue.remove(pos); - parent_request.failed_attempts += 1; - parent_request.last_submitted_peer = peer_id; - self.request_parent(parent_request); - return; - } - - // Otherwise this error matches no known request. - trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } fn peer_disconnect(&mut self, peer_id: &PeerId) { @@ -978,7 +986,7 @@ impl SyncManager { // guaranteed to have this chain of blocks. let peer_id = parent_request.last_submitted_peer; - if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { + if let Ok(request_id) = self.network.parent_lookup_request(peer_id, request) { // if the request was successful add the queue back into self parent_request.pending = Some(request_id); self.parent_queue.push(parent_request); @@ -994,59 +1002,15 @@ impl SyncManager { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); } - SyncMessage::BlocksByRangeResponse { - peer_id, + SyncMessage::RpcBlock { request_id, - beacon_block, - } => { - let beacon_block = beacon_block.map(|b| *b); - // Obtain which sync requested these blocks and divert accordingly. - match self - .network - .blocks_by_range_response(request_id, beacon_block.is_none()) - { - Some(SyncRequestType::RangeSync(batch_id, chain_id)) => { - self.range_sync.blocks_by_range_response( - &mut self.network, - peer_id, - chain_id, - batch_id, - request_id, - beacon_block, - ); - self.update_sync_state(); - } - Some(SyncRequestType::BackFillSync(batch_id)) => { - match self.backfill_sync.on_block_response( - &mut self.network, - batch_id, - &peer_id, - request_id, - beacon_block, - ) { - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Ok(ProcessResult::Successful) => {} - Err(_error) => { - // The backfill sync has failed, errors are reported - // within. - self.update_sync_state(); - } - } - } - None => { - trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id) - } - } - } - SyncMessage::BlocksByRootResponse { peer_id, - request_id, beacon_block, seen_timestamp, } => { - self.blocks_by_root_response( - peer_id, + self.rpc_block_received( request_id, + peer_id, beacon_block.map(|b| *b), seen_timestamp, ) @@ -1061,38 +1025,10 @@ impl SyncManager { SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); } - SyncMessage::RPCError(peer_id, request_id) => { - // Redirect to a sync mechanism if the error is related to one of their - // requests. - match self.network.blocks_by_range_response(request_id, true) { - Some(SyncRequestType::RangeSync(batch_id, chain_id)) => { - self.range_sync.inject_error( - &mut self.network, - peer_id, - batch_id, - chain_id, - request_id, - ); - self.update_sync_state(); - } - Some(SyncRequestType::BackFillSync(batch_id)) => { - match self.backfill_sync.inject_error( - &mut self.network, - batch_id, - &peer_id, - request_id, - ) { - Ok(_) => {} - Err(_) => self.update_sync_state(), - } - } - None => { - // This is a request not belonging to a sync algorithm. - // Process internally. - self.inject_error(peer_id, request_id); - } - } - } + SyncMessage::RpcError { + peer_id, + request_id, + } => self.inject_error(peer_id, request_id), SyncMessage::BatchProcessed { sync_type, result } => match sync_type { SyncRequestType::RangeSync(epoch, chain_id) => { self.range_sync.handle_block_process_result( @@ -1136,4 +1072,60 @@ impl SyncManager { } } } + + async fn rpc_block_received( + &mut self, + request_id: RequestId, + peer_id: PeerId, + beacon_block: Option>, + seen_timestamp: Duration, + ) { + match request_id { + RequestId::SingleBlock { id } => { + self.single_block_lookup_response(id, peer_id, beacon_block, seen_timestamp) + .await; + } + RequestId::ParentLookup { id } => { + self.parent_lookup_response(peer_id, id, beacon_block, seen_timestamp) + .await + } + RequestId::BackFillSync { id } => { + if let Some(batch_id) = self + .network + .backfill_sync_response(id, beacon_block.is_none()) + { + match self.backfill_sync.on_block_response( + &mut self.network, + batch_id, + &peer_id, + id, + beacon_block, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_error) => { + // The backfill sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + } + RequestId::RangeSync { id } => { + if let Some((chain_id, batch_id)) = + self.network.range_sync_response(id, beacon_block.is_none()) + { + self.range_sync.blocks_by_range_response( + &mut self.network, + peer_id, + chain_id, + batch_id, + id, + beacon_block, + ); + self.update_sync_state(); + } + } + } + } } diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 1755b13e3..169a41d71 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -9,6 +9,3 @@ mod range_sync; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::ChainId; - -/// Type of id of rpc requests sent by sync -pub type RequestId = usize; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 9415f2100..96bdc533f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,15 +1,12 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use super::manager::SyncRequestType; +use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ChainId}; -use super::RequestId as SyncRequestId; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use fnv::FnvHashMap; -use lighthouse_network::rpc::{ - BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId, -}; +use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; use std::sync::Arc; @@ -26,10 +23,12 @@ pub struct SyncNetworkContext { network_globals: Arc>, /// A sequential ID for all RPC requests. - request_id: SyncRequestId, + request_id: Id, - /// BlocksByRange requests made by syncing algorithms. - range_requests: FnvHashMap, + /// BlocksByRange requests made by the range syncing algorithm. + range_requests: FnvHashMap, + + backfill_requests: FnvHashMap, /// Logger for the `SyncNetworkContext`. log: slog::Logger, @@ -46,6 +45,7 @@ impl SyncNetworkContext { network_globals, request_id: 1, range_requests: FnvHashMap::default(), + backfill_requests: FnvHashMap::default(), log, } } @@ -78,7 +78,13 @@ impl SyncNetworkContext { "head_slot" => %status_message.head_slot, ); - let _ = self.send_rpc_request(peer_id, Request::Status(status_message.clone())); + let request = Request::Status(status_message.clone()); + let request_id = RequestId::Router; + let _ = self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + }); } } } @@ -90,7 +96,7 @@ impl SyncNetworkContext { request: BlocksByRangeRequest, chain_id: ChainId, batch_id: BatchId, - ) -> Result { + ) -> Result { trace!( self.log, "Sending BlocksByRange Request"; @@ -98,10 +104,16 @@ impl SyncNetworkContext { "count" => request.count, "peer" => %peer_id, ); - let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?; - self.range_requests - .insert(req_id, SyncRequestType::RangeSync(batch_id, chain_id)); - Ok(req_id) + let request = Request::BlocksByRange(request); + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::RangeSync { id }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + })?; + self.range_requests.insert(id, (chain_id, batch_id)); + Ok(id) } /// A blocks by range request sent by the backfill sync algorithm @@ -110,7 +122,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlocksByRangeRequest, batch_id: BatchId, - ) -> Result { + ) -> Result { trace!( self.log, "Sending backfill BlocksByRange Request"; @@ -118,21 +130,24 @@ impl SyncNetworkContext { "count" => request.count, "peer" => %peer_id, ); - let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?; - self.range_requests - .insert(req_id, SyncRequestType::BackFillSync(batch_id)); - Ok(req_id) + let request = Request::BlocksByRange(request); + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + })?; + self.backfill_requests.insert(id, batch_id); + Ok(id) } /// Received a blocks by range response. - pub fn blocks_by_range_response( + pub fn range_sync_response( &mut self, - request_id: usize, + request_id: Id, remove: bool, - ) -> Option { - // NOTE: we can't guarantee that the request must be registered as it could receive more - // than an error, and be removed after receiving the first one. - // FIXME: https://github.com/sigp/lighthouse/issues/1634 + ) -> Option<(ChainId, BatchId)> { if remove { self.range_requests.remove(&request_id) } else { @@ -140,12 +155,21 @@ impl SyncNetworkContext { } } - /// Sends a blocks by root request. - pub fn blocks_by_root_request( + /// Received a blocks by range response. + pub fn backfill_sync_response(&mut self, request_id: Id, remove: bool) -> Option { + if remove { + self.backfill_requests.remove(&request_id) + } else { + self.backfill_requests.get(&request_id).cloned() + } + } + + /// Sends a blocks by root request for a single block lookup. + pub fn single_block_lookup_request( &mut self, peer_id: PeerId, request: BlocksByRootRequest, - ) -> Result { + ) -> Result { trace!( self.log, "Sending BlocksByRoot Request"; @@ -153,7 +177,39 @@ impl SyncNetworkContext { "count" => request.block_roots.len(), "peer" => %peer_id ); - self.send_rpc_request(peer_id, Request::BlocksByRoot(request)) + let request = Request::BlocksByRoot(request); + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + })?; + Ok(id) + } + + /// Sends a blocks by root request for a parent request. + pub fn parent_lookup_request( + &mut self, + peer_id: PeerId, + request: BlocksByRootRequest, + ) -> Result { + trace!( + self.log, + "Sending BlocksByRoot Request"; + "method" => "BlocksByRoot", + "count" => request.block_roots.len(), + "peer" => %peer_id + ); + let request = Request::BlocksByRoot(request); + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + })?; + Ok(id) } /// Terminates the connection with the peer and bans them. @@ -184,22 +240,6 @@ impl SyncNetworkContext { }); } - /// Sends an RPC request. - fn send_rpc_request( - &mut self, - peer_id: PeerId, - request: Request, - ) -> Result { - let request_id = self.request_id; - self.request_id += 1; - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request_id: RequestId::Sync(request_id), - request, - })?; - Ok(request_id) - } - /// Subscribes to core topics. pub fn subscribe_core_topics(&mut self) { self.network_send @@ -216,4 +256,10 @@ impl SyncNetworkContext { "Network channel send Failed" }) } + + fn next_id(&mut self) -> Id { + let id = self.request_id; + self.request_id += 1; + id + } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 7239081ad..614bf57dd 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,4 +1,4 @@ -use crate::sync::RequestId; +use crate::sync::manager::Id; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::PeerId; use std::collections::HashSet; @@ -93,7 +93,7 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>, RequestId), + Downloading(PeerId, Vec>, Id), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. @@ -167,7 +167,7 @@ impl BatchInfo { } /// Verifies if an incomming block belongs to this batch. - pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &RequestId) -> bool { + pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool { if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { return peer_id == expected_peer && expected_id == request_id; } @@ -312,7 +312,7 @@ impl BatchInfo { pub fn start_downloading_from_peer( &mut self, peer: PeerId, - request_id: RequestId, + request_id: Id, ) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 4474f1cc3..73f4ecbe0 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,7 +1,7 @@ use super::batch::{BatchInfo, BatchState}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult, RequestId}; +use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; @@ -214,7 +214,7 @@ impl SyncingChain { network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, - request_id: RequestId, + request_id: Id, beacon_block: Option>, ) -> ProcessingResult { // check if we have this batch @@ -807,7 +807,7 @@ impl SyncingChain { network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, - request_id: RequestId, + request_id: Id, ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { // A batch could be retried without the peer failing the request (disconnecting/ diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index ffe74ea98..185fc204a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -45,8 +45,9 @@ use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::status::ToStatusMessage; +use crate::sync::manager::Id; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::{BatchProcessResult, RequestId}; +use crate::sync::BatchProcessResult; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; @@ -201,7 +202,7 @@ where peer_id: PeerId, chain_id: ChainId, batch_id: BatchId, - request_id: RequestId, + request_id: Id, beacon_block: Option>, ) { // check if this chunk removes the chain @@ -300,7 +301,7 @@ where peer_id: PeerId, batch_id: BatchId, chain_id: ChainId, - request_id: RequestId, + request_id: Id, ) { // check that this request is pending match self.chains.call_by_id(chain_id, |chain| { @@ -364,6 +365,7 @@ where #[cfg(test)] mod tests { + use crate::service::RequestId; use crate::NetworkMessage; use super::*; @@ -494,10 +496,7 @@ mod tests { } /// Reads an BlocksByRange request to a given peer from the network receiver channel. - fn grab_request( - &mut self, - expected_peer: &PeerId, - ) -> (lighthouse_network::rpc::RequestId, BlocksByRangeRequest) { + fn grab_request(&mut self, expected_peer: &PeerId) -> (RequestId, BlocksByRangeRequest) { if let Some(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRange(request),