diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs deleted file mode 100644 index 47e14f5e2..000000000 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ /dev/null @@ -1,218 +0,0 @@ -use crate::metrics; -use crate::router::processor::FUTURE_SLOT_TOLERANCE; -use crate::sync::manager::SyncMessage; -use crate::sync::{BatchProcessResult, ChainId}; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; -use eth2_libp2p::PeerId; -use slog::{debug, error, trace, warn}; -use std::sync::Arc; -use tokio::sync::mpsc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock}; - -/// Id associated to a block processing request, either a batch or a single block. -#[derive(Clone, Debug, PartialEq)] -pub enum ProcessId { - /// Processing Id of a range syncing batch. - RangeBatchId(ChainId, Epoch), - /// Processing Id of the parent lookup of a block. - ParentLookup(PeerId, Hash256), -} - -pub fn handle_chain_segment( - chain: Arc>, - process_id: ProcessId, - downloaded_blocks: Vec>, - sync_send: mpsc::UnboundedSender>, - log: slog::Logger, -) { - match process_id { - // this a request from the range sync - ProcessId::RangeBatchId(chain_id, epoch) => { - let start_slot = downloaded_blocks.first().map(|b| b.message.slot.as_u64()); - let end_slot = downloaded_blocks.last().map(|b| b.message.slot.as_u64()); - let sent_blocks = downloaded_blocks.len(); - - let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { - (_, Ok(_)) => { - debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, - "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); - BatchProcessResult::Success(sent_blocks > 0) - } - (imported_blocks, Err(e)) => { - debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, - "last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync"); - BatchProcessResult::Failed(imported_blocks > 0) - } - }; - - let msg = SyncMessage::BatchProcessed { - chain_id, - epoch, - result, - }; - sync_send.send(msg).unwrap_or_else(|_| { - debug!( - log, - "Block processor could not inform range sync result. Likely shutting down." - ); - }); - } - // this is a parent lookup request from the sync manager - ProcessId::ParentLookup(peer_id, chain_head) => { - debug!( - log, "Processing parent lookup"; - "last_peer_id" => format!("{}", peer_id), - "blocks" => downloaded_blocks.len() - ); - // parent blocks are ordered from highest slot to lowest, so we need to process in - // reverse - match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { - (_, Err(e)) => { - debug!(log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); - sync_send - .send(SyncMessage::ParentLookupFailed{peer_id, chain_head}) - .unwrap_or_else(|_| { - // on failure, inform to downvote the peer - debug!( - log, - "Block processor could not inform parent lookup result. Likely shutting down." - ); - }); - } - (_, Ok(_)) => { - debug!(log, "Parent lookup processed successfully"); - } - } - } - } -} - -/// Helper function to process blocks batches which only consumes the chain and blocks to process. -fn process_blocks< - 'a, - T: BeaconChainTypes, - I: Iterator>, ->( - chain: Arc>, - downloaded_blocks: I, - log: &slog::Logger, -) -> (usize, Result<(), String>) { - let blocks = downloaded_blocks.cloned().collect::>(); - match chain.process_chain_segment(blocks) { - ChainSegmentResult::Successful { imported_blocks } => { - metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks > 0 { - // Batch completed successfully with at least one block, run fork choice. - run_fork_choice(chain, log); - } - - (imported_blocks, Ok(())) - } - ChainSegmentResult::Failed { - imported_blocks, - error, - } => { - metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); - let r = handle_failed_chain_segment(error, log); - if imported_blocks > 0 { - run_fork_choice(chain, log); - } - (imported_blocks, r) - } - } -} - -/// Runs fork-choice on a given chain. This is used during block processing after one successful -/// block import. -fn run_fork_choice(chain: Arc>, log: &slog::Logger) { - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "batch processing" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => ?e, - "location" => "batch import error" - ), - } -} - -/// Helper function to handle a `BlockError` from `process_chain_segment` -fn handle_failed_chain_segment( - error: BlockError, - log: &slog::Logger, -) -> Result<(), String> { - match error { - BlockError::ParentUnknown(block) => { - // blocks should be sequential and all parents should exist - - Err(format!( - "Block has an unknown parent: {}", - block.parent_root() - )) - } - BlockError::BlockIsAlreadyKnown => { - // This can happen for many reasons. Head sync's can download multiples and parent - // lookups can download blocks before range sync - Ok(()) - } - BlockError::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - warn!( - log, "Block is ahead of our slot clock"; - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } else { - // The block is in the future, but not too far. - debug!( - log, "Block is slightly ahead of our slot clock, ignoring."; - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } - - Err(format!( - "Block with slot {} is higher than the current slot {}", - block_slot, present_slot - )) - } - BlockError::WouldRevertFinalizedSlot { .. } => { - debug!( log, "Finalized or earlier block processed";); - - Ok(()) - } - BlockError::GenesisBlock => { - debug!(log, "Genesis block was processed"); - Ok(()) - } - BlockError::BeaconChainError(e) => { - warn!( - log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => ?e, - ); - - Err(format!("Internal error whilst processing block: {:?}", e)) - } - other => { - debug!( - log, "Invalid block received"; - "msg" => "peer sent invalid block", - "outcome" => %other, - ); - - Err(format!("Peer sent invalid block. Reason: {:?}", other)) - } - } -} diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index b23b40e54..d616ac0ad 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1,4 +1,4 @@ -//! Provides the `BeaconProcessor`, a mutli-threaded processor for messages received on the network +//! Provides the `BeaconProcessor`, a multi-threaded processor for messages received on the network //! that need to be processed by the `BeaconChain`. //! //! Uses `tokio` tasks (instead of raw threads) to provide the following tasks: @@ -26,7 +26,7 @@ //! //! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count. //! -//! Whenever the manager receives a new parcel of work, it either: +//! Whenever the manager receives a new parcel of work, it is either: //! //! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers). //! - Added to a queue. @@ -37,7 +37,10 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; -use eth2_libp2p::{MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{ + rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, + MessageId, NetworkGlobals, PeerId, PeerRequestId, +}; use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::sync::{Arc, Weak}; @@ -48,12 +51,12 @@ use types::{ Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; + use worker::Worker; -mod chain_segment; mod worker; -pub use chain_segment::ProcessId; +pub use worker::ProcessId; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -98,10 +101,22 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; +/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be +/// stored before we start dropping them. +const MAX_STATUS_QUEUE_LEN: usize = 1_024; + +/// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; + +/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; + /// The name of the manager tokio task. -const MANAGER_TASK_NAME: &str = "beacon_gossip_processor_manager"; +const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; /// The name of the worker tokio tasks. -const WORKER_TASK_NAME: &str = "beacon_gossip_processor_worker"; +const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); @@ -132,7 +147,7 @@ impl FifoQueue { if self.queue.len() == self.max_length { error!( log, - "Block queue full"; + "Work queue is full"; "msg" => "the system has insufficient resources for load", "queue_len" => self.max_length, "queue" => item_desc, @@ -320,6 +335,51 @@ impl WorkEvent { work: Work::ChainSegment { process_id, blocks }, } } + + /// Create a new work event to process `StatusMessage`s from the RPC network. + pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self { + Self { + drop_during_sync: false, + work: Work::Status { peer_id, message }, + } + } + + /// Create a new work event to process `BlocksByRangeRequest`s from the RPC network. + pub fn blocks_by_range_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRangeRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlocksByRangeRequest { + peer_id, + request_id, + request, + }, + } + } + + /// Create a new work event to process `BlocksByRootRequest`s from the RPC network. + pub fn blocks_by_roots_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlocksByRootsRequest { + peer_id, + request_id, + request, + }, + } + } + + /// Get a `str` representation of the type of work this `WorkEvent` contains. + pub fn work_type(&self) -> &'static str { + self.work.str_id() + } } /// A consensus message (or multiple) from the network that requires processing. @@ -365,6 +425,20 @@ pub enum Work { process_id: ProcessId, blocks: Vec>, }, + Status { + peer_id: PeerId, + message: StatusMessage, + }, + BlocksByRangeRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRangeRequest, + }, + BlocksByRootsRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + }, } impl Work { @@ -379,6 +453,9 @@ impl Work { Work::GossipAttesterSlashing { .. } => "gossip_attester_slashing", Work::RpcBlock { .. } => "rpc_block", Work::ChainSegment { .. } => "chain_segment", + Work::Status { .. } => "status_processing", + Work::BlocksByRangeRequest { .. } => "blocks_by_range_request", + Work::BlocksByRootsRequest { .. } => "blocks_by_roots_request", } } } @@ -453,6 +530,10 @@ impl BeaconProcessor { let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); + let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); + let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); + let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); + let executor = self.executor.clone(); // The manager future will run on the core executor and delegate tasks to worker @@ -534,14 +615,22 @@ impl BeaconProcessor { // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(idle_tx.clone(), item); - // Check the aggregates, *then* the unaggregates - // since we assume that aggregates are more valuable to local validators - // and effectively give us more information with less signature - // verification time. + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively give us + // more information with less signature verification time. } else if let Some(item) = aggregate_queue.pop() { self.spawn_worker(idle_tx.clone(), item); } else if let Some(item) = attestation_queue.pop() { self.spawn_worker(idle_tx.clone(), item); + // Check RPC methods next. Status messages are needed for sync so + // prioritize them over syncing requests from other peers (BlocksByRange + // and BlocksByRoot) + } else if let Some(item) = status_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + } else if let Some(item) = bbrange_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + } else if let Some(item) = bbroots_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); // Check slashings after all other consensus messages so we prioritize // following head. // @@ -606,6 +695,13 @@ impl BeaconProcessor { Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } + Work::Status { .. } => status_queue.push(work, work_id, &self.log), + Work::BlocksByRangeRequest { .. } => { + bbrange_queue.push(work, work_id, &self.log) + } + Work::BlocksByRootsRequest { .. } => { + bbroots_queue.push(work, work_id, &self.log) + } } } } @@ -804,6 +900,26 @@ impl BeaconProcessor { Work::ChainSegment { process_id, blocks } => { worker.process_chain_segment(process_id, blocks) } + /* + * Processing of Status Messages. + */ + Work::Status { peer_id, message } => worker.process_status(peer_id, message), + /* + * Processing of range syncing requests from other peers. + */ + Work::BlocksByRangeRequest { + peer_id, + request_id, + request, + } => worker.handle_blocks_by_range_request(peer_id, request_id, request), + /* + * Processing of blocks by roots requests from other peers. + */ + Work::BlocksByRootsRequest { + peer_id, + request_id, + request, + } => worker.handle_blocks_by_root_request(peer_id, request_id, request), }; trace!( diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs similarity index 85% rename from beacon_node/network/src/beacon_processor/worker.rs rename to beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 9d5497514..d16722782 100644 --- a/beacon_node/network/src/beacon_processor/worker.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,31 +1,47 @@ -use super::{ - chain_segment::{handle_chain_segment, ProcessId}, - BlockResultSender, -}; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{ attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, - BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, + BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, }; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId}; -use slog::{crit, debug, error, info, trace, warn, Logger}; +use slog::{debug, error, info, trace, warn}; use ssz::Encode; -use std::sync::Arc; -use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -/// Contains the context necessary to import blocks, attestations, etc to the beacon chain. -pub struct Worker { - pub chain: Arc>, - pub network_tx: mpsc::UnboundedSender>, - pub sync_tx: mpsc::UnboundedSender>, - pub log: Logger, -} +use super::Worker; impl Worker { + /* Auxiliary functions */ + + /// Penalizes a peer for misbehaviour. + fn penalize_peer(&self, peer_id: PeerId, action: PeerAction) { + self.send_network_message(NetworkMessage::ReportPeer { peer_id, action }) + } + + /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on + /// the gossip network. + /// + /// Creates a log if there is an internal error. + /// Propagates the result of the validation for the given message to the network. If the result + /// is valid the message gets forwarded to other peers. + fn propagate_validation_result( + &self, + message_id: MessageId, + propagation_source: PeerId, + validation_result: MessageAcceptance, + ) { + self.send_network_message(NetworkMessage::ValidationResult { + propagation_source, + message_id, + validation_result, + }) + } + + /* Processing functions */ + /// Process the unaggregated attestation received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -76,17 +92,17 @@ impl Worker { debug!( self.log, "Attestation invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } e => error!( self.log, "Error applying attestation to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ), } } @@ -95,9 +111,9 @@ impl Worker { debug!( self.log, "Attestation invalid for agg pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } @@ -149,17 +165,17 @@ impl Worker { debug!( self.log, "Aggregate invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } e => error!( self.log, "Error applying aggregate to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ), } } @@ -168,9 +184,9 @@ impl Worker { debug!( self.log, "Attestation invalid for op pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } @@ -255,7 +271,7 @@ impl Worker { trace!( self.log, "Gossipsub block processed"; - "peer_id" => peer_id.to_string() + "peer_id" => %peer_id ); // The `MessageHandler` would be the place to put this, however it doesn't seem @@ -270,7 +286,7 @@ impl Worker { Err(e) => error!( self.log, "Fork choice failed"; - "error" => format!("{:?}", e), + "error" => ?e, "location" => "block gossip" ), } @@ -281,7 +297,7 @@ impl Worker { error!( self.log, "Block with unknown parent attempted to be processed"; - "peer_id" => peer_id.to_string() + "peer_id" => %peer_id ); self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); } @@ -323,7 +339,7 @@ impl Worker { self.log, "Dropping exit for already exiting validator"; "validator_index" => validator_index, - "peer" => peer_id.to_string() + "peer" => %peer_id ); return; } @@ -332,8 +348,8 @@ impl Worker { self.log, "Dropping invalid exit"; "validator_index" => validator_index, - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) + "peer" => %peer_id, + "error" => ?e ); // These errors occur due to a fault in the beacon chain. It is not necessarily // the fault on the peer. @@ -377,7 +393,7 @@ impl Worker { "Dropping proposer slashing"; "reason" => "Already seen a proposer slashing for that validator", "validator_index" => validator_index, - "peer" => peer_id.to_string() + "peer" => %peer_id ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -389,8 +405,8 @@ impl Worker { self.log, "Dropping invalid proposer slashing"; "validator_index" => validator_index, - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) + "peer" => %peer_id, + "error" => ?e ); self.propagate_validation_result( message_id, @@ -430,7 +446,7 @@ impl Worker { self.log, "Dropping attester slashing"; "reason" => "Slashings already known for all slashed validators", - "peer" => peer_id.to_string() + "peer" => %peer_id ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -439,8 +455,8 @@ impl Worker { debug!( self.log, "Dropping invalid attester slashing"; - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) + "peer" => %peer_id, + "error" => ?e ); self.propagate_validation_result( message_id, @@ -458,7 +474,7 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); if let Err(e) = self.chain.import_attester_slashing(slashing) { - debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e)); + debug!(self.log, "Error importing attester slashing"; "error" => ?e); metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL); } else { debug!(self.log, "Successfully imported attester slashing"); @@ -466,82 +482,6 @@ impl Worker { } } - /// Attempt to process a block received from a direct RPC request, returning the processing - /// result on the `result_tx` channel. - /// - /// Raises a log if there are errors publishing the result to the channel. - pub fn process_rpc_block( - self, - block: SignedBeaconBlock, - result_tx: BlockResultSender, - ) { - let block_result = self.chain.process_block(block); - - metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); - - if result_tx.send(block_result).is_err() { - crit!(self.log, "Failed return sync block result"); - } - } - - /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync - /// thread if more blocks are needed to process it. - pub fn process_chain_segment( - self, - process_id: ProcessId, - blocks: Vec>, - ) { - handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log) - } - - /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on - /// the gossip network. - /// - /// Creates a log if there is an interal error. - /// Propagates the result of the validation fot the given message to the network. If the result - /// is valid the message gets forwarded to other peers. - fn propagate_validation_result( - &self, - message_id: MessageId, - propagation_source: PeerId, - validation_result: MessageAcceptance, - ) { - self.network_tx - .send(NetworkMessage::ValidationResult { - propagation_source, - message_id, - validation_result, - }) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send propagation request to the network service" - ) - }); - } - - /// Penalizes a peer for misbehaviour. - fn penalize_peer(&self, peer_id: PeerId, action: PeerAction) { - self.network_tx - .send(NetworkMessage::ReportPeer { peer_id, action }) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send peer action to the network service" - ) - }); - } - - /// Send a message to `sync_tx`. - /// - /// Creates a log if there is an interal error. - fn send_sync_message(&self, message: SyncMessage) { - self.sync_tx.send(message).unwrap_or_else(|e| { - error!(self.log, "Could not send message to the sync service"; - "error" => %e) - }); - } - /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the /// network. pub fn handle_attestation_verification_failure( @@ -567,9 +507,9 @@ impl Worker { trace!( self.log, "Attestation is not within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result( message_id, @@ -657,9 +597,9 @@ impl Worker { trace!( self.log, "Attestation already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -674,9 +614,9 @@ impl Worker { trace!( self.log, "Aggregator already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -690,9 +630,9 @@ impl Worker { trace!( self.log, "Prior attestation known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -721,8 +661,8 @@ impl Worker { debug!( self.log, "Attestation for unknown block"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root) + "peer_id" => %peer_id, + "block" => %beacon_block_root ); // we don't know the block, get the sync manager to handle the block lookup self.sync_tx @@ -909,8 +849,8 @@ impl Worker { error!( self.log, "Unable to validate aggregate"; - "peer_id" => peer_id.to_string(), - "error" => format!("{:?}", e), + "peer_id" => %peer_id, + "error" => ?e, ); self.propagate_validation_result( message_id, @@ -925,10 +865,10 @@ impl Worker { debug!( self.log, "Invalid attestation from network"; - "reason" => format!("{:?}", error), - "block" => format!("{}", beacon_block_root), - "peer_id" => peer_id.to_string(), - "type" => format!("{:?}", attestation_type), + "reason" => ?error, + "block" => %beacon_block_root, + "peer_id" => %peer_id, + "type" => ?attestation_type, ); } } diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs new file mode 100644 index 000000000..40a863303 --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -0,0 +1,43 @@ +use crate::{service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use slog::{error, Logger}; +use std::sync::Arc; +use tokio::sync::mpsc; + +mod gossip_methods; +mod rpc_methods; +mod sync_methods; + +pub use sync_methods::ProcessId; + +pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; + +/// Contains the context necessary to import blocks, attestations, etc to the beacon chain. +pub struct Worker { + pub chain: Arc>, + pub network_tx: mpsc::UnboundedSender>, + pub sync_tx: mpsc::UnboundedSender>, + pub log: Logger, +} + +impl Worker { + /// Send a message to `sync_tx`. + /// + /// Creates a log if there is an internal error. + fn send_sync_message(&self, message: SyncMessage) { + self.sync_tx.send(message).unwrap_or_else(|e| { + error!(self.log, "Could not send message to the sync service"; + "error" => %e) + }); + } + + /// Send a message to `network_tx`. + /// + /// Creates a log if there is an internal error. + fn send_network_message(&self, message: NetworkMessage) { + self.network_tx.send(message).unwrap_or_else(|e| { + error!(self.log, "Could not send message to the network service"; + "error" => %e) + }); + } +} diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs new file mode 100644 index 000000000..42796ee2a --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -0,0 +1,251 @@ +use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; +use crate::service::NetworkMessage; +use crate::status::ToStatusMessage; +use crate::sync::SyncMessage; +use beacon_chain::{BeaconChainError, BeaconChainTypes}; +use eth2_libp2p::rpc::StatusMessage; +use eth2_libp2p::rpc::*; +use eth2_libp2p::{PeerId, PeerRequestId, Response, SyncInfo}; +use itertools::process_results; +use slog::{debug, error, warn}; +use slot_clock::SlotClock; +use types::{Epoch, EthSpec, Hash256, Slot}; + +use super::Worker; + +impl Worker { + /* Auxiliary functions */ + + /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. + pub fn goodbye_peer(&self, peer_id: PeerId, reason: GoodbyeReason) { + self.send_network_message(NetworkMessage::GoodbyePeer { peer_id, reason }); + } + + pub fn send_response( + &self, + peer_id: PeerId, + response: Response, + id: PeerRequestId, + ) { + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + id, + response, + }) + } + + /* Processing functions */ + + /// Process a `Status` message to determine if a peer is relevant to us. If the peer is + /// irrelevant the reason is returned. + fn check_peer_relevance( + &self, + remote: &StatusMessage, + ) -> Result, BeaconChainError> { + let local = self.chain.status_message()?; + let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); + + let irrelevant_reason = if local.fork_digest != remote.fork_digest { + // The node is on a different network/fork + Some(format!( + "Incompatible forks Ours:{} Theirs:{}", + hex::encode(local.fork_digest), + hex::encode(remote.fork_digest) + )) + } else if remote.head_slot + > self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()) + + FUTURE_SLOT_TOLERANCE + { + // The remote's head is on a slot that is significantly ahead of what we consider the + // current slot. This could be because they are using a different genesis time, or that + // their or our system's clock is incorrect. + Some("Different system clocks or genesis time".to_string()) + } else if remote.finalized_epoch <= local.finalized_epoch + && remote.finalized_root != Hash256::zero() + && local.finalized_root != Hash256::zero() + && self + .chain + .root_at_slot(start_slot(remote.finalized_epoch)) + .map(|root_opt| root_opt != Some(remote.finalized_root))? + { + // The remote's finalized epoch is less than or equal to ours, but the block root is + // different to the one in our chain. Therefore, the node is on a different chain and we + // should not communicate with them. + Some("Different finalized chain".to_string()) + } else { + None + }; + + Ok(irrelevant_reason) + } + + pub fn process_status(&self, peer_id: PeerId, status: StatusMessage) { + match self.check_peer_relevance(&status) { + Ok(Some(irrelevant_reason)) => { + debug!(self.log, "Handshake Failure"; "peer" => %peer_id, "reason" => irrelevant_reason); + self.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); + } + Ok(None) => { + let info = SyncInfo { + head_slot: status.head_slot, + head_root: status.head_root, + finalized_epoch: status.finalized_epoch, + finalized_root: status.finalized_root, + }; + self.send_sync_message(SyncMessage::AddPeer(peer_id, info)); + } + Err(e) => error!(self.log, "Could not process status message"; "error" => ?e), + } + } + + /// Handle a `BlocksByRoot` request from the peer. + pub fn handle_blocks_by_root_request( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) { + let mut send_block_count = 0; + for root in request.block_roots.iter() { + if let Ok(Some(block)) = self.chain.store.get_block(root) { + self.send_response( + peer_id.clone(), + Response::BlocksByRoot(Some(Box::new(block))), + request_id, + ); + send_block_count += 1; + } else { + debug!(self.log, "Peer requested unknown block"; + "peer" => %peer_id, + "request_root" => ?root); + } + } + debug!(self.log, "Received BlocksByRoot Request"; + "peer" => %peer_id, + "requested" => request.block_roots.len(), + "returned" => send_block_count); + + // send stream termination + self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + } + + /// Handle a `BlocksByRange` request from the peer. + pub fn handle_blocks_by_range_request( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + mut req: BlocksByRangeRequest, + ) { + debug!(self.log, "Received BlocksByRange Request"; + "peer_id" => %peer_id, + "count" => req.count, + "start_slot" => req.start_slot, + "step" => req.step); + + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOCKS { + req.count = MAX_REQUEST_BLOCKS; + } + if req.step == 0 { + self.goodbye_peer(peer_id, GoodbyeReason::Fault); + return warn!(self.log, "Peer sent invalid range request"; "error" => "Step sent was 0"); + } + + let forwards_block_root_iter = match self + .chain + .forwards_iter_block_roots(Slot::from(req.start_slot)) + { + Ok(iter) => iter, + Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + }; + + // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. + // + // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and + // the peer will get less blocks. + // The step parameter is quadratically weighted in the filter, so large values should be + // prevented before reaching this point. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| { + slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) + }) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .step_by(req.step as usize) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + }; + + // remove all skip slots + let block_roots = block_roots + .into_iter() + .filter_map(|root| root) + .collect::>(); + + let mut blocks_sent = 0; + for root in block_roots { + if let Ok(Some(block)) = self.chain.store.get_block(&root) { + // Due to skip slots, blocks could be out of the range, we ensure they are in the + // range before sending + if block.slot() >= req.start_slot + && block.slot() < req.start_slot + req.count * req.step + { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id: peer_id.clone(), + response: Response::BlocksByRange(Some(Box::new(block))), + id: request_id, + }); + } + } else { + error!(self.log, "Block in the chain is not in the store"; + "request_root" => ?root); + } + } + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + if blocks_sent < (req.count as usize) { + debug!(self.log, "BlocksByRange Response sent"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent); + } else { + debug!(self.log, "BlocksByRange Response sent"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent); + } + + // send the stream terminator + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlocksByRange(None), + id: request_id, + }); + } +} diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs new file mode 100644 index 000000000..4eb8be09a --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -0,0 +1,229 @@ +use super::Worker; +use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; +use crate::beacon_processor::BlockResultSender; +use crate::metrics; +use crate::sync::manager::SyncMessage; +use crate::sync::{BatchProcessResult, ChainId}; +use beacon_chain::{BeaconChainTypes, BlockError, ChainSegmentResult}; +use eth2_libp2p::PeerId; +use slog::{crit, debug, error, trace, warn}; +use types::{Epoch, Hash256, SignedBeaconBlock}; + +/// Id associated to a block processing request, either a batch or a single block. +#[derive(Clone, Debug, PartialEq)] +pub enum ProcessId { + /// Processing Id of a range syncing batch. + RangeBatchId(ChainId, Epoch), + /// Processing Id of the parent lookup of a block. + ParentLookup(PeerId, Hash256), +} + +impl Worker { + /// Attempt to process a block received from a direct RPC request, returning the processing + /// result on the `result_tx` channel. + /// + /// Raises a log if there are errors publishing the result to the channel. + pub fn process_rpc_block( + self, + block: SignedBeaconBlock, + result_tx: BlockResultSender, + ) { + let block_result = self.chain.process_block(block); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + + if result_tx.send(block_result).is_err() { + crit!(self.log, "Failed return sync block result"); + } + } + + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync + /// thread if more blocks are needed to process it. + pub fn process_chain_segment( + &self, + process_id: ProcessId, + downloaded_blocks: Vec>, + ) { + match process_id { + // this a request from the range sync + ProcessId::RangeBatchId(chain_id, epoch) => { + let start_slot = downloaded_blocks.first().map(|b| b.message.slot.as_u64()); + let end_slot = downloaded_blocks.last().map(|b| b.message.slot.as_u64()); + let sent_blocks = downloaded_blocks.len(); + + let result = match self.process_blocks(downloaded_blocks.iter()) { + (_, Ok(_)) => { + debug!(self.log, "Batch processed"; + "batch_epoch" => epoch, + "first_block_slot" => start_slot, + "chain" => chain_id, + "last_block_slot" => end_slot, + "processed_blocks" => sent_blocks, + "service"=> "sync"); + BatchProcessResult::Success(sent_blocks > 0) + } + (imported_blocks, Err(e)) => { + debug!(self.log, "Batch processing failed"; + "batch_epoch" => epoch, + "first_block_slot" => start_slot, + "chain" => chain_id, + "last_block_slot" => end_slot, + "imported_blocks" => imported_blocks, + "error" => e, + "service" => "sync"); + BatchProcessResult::Failed(imported_blocks > 0) + } + }; + + self.send_sync_message(SyncMessage::BatchProcessed { + chain_id, + epoch, + result, + }); + } + // this is a parent lookup request from the sync manager + ProcessId::ParentLookup(peer_id, chain_head) => { + debug!( + self.log, "Processing parent lookup"; + "last_peer_id" => %peer_id, + "blocks" => downloaded_blocks.len() + ); + // parent blocks are ordered from highest slot to lowest, so we need to process in + // reverse + match self.process_blocks(downloaded_blocks.iter().rev()) { + (_, Err(e)) => { + debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); + self.send_sync_message(SyncMessage::ParentLookupFailed { + peer_id, + chain_head, + }) + } + (_, Ok(_)) => { + debug!(self.log, "Parent lookup processed successfully"); + } + } + } + } + } + + /// Helper function to process blocks batches which only consumes the chain and blocks to process. + fn process_blocks<'a>( + &self, + downloaded_blocks: impl Iterator>, + ) -> (usize, Result<(), String>) { + let blocks = downloaded_blocks.cloned().collect::>(); + match self.chain.process_chain_segment(blocks) { + ChainSegmentResult::Successful { imported_blocks } => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); + if imported_blocks > 0 { + // Batch completed successfully with at least one block, run fork choice. + self.run_fork_choice(); + } + + (imported_blocks, Ok(())) + } + ChainSegmentResult::Failed { + imported_blocks, + error, + } => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); + let r = self.handle_failed_chain_segment(error); + if imported_blocks > 0 { + self.run_fork_choice(); + } + (imported_blocks, r) + } + } + } + + /// Runs fork-choice on a given chain. This is used during block processing after one successful + /// block import. + fn run_fork_choice(&self) { + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "batch processing" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => ?e, + "location" => "batch import error" + ), + } + } + + /// Helper function to handle a `BlockError` from `process_chain_segment` + fn handle_failed_chain_segment(&self, error: BlockError) -> Result<(), String> { + match error { + BlockError::ParentUnknown(block) => { + // blocks should be sequential and all parents should exist + + Err(format!( + "Block has an unknown parent: {}", + block.parent_root() + )) + } + BlockError::BlockIsAlreadyKnown => { + // This can happen for many reasons. Head sync's can download multiples and parent + // lookups can download blocks before range sync + Ok(()) + } + BlockError::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + warn!( + self.log, "Block is ahead of our slot clock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } else { + // The block is in the future, but not too far. + debug!( + self.log, "Block is slightly ahead of our slot clock, ignoring."; + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } + + Err(format!( + "Block with slot {} is higher than the current slot {}", + block_slot, present_slot + )) + } + BlockError::WouldRevertFinalizedSlot { .. } => { + debug!(self.log, "Finalized or earlier block processed";); + Ok(()) + } + BlockError::GenesisBlock => { + debug!(self.log, "Genesis block was processed"); + Ok(()) + } + BlockError::BeaconChainError(e) => { + warn!( + self.log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => ?e, + ); + + Err(format!("Internal error whilst processing block: {:?}", e)) + } + other => { + debug!( + self.log, "Invalid block received"; + "msg" => "peer sent invalid block", + "outcome" => %other, + ); + + Err(format!("Peer sent invalid block. Reason: {:?}", other)) + } + } + } +} diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index 5ac74e2ed..f6aa777ab 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -11,6 +11,7 @@ mod metrics; mod nat; mod persisted_dht; mod router; +mod status; mod sync; pub use eth2_libp2p::NetworkConfig; diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index d26d77997..68d530319 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -5,7 +5,7 @@ //! syncing-related responses to the Sync manager. #![allow(clippy::unit_arg)] -pub mod processor; +mod processor; use crate::error; use crate::service::NetworkMessage; diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 4bc97d041..54d66c342 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -6,23 +6,17 @@ use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::*; use eth2_libp2p::{ - MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, SyncInfo, + MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, }; -use itertools::process_results; use slog::{debug, error, o, trace, warn}; -use slot_clock::SlotClock; use std::cmp; use std::sync::Arc; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, ChainSpec, Epoch, EthSpec, Hash256, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, + Attestation, AttesterSlashing, ChainSpec, EthSpec, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. -/// Otherwise we queue it. -pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; - /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. pub struct Processor { @@ -34,8 +28,6 @@ pub struct Processor { network: HandlerNetworkContext, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, - /// The current task executor. - executor: task_executor::TaskExecutor, /// The `RPCHandler` logger. log: slog::Logger, } @@ -68,7 +60,7 @@ impl Processor { network_tx: network_send.clone(), sync_tx: sync_send.clone(), network_globals, - executor: executor.clone(), + executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, log: log.clone(), @@ -80,7 +72,6 @@ impl Processor { sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), beacon_processor_send, - executor, log: log.new(o!("service" => "router")), } } @@ -144,127 +135,25 @@ impl Processor { ); } - if let Err(e) = self.process_status(peer_id, status) { - error!(self.log, "Could not process status message"; "error" => format!("{:?}", e)); - } + self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) } /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status); - - // Process the status message, without sending back another status. - if let Err(e) = self.process_status(peer_id, status) { - error!(self.log, "Could not process status message"; "error" => format!("{:?}", e)); - } - } - - /// Process a `Status` message to determine if a peer is relevant to us. Irrelevant peers are - /// disconnected; relevant peers are sent to the SyncManager - fn process_status( - &mut self, - peer_id: PeerId, - remote: StatusMessage, - ) -> Result<(), BeaconChainError> { - let local = status_message(&self.chain)?; - let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - - let irrelevant_reason = if local.fork_digest != remote.fork_digest { - // The node is on a different network/fork - Some(format!( - "Incompatible forks Ours:{} Theirs:{}", - hex::encode(local.fork_digest), - hex::encode(remote.fork_digest) - )) - } else if remote.head_slot - > self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()) - + FUTURE_SLOT_TOLERANCE - { - // The remote's head is on a slot that is significantly ahead of what we consider the - // current slot. This could be because they are using a different genesis time, or that - // their or our system's clock is incorrect. - Some("Different system clocks or genesis time".to_string()) - } else if remote.finalized_epoch <= local.finalized_epoch - && remote.finalized_root != Hash256::zero() - && local.finalized_root != Hash256::zero() - && self - .chain - .root_at_slot(start_slot(remote.finalized_epoch)) - .map(|root_opt| root_opt != Some(remote.finalized_root))? - { - // The remote's finalized epoch is less than or equal to ours, but the block root is - // different to the one in our chain. Therefore, the node is on a different chain and we - // should not communicate with them. - Some("Different finalized chain".to_string()) - } else { - None - }; - - if let Some(irrelevant_reason) = irrelevant_reason { - debug!(self.log, "Handshake Failure"; "peer" => %peer_id, "reason" => irrelevant_reason); - self.network - .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); - } else { - let info = SyncInfo { - head_slot: remote.head_slot, - head_root: remote.head_root, - finalized_epoch: remote.finalized_epoch, - finalized_root: remote.finalized_root, - }; - self.send_to_sync(SyncMessage::AddPeer(peer_id, info)); - } - - Ok(()) + self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) } /// Handle a `BlocksByRoot` request from the peer. pub fn on_blocks_by_root_request( - &self, + &mut self, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) { - let chain = self.chain.clone(); - let mut network = self.network.clone(); - let log = self.log.clone(); - - // Shift the db reads to a blocking thread. - self.executor.spawn_blocking( - move || { - let mut send_block_count = 0; - for root in request.block_roots.iter() { - if let Ok(Some(block)) = chain.store.get_block(root) { - network.send_response( - peer_id.clone(), - Response::BlocksByRoot(Some(Box::new(block))), - request_id, - ); - send_block_count += 1; - } else { - debug!( - log, - "Peer requested unknown block"; - "peer" => peer_id.to_string(), - "request_root" => format!("{:}", root), - ); - } - } - debug!( - log, - "Received BlocksByRoot Request"; - "peer" => peer_id.to_string(), - "requested" => request.block_roots.len(), - "returned" => send_block_count, - ); - - // send stream termination - network.send_response(peer_id, Response::BlocksByRoot(None), request_id); - }, - "blocks_by_root_request", - ); + self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request( + peer_id, request_id, request, + )) } /// Handle a `BlocksByRange` request from the peer. @@ -272,144 +161,11 @@ impl Processor { &mut self, peer_id: PeerId, request_id: PeerRequestId, - mut req: BlocksByRangeRequest, + req: BlocksByRangeRequest, ) { - let chain = self.chain.clone(); - let mut network = self.network.clone(); - let log = self.log.clone(); - - // Shift the db reads to a blocking thread. - self.executor.spawn_blocking(move || { - - debug!( - log, - "Received BlocksByRange Request"; - "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, - "step" => req.step, - ); - - // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOCKS { - req.count = MAX_REQUEST_BLOCKS; - } - if req.step == 0 { - warn!(log, - "Peer sent invalid range request"; - "error" => "Step sent was 0"); - network.goodbye_peer(peer_id, GoodbyeReason::Fault); - return; - } - - let forwards_block_root_iter = match - chain - .forwards_iter_block_roots(Slot::from(req.start_slot)) - { - Ok(iter) => iter, - Err(e) => { - return error!( - log, - "Unable to obtain root iter"; - "error" => format!("{:?}", e) - ) - } - }; - - // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. - // - // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and - // the peer will get less blocks. - // The step parameter is quadratically weighted in the filter, so large values should be - // prevented before reaching this point. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .step_by(req.step as usize) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); - return; - } - }; - - // remove all skip slots - let block_roots = block_roots - .into_iter() - .filter_map(|root| root) - .collect::>(); - - let mut blocks_sent = 0; - for root in block_roots { - if let Ok(Some(block)) = chain.store.get_block(&root) { - // Due to skip slots, blocks could be out of the range, we ensure they are in the - // range before sending - if block.slot() >= req.start_slot - && block.slot() < req.start_slot + req.count * req.step - { - blocks_sent += 1; - network.send_response( - peer_id.clone(), - Response::BlocksByRange(Some(Box::new(block))), - request_id, - ); - } - } else { - error!( - log, - "Block in the chain is not in the store"; - "request_root" => format!("{:}", root), - ); - } - } - - let current_slot = - chain - .slot() - .unwrap_or_else(|_| chain.slot_clock.genesis_slot()); - - if blocks_sent < (req.count as usize) { - debug!( - log, - "BlocksByRange Response Sent"; - "peer" => peer_id.to_string(), - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } else { - debug!( - log, - "Sending BlocksByRange Response"; - "peer" => peer_id.to_string(), - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } - - // send the stream terminator - network - .send_response(peer_id, Response::BlocksByRange(None), request_id); - - }, "blocks_by_range_request"); + self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_range_request( + peer_id, request_id, req, + )) } /// Handle a `BlocksByRange` response from the peer. @@ -478,18 +234,9 @@ impl Processor { peer_id: PeerId, block: Box>, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_beacon_block( - message_id, peer_id, block, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "block gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block( + message_id, peer_id, block, + )) } pub fn on_unaggregated_attestation_gossip( @@ -500,22 +247,13 @@ impl Processor { subnet_id: SubnetId, should_process: bool, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::unaggregated_attestation( - message_id, - peer_id, - unaggregated_attestation, - subnet_id, - should_process, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "unaggregated attestation gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation( + message_id, + peer_id, + unaggregated_attestation, + subnet_id, + should_process, + )) } pub fn on_aggregated_attestation_gossip( @@ -524,18 +262,9 @@ impl Processor { peer_id: PeerId, aggregate: SignedAggregateAndProof, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::aggregated_attestation( - message_id, peer_id, aggregate, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "aggregated attestation gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation( + message_id, peer_id, aggregate, + )) } pub fn on_voluntary_exit_gossip( @@ -544,20 +273,11 @@ impl Processor { peer_id: PeerId, voluntary_exit: Box, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_voluntary_exit( - message_id, - peer_id, - voluntary_exit, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "voluntary exit gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit( + message_id, + peer_id, + voluntary_exit, + )) } pub fn on_proposer_slashing_gossip( @@ -566,20 +286,11 @@ impl Processor { peer_id: PeerId, proposer_slashing: Box, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_proposer_slashing( - message_id, - peer_id, - proposer_slashing, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "proposer slashing gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing( + message_id, + peer_id, + proposer_slashing, + )) } pub fn on_attester_slashing_gossip( @@ -588,19 +299,23 @@ impl Processor { peer_id: PeerId, attester_slashing: Box>, ) { + self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing( + message_id, + peer_id, + attester_slashing, + )) + } + + fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent) { self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_attester_slashing( - message_id, - peer_id, - attester_slashing, - )) + .try_send(work) .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "attester slashing gossip", - "error" => e.to_string(), - ) + let work_type = match &e { + mpsc::error::TrySendError::Closed(work) + | mpsc::error::TrySendError::Full(work) => work.work_type(), + }; + error!(&self.log, "Unable to send message to the beacon processor"; + "error" => %e, "type" => work_type) }) } } @@ -648,7 +363,7 @@ impl HandlerNetworkContext { } /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. - pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn _goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.inform_network(NetworkMessage::GoodbyePeer { peer_id, reason }); } diff --git a/beacon_node/network/src/status.rs b/beacon_node/network/src/status.rs new file mode 100644 index 000000000..41cc990ed --- /dev/null +++ b/beacon_node/network/src/status.rs @@ -0,0 +1,29 @@ +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use types::ChainSpec; + +use eth2_libp2p::rpc::StatusMessage; +/// Trait to produce a `StatusMessage` representing the state of the given `beacon_chain`. +/// +/// NOTE: The purpose of this is simply to obtain a `StatusMessage` from the `BeaconChain` without +/// polluting/coupling the type with RPC concepts. +pub trait ToStatusMessage { + fn status_message(&self) -> Result; +} + +impl ToStatusMessage for BeaconChain { + fn status_message(&self) -> Result { + let head_info = self.head_info()?; + let genesis_validators_root = self.genesis_validators_root; + + let fork_digest = + ChainSpec::compute_fork_digest(head_info.fork.current_version, genesis_validators_root); + + Ok(StatusMessage { + fork_digest, + finalized_root: head_info.finalized_checkpoint.root, + finalized_epoch: head_info.finalized_checkpoint.epoch, + head_root: head_info.block_root, + head_slot: head_info.slot, + }) + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5c2a01e2b..8e558f814 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -38,8 +38,8 @@ use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; -use crate::router::processor::status_message; use crate::service::NetworkMessage; +use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; use eth2_libp2p::types::{NetworkGlobals, SyncState}; @@ -258,7 +258,7 @@ impl SyncManager { /// ours that we consider it fully sync'd with respect to our current chain. fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) { // ensure the beacon chain still exists - let local = match status_message(&self.chain) { + let local = match self.chain.status_message() { Ok(status) => SyncInfo { head_slot: status.head_slot, head_root: status.head_root, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index aa000939d..2529bde65 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -3,8 +3,8 @@ use super::range_sync::{BatchId, ChainId}; use super::RequestId as SyncRequestId; -use crate::router::processor::status_message; use crate::service::NetworkMessage; +use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request}; @@ -63,7 +63,7 @@ impl SyncNetworkContext { chain: Arc>, peers: impl Iterator, ) { - if let Ok(status_message) = status_message(&chain) { + if let Ok(status_message) = &chain.status_message() { for peer_id in peers { debug!( self.log, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 9075804f8..a89911d99 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,10 +43,9 @@ use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::router::processor::status_message; +use crate::status::ToStatusMessage; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::BatchProcessResult; -use crate::sync::RequestId; +use crate::sync::{BatchProcessResult, RequestId}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; use eth2_libp2p::SyncInfo; @@ -341,7 +340,7 @@ impl RangeSync { network.status_peers(self.beacon_chain.clone(), chain.peers()); - let local = match status_message(&self.beacon_chain) { + let local = match self.beacon_chain.status_message() { Ok(status) => SyncInfo { head_slot: status.head_slot, head_root: status.head_root,