From ac0eb39ceda492463dae7e968926789da05a4741 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 15 Mar 2023 01:27:46 +0000 Subject: [PATCH 1/7] Complete match for `has_context_bytes` (#3972) ## Issue Addressed - Add a complete match for `Protocol` here. - The incomplete match was causing us not to append context bytes to the light client protocols - This is the relevant part of the spec and it looks like context bytes are defined https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap Disclaimer: I have no idea if people are using it but it shouldn't have been working so not sure why it wasn't caught Co-authored-by: realbigsean --- .../lighthouse_network/src/rpc/protocol.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 8d7b22029..a8423e47b 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -344,13 +344,16 @@ impl ProtocolId { /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the /// beginning of the stream, else returns `false`. pub fn has_context_bytes(&self) -> bool { - if self.version == Version::V2 { - match self.message_name { - Protocol::BlocksByRange | Protocol::BlocksByRoot => return true, - _ => return false, - } + match self.message_name { + Protocol::BlocksByRange | Protocol::BlocksByRoot => match self.version { + Version::V2 => true, + Version::V1 => false, + }, + Protocol::LightClientBootstrap => match self.version { + Version::V2 | Version::V1 => true, + }, + Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false, } - false } } From 1ec30416738e10e110e0731e9800e0c69dade5df Mon Sep 17 00:00:00 2001 From: Daniel Ramirez Chiquillo Date: Wed, 15 Mar 2023 01:27:47 +0000 Subject: [PATCH 2/7] Remove Router/Processor Code (#4002) ## Issue Addressed #3938 ## Proposed Changes - `network::Processor` is deleted and all it's logic is moved to `network::Router`. - The `network::Router` module is moved to a single file. - The following functions are deleted: `on_disconnect` `send_status` `on_status_response` `on_blocks_by_root_request` `on_lightclient_bootstrap` `on_blocks_by_range_request` `on_block_gossip` `on_unaggregated_attestation_gossip` `on_aggregated_attestation_gossip` `on_voluntary_exit_gossip` `on_proposer_slashing_gossip` `on_attester_slashing_gossip` `on_sync_committee_signature_gossip` `on_sync_committee_contribution_gossip` `on_light_client_finality_update_gossip` `on_light_client_optimistic_update_gossip`. This deletions are possible because the updated `Router` allows the underlying methods to be called directly. --- beacon_node/network/src/router.rs | 535 ++++++++++++++++++++ beacon_node/network/src/router/mod.rs | 321 ------------ beacon_node/network/src/router/processor.rs | 472 ----------------- beacon_node/network/src/service.rs | 2 +- 4 files changed, 536 insertions(+), 794 deletions(-) create mode 100644 beacon_node/network/src/router.rs delete mode 100644 beacon_node/network/src/router/mod.rs delete mode 100644 beacon_node/network/src/router/processor.rs diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs new file mode 100644 index 000000000..7f75a27fe --- /dev/null +++ b/beacon_node/network/src/router.rs @@ -0,0 +1,535 @@ +//! This module handles incoming network messages. +//! +//! It routes the messages to appropriate services. +//! It handles requests at the application layer in its associated processor and directs +//! syncing-related responses to the Sync manager. +#![allow(clippy::unit_arg)] + +use crate::beacon_processor::{ + BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, +}; +use crate::error; +use crate::service::{NetworkMessage, RequestId}; +use crate::status::status_message; +use crate::sync::manager::RequestId as SyncId; +use crate::sync::SyncMessage; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use futures::prelude::*; +use lighthouse_network::rpc::*; +use lighthouse_network::{ + MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, +}; +use slog::{debug, o, trace}; +use slog::{error, warn}; +use std::cmp; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use types::{EthSpec, SignedBeaconBlock}; + +/// Handles messages from the network and routes them to the appropriate service to be handled. +pub struct Router { + /// Access to the peer db and network information. + network_globals: Arc>, + /// A reference to the underlying beacon chain. + chain: Arc>, + /// A channel to the syncing thread. + sync_send: mpsc::UnboundedSender>, + /// A network context to return and handle RPC requests. + network: HandlerNetworkContext, + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. + beacon_processor_send: mpsc::Sender>, + /// The `Router` logger. + log: slog::Logger, +} + +/// Types of messages the router can receive. +#[derive(Debug)] +pub enum RouterMessage { + /// Peer has disconnected. + PeerDisconnected(PeerId), + /// An RPC request has been received. + RPCRequestReceived { + peer_id: PeerId, + id: PeerRequestId, + request: Request, + }, + /// An RPC response has been received. + RPCResponseReceived { + peer_id: PeerId, + request_id: RequestId, + response: Response, + }, + /// An RPC request failed + RPCFailed { + peer_id: PeerId, + request_id: RequestId, + }, + /// A gossip message has been received. The fields are: message id, the peer that sent us this + /// message, the message itself and a bool which indicates if the message should be processed + /// by the beacon chain after successful verification. + PubsubMessage(MessageId, PeerId, PubsubMessage, bool), + /// The peer manager has requested we re-status a peer. + StatusPeer(PeerId), +} + +impl Router { + /// Initializes and runs the Router. + pub fn spawn( + beacon_chain: Arc>, + network_globals: Arc>, + network_send: mpsc::UnboundedSender>, + executor: task_executor::TaskExecutor, + log: slog::Logger, + ) -> error::Result>> { + let message_handler_log = log.new(o!("service"=> "router")); + trace!(message_handler_log, "Service starting"); + + let (handler_send, handler_recv) = mpsc::unbounded_channel(); + + let (beacon_processor_send, beacon_processor_receive) = + mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); + + let sync_logger = log.new(o!("service"=> "sync")); + + // spawn the sync thread + let sync_send = crate::sync::manager::spawn( + executor.clone(), + beacon_chain.clone(), + network_globals.clone(), + network_send.clone(), + beacon_processor_send.clone(), + sync_logger, + ); + + BeaconProcessor { + beacon_chain: Arc::downgrade(&beacon_chain), + network_tx: network_send.clone(), + sync_tx: sync_send.clone(), + network_globals: network_globals.clone(), + executor: executor.clone(), + max_workers: cmp::max(1, num_cpus::get()), + current_workers: 0, + importing_blocks: Default::default(), + log: log.clone(), + } + .spawn_manager(beacon_processor_receive, None); + + // generate the Message handler + let mut handler = Router { + network_globals, + chain: beacon_chain, + sync_send, + network: HandlerNetworkContext::new(network_send, log.clone()), + beacon_processor_send, + log: message_handler_log, + }; + + // spawn handler task and move the message handler instance into the spawned thread + executor.spawn( + async move { + debug!(log, "Network message router started"); + UnboundedReceiverStream::new(handler_recv) + .for_each(move |msg| future::ready(handler.handle_message(msg))) + .await; + }, + "router", + ); + + Ok(handler_send) + } + + /// Handle all messages incoming from the network service. + fn handle_message(&mut self, message: RouterMessage) { + match message { + // we have initiated a connection to a peer or the peer manager has requested a + // re-status + RouterMessage::StatusPeer(peer_id) => { + self.send_status(peer_id); + } + // A peer has disconnected + RouterMessage::PeerDisconnected(peer_id) => { + self.send_to_sync(SyncMessage::Disconnect(peer_id)); + } + RouterMessage::RPCRequestReceived { + peer_id, + id, + request, + } => { + self.handle_rpc_request(peer_id, id, request); + } + RouterMessage::RPCResponseReceived { + peer_id, + request_id, + response, + } => { + self.handle_rpc_response(peer_id, request_id, response); + } + RouterMessage::RPCFailed { + peer_id, + request_id, + } => { + self.on_rpc_error(peer_id, request_id); + } + RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { + self.handle_gossip(id, peer_id, gossip, should_process); + } + } + } + + /* RPC - Related functionality */ + + /// A new RPC request has been received from the network. + fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: PeerRequestId, request: Request) { + if !self.network_globals.peers.read().is_connected(&peer_id) { + debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request); + return; + } + match request { + Request::Status(status_message) => { + self.on_status_request(peer_id, request_id, status_message) + } + Request::BlocksByRange(request) => self.send_beacon_processor_work( + BeaconWorkEvent::blocks_by_range_request(peer_id, request_id, request), + ), + Request::BlocksByRoot(request) => self.send_beacon_processor_work( + BeaconWorkEvent::blocks_by_roots_request(peer_id, request_id, request), + ), + Request::LightClientBootstrap(request) => self.send_beacon_processor_work( + BeaconWorkEvent::lightclient_bootstrap_request(peer_id, request_id, request), + ), + } + } + + /// An RPC response has been received from the network. + fn handle_rpc_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + response: Response, + ) { + match response { + Response::Status(status_message) => { + debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status_message); + self.send_beacon_processor_work(BeaconWorkEvent::status_message( + peer_id, + status_message, + )) + } + Response::BlocksByRange(beacon_block) => { + self.on_blocks_by_range_response(peer_id, request_id, beacon_block); + } + Response::BlocksByRoot(beacon_block) => { + self.on_blocks_by_root_response(peer_id, request_id, beacon_block); + } + Response::LightClientBootstrap(_) => unreachable!(), + } + } + + /// Handle RPC messages. + /// Note: `should_process` is currently only useful for the `Attestation` variant. + /// if `should_process` is `false`, we only propagate the message on successful verification, + /// else, we propagate **and** import into the beacon chain. + fn handle_gossip( + &mut self, + message_id: MessageId, + peer_id: PeerId, + gossip_message: PubsubMessage, + should_process: bool, + ) { + match gossip_message { + PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => self + .send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation( + message_id, + peer_id, + *aggregate_and_proof, + timestamp_now(), + )), + PubsubMessage::Attestation(subnet_attestation) => { + self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation( + message_id, + peer_id, + subnet_attestation.1, + subnet_attestation.0, + should_process, + timestamp_now(), + )) + } + PubsubMessage::BeaconBlock(block) => { + self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block( + message_id, + peer_id, + self.network_globals.client(&peer_id), + block, + timestamp_now(), + )) + } + PubsubMessage::VoluntaryExit(exit) => { + debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); + self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit( + message_id, peer_id, exit, + )) + } + PubsubMessage::ProposerSlashing(proposer_slashing) => { + debug!( + self.log, + "Received a proposer slashing"; + "peer_id" => %peer_id + ); + self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing( + message_id, + peer_id, + proposer_slashing, + )) + } + PubsubMessage::AttesterSlashing(attester_slashing) => { + debug!( + self.log, + "Received a attester slashing"; + "peer_id" => %peer_id + ); + self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing( + message_id, + peer_id, + attester_slashing, + )) + } + PubsubMessage::SignedContributionAndProof(contribution_and_proof) => { + trace!( + self.log, + "Received sync committee aggregate"; + "peer_id" => %peer_id + ); + self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution( + message_id, + peer_id, + *contribution_and_proof, + timestamp_now(), + )) + } + PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => { + trace!( + self.log, + "Received sync committee signature"; + "peer_id" => %peer_id + ); + self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature( + message_id, + peer_id, + sync_committtee_msg.1, + sync_committtee_msg.0, + timestamp_now(), + )) + } + PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => { + trace!( + self.log, + "Received light client finality update"; + "peer_id" => %peer_id + ); + self.send_beacon_processor_work( + BeaconWorkEvent::gossip_light_client_finality_update( + message_id, + peer_id, + light_client_finality_update, + timestamp_now(), + ), + ) + } + PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => { + trace!( + self.log, + "Received light client optimistic update"; + "peer_id" => %peer_id + ); + self.send_beacon_processor_work( + BeaconWorkEvent::gossip_light_client_optimistic_update( + message_id, + peer_id, + light_client_optimistic_update, + timestamp_now(), + ), + ) + } + PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => self + .send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change( + message_id, + peer_id, + bls_to_execution_change, + )), + } + } + + fn send_status(&mut self, peer_id: PeerId) { + let status_message = status_message(&self.chain); + debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message); + self.network + .send_processor_request(peer_id, Request::Status(status_message)); + } + + fn send_to_sync(&mut self, message: SyncMessage) { + self.sync_send.send(message).unwrap_or_else(|e| { + warn!( + self.log, + "Could not send message to the sync service"; + "error" => %e, + ) + }); + } + + /// An error occurred during an RPC request. The state is maintained by the sync manager, so + /// this function notifies the sync manager of the error. + pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { + // Check if the failed RPC belongs to sync + if let RequestId::Sync(request_id) = request_id { + self.send_to_sync(SyncMessage::RpcError { + peer_id, + request_id, + }); + } + } + + /// Handle a `Status` request. + /// + /// Processes the `Status` from the remote peer and sends back our `Status`. + pub fn on_status_request( + &mut self, + peer_id: PeerId, + request_id: PeerRequestId, + status: StatusMessage, + ) { + debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status); + + // Say status back. + self.network.send_response( + peer_id, + Response::Status(status_message(&self.chain)), + request_id, + ); + + self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) + } + + /// Handle a `BlocksByRange` response from the peer. + /// A `beacon_block` behaves as a stream which is terminated on a `None` response. + pub fn on_blocks_by_range_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + beacon_block: Option>>, + ) { + let request_id = match request_id { + RequestId::Sync(sync_id) => match sync_id { + SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { + unreachable!("Block lookups do not request BBRange requests") + } + id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id, + }, + RequestId::Router => unreachable!("All BBRange requests belong to sync"), + }; + + trace!( + self.log, + "Received BlocksByRange Response"; + "peer" => %peer_id, + ); + + self.send_to_sync(SyncMessage::RpcBlock { + peer_id, + request_id, + beacon_block, + seen_timestamp: timestamp_now(), + }); + } + + /// Handle a `BlocksByRoot` response from the peer. + pub fn on_blocks_by_root_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + beacon_block: Option>>, + ) { + let request_id = match request_id { + RequestId::Sync(sync_id) => match sync_id { + id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, + SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { + unreachable!("Batch syncing do not request BBRoot requests") + } + }, + RequestId::Router => unreachable!("All BBRoot requests belong to sync"), + }; + + trace!( + self.log, + "Received BlocksByRoot Response"; + "peer" => %peer_id, + ); + self.send_to_sync(SyncMessage::RpcBlock { + peer_id, + request_id, + beacon_block, + seen_timestamp: timestamp_now(), + }); + } + + fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent) { + self.beacon_processor_send + .try_send(work) + .unwrap_or_else(|e| { + let work_type = match &e { + mpsc::error::TrySendError::Closed(work) + | mpsc::error::TrySendError::Full(work) => work.work_type(), + }; + error!(&self.log, "Unable to send message to the beacon processor"; + "error" => %e, "type" => work_type) + }) + } +} + +/// Wraps a Network Channel to employ various RPC related network functionality for the +/// processor. +#[derive(Clone)] +pub struct HandlerNetworkContext { + /// The network channel to relay messages to the Network service. + network_send: mpsc::UnboundedSender>, + /// Logger for the `NetworkContext`. + log: slog::Logger, +} + +impl HandlerNetworkContext { + pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { + Self { network_send, log } + } + + /// Sends a message to the network task. + fn inform_network(&mut self, msg: NetworkMessage) { + self.network_send.send(msg).unwrap_or_else( + |e| warn!(self.log, "Could not send message to the network service"; "error" => %e), + ) + } + + /// Sends a request to the network task. + pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { + self.inform_network(NetworkMessage::SendRequest { + peer_id, + request_id: RequestId::Router, + request, + }) + } + + /// Sends a response to the network task. + pub fn send_response(&mut self, peer_id: PeerId, response: Response, id: PeerRequestId) { + self.inform_network(NetworkMessage::SendResponse { + peer_id, + id, + response, + }) + } +} + +fn timestamp_now() -> Duration { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) +} diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs deleted file mode 100644 index 231f30f3e..000000000 --- a/beacon_node/network/src/router/mod.rs +++ /dev/null @@ -1,321 +0,0 @@ -//! This module handles incoming network messages. -//! -//! It routes the messages to appropriate services. -//! It handles requests at the application layer in its associated processor and directs -//! syncing-related responses to the Sync manager. -#![allow(clippy::unit_arg)] - -mod processor; - -use crate::error; -use crate::service::{NetworkMessage, RequestId}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use futures::prelude::*; -use lighthouse_network::{ - MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, -}; -use processor::Processor; -use slog::{debug, o, trace}; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; -use types::EthSpec; - -/// Handles messages received from the network and client and organises syncing. This -/// functionality of this struct is to validate an decode messages from the network before -/// passing them to the internal message processor. The message processor spawns a syncing thread -/// which manages which blocks need to be requested and processed. -pub struct Router { - /// Access to the peer db. - network_globals: Arc>, - /// Processes validated and decoded messages from the network. Has direct access to the - /// sync manager. - processor: Processor, - /// The `Router` logger. - log: slog::Logger, -} - -/// Types of messages the handler can receive. -#[derive(Debug)] -pub enum RouterMessage { - /// We have initiated a connection to a new peer. - PeerDialed(PeerId), - /// Peer has disconnected, - PeerDisconnected(PeerId), - /// An RPC request has been received. - RPCRequestReceived { - peer_id: PeerId, - id: PeerRequestId, - request: Request, - }, - /// An RPC response has been received. - RPCResponseReceived { - peer_id: PeerId, - request_id: RequestId, - response: Response, - }, - /// An RPC request failed - RPCFailed { - peer_id: PeerId, - request_id: RequestId, - }, - /// A gossip message has been received. The fields are: message id, the peer that sent us this - /// message, the message itself and a bool which indicates if the message should be processed - /// by the beacon chain after successful verification. - PubsubMessage(MessageId, PeerId, PubsubMessage, bool), - /// The peer manager has requested we re-status a peer. - StatusPeer(PeerId), -} - -impl Router { - /// Initializes and runs the Router. - pub fn spawn( - beacon_chain: Arc>, - network_globals: Arc>, - network_send: mpsc::UnboundedSender>, - executor: task_executor::TaskExecutor, - log: slog::Logger, - ) -> error::Result>> { - let message_handler_log = log.new(o!("service"=> "router")); - trace!(message_handler_log, "Service starting"); - - let (handler_send, handler_recv) = mpsc::unbounded_channel(); - - // Initialise a message instance, which itself spawns the syncing thread. - let processor = Processor::new( - executor.clone(), - beacon_chain, - network_globals.clone(), - network_send, - &log, - ); - - // generate the Message handler - let mut handler = Router { - network_globals, - processor, - log: message_handler_log, - }; - - // spawn handler task and move the message handler instance into the spawned thread - executor.spawn( - async move { - debug!(log, "Network message router started"); - UnboundedReceiverStream::new(handler_recv) - .for_each(move |msg| future::ready(handler.handle_message(msg))) - .await; - }, - "router", - ); - - Ok(handler_send) - } - - /// Handle all messages incoming from the network service. - fn handle_message(&mut self, message: RouterMessage) { - match message { - // we have initiated a connection to a peer or the peer manager has requested a - // re-status - RouterMessage::PeerDialed(peer_id) | RouterMessage::StatusPeer(peer_id) => { - self.processor.send_status(peer_id); - } - // A peer has disconnected - RouterMessage::PeerDisconnected(peer_id) => { - self.processor.on_disconnect(peer_id); - } - RouterMessage::RPCRequestReceived { - peer_id, - id, - request, - } => { - self.handle_rpc_request(peer_id, id, request); - } - RouterMessage::RPCResponseReceived { - peer_id, - request_id, - response, - } => { - self.handle_rpc_response(peer_id, request_id, response); - } - RouterMessage::RPCFailed { - peer_id, - request_id, - } => { - self.processor.on_rpc_error(peer_id, request_id); - } - RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { - self.handle_gossip(id, peer_id, gossip, should_process); - } - } - } - - /* RPC - Related functionality */ - - /// A new RPC request has been received from the network. - fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) { - if !self.network_globals.peers.read().is_connected(&peer_id) { - debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request); - return; - } - match request { - Request::Status(status_message) => { - self.processor - .on_status_request(peer_id, id, status_message) - } - Request::BlocksByRange(request) => self - .processor - .on_blocks_by_range_request(peer_id, id, request), - Request::BlocksByRoot(request) => self - .processor - .on_blocks_by_root_request(peer_id, id, request), - Request::LightClientBootstrap(request) => self - .processor - .on_lightclient_bootstrap(peer_id, id, request), - } - } - - /// An RPC response has been received from the network. - // we match on id and ignore responses past the timeout. - fn handle_rpc_response( - &mut self, - peer_id: PeerId, - request_id: RequestId, - response: Response, - ) { - // an error could have occurred. - match response { - Response::Status(status_message) => { - self.processor.on_status_response(peer_id, status_message); - } - Response::BlocksByRange(beacon_block) => { - self.processor - .on_blocks_by_range_response(peer_id, request_id, beacon_block); - } - Response::BlocksByRoot(beacon_block) => { - self.processor - .on_blocks_by_root_response(peer_id, request_id, beacon_block); - } - Response::LightClientBootstrap(_) => unreachable!(), - } - } - - /// Handle RPC messages. - /// Note: `should_process` is currently only useful for the `Attestation` variant. - /// if `should_process` is `false`, we only propagate the message on successful verification, - /// else, we propagate **and** import into the beacon chain. - fn handle_gossip( - &mut self, - id: MessageId, - peer_id: PeerId, - gossip_message: PubsubMessage, - should_process: bool, - ) { - match gossip_message { - // Attestations should never reach the router. - PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => { - self.processor - .on_aggregated_attestation_gossip(id, peer_id, *aggregate_and_proof); - } - PubsubMessage::Attestation(subnet_attestation) => { - self.processor.on_unaggregated_attestation_gossip( - id, - peer_id, - subnet_attestation.1.clone(), - subnet_attestation.0, - should_process, - ); - } - PubsubMessage::BeaconBlock(block) => { - self.processor.on_block_gossip( - id, - peer_id, - self.network_globals.client(&peer_id), - block, - ); - } - PubsubMessage::VoluntaryExit(exit) => { - debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); - self.processor.on_voluntary_exit_gossip(id, peer_id, exit); - } - PubsubMessage::ProposerSlashing(proposer_slashing) => { - debug!( - self.log, - "Received a proposer slashing"; - "peer_id" => %peer_id - ); - self.processor - .on_proposer_slashing_gossip(id, peer_id, proposer_slashing); - } - PubsubMessage::AttesterSlashing(attester_slashing) => { - debug!( - self.log, - "Received a attester slashing"; - "peer_id" => %peer_id - ); - self.processor - .on_attester_slashing_gossip(id, peer_id, attester_slashing); - } - PubsubMessage::SignedContributionAndProof(contribution_and_proof) => { - trace!( - self.log, - "Received sync committee aggregate"; - "peer_id" => %peer_id - ); - self.processor.on_sync_committee_contribution_gossip( - id, - peer_id, - *contribution_and_proof, - ); - } - PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => { - trace!( - self.log, - "Received sync committee signature"; - "peer_id" => %peer_id - ); - self.processor.on_sync_committee_signature_gossip( - id, - peer_id, - sync_committtee_msg.1, - sync_committtee_msg.0, - ); - } - PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => { - trace!( - self.log, - "Received BLS to execution change"; - "peer_id" => %peer_id - ); - self.processor.on_bls_to_execution_change_gossip( - id, - peer_id, - bls_to_execution_change, - ); - } - PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => { - trace!( - self.log, - "Received light client finality update"; - "peer_id" => %peer_id - ); - self.processor.on_light_client_finality_update_gossip( - id, - peer_id, - light_client_finality_update, - ); - } - PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => { - trace!( - self.log, - "Received light client optimistic update"; - "peer_id" => %peer_id - ); - self.processor.on_light_client_optimistic_update_gossip( - id, - peer_id, - light_client_optimistic_update, - ); - } - } - } -} diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs deleted file mode 100644 index d3ba024e4..000000000 --- a/beacon_node/network/src/router/processor.rs +++ /dev/null @@ -1,472 +0,0 @@ -use crate::beacon_processor::{ - BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, -}; -use crate::service::{NetworkMessage, RequestId}; -use crate::status::status_message; -use crate::sync::manager::RequestId as SyncId; -use crate::sync::SyncMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use lighthouse_network::rpc::*; -use lighthouse_network::{ - Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, -}; -use slog::{debug, error, o, trace, warn}; -use std::cmp; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use store::SyncCommitteeMessage; -use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, EthSpec, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, -}; - -/// Processes validated messages from the network. It relays necessary data to the syncing thread -/// and processes blocks from the pubsub network. -pub struct Processor { - /// A reference to the underlying beacon chain. - chain: Arc>, - /// A channel to the syncing thread. - sync_send: mpsc::UnboundedSender>, - /// A network context to return and handle RPC requests. - network: HandlerNetworkContext, - /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, - /// The `RPCHandler` logger. - log: slog::Logger, -} - -impl Processor { - /// Instantiate a `Processor` instance - pub fn new( - executor: task_executor::TaskExecutor, - beacon_chain: Arc>, - network_globals: Arc>, - network_send: mpsc::UnboundedSender>, - log: &slog::Logger, - ) -> Self { - let sync_logger = log.new(o!("service"=> "sync")); - let (beacon_processor_send, beacon_processor_receive) = - mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); - - // spawn the sync thread - let sync_send = crate::sync::manager::spawn( - executor.clone(), - beacon_chain.clone(), - network_globals.clone(), - network_send.clone(), - beacon_processor_send.clone(), - sync_logger, - ); - - BeaconProcessor { - beacon_chain: Arc::downgrade(&beacon_chain), - network_tx: network_send.clone(), - sync_tx: sync_send.clone(), - network_globals, - executor, - max_workers: cmp::max(1, num_cpus::get()), - current_workers: 0, - importing_blocks: Default::default(), - log: log.clone(), - } - .spawn_manager(beacon_processor_receive, None); - - Processor { - chain: beacon_chain, - sync_send, - network: HandlerNetworkContext::new(network_send, log.clone()), - beacon_processor_send, - log: log.new(o!("service" => "router")), - } - } - - fn send_to_sync(&mut self, message: SyncMessage) { - self.sync_send.send(message).unwrap_or_else(|e| { - warn!( - self.log, - "Could not send message to the sync service"; - "error" => %e, - ) - }); - } - - /// Handle a peer disconnect. - /// - /// Removes the peer from the manager. - pub fn on_disconnect(&mut self, peer_id: PeerId) { - self.send_to_sync(SyncMessage::Disconnect(peer_id)); - } - - /// An error occurred during an RPC request. The state is maintained by the sync manager, so - /// this function notifies the sync manager of the error. - pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { - // Check if the failed RPC belongs to sync - if let RequestId::Sync(request_id) = request_id { - self.send_to_sync(SyncMessage::RpcError { - peer_id, - request_id, - }); - } - } - - /// Sends a `Status` message to the peer. - /// - /// Called when we first connect to a peer, or when the PeerManager determines we need to - /// re-status. - pub fn send_status(&mut self, peer_id: PeerId) { - let status_message = status_message(&self.chain); - debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message); - self.network - .send_processor_request(peer_id, Request::Status(status_message)); - } - - /// Handle a `Status` request. - /// - /// Processes the `Status` from the remote peer and sends back our `Status`. - pub fn on_status_request( - &mut self, - peer_id: PeerId, - request_id: PeerRequestId, - status: StatusMessage, - ) { - debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status); - - // Say status back. - self.network.send_response( - peer_id, - Response::Status(status_message(&self.chain)), - request_id, - ); - - self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) - } - - /// Process a `Status` response from a peer. - pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { - debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status); - self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) - } - - /// Handle a `BlocksByRoot` request from the peer. - pub fn on_blocks_by_root_request( - &mut self, - peer_id: PeerId, - request_id: PeerRequestId, - request: BlocksByRootRequest, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request( - peer_id, request_id, request, - )) - } - - /// Handle a `LightClientBootstrap` request from the peer. - pub fn on_lightclient_bootstrap( - &mut self, - peer_id: PeerId, - request_id: PeerRequestId, - request: LightClientBootstrapRequest, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::lightclient_bootstrap_request( - peer_id, request_id, request, - )) - } - - /// Handle a `BlocksByRange` request from the peer. - pub fn on_blocks_by_range_request( - &mut self, - peer_id: PeerId, - request_id: PeerRequestId, - req: BlocksByRangeRequest, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_range_request( - peer_id, request_id, req, - )) - } - - /// Handle a `BlocksByRange` response from the peer. - /// A `beacon_block` behaves as a stream which is terminated on a `None` response. - pub fn on_blocks_by_range_response( - &mut self, - peer_id: PeerId, - request_id: RequestId, - beacon_block: Option>>, - ) { - let request_id = match request_id { - RequestId::Sync(sync_id) => match sync_id { - SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { - unreachable!("Block lookups do not request BBRange requests") - } - id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id, - }, - RequestId::Router => unreachable!("All BBRange requests belong to sync"), - }; - - trace!( - self.log, - "Received BlocksByRange Response"; - "peer" => %peer_id, - ); - - self.send_to_sync(SyncMessage::RpcBlock { - peer_id, - request_id, - beacon_block, - seen_timestamp: timestamp_now(), - }); - } - - /// Handle a `BlocksByRoot` response from the peer. - pub fn on_blocks_by_root_response( - &mut self, - peer_id: PeerId, - request_id: RequestId, - beacon_block: Option>>, - ) { - let request_id = match request_id { - RequestId::Sync(sync_id) => match sync_id { - id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { - unreachable!("Batch syncing do not request BBRoot requests") - } - }, - RequestId::Router => unreachable!("All BBRoot requests belong to sync"), - }; - - trace!( - self.log, - "Received BlocksByRoot Response"; - "peer" => %peer_id, - ); - self.send_to_sync(SyncMessage::RpcBlock { - peer_id, - request_id, - beacon_block, - seen_timestamp: timestamp_now(), - }); - } - - /// Process a gossip message declaring a new block. - /// - /// Attempts to apply to block to the beacon chain. May queue the block for later processing. - /// - /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. - pub fn on_block_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - peer_client: Client, - block: Arc>, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block( - message_id, - peer_id, - peer_client, - block, - timestamp_now(), - )) - } - - pub fn on_unaggregated_attestation_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - unaggregated_attestation: Attestation, - subnet_id: SubnetId, - should_process: bool, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation( - message_id, - peer_id, - unaggregated_attestation, - subnet_id, - should_process, - timestamp_now(), - )) - } - - pub fn on_aggregated_attestation_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - aggregate: SignedAggregateAndProof, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation( - message_id, - peer_id, - aggregate, - timestamp_now(), - )) - } - - pub fn on_voluntary_exit_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - voluntary_exit: Box, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit( - message_id, - peer_id, - voluntary_exit, - )) - } - - pub fn on_proposer_slashing_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - proposer_slashing: Box, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing( - message_id, - peer_id, - proposer_slashing, - )) - } - - pub fn on_attester_slashing_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - attester_slashing: Box>, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing( - message_id, - peer_id, - attester_slashing, - )) - } - - pub fn on_sync_committee_signature_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - sync_signature: SyncCommitteeMessage, - subnet_id: SyncSubnetId, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature( - message_id, - peer_id, - sync_signature, - subnet_id, - timestamp_now(), - )) - } - - pub fn on_sync_committee_contribution_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - sync_contribution: SignedContributionAndProof, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution( - message_id, - peer_id, - sync_contribution, - timestamp_now(), - )) - } - - pub fn on_bls_to_execution_change_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - bls_to_execution_change: Box, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change( - message_id, - peer_id, - bls_to_execution_change, - )) - } - - pub fn on_light_client_finality_update_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - light_client_finality_update: Box>, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_finality_update( - message_id, - peer_id, - light_client_finality_update, - timestamp_now(), - )) - } - - pub fn on_light_client_optimistic_update_gossip( - &mut self, - message_id: MessageId, - peer_id: PeerId, - light_client_optimistic_update: Box>, - ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_optimistic_update( - message_id, - peer_id, - light_client_optimistic_update, - timestamp_now(), - )) - } - - fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent) { - self.beacon_processor_send - .try_send(work) - .unwrap_or_else(|e| { - let work_type = match &e { - mpsc::error::TrySendError::Closed(work) - | mpsc::error::TrySendError::Full(work) => work.work_type(), - }; - error!(&self.log, "Unable to send message to the beacon processor"; - "error" => %e, "type" => work_type) - }) - } -} - -/// Wraps a Network Channel to employ various RPC related network functionality for the -/// processor. -#[derive(Clone)] -pub struct HandlerNetworkContext { - /// The network channel to relay messages to the Network service. - network_send: mpsc::UnboundedSender>, - /// Logger for the `NetworkContext`. - log: slog::Logger, -} - -impl HandlerNetworkContext { - pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { - Self { network_send, log } - } - - /// Sends a message to the network task. - fn inform_network(&mut self, msg: NetworkMessage) { - self.network_send.send(msg).unwrap_or_else( - |e| warn!(self.log, "Could not send message to the network service"; "error" => %e), - ) - } - - /// Sends a request to the network task. - pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { - self.inform_network(NetworkMessage::SendRequest { - peer_id, - request_id: RequestId::Router, - request, - }) - } - - /// Sends a response to the network task. - pub fn send_response(&mut self, peer_id: PeerId, response: Response, id: PeerRequestId) { - self.inform_network(NetworkMessage::SendResponse { - peer_id, - id, - response, - }) - } -} - -fn timestamp_now() -> Duration { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) -} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ab3d15aee..3e86d2099 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -472,7 +472,7 @@ impl NetworkService { ) { match ev { NetworkEvent::PeerConnectedOutgoing(peer_id) => { - self.send_to_router(RouterMessage::PeerDialed(peer_id)); + self.send_to_router(RouterMessage::StatusPeer(peer_id)); } NetworkEvent::PeerConnectedIncoming(_) | NetworkEvent::PeerBanned(_) From 3d99ce25f83224da2cea20936fc03ff21635460a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 16 Mar 2023 05:44:54 +0000 Subject: [PATCH 3/7] Correct a race condition when dialing peers (#4056) There is a race condition which occurs when multiple discovery queries return at almost the exact same time and they independently contain a useful peer we would like to connect to. The condition can occur that we can add the same peer to the dial queue, before we get a chance to process the queue. This ends up displaying an error to the user: ``` ERRO Dialing an already dialing peer ``` Although this error is harmless it's not ideal. There are two solutions to resolving this: 1. As we decide to dial the peer, we change the state in the peer-db to dialing (before we add it to the queue) which would prevent other requests from adding to the queue. 2. We prevent duplicates in the dial queue This PR has opted for 2. because 1. will complicate the code in that we are changing states in non-intuitive places. Although this technically adds a very slight performance cost, its probably a cleaner solution as we can keep the state-changing logic in one place. --- Cargo.lock | 17 +++++++++++++--- Dockerfile | 2 +- beacon_node/lighthouse_network/Cargo.toml | 2 +- .../src/peer_manager/mod.rs | 20 +++++++++++++++---- .../src/peer_manager/network_behaviour.rs | 2 +- beacon_node/network/Cargo.toml | 2 +- common/lru_cache/src/time.rs | 6 ++++++ lighthouse/Cargo.toml | 2 +- 8 files changed, 41 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5951b49c7..04c2997c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1614,6 +1614,16 @@ dependencies = [ "tokio-util 0.6.10", ] +[[package]] +name = "delay_map" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4355c25cbf99edcb6b4a0e906f6bdc6956eda149e84455bea49696429b2f8e8" +dependencies = [ + "futures", + "tokio-util 0.7.7", +] + [[package]] name = "deposit_contract" version = "0.2.0" @@ -1812,7 +1822,7 @@ dependencies = [ "aes 0.7.5", "aes-gcm 0.9.4", "arrayvec", - "delay_map", + "delay_map 0.1.2", "enr", "fnv", "futures", @@ -4404,7 +4414,7 @@ dependencies = [ name = "lighthouse_network" version = "0.2.0" dependencies = [ - "delay_map", + "delay_map 0.3.0", "directory", "dirs", "discv5", @@ -5021,7 +5031,7 @@ name = "network" version = "0.2.0" dependencies = [ "beacon_chain", - "delay_map", + "delay_map 0.3.0", "derivative", "environment", "error-chain", @@ -7985,6 +7995,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite 0.2.9", + "slab", "tokio", "tracing", ] diff --git a/Dockerfile b/Dockerfile index 7a0602a22..25ca07538 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.65.0-bullseye AS builder +FROM rust:1.66.0-bullseye AS builder RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev protobuf-compiler COPY . lighthouse ARG FEATURES diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index f956d35d0..2ec8baaf5 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -42,7 +42,7 @@ strum = { version = "0.24.0", features = ["derive"] } superstruct = "0.5.0" prometheus-client = "0.18.0" unused_port = { path = "../../common/unused_port" } -delay_map = "0.1.1" +delay_map = "0.3.0" void = "1" [dependencies.libp2p] diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 03f6a746a..3d5c862e8 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -13,7 +13,7 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; use smallvec::SmallVec; -use std::collections::VecDeque; +use std::collections::BTreeMap; use std::{ sync::Arc, time::{Duration, Instant}, @@ -77,7 +77,7 @@ pub struct PeerManager { /// The target number of peers we would like to connect to. target_peers: usize, /// Peers queued to be dialed. - peers_to_dial: VecDeque<(PeerId, Option)>, + peers_to_dial: BTreeMap>, /// The number of temporarily banned peers. This is used to prevent instantaneous /// reconnection. // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A @@ -308,7 +308,7 @@ impl PeerManager { /// proves resource constraining, we should switch to multiaddr dialling here. #[allow(clippy::mutable_key_type)] pub fn peers_discovered(&mut self, results: HashMap>) -> Vec { - let mut to_dial_peers = Vec::new(); + let mut to_dial_peers = Vec::with_capacity(4); let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); for (peer_id, min_ttl) in results { @@ -398,7 +398,7 @@ impl PeerManager { // A peer is being dialed. pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option) { - self.peers_to_dial.push_back((*peer_id, enr)); + self.peers_to_dial.insert(*peer_id, enr); } /// Reports if a peer is banned or not. @@ -1185,6 +1185,18 @@ impl PeerManager { // Unban any peers that have served their temporary ban timeout self.unban_temporary_banned_peers(); + + // Maintains memory by shrinking mappings + self.shrink_mappings(); + } + + // Reduce memory footprint by routinely shrinking associating mappings. + fn shrink_mappings(&mut self) { + self.inbound_ping_peers.shrink_to(5); + self.outbound_ping_peers.shrink_to(5); + self.status_peers.shrink_to(5); + self.temporary_banned_peers.shrink_to_fit(); + self.sync_committee_subnets.shrink_to_fit(); } // Update metrics related to peer scoring. diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 21288473e..a29f243c9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -89,7 +89,7 @@ impl NetworkBehaviour for PeerManager { self.events.shrink_to_fit(); } - if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_front() { + if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() { self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr); let handler = self.new_handler(); return Poll::Ready(NetworkBehaviourAction::Dial { diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 95d8a294c..d068a2007 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -43,7 +43,7 @@ if-addrs = "0.6.4" strum = "0.24.0" tokio-util = { version = "0.6.3", features = ["time"] } derivative = "2.2.0" -delay_map = "0.1.1" +delay_map = "0.3.0" ethereum-types = { version = "0.14.1", optional = true } operation_pool = { path = "../operation_pool" } execution_layer = { path = "../execution_layer" } diff --git a/common/lru_cache/src/time.rs b/common/lru_cache/src/time.rs index 1253ef1ec..7b8e9ba9a 100644 --- a/common/lru_cache/src/time.rs +++ b/common/lru_cache/src/time.rs @@ -160,6 +160,12 @@ where self.map.contains(key) } + /// Shrink the mappings to fit the current size. + pub fn shrink_to_fit(&mut self) { + self.map.shrink_to_fit(); + self.list.shrink_to_fit(); + } + #[cfg(test)] #[track_caller] fn check_invariant(&self) { diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 9360c9600..331e9fe59 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -4,7 +4,7 @@ version = "3.5.1" authors = ["Sigma Prime "] edition = "2021" autotests = false -rust-version = "1.65" +rust-version = "1.66" [features] default = ["slasher-mdbx"] From 974b7e9f5820273136e6d2c593848096eacd166a Mon Sep 17 00:00:00 2001 From: Ricki Moore Date: Thu, 16 Mar 2023 08:03:43 +0000 Subject: [PATCH 4/7] Siren Ui Lighthouse Version Requirments (#4093) ## Issue Addressed Added note in lighthouse book to instruct users to use a min lighthouse requirement to run Siren Ui. Which issue # does this PR address? ## Proposed Changes Please list or describe the changes introduced by this PR. ## Additional Info Please provide any additional information. For example, future considerations or information useful for reviewers. --- book/src/ui-faqs.md | 11 +++++++---- book/src/ui-installation.md | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/book/src/ui-faqs.md b/book/src/ui-faqs.md index 240195421..51aa9385a 100644 --- a/book/src/ui-faqs.md +++ b/book/src/ui-faqs.md @@ -1,13 +1,16 @@ # Frequently Asked Questions -## 1. Where can I find my API token? +## 1. Are there any requirements to run Siren? +Yes, Siren requires Lighthouse v3.5.1 or higher to function properly. These releases can be found on the [releases](https://github.com/sigp/lighthouse/releases) page of the Lighthouse repository. + +## 2. Where can I find my API token? The required Api token may be found in the default data directory of the validator client. For more information please refer to the lighthouse ui configuration [`api token section`](./ui-configuration.md#api-token). -## 2. How do I fix the Node Network Errors? +## 3. How do I fix the Node Network Errors? If you recieve a red notification with a BEACON or VALIDATOR NODE NETWORK ERROR you can refer to the lighthouse ui configuration and [`connecting to clients section`](./ui-configuration.md#connecting-to-the-clients). -## 3. How do I change my Beacon or Validator address after logging in? +## 4. How do I change my Beacon or Validator address after logging in? Once you have successfully arrived to the main dashboard, use the sidebar to access the settings view. In the top right hand corner there is a `Configurtion` action button that will redirect you back to the configuration screen where you can make appropriate changes. -## 4. Why doesn't my validator balance graph show any data? +## 5. Why doesn't my validator balance graph show any data? If your graph is not showing data, it usually means your validator node is still caching data. The application must wait at least 3 epochs before it can render any graphical visualizations. This could take up to 20min. diff --git a/book/src/ui-installation.md b/book/src/ui-installation.md index 0b96b1923..b8ae788c6 100644 --- a/book/src/ui-installation.md +++ b/book/src/ui-installation.md @@ -2,6 +2,8 @@ Siren runs on Linux, MacOS and Windows. +## Version Requirement +The Siren app requires Lighthouse v3.5.1 or higher to function properly. These versions can be found on the [releases](https://github.com/sigp/lighthouse/releases) page of the Lighthouse repository. ## Pre-Built Electron Packages @@ -16,7 +18,7 @@ Simply download the package specific to your operating system and run it. ### Requirements -Building from source requires `Node v18` and `yarn`. +Building from source requires `Node v18` and `yarn`. ### Building From Source From 4c2d4af6cd24a97f7515a2c85bc177d36fc30c5b Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 17 Mar 2023 00:44:02 +0000 Subject: [PATCH 5/7] Make more noise when the EL is broken (#3986) ## Issue Addressed Closes #3814, replaces #3818. ## Proposed Changes * Add a WARN log for the case where we are attempting to sync chain segments but can't process them because they're building on an invalid parent. The most common case where we see this is when the execution node database is corrupt, causing sync to stall mysteriously (because we're currently logging the failure only at debug level). * Additionally I've bumped up the logging for invalid execution payloads to `WARN`. This may result in some duplicate logs as we log errors from the `beacon_chain` and then again from the beacon processor. Invalid payloads and corrupt DBs _should_ be rare enough that this doesn't produce overwhelming log volume. --- beacon_node/beacon_chain/src/beacon_chain.rs | 14 ++------------ .../beacon_chain/src/block_verification.rs | 8 ++++---- beacon_node/beacon_chain/src/execution_payload.rs | 4 ++-- .../src/beacon_processor/worker/gossip_methods.rs | 1 - .../src/beacon_processor/worker/sync_methods.rs | 15 +++++++++++++++ 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4c7f314d8..97ce142dd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5097,7 +5097,7 @@ impl BeaconChain { latest_valid_hash, ref validation_error, } => { - debug!( + warn!( self.log, "Invalid execution payload"; "validation_error" => ?validation_error, @@ -5106,11 +5106,6 @@ impl BeaconChain { "head_block_root" => ?head_block_root, "method" => "fcU", ); - warn!( - self.log, - "Fork choice update invalidated payload"; - "status" => ?status - ); match latest_valid_hash { // The `latest_valid_hash` is set to `None` when the EE @@ -5156,7 +5151,7 @@ impl BeaconChain { PayloadStatus::InvalidBlockHash { ref validation_error, } => { - debug!( + warn!( self.log, "Invalid execution payload block hash"; "validation_error" => ?validation_error, @@ -5164,11 +5159,6 @@ impl BeaconChain { "head_block_root" => ?head_block_root, "method" => "fcU", ); - warn!( - self.log, - "Fork choice update invalidated payload"; - "status" => ?status - ); // The execution engine has stated that the head block is invalid, however it // hasn't returned a latest valid ancestor. // diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 7d5d35010..8c169cfe5 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -280,10 +280,10 @@ pub enum BlockError { /// /// ## Peer scoring /// - /// TODO(merge): reconsider how we score peers for this. - /// - /// The peer sent us an invalid block, but I'm not really sure how to score this in an - /// "optimistic" sync world. + /// The peer sent us an invalid block, we must penalise harshly. + /// If it's actually our fault (e.g. our execution node database is corrupt) we have bigger + /// problems to worry about than losing peers, and we're doing the network a favour by + /// disconnecting. ParentExecutionPayloadInvalid { parent_root: Hash256 }, } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 5cc8ee2d2..1ac7229cc 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -159,7 +159,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>( latest_valid_hash, ref validation_error, } => { - debug!( + warn!( chain.log, "Invalid execution payload"; "validation_error" => ?validation_error, @@ -206,7 +206,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>( PayloadStatus::InvalidBlockHash { ref validation_error, } => { - debug!( + warn!( chain.log, "Invalid execution payload block hash"; "validation_error" => ?validation_error, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index f2b1b3a26..1ec03ae95 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -834,7 +834,6 @@ impl Worker { | Err(e @ BlockError::WeakSubjectivityConflict) | Err(e @ BlockError::InconsistentFork(_)) | Err(e @ BlockError::ExecutionPayloadError(_)) - // TODO(merge): reconsider peer scoring for this event. | Err(e @ BlockError::ParentExecutionPayloadInvalid { .. }) | Err(e @ BlockError::GenesisBlock) => { warn!(self.log, "Could not verify block for gossip. Rejecting the block"; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 6e6e68155..e8182a1d5 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -513,6 +513,21 @@ impl Worker { }) } } + ref err @ BlockError::ParentExecutionPayloadInvalid { ref parent_root } => { + warn!( + self.log, + "Failed to sync chain built on invalid parent"; + "parent_root" => ?parent_root, + "advice" => "check execution node for corruption then restart it and Lighthouse", + ); + Err(ChainSegmentFailed { + message: format!("Peer sent invalid block. Reason: {err:?}"), + // We need to penalise harshly in case this represents an actual attack. In case + // of a faulty EL it will usually require manual intervention to fix anyway, so + // it's not too bad if we drop most of our peers. + peer_action: Some(PeerAction::LowToleranceError), + }) + } other => { debug!( self.log, "Invalid block received"; From c3e5053612782d77618fd79d3a1d3d4430b7754c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 17 Mar 2023 00:44:03 +0000 Subject: [PATCH 6/7] Reduce false positive logging for late builder blocks (#4073) ## Issue Addressed NA ## Proposed Changes When producing a block from a builder, there are two points where we could consider the block "broadcast": 1. When the blinded block is published to the builder. 2. When the un-blinded block is published to the P2P network (this is always *after* the previous step). Our logging for late block broadcasts was using (2) for builder-blocks, which was creating a lot of false-positive logs. This is because the builder publishes the block on the P2P network themselves before returning it to us and we perform (2). For clarity, the logs were false-positives because we claim that the block was published late by us when it was actually published earlier by the builder. This PR changes our logging behavior so we do our logging at (1) instead. It also updates our metrics for block broadcast to distinguish between local and builder blocks. I believe the metrics change will be natively compatible with existing Grafana dashboards. ## Additional Info One could argue that the builder *should* return the block to us faster, however that's not the case. I think it's more important that we don't desensitize users with false-positives. --- .../beacon_chain/src/validator_monitor.rs | 5 +- beacon_node/execution_layer/src/lib.rs | 2 +- beacon_node/http_api/src/lib.rs | 13 +- beacon_node/http_api/src/metrics.rs | 5 +- beacon_node/http_api/src/publish_blocks.rs | 161 ++++++++++++------ consensus/types/src/payload.rs | 2 +- 6 files changed, 130 insertions(+), 58 deletions(-) diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index de2681012..d79a56df6 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -15,6 +15,7 @@ use std::io; use std::marker::PhantomData; use std::str::Utf8Error; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use store::AbstractExecPayload; use types::{ AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, @@ -1736,9 +1737,9 @@ fn u64_to_i64(n: impl Into) -> i64 { } /// Returns the delay between the start of `block.slot` and `seen_timestamp`. -pub fn get_block_delay_ms( +pub fn get_block_delay_ms>( seen_timestamp: Duration, - block: BeaconBlockRef<'_, T>, + block: BeaconBlockRef<'_, T, P>, slot_clock: &S, ) -> Duration { get_slot_delay_ms::(seen_timestamp, block.slot(), slot_clock) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 60bc6278a..fa661fcf6 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -77,7 +77,7 @@ const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] = const CONFIG_POLL_INTERVAL: Duration = Duration::from_secs(60); /// A payload alongside some information about where it came from. -enum ProvenancedPayload

{ +pub enum ProvenancedPayload

{ /// A good ol' fashioned farm-to-table payload from your local EE. Local(P), /// A payload from a builder (e.g. mev-boost). diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 05fe2fe10..067119d9f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -37,6 +37,7 @@ use lighthouse_version::version_with_platform; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; +use publish_blocks::ProvenancedBlock; use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; @@ -1123,9 +1124,15 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| async move { - publish_blocks::publish_block(None, block, chain, &network_tx, log) - .await - .map(|()| warp::reply().into_response()) + publish_blocks::publish_block( + None, + ProvenancedBlock::Local(block), + chain, + &network_tx, + log, + ) + .await + .map(|()| warp::reply().into_response()) }, ); diff --git a/beacon_node/http_api/src/metrics.rs b/beacon_node/http_api/src/metrics.rs index 1c3ab1f68..26ee183c8 100644 --- a/beacon_node/http_api/src/metrics.rs +++ b/beacon_node/http_api/src/metrics.rs @@ -29,9 +29,10 @@ lazy_static::lazy_static! { "http_api_beacon_proposer_cache_misses_total", "Count of times the proposer cache has been missed", ); - pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result = try_create_histogram( + pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result = try_create_histogram_vec( "http_api_block_broadcast_delay_times", - "Time between start of the slot and when the block was broadcast" + "Time between start of the slot and when the block was broadcast", + &["provenance"] ); pub static ref HTTP_API_BLOCK_PUBLISHED_LATE_TOTAL: Result = try_create_int_counter( "http_api_block_published_late_total", diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 673ead1f2..1a5d5175b 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -3,28 +3,43 @@ use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::{ BeaconChain, BeaconChainTypes, BlockError, CountUnrealized, NotifyExecutionLayer, }; +use execution_layer::ProvenancedPayload; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload, - Hash256, SignedBeaconBlock, + AbstractExecPayload, BeaconBlockRef, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, + FullPayload, Hash256, SignedBeaconBlock, }; use warp::Rejection; +pub enum ProvenancedBlock { + /// The payload was built using a local EE. + Local(Arc>>), + /// The payload was build using a remote builder (e.g., via a mev-boost + /// compatible relay). + Builder(Arc>>), +} + /// Handles a request from the HTTP API for full blocks. pub async fn publish_block( block_root: Option, - block: Arc>, + provenanced_block: ProvenancedBlock, chain: Arc>, network_tx: &UnboundedSender>, log: Logger, ) -> Result<(), Rejection> { let seen_timestamp = timestamp_now(); + let (block, is_locally_built_block) = match provenanced_block { + ProvenancedBlock::Local(block) => (block, true), + ProvenancedBlock::Builder(block) => (block, false), + }; + let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); debug!( log, @@ -38,10 +53,6 @@ pub async fn publish_block( let message = PubsubMessage::BeaconBlock(block.clone()); crate::publish_pubsub_message(network_tx, message)?; - // Determine the delay after the start of the slot, register it with metrics. - let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); - metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); - let block_root = block_root.unwrap_or_else(|| block.canonical_root()); match chain @@ -75,31 +86,11 @@ pub async fn publish_block( // head. chain.recompute_head_at_current_slot().await; - // Perform some logging to inform users if their blocks are being produced - // late. - // - // Check to see the thresholds are non-zero to avoid logging errors with small - // slot times (e.g., during testing) - let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay(); - let delayed_threshold = too_late_threshold / 2; - if delay >= too_late_threshold { - error!( - log, - "Block was broadcast too late"; - "msg" => "system may be overloaded, block likely to be orphaned", - "delay_ms" => delay.as_millis(), - "slot" => block.slot(), - "root" => ?root, - ) - } else if delay >= delayed_threshold { - error!( - log, - "Block broadcast was delayed"; - "msg" => "system may be overloaded, block may be orphaned", - "delay_ms" => delay.as_millis(), - "slot" => block.slot(), - "root" => ?root, - ) + // Only perform late-block logging here if the block is local. For + // blocks built with builders we consider the broadcast time to be + // when the blinded block is published to the builder. + if is_locally_built_block { + late_block_logging(&chain, seen_timestamp, block.message(), root, "local", &log) } Ok(()) @@ -147,14 +138,7 @@ pub async fn publish_blinded_block( ) -> Result<(), Rejection> { let block_root = block.canonical_root(); let full_block = reconstruct_block(chain.clone(), block_root, block, log.clone()).await?; - publish_block::( - Some(block_root), - Arc::new(full_block), - chain, - network_tx, - log, - ) - .await + publish_block::(Some(block_root), full_block, chain, network_tx, log).await } /// Deconstruct the given blinded block, and construct a full block. This attempts to use the @@ -165,15 +149,15 @@ async fn reconstruct_block( block_root: Hash256, block: SignedBeaconBlock>, log: Logger, -) -> Result>, Rejection> { - let full_payload = if let Ok(payload_header) = block.message().body().execution_payload() { +) -> Result, Rejection> { + let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() { let el = chain.execution_layer.as_ref().ok_or_else(|| { warp_utils::reject::custom_server_error("Missing execution layer".to_string()) })?; // If the execution block hash is zero, use an empty payload. let full_payload = if payload_header.block_hash() == ExecutionBlockHash::zero() { - FullPayload::default_at_fork( + let payload = FullPayload::default_at_fork( chain .spec .fork_name_at_epoch(block.slot().epoch(T::EthSpec::slots_per_epoch())), @@ -183,15 +167,30 @@ async fn reconstruct_block( "Default payload construction error: {e:?}" )) })? - .into() - // If we already have an execution payload with this transactions root cached, use it. + .into(); + ProvenancedPayload::Local(payload) + // If we already have an execution payload with this transactions root cached, use it. } else if let Some(cached_payload) = el.get_payload_by_root(&payload_header.tree_hash_root()) { info!(log, "Reconstructing a full block using a local payload"; "block_hash" => ?cached_payload.block_hash()); - cached_payload - // Otherwise, this means we are attempting a blind block proposal. + ProvenancedPayload::Local(cached_payload) + // Otherwise, this means we are attempting a blind block proposal. } else { + // Perform the logging for late blocks when we publish to the + // builder, rather than when we publish to the network. This helps + // prevent false positive logs when the builder publishes to the P2P + // network significantly earlier than when they return the block to + // us. + late_block_logging( + &chain, + timestamp_now(), + block.message(), + block_root, + "builder", + &log, + ); + let full_payload = el .propose_blinded_beacon_block(block_root, &block) .await @@ -202,7 +201,7 @@ async fn reconstruct_block( )) })?; info!(log, "Successfully published a block to the builder network"; "block_hash" => ?full_payload.block_hash()); - full_payload + ProvenancedPayload::Builder(full_payload) }; Some(full_payload) @@ -210,7 +209,71 @@ async fn reconstruct_block( None }; - block.try_into_full_block(full_payload).ok_or_else(|| { + match full_payload_opt { + // A block without a payload is pre-merge and we consider it locally + // built. + None => block + .try_into_full_block(None) + .map(Arc::new) + .map(ProvenancedBlock::Local), + Some(ProvenancedPayload::Local(full_payload)) => block + .try_into_full_block(Some(full_payload)) + .map(Arc::new) + .map(ProvenancedBlock::Local), + Some(ProvenancedPayload::Builder(full_payload)) => block + .try_into_full_block(Some(full_payload)) + .map(Arc::new) + .map(ProvenancedBlock::Builder), + } + .ok_or_else(|| { warp_utils::reject::custom_server_error("Unable to add payload to block".to_string()) }) } + +/// If the `seen_timestamp` is some time after the start of the slot for +/// `block`, create some logs to indicate that the block was published late. +fn late_block_logging>( + chain: &BeaconChain, + seen_timestamp: Duration, + block: BeaconBlockRef, + root: Hash256, + provenance: &str, + log: &Logger, +) { + let delay = get_block_delay_ms(seen_timestamp, block, &chain.slot_clock); + + metrics::observe_timer_vec( + &metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, + &[provenance], + delay, + ); + + // Perform some logging to inform users if their blocks are being produced + // late. + // + // Check to see the thresholds are non-zero to avoid logging errors with small + // slot times (e.g., during testing) + let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay(); + let delayed_threshold = too_late_threshold / 2; + if delay >= too_late_threshold { + error!( + log, + "Block was broadcast too late"; + "msg" => "system may be overloaded, block likely to be orphaned", + "provenance" => provenance, + "delay_ms" => delay.as_millis(), + "slot" => block.slot(), + "root" => ?root, + ) + } else if delay >= delayed_threshold { + error!( + log, + "Block broadcast was delayed"; + "msg" => "system may be overloaded, block may be orphaned", + "provenance" => provenance, + "delay_ms" => delay.as_millis(), + "slot" => block.slot(), + "root" => ?root, + ) + } +} diff --git a/consensus/types/src/payload.rs b/consensus/types/src/payload.rs index 6c739c969..2795c7f10 100644 --- a/consensus/types/src/payload.rs +++ b/consensus/types/src/payload.rs @@ -12,7 +12,7 @@ use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum BlockType { Blinded, Full, From 020fb483feac4fbb05a04845fab330ac861f3855 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 17 Mar 2023 00:44:04 +0000 Subject: [PATCH 7/7] Clarify "Ready for Capella" (#4095) ## Issue Addressed Resolves #4061 ## Proposed Changes Adds a message to tell users to check their EE. ## Additional Info I really struggled to come up with something succinct and complete, so I'm totally open to feedback. --- beacon_node/client/src/notifier.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index fb8a9b634..1105bc41f 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -460,7 +460,11 @@ async fn capella_readiness_logging( match beacon_chain.check_capella_readiness().await { CapellaReadiness::Ready => { - info!(log, "Ready for Capella") + info!( + log, + "Ready for Capella"; + "info" => "ensure the execution endpoint is updated to the latest Capella/Shanghai release" + ) } readiness @ CapellaReadiness::ExchangeCapabilitiesFailed { error: _ } => { error!(