diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 06ce36c6b..c9b4bfa34 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -39,6 +39,7 @@ //! task. use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::parking_lot::Mutex; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock}; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -47,13 +48,13 @@ use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, }; use slog::{crit, debug, error, trace, warn, Logger}; -use std::cmp; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::task::Context; use std::time::{Duration, Instant}; +use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::{mpsc, oneshot}; use types::{ @@ -284,6 +285,55 @@ impl LifoQueue { } } +/// A handle that sends a message on the provided channel to a receiver when it gets dropped. +/// +/// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache` +/// and perform any other necessary cleanup. +pub struct DuplicateCacheHandle { + entry: Hash256, + cache: DuplicateCache, +} + +impl Drop for DuplicateCacheHandle { + fn drop(&mut self) { + self.cache.remove(&self.entry); + } +} + +/// A simple cache for detecting duplicate block roots across multiple threads. +#[derive(Clone, Default)] +pub struct DuplicateCache { + inner: Arc>>, +} + +impl DuplicateCache { + /// Checks if the given block_root exists and inserts it into the cache if + /// it doesn't exist. + /// + /// Returns a `Some(DuplicateCacheHandle)` if the block_root was successfully + /// inserted and `None` if the block root already existed in the cache. + /// + /// The handle removes the entry from the cache when it is dropped. This ensures that any unclean + /// shutdowns in the worker tasks does not leave inconsistent state in the cache. + pub fn check_and_insert(&self, block_root: Hash256) -> Option { + let mut inner = self.inner.lock(); + if inner.insert(block_root) { + Some(DuplicateCacheHandle { + entry: block_root, + cache: self.clone(), + }) + } else { + None + } + } + + /// Remove the given block_root from the cache. + pub fn remove(&self, block_root: &Hash256) { + let mut inner = self.inner.lock(); + inner.remove(block_root); + } +} + /// An event to be processed by the manager task. pub struct WorkEvent { drop_during_sync: bool, @@ -787,6 +837,7 @@ pub struct BeaconProcessor { pub executor: TaskExecutor, pub max_workers: usize, pub current_workers: usize, + pub importing_blocks: DuplicateCache, pub log: Logger, } @@ -1302,6 +1353,8 @@ impl BeaconProcessor { log: self.log.clone(), }; + let duplicate_cache = self.importing_blocks.clone(); + trace!( self.log, "Spawning beacon processor worker"; @@ -1373,7 +1426,8 @@ impl BeaconProcessor { peer_id, peer_client, *block, - work_reprocessing_tx, + work_reprocessing_tx.clone(), + duplicate_cache, seen_timestamp, ), /* @@ -1455,7 +1509,12 @@ impl BeaconProcessor { * Verification for beacon blocks received during syncing via RPC. */ Work::RpcBlock { block, result_tx } => { - worker.process_rpc_block(*block, result_tx, work_reprocessing_tx) + worker.process_rpc_block( + *block, + result_tx, + work_reprocessing_tx.clone(), + duplicate_cache, + ); } /* * Verification for a chain segment (multiple blocks). diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 2dbdb3269..7c060472b 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -201,6 +201,7 @@ impl TestRig { executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, + importing_blocks: Default::default(), log: log.clone(), } .spawn_manager(beacon_processor_rx, Some(work_journal_tx)); diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index d7d9482bd..fe6cb573a 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -25,6 +25,7 @@ use super::{ }, Worker, }; +use crate::beacon_processor::DuplicateCache; /// An attestation that has been validated by the `BeaconChain`. /// @@ -190,7 +191,7 @@ impl Worker { /// Creates a log if there is an internal error. /// Propagates the result of the validation for the given message to the network. If the result /// is valid the message gets forwarded to other peers. - fn propagate_validation_result( + pub(crate) fn propagate_validation_result( &self, message_id: MessageId, propagation_source: PeerId, @@ -618,6 +619,7 @@ impl Worker { /// be downloaded. /// /// Raises a log if there are errors. + #[allow(clippy::too_many_arguments)] pub fn process_gossip_block( self, message_id: MessageId, @@ -625,8 +627,50 @@ impl Worker { peer_client: Client, block: SignedBeaconBlock, reprocess_tx: mpsc::Sender>, + duplicate_cache: DuplicateCache, seen_duration: Duration, ) { + if let Some(gossip_verified_block) = self.process_gossip_unverified_block( + message_id, + peer_id, + peer_client, + block, + reprocess_tx.clone(), + seen_duration, + ) { + let block_root = gossip_verified_block.block_root; + if let Some(handle) = duplicate_cache.check_and_insert(block_root) { + self.process_gossip_verified_block( + peer_id, + gossip_verified_block, + reprocess_tx, + seen_duration, + ); + // Drop the handle to remove the entry from the cache + drop(handle); + } else { + debug!( + self.log, + "RPC block is being imported"; + "block_root" => %block_root, + ); + } + } + } + + /// Process the beacon block received from the gossip network and + /// if it passes gossip propagation criteria, tell the network thread to forward it. + /// + /// Returns the `GossipVerifiedBlock` if verification passes and raises a log if there are errors. + pub fn process_gossip_unverified_block( + &self, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + block: SignedBeaconBlock, + reprocess_tx: mpsc::Sender>, + seen_duration: Duration, + ) -> Option> { let block_delay = get_block_delay_ms(seen_duration, block.message(), &self.chain.slot_clock); // Log metrics to track delay from other nodes on the network. @@ -687,7 +731,7 @@ impl Worker { "root" => ?block.canonical_root() ); self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); - return; + return None; } Err(e @ BlockError::FutureSlot { .. }) | Err(e @ BlockError::WouldRevertFinalizedSlot { .. }) @@ -700,7 +744,7 @@ impl Worker { // Prevent recurring behaviour by penalizing the peer slightly. self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - return; + return None; } Err(e @ BlockError::StateRootMismatch { .. }) | Err(e @ BlockError::IncorrectBlockProposer { .. }) @@ -720,7 +764,7 @@ impl Worker { "error" => %e); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); - return; + return None; } }; @@ -786,13 +830,9 @@ impl Worker { "location" => "block gossip" ) } + None } - Ok(_) => self.process_gossip_verified_block( - peer_id, - verified_block, - reprocess_tx, - seen_duration, - ), + Ok(_) => Some(verified_block), Err(e) => { error!( self.log, @@ -801,7 +841,8 @@ impl Worker { "block_slot" => %block_slot, "block_root" => ?block_root, "location" => "block gossip" - ) + ); + None } } } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 44009e27e..6a75c2990 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -1,6 +1,6 @@ use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker}; use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; -use crate::beacon_processor::BlockResultSender; +use crate::beacon_processor::{BlockResultSender, DuplicateCache}; use crate::metrics; use crate::sync::manager::{SyncMessage, SyncRequestType}; use crate::sync::{BatchProcessResult, ChainId}; @@ -33,35 +33,61 @@ impl Worker { block: SignedBeaconBlock, result_tx: BlockResultSender, reprocess_tx: mpsc::Sender>, + duplicate_cache: DuplicateCache, ) { - let slot = block.slot(); - let block_result = self.chain.process_block(block); + let block_root = block.canonical_root(); + // Checks if the block is already being imported through another source + if let Some(handle) = duplicate_cache.check_and_insert(block_root) { + let slot = block.slot(); + let block_result = self.chain.process_block(block); - metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); - if let Ok(root) = &block_result { - info!( + if let Ok(root) = &block_result { + info!( + self.log, + "New RPC block received"; + "slot" => slot, + "hash" => %root + ); + + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported(*root)) + .is_err() + { + error!( + self.log, + "Failed to inform block import"; + "source" => "rpc", + "block_root" => %root, + ) + }; + } + + if result_tx.send(block_result).is_err() { + crit!(self.log, "Failed return sync block result"); + } + // Drop the handle to remove the entry from the cache + drop(handle); + } else { + debug!( self.log, - "New RPC block received"; - "slot" => slot, - "hash" => %root + "Gossip block is being imported"; + "block_root" => %block_root, ); + // The gossip block that is being imported should eventually + // trigger reprocessing of queued attestations once it is imported. + // If the gossip block fails import, then it will be downscored + // appropriately in `process_gossip_block`. - if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported(*root)) + // Here, we assume that the block will eventually be imported and + // send a `BlockIsAlreadyKnown` message to sync. + if result_tx + .send(Err(BlockError::BlockIsAlreadyKnown)) .is_err() { - error!( - self.log, - "Failed to inform block import"; - "source" => "rpc", - "block_root" => %root, - ) - }; - } - - if result_tx.send(block_result).is_err() { - crit!(self.log, "Failed return sync block result"); + crit!(self.log, "Failed return sync block result"); + } } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 47d82d81e..04589d140 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -65,6 +65,7 @@ impl Processor { executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, + importing_blocks: Default::default(), log: log.clone(), } .spawn_manager(beacon_processor_receive, None);