diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 2389afdb4..eb40be960 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -38,9 +38,10 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. +use crate::sync::manager::BlockProcessType; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::parking_lot::Mutex; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock}; +use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock}; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; use lighthouse_network::{ @@ -57,7 +58,7 @@ use std::task::Context; use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, @@ -74,7 +75,7 @@ mod work_reprocessing_queue; mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedBlock; -pub use worker::{GossipAggregatePackage, GossipAttestationPackage, ProcessId}; +pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -196,10 +197,6 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; -/// Used to send/receive results from a rpc block import in a blocking task. -pub type BlockResultSender = oneshot::Sender>>; -pub type BlockResultReceiver = oneshot::Receiver>>; - /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { queue: VecDeque, @@ -496,18 +493,22 @@ impl WorkEvent { /// sent to the other side of `result_tx`. pub fn rpc_beacon_block( block: Box>, - ) -> (Self, BlockResultReceiver) { - let (result_tx, result_rx) = oneshot::channel(); - let event = Self { + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Self { + Self { drop_during_sync: false, - work: Work::RpcBlock { block, result_tx }, - }; - (event, result_rx) + work: Work::RpcBlock { + block, + seen_timestamp, + process_type, + }, + } } /// Create a new work event to import `blocks` as a beacon chain segment. pub fn chain_segment( - process_id: ProcessId, + process_id: ChainSegmentProcessId, blocks: Vec>, ) -> Self { Self { @@ -692,10 +693,11 @@ pub enum Work { }, RpcBlock { block: Box>, - result_tx: BlockResultSender, + seen_timestamp: Duration, + process_type: BlockProcessType, }, ChainSegment { - process_id: ProcessId, + process_id: ChainSegmentProcessId, blocks: Vec>, }, Status { @@ -1488,10 +1490,15 @@ impl BeaconProcessor { /* * Verification for beacon blocks received during syncing via RPC. */ - Work::RpcBlock { block, result_tx } => { + Work::RpcBlock { + block, + seen_timestamp, + process_type, + } => { worker.process_rpc_block( *block, - result_tx, + seen_timestamp, + process_type, work_reprocessing_tx.clone(), duplicate_cache, ); diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 7c060472b..0f97bc794 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -240,7 +240,13 @@ impl TestRig { } pub fn enqueue_rpc_block(&self) { - let (event, _rx) = WorkEvent::rpc_beacon_block(Box::new(self.next_block.clone())); + let event = WorkEvent::rpc_beacon_block( + Box::new(self.next_block.clone()), + std::time::Duration::default(), + BlockProcessType::ParentLookup { + chain_hash: Hash256::random(), + }, + ); self.beacon_processor_tx.try_send(event).unwrap(); } diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index b9d78900b..f907c49b7 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -10,7 +10,7 @@ mod rpc_methods; mod sync_methods; pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage}; -pub use sync_methods::ProcessId; +pub use sync_methods::ChainSegmentProcessId; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 27e0a6711..082808f88 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -1,26 +1,28 @@ +use std::time::Duration; + use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker}; use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; -use crate::beacon_processor::{BlockResultSender, DuplicateCache}; +use crate::beacon_processor::DuplicateCache; use crate::metrics; -use crate::sync::manager::{SyncMessage, SyncRequestType}; +use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, }; -use lighthouse_network::{PeerAction, PeerId}; -use slog::{crit, debug, error, info, trace, warn}; +use lighthouse_network::PeerAction; +use slog::{debug, error, info, trace, warn}; use tokio::sync::mpsc; use types::{Epoch, Hash256, SignedBeaconBlock}; -/// Id associated to a block processing request, either a batch or a single block. +/// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] -pub enum ProcessId { +pub enum ChainSegmentProcessId { /// Processing Id of a range syncing batch. RangeBatchId(ChainId, Epoch), /// Processing ID for a backfill syncing batch. BackSyncBatchId(Epoch), /// Processing Id of the parent lookup of a block. - ParentLookup(PeerId, Hash256), + ParentLookup(Hash256), } /// Returned when a chain segment import fails. @@ -32,88 +34,77 @@ struct ChainSegmentFailed { } impl Worker { - /// Attempt to process a block received from a direct RPC request, returning the processing - /// result on the `result_tx` channel. - /// - /// Raises a log if there are errors publishing the result to the channel. + /// Attempt to process a block received from a direct RPC request. pub fn process_rpc_block( self, block: SignedBeaconBlock, - result_tx: BlockResultSender, + seen_timestamp: Duration, + process_type: BlockProcessType, reprocess_tx: mpsc::Sender>, duplicate_cache: DuplicateCache, ) { - let block_root = block.canonical_root(); - // Checks if the block is already being imported through another source - if let Some(handle) = duplicate_cache.check_and_insert(block_root) { - let slot = block.slot(); - let block_result = self.chain.process_block(block); + // Check if the block is already being imported through another source + let handle = match duplicate_cache.check_and_insert(block.canonical_root()) { + Some(handle) => handle, + None => { + // Sync handles these results + self.send_sync_message(SyncMessage::BlockProcessed { + process_type, + result: Err(BlockError::BlockIsAlreadyKnown), + }); + return; + } + }; + let slot = block.slot(); + let result = self.chain.process_block(block); - metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); - if let Ok(root) = &block_result { - info!( - self.log, - "New RPC block received"; - "slot" => slot, - "hash" => %root + // RPC block imported, regardless of process type + if let &Ok(hash) = &result { + info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); + + // Trigger processing for work referencing this block. + let reprocess_msg = ReprocessQueueMessage::BlockImported(hash); + if reprocess_tx.try_send(reprocess_msg).is_err() { + error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) + }; + if matches!(process_type, BlockProcessType::SingleBlock { .. }) { + self.chain.block_times_cache.write().set_time_observed( + hash, + slot, + seen_timestamp, + None, + None, ); - - if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported(*root)) - .is_err() - { - error!( - self.log, - "Failed to inform block import"; - "source" => "rpc", - "block_root" => %root, - ) - }; - } - - if result_tx.send(block_result).is_err() { - crit!(self.log, "Failed return sync block result"); - } - // Drop the handle to remove the entry from the cache - drop(handle); - } else { - debug!( - self.log, - "Gossip block is being imported"; - "block_root" => %block_root, - ); - // The gossip block that is being imported should eventually - // trigger reprocessing of queued attestations once it is imported. - // If the gossip block fails import, then it will be downscored - // appropriately in `process_gossip_block`. - - // Here, we assume that the block will eventually be imported and - // send a `BlockIsAlreadyKnown` message to sync. - if result_tx - .send(Err(BlockError::BlockIsAlreadyKnown)) - .is_err() - { - crit!(self.log, "Failed return sync block result"); + self.run_fork_choice() } } + // Sync handles these results + self.send_sync_message(SyncMessage::BlockProcessed { + process_type, + result: result.map(|_| ()), + }); + + // Drop the handle to remove the entry from the cache + drop(handle); } /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub fn process_chain_segment( &self, - process_id: ProcessId, + sync_type: ChainSegmentProcessId, downloaded_blocks: Vec>, ) { - match process_id { + let result = match sync_type { // this a request from the range sync - ProcessId::RangeBatchId(chain_id, epoch) => { + ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - let result = match self.process_blocks(downloaded_blocks.iter()) { + match self.process_blocks(downloaded_blocks.iter()) { (_, Ok(_)) => { debug!(self.log, "Batch processed"; "batch_epoch" => epoch, @@ -139,19 +130,15 @@ impl Worker { peer_action: e.peer_action, } } - }; - - let sync_type = SyncRequestType::RangeSync(epoch, chain_id); - - self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + } } // this a request from the Backfill sync - ProcessId::BackSyncBatchId(epoch) => { + ChainSegmentProcessId::BackSyncBatchId(epoch) => { let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - let result = match self.process_backfill_blocks(&downloaded_blocks) { + match self.process_backfill_blocks(&downloaded_blocks) { (_, Ok(_)) => { debug!(self.log, "Backfill batch processed"; "batch_epoch" => epoch, @@ -173,35 +160,34 @@ impl Worker { peer_action: e.peer_action, } } - }; - - let sync_type = SyncRequestType::BackFillSync(epoch); - - self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + } } // this is a parent lookup request from the sync manager - ProcessId::ParentLookup(peer_id, chain_head) => { + ChainSegmentProcessId::ParentLookup(chain_head) => { debug!( self.log, "Processing parent lookup"; - "last_peer_id" => %peer_id, + "chain_hash" => %chain_head, "blocks" => downloaded_blocks.len() ); // parent blocks are ordered from highest slot to lowest, so we need to process in // reverse match self.process_blocks(downloaded_blocks.iter().rev()) { - (_, Err(e)) => { - debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => %e.message); - self.send_sync_message(SyncMessage::ParentLookupFailed { - peer_id, - chain_head, - }) + (imported_blocks, Err(e)) => { + debug!(self.log, "Parent lookup failed"; "error" => %e.message); + BatchProcessResult::Failed { + imported_blocks: imported_blocks > 0, + peer_action: e.peer_action, + } } - (_, Ok(_)) => { + (imported_blocks, Ok(_)) => { debug!(self.log, "Parent lookup processed successfully"); + BatchProcessResult::Success(imported_blocks > 0) } } } - } + }; + + self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); } /// Helper function to process blocks batches which only consumes the chain and blocks to process. diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 446aa0a03..04aa51472 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -286,6 +286,14 @@ lazy_static! { "Number of Syncing chains in range, per range type", &["range_type"] ); + pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result = try_create_int_gauge( + "sync_single_block_lookups", + "Number of single block lookups underway" + ); + pub static ref SYNC_PARENT_BLOCK_LOOKUPS: Result = try_create_int_gauge( + "sync_parent_block_lookups", + "Number of parent block lookups underway" + ); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 2c700e9fa..e76c037da 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,7 +8,7 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. -use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::sync::manager::{BatchProcessResult, Id}; use crate::sync::network_context::SyncNetworkContext; use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchState}; @@ -534,7 +534,7 @@ impl BackFillSync { Ok(v) => v, }; - let process_id = ProcessId::BackSyncBatchId(batch_id); + let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); if let Err(e) = self diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs new file mode 100644 index 000000000..b11dc1c7a --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -0,0 +1,637 @@ +use std::collections::hash_map::Entry; +use std::time::Duration; + +use beacon_chain::{BeaconChainTypes, BlockError}; +use fnv::FnvHashMap; +use lighthouse_network::{PeerAction, PeerId}; +use lru_cache::LRUCache; +use slog::{crit, debug, error, trace, warn, Logger}; +use smallvec::SmallVec; +use store::{Hash256, SignedBeaconBlock}; +use strum::AsStaticRef; +use tokio::sync::mpsc; + +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; +use crate::metrics; + +use self::{ + parent_lookup::{ParentLookup, VerifyError}, + single_block_lookup::SingleBlockRequest, +}; + +use super::BatchProcessResult; +use super::{ + manager::{BlockProcessType, Id}, + network_context::SyncNetworkContext, +}; + +mod parent_lookup; +mod single_block_lookup; +#[cfg(test)] +mod tests; + +const FAILED_CHAINS_CACHE_SIZE: usize = 500; +const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; + +pub(crate) struct BlockLookups { + /// A collection of parent block lookups. + parent_queue: SmallVec<[ParentLookup; 3]>, + + /// A cache of failed chain lookups to prevent duplicate searches. + failed_chains: LRUCache, + + /// A collection of block hashes being searched for and a flag indicating if a result has been + /// received or not. + /// + /// The flag allows us to determine if the peer returned data or sent us nothing. + single_block_lookups: FnvHashMap>, + + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. + beacon_processor_send: mpsc::Sender>, + + /// The logger for the import manager. + log: Logger, +} + +impl BlockLookups { + pub fn new(beacon_processor_send: mpsc::Sender>, log: Logger) -> Self { + Self { + parent_queue: Default::default(), + failed_chains: LRUCache::new(FAILED_CHAINS_CACHE_SIZE), + single_block_lookups: Default::default(), + beacon_processor_send, + log, + } + } + + /* Lookup requests */ + + pub fn search_block( + &mut self, + hash: Hash256, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { + // Do not re-request a block that is already being requested + if self + .single_block_lookups + .values_mut() + .any(|single_block_request| single_block_request.add_peer(&hash, &peer_id)) + { + return; + } + + debug!( + self.log, + "Searching for block"; + "peer_id" => %peer_id, + "block" => %hash + ); + + let mut single_block_request = SingleBlockRequest::new(hash, peer_id); + + let (peer_id, request) = single_block_request.request_block().unwrap(); + if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { + self.single_block_lookups + .insert(request_id, single_block_request); + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + } + } + + pub fn search_parent( + &mut self, + block: Box>, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + // If this block or it's parent is part of a known failed chain, ignore it. + if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { + debug!(self.log, "Block is from a past failed chain. Dropping"; + "block_root" => ?block_root, "block_slot" => block.slot()); + return; + } + + // Make sure this block is not already downloaded, and that neither it or its parent is + // being searched for. + if self.parent_queue.iter_mut().any(|parent_req| { + parent_req.contains_block(&block) + || parent_req.add_peer(&block_root, &peer_id) + || parent_req.add_peer(&parent_root, &peer_id) + }) { + // we are already searching for this block, ignore it + return; + } + + let parent_lookup = ParentLookup::new(*block, peer_id); + self.request_parent(parent_lookup, cx); + } + + /* Lookup responses */ + + pub fn single_block_lookup_response( + &mut self, + id: Id, + peer_id: PeerId, + block: Option>>, + seen_timestamp: Duration, + cx: &mut SyncNetworkContext, + ) { + let mut request = match self.single_block_lookups.entry(id) { + Entry::Occupied(req) => req, + Entry::Vacant(_) => { + if block.is_some() { + crit!( + self.log, + "Block returned for single block lookup not present" + ); + #[cfg(debug_assertions)] + panic!("block returned for single block lookup not present"); + } + return; + } + }; + + match request.get_mut().verify_block(block) { + Ok(Some(block)) => { + // This is the correct block, send it for processing + if self + .send_block_for_processing( + block, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(&id); + } + } + Ok(None) => { + // request finished correctly, it will be removed after the block is processed. + } + Err(error) => { + let msg: &str = error.as_static(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + // Remove the request, if it can be retried it will be added with a new id. + let mut req = request.remove(); + + debug!(self.log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => %req.hash); + // try the request again if possible + if let Ok((peer_id, request)) = req.request_block() { + if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { + self.single_block_lookups.insert(id, req); + } + } + } + } + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + } + + pub fn parent_lookup_response( + &mut self, + id: Id, + peer_id: PeerId, + block: Option>>, + seen_timestamp: Duration, + cx: &mut SyncNetworkContext, + ) { + let mut parent_lookup = if let Some(pos) = self + .parent_queue + .iter() + .position(|request| request.pending_response(id)) + { + self.parent_queue.remove(pos) + } else { + if block.is_some() { + debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); + } + return; + }; + + match parent_lookup.verify_block(block, &self.failed_chains) { + Ok(Some(block)) => { + // Block is correct, send to the beacon processor. + let chain_hash = parent_lookup.chain_hash(); + if self + .send_block_for_processing( + block, + seen_timestamp, + BlockProcessType::ParentLookup { chain_hash }, + ) + .is_ok() + { + self.parent_queue.push(parent_lookup) + } + } + Ok(None) => { + // Request finished successfully, nothing else to do. It will be removed after the + // processing result arrives. + self.parent_queue.push(parent_lookup); + } + Err(e) => match e { + VerifyError::RootMismatch + | VerifyError::NoBlockReturned + | VerifyError::ExtraBlocksReturned => { + let e = e.as_static(); + warn!(self.log, "Peer sent invalid response to parent request."; + "peer_id" => %peer_id, "reason" => e); + + // We do not tolerate these kinds of errors. We will accept a few but these are signs + // of a faulty peer. + cx.report_peer(peer_id, PeerAction::LowToleranceError, e); + + // We try again if possible. + self.request_parent(parent_lookup, cx); + } + VerifyError::PreviousFailure { parent_root } => { + self.failed_chains.insert(parent_lookup.chain_hash()); + debug!( + self.log, + "Parent chain ignored due to past failure"; + "block" => %parent_root, + ); + // Add the root block to failed chains + self.failed_chains.insert(parent_lookup.chain_hash()); + + cx.report_peer( + peer_id, + PeerAction::MidToleranceError, + "bbroot_failed_chains", + ); + } + }, + }; + + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_queue.len() as i64, + ); + } + + /* Error responses */ + + #[allow(clippy::needless_collect)] // false positive + pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { + /* Check disconnection for single block lookups */ + // better written after https://github.com/rust-lang/rust/issues/59618 + let remove_retry_ids: Vec = self + .single_block_lookups + .iter_mut() + .filter_map(|(id, req)| { + if req.check_peer_disconnected(peer_id).is_err() { + Some(*id) + } else { + None + } + }) + .collect(); + + for mut req in remove_retry_ids + .into_iter() + .map(|id| self.single_block_lookups.remove(&id).unwrap()) + .collect::>() + { + // retry the request + match req.request_block() { + Ok((peer_id, block_request)) => { + if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { + self.single_block_lookups.insert(request_id, req); + } + } + Err(e) => { + trace!(self.log, "Single block request failed on peer disconnection"; + "block_root" => %req.hash, "peer_id" => %peer_id, "reason" => e.as_static()); + } + } + } + + /* Check disconnection for parent lookups */ + while let Some(pos) = self + .parent_queue + .iter_mut() + .position(|req| req.check_peer_disconnected(peer_id).is_err()) + { + let parent_lookup = self.parent_queue.remove(pos); + trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); + self.request_parent(parent_lookup, cx); + } + } + + pub fn parent_lookup_failed( + &mut self, + id: Id, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { + if let Some(pos) = self + .parent_queue + .iter() + .position(|request| request.pending_response(id)) + { + let mut parent_lookup = self.parent_queue.remove(pos); + parent_lookup.download_failed(); + trace!(self.log, "Parent lookup request failed"; &parent_lookup); + self.request_parent(parent_lookup, cx); + } else { + return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id); + }; + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_queue.len() as i64, + ); + } + + pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext) { + if let Some(mut request) = self.single_block_lookups.remove(&id) { + request.register_failure(); + trace!(self.log, "Single block lookup failed"; "block" => %request.hash); + if let Ok((peer_id, block_request)) = request.request_block() { + if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { + self.single_block_lookups.insert(request_id, request); + } + } + } + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + } + + /* Processing responses */ + + pub fn single_block_processed( + &mut self, + id: Id, + result: Result<(), BlockError>, + cx: &mut SyncNetworkContext, + ) { + let mut req = match self.single_block_lookups.remove(&id) { + Some(req) => req, + None => { + #[cfg(debug_assertions)] + panic!("block processed for single block lookup not present"); + #[cfg(not(debug_assertions))] + return crit!( + self.log, + "Block processed for single block lookup not present" + ); + } + }; + + let root = req.hash; + let peer_id = match req.processing_peer() { + Ok(peer) => peer, + Err(_) => return, + }; + + if let Err(e) = &result { + trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); + } else { + trace!(self.log, "Single block processing succeeded"; "block" => %root); + } + + match result { + Err(e) => match e { + BlockError::BlockIsAlreadyKnown => { + // No error here + } + BlockError::BeaconChainError(e) => { + // Internal error + error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); + } + BlockError::ParentUnknown(block) => { + self.search_parent(block, peer_id, cx); + } + other => { + warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); + cx.report_peer( + peer_id, + PeerAction::MidToleranceError, + "single_block_failure", + ); + + // Try it again if possible. + req.register_failure(); + if let Ok((peer_id, request)) = req.request_block() { + if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { + // insert with the new id + self.single_block_lookups.insert(request_id, req); + } + } + } + }, + Ok(()) => { + // No error here + } + } + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + } + + pub fn parent_block_processed( + &mut self, + chain_hash: Hash256, + result: Result<(), BlockError>, + cx: &mut SyncNetworkContext, + ) { + let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self + .parent_queue + .iter() + .enumerate() + .find_map(|(pos, request)| { + request + .get_processing_peer(chain_hash) + .map(|peer| (pos, peer)) + }) { + (self.parent_queue.remove(pos), peer) + } else { + #[cfg(debug_assertions)] + panic!( + "Process response for a parent lookup request that was not found. Chain_hash: {}", + chain_hash + ); + #[cfg(not(debug_assertions))] + return crit!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); + }; + + if let Err(e) = &result { + trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e); + } else { + trace!(self.log, "Parent block processing succeeded"; &parent_lookup); + } + + match result { + Err(BlockError::ParentUnknown(block)) => { + // need to keep looking for parents + // add the block back to the queue and continue the search + parent_lookup.add_block(*block); + self.request_parent(parent_lookup, cx); + } + Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { + let chain_hash = parent_lookup.chain_hash(); + let blocks = parent_lookup.chain_blocks(); + let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); + + match self + .beacon_processor_send + .try_send(WorkEvent::chain_segment(process_id, blocks)) + { + Ok(_) => { + self.parent_queue.push(parent_lookup); + } + Err(e) => { + error!( + self.log, + "Failed to send chain segment to processor"; + "error" => ?e + ); + } + } + } + Err(outcome) => { + // all else we consider the chain a failure and downvote the peer that sent + // us the last block + warn!( + self.log, "Invalid parent chain"; + "score_adjustment" => %PeerAction::MidToleranceError, + "outcome" => ?outcome, + "last_peer" => %peer_id, + ); + + // Add this chain to cache of failed chains + self.failed_chains.insert(chain_hash); + + // This currently can be a host of errors. We permit this due to the partial + // ambiguity. + cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); + } + } + + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_queue.len() as i64, + ); + } + + pub fn parent_chain_processed( + &mut self, + chain_hash: Hash256, + result: BatchProcessResult, + cx: &mut SyncNetworkContext, + ) { + let parent_lookup = if let Some(pos) = self + .parent_queue + .iter() + .position(|request| request.chain_hash() == chain_hash) + { + self.parent_queue.remove(pos) + } else { + #[cfg(debug_assertions)] + panic!( + "Chain process response for a parent lookup request that was not found. Chain_hash: {}", + chain_hash + ); + #[cfg(not(debug_assertions))] + return crit!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); + }; + + debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); + match result { + BatchProcessResult::Success(_) => { + // nothing to do. + } + BatchProcessResult::Failed { + imported_blocks: _, + peer_action, + } => { + self.failed_chains.insert(parent_lookup.chain_hash()); + if let Some(peer_action) = peer_action { + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, peer_action, "parent_chain_failure") + } + } + } + } + + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_queue.len() as i64, + ); + } + + /* Helper functions */ + + fn send_block_for_processing( + &mut self, + block: Box>, + duration: Duration, + process_type: BlockProcessType, + ) -> Result<(), ()> { + trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type); + let event = WorkEvent::rpc_beacon_block(block, duration, process_type); + if let Err(e) = self.beacon_processor_send.try_send(event) { + error!( + self.log, + "Failed to send sync block to processor"; + "error" => ?e + ); + return Err(()); + } + + Ok(()) + } + + fn request_parent( + &mut self, + mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, + ) { + match parent_lookup.request_parent(cx) { + Err(e) => { + debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static()); + match e { + parent_lookup::RequestError::SendFailed(_) => { + // Probably shutting down, nothing to do here. Drop the request + } + parent_lookup::RequestError::ChainTooLong + | parent_lookup::RequestError::TooManyAttempts => { + self.failed_chains.insert(parent_lookup.chain_hash()); + // This indicates faulty peers. + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) + } + } + parent_lookup::RequestError::NoPeers => { + // This happens if the peer disconnects while the block is being + // processed. Drop the request without extra penalty + } + } + } + Ok(_) => { + debug!(self.log, "Requesting parent"; &parent_lookup); + self.parent_queue.push(parent_lookup) + } + } + + // We remove and add back again requests so we want this updated regardless of outcome. + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_queue.len() as i64, + ); + } +} diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs new file mode 100644 index 000000000..eb8d61ab9 --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -0,0 +1,201 @@ +use lighthouse_network::PeerId; +use store::{EthSpec, Hash256, SignedBeaconBlock}; +use strum::AsStaticStr; + +use crate::sync::{ + manager::{Id, SLOT_IMPORT_TOLERANCE}, + network_context::SyncNetworkContext, +}; + +use super::single_block_lookup::{self, SingleBlockRequest}; + +/// How many attempts we try to find a parent of a block before we give up trying . +pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; +/// The maximum depth we will search for a parent block. In principle we should have sync'd any +/// canonical chain to its head once the peer connects. A chain should not appear where it's depth +/// is further back than the most recent head slot. +pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; + +/// Maintains a sequential list of parents to lookup and the lookup's current state. +pub(crate) struct ParentLookup { + /// The root of the block triggering this parent request. + chain_hash: Hash256, + /// The blocks that have currently been downloaded. + downloaded_blocks: Vec>, + /// Request of the last parent. + current_parent_request: SingleBlockRequest, + /// Id of the last parent request. + current_parent_request_id: Option, +} + +#[derive(Debug, PartialEq, Eq, AsStaticStr)] +pub enum VerifyError { + RootMismatch, + NoBlockReturned, + ExtraBlocksReturned, + PreviousFailure { parent_root: Hash256 }, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum RequestError { + SendFailed(&'static str), + ChainTooLong, + TooManyAttempts, + NoPeers, +} + +impl ParentLookup { + pub fn contains_block(&self, block: &SignedBeaconBlock) -> bool { + self.downloaded_blocks + .iter() + .any(|d_block| d_block == block) + } + + pub fn new(block: SignedBeaconBlock, peer_id: PeerId) -> Self { + let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); + + Self { + chain_hash: block.canonical_root(), + downloaded_blocks: vec![block], + current_parent_request, + current_parent_request_id: None, + } + } + + /// Attempts to request the next unknown parent. If the request fails, it should be removed. + pub fn request_parent(&mut self, cx: &mut SyncNetworkContext) -> Result<(), RequestError> { + // check to make sure this request hasn't failed + if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { + return Err(RequestError::ChainTooLong); + } + + let (peer_id, request) = self.current_parent_request.request_block()?; + match cx.parent_lookup_request(peer_id, request) { + Ok(request_id) => { + self.current_parent_request_id = Some(request_id); + Ok(()) + } + Err(reason) => { + self.current_parent_request_id = None; + Err(RequestError::SendFailed(reason)) + } + } + } + + pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { + self.current_parent_request.check_peer_disconnected(peer_id) + } + + pub fn add_block(&mut self, block: SignedBeaconBlock) { + let next_parent = block.parent_root(); + self.downloaded_blocks.push(block); + self.current_parent_request.hash = next_parent; + self.current_parent_request.state = single_block_lookup::State::AwaitingDownload; + self.current_parent_request_id = None; + } + + pub fn pending_response(&self, req_id: Id) -> bool { + self.current_parent_request_id == Some(req_id) + } + + /// Get the parent lookup's chain hash. + pub fn chain_hash(&self) -> Hash256 { + self.chain_hash + } + + pub fn download_failed(&mut self) { + self.current_parent_request.register_failure(); + self.current_parent_request_id = None; + } + + pub fn chain_blocks(&mut self) -> Vec> { + std::mem::take(&mut self.downloaded_blocks) + } + + /// Verifies that the received block is what we requested. If so, parent lookup now waits for + /// the processing result of the block. + pub fn verify_block( + &mut self, + block: Option>>, + failed_chains: &lru_cache::LRUCache, + ) -> Result>>, VerifyError> { + let block = self.current_parent_request.verify_block(block)?; + + // check if the parent of this block isn't in the failed cache. If it is, this chain should + // be dropped and the peer downscored. + if let Some(parent_root) = block.as_ref().map(|block| block.parent_root()) { + if failed_chains.contains(&parent_root) { + self.current_parent_request.register_failure(); + self.current_parent_request_id = None; + return Err(VerifyError::PreviousFailure { parent_root }); + } + } + + Ok(block) + } + + pub fn get_processing_peer(&self, chain_hash: Hash256) -> Option { + if self.chain_hash == chain_hash { + return self.current_parent_request.processing_peer().ok(); + } + None + } + + #[cfg(test)] + pub fn failed_attempts(&self) -> u8 { + self.current_parent_request.failed_attempts + } + + pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { + self.current_parent_request.add_peer(block_root, peer_id) + } + + pub fn used_peers(&self) -> impl Iterator + '_ { + self.current_parent_request.used_peers.iter() + } +} + +impl From for VerifyError { + fn from(e: super::single_block_lookup::VerifyError) -> Self { + use super::single_block_lookup::VerifyError as E; + match e { + E::RootMismatch => VerifyError::RootMismatch, + E::NoBlockReturned => VerifyError::NoBlockReturned, + E::ExtraBlocksReturned => VerifyError::ExtraBlocksReturned, + } + } +} + +impl From for RequestError { + fn from(e: super::single_block_lookup::LookupRequestError) -> Self { + use super::single_block_lookup::LookupRequestError as E; + match e { + E::TooManyAttempts => RequestError::TooManyAttempts, + E::NoPeers => RequestError::NoPeers, + } + } +} + +impl slog::KV for ParentLookup { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_arguments("chain_hash", &format_args!("{}", self.chain_hash))?; + slog::Value::serialize(&self.current_parent_request, record, "parent", serializer)?; + serializer.emit_usize("downloaded_blocks", self.downloaded_blocks.len())?; + slog::Result::Ok(()) + } +} + +impl RequestError { + pub fn as_static(&self) -> &'static str { + match self { + RequestError::SendFailed(e) => e, + RequestError::ChainTooLong => "chain_too_long", + RequestError::TooManyAttempts => "too_many_attempts", + RequestError::NoPeers => "no_peers", + } + } +} diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs new file mode 100644 index 000000000..a4df616cb --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -0,0 +1,209 @@ +use std::collections::HashSet; + +use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; +use rand::seq::IteratorRandom; +use ssz_types::VariableList; +use store::{EthSpec, Hash256, SignedBeaconBlock}; +use strum::AsStaticStr; + +/// Object representing a single block lookup request. +#[derive(PartialEq, Eq)] +pub struct SingleBlockRequest { + /// The hash of the requested block. + pub hash: Hash256, + /// State of this request. + pub state: State, + /// Peers that should have this block. + pub available_peers: HashSet, + /// Peers from which we have requested this block. + pub used_peers: HashSet, + /// How many times have we attempted this block. + pub failed_attempts: u8, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + AwaitingDownload, + Downloading { peer_id: PeerId }, + Processing { peer_id: PeerId }, +} + +#[derive(Debug, PartialEq, Eq, AsStaticStr)] +pub enum VerifyError { + RootMismatch, + NoBlockReturned, + ExtraBlocksReturned, +} + +#[derive(Debug, PartialEq, Eq, AsStaticStr)] +pub enum LookupRequestError { + TooManyAttempts, + NoPeers, +} + +impl SingleBlockRequest { + pub fn new(hash: Hash256, peer_id: PeerId) -> Self { + Self { + hash, + state: State::AwaitingDownload, + available_peers: HashSet::from([peer_id]), + used_peers: HashSet::default(), + failed_attempts: 0, + } + } + + pub fn register_failure(&mut self) { + self.failed_attempts += 1; + self.state = State::AwaitingDownload; + } + + pub fn add_peer(&mut self, hash: &Hash256, peer_id: &PeerId) -> bool { + let is_useful = &self.hash == hash; + if is_useful { + self.available_peers.insert(*peer_id); + } + is_useful + } + + /// If a peer disconnects, this request could be failed. If so, an error is returned + pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> { + self.available_peers.remove(dc_peer_id); + if let State::Downloading { peer_id } = &self.state { + if peer_id == dc_peer_id { + // Peer disconnected before providing a block + self.register_failure(); + return Err(()); + } + } + Ok(()) + } + + /// Verifies if the received block matches the requested one. + /// Returns the block for processing if the response is what we expected. + pub fn verify_block( + &mut self, + block: Option>>, + ) -> Result>>, VerifyError> { + match self.state { + State::AwaitingDownload => { + self.register_failure(); + Err(VerifyError::ExtraBlocksReturned) + } + State::Downloading { peer_id } => match block { + Some(block) => { + if block.canonical_root() != self.hash { + // return an error and drop the block + self.register_failure(); + Err(VerifyError::RootMismatch) + } else { + // Return the block for processing. + self.state = State::Processing { peer_id }; + Ok(Some(block)) + } + } + None => { + self.register_failure(); + Err(VerifyError::NoBlockReturned) + } + }, + State::Processing { peer_id: _ } => match block { + Some(_) => { + // We sent the block for processing and received an extra block. + self.register_failure(); + Err(VerifyError::ExtraBlocksReturned) + } + None => { + // This is simply the stream termination and we are already processing the + // block + Ok(None) + } + }, + } + } + + pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> { + debug_assert!(matches!(self.state, State::AwaitingDownload)); + if self.failed_attempts <= MAX_ATTEMPTS { + if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) { + let request = BlocksByRootRequest { + block_roots: VariableList::from(vec![self.hash]), + }; + self.state = State::Downloading { peer_id }; + self.used_peers.insert(peer_id); + Ok((peer_id, request)) + } else { + Err(LookupRequestError::NoPeers) + } + } else { + Err(LookupRequestError::TooManyAttempts) + } + } + + pub fn processing_peer(&self) -> Result { + if let State::Processing { peer_id } = &self.state { + Ok(*peer_id) + } else { + Err(()) + } + } +} + +impl slog::Value for SingleBlockRequest { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_str("request", key)?; + serializer.emit_arguments("hash", &format_args!("{}", self.hash))?; + match &self.state { + State::AwaitingDownload => { + "awaiting_download".serialize(record, "state", serializer)? + } + State::Downloading { peer_id } => { + serializer.emit_arguments("downloading_peer", &format_args!("{}", peer_id))? + } + State::Processing { peer_id } => { + serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))? + } + } + slog::Result::Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; + use types::MinimalEthSpec as E; + + fn rand_block() -> SignedBeaconBlock { + let mut rng = XorShiftRng::from_seed([42; 16]); + SignedBeaconBlock::from_block( + types::BeaconBlock::Base(types::BeaconBlockBase { + ..<_>::random_for_test(&mut rng) + }), + types::Signature::random_for_test(&mut rng), + ) + } + + #[test] + fn test_happy_path() { + let peer_id = PeerId::random(); + let block = rand_block(); + + let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id); + sl.request_block().unwrap(); + sl.verify_block(Some(Box::new(block))).unwrap().unwrap(); + } + + #[test] + fn test_max_attempts() { + let peer_id = PeerId::random(); + let block = rand_block(); + + let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id); + sl.register_failure(); + } +} diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs new file mode 100644 index 000000000..dde7d4995 --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -0,0 +1,460 @@ +use std::sync::Arc; + +use crate::service::RequestId; +use crate::sync::manager::RequestId as SyncId; +use crate::NetworkMessage; + +use super::*; + +use beacon_chain::builder::Witness; +use beacon_chain::eth1_chain::CachingEth1Backend; +use lighthouse_network::{NetworkGlobals, Request}; +use slog::{Drain, Level}; +use slot_clock::SystemTimeSlotClock; +use store::MemoryStore; +use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; +use types::MinimalEthSpec as E; + +type T = Witness, E, MemoryStore, MemoryStore>; + +struct TestRig { + beacon_processor_rx: mpsc::Receiver>, + network_rx: mpsc::UnboundedReceiver>, + rng: XorShiftRng, +} + +const D: Duration = Duration::new(0, 0); + +impl TestRig { + fn test_setup(log_level: Option) -> (BlockLookups, SyncNetworkContext, Self) { + let log = { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if let Some(log_level) = log_level { + slog::Logger::root(drain.filter_level(log_level).fuse(), slog::o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), slog::o!()) + } + }; + + let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(100); + let (network_tx, network_rx) = mpsc::unbounded_channel(); + let rng = XorShiftRng::from_seed([42; 16]); + let rig = TestRig { + beacon_processor_rx, + network_rx, + rng, + }; + let bl = BlockLookups::new( + beacon_processor_tx, + log.new(slog::o!("component" => "block_lookups")), + ); + let cx = { + let globals = Arc::new(NetworkGlobals::new_test_globals(&log)); + SyncNetworkContext::new( + network_tx, + globals, + log.new(slog::o!("component" => "network_context")), + ) + }; + + (bl, cx, rig) + } + + fn rand_block(&mut self) -> SignedBeaconBlock { + SignedBeaconBlock::from_block( + types::BeaconBlock::Base(types::BeaconBlockBase { + ..<_>::random_for_test(&mut self.rng) + }), + types::Signature::random_for_test(&mut self.rng), + ) + } + + #[track_caller] + fn expect_block_request(&mut self) -> Id { + match self.network_rx.try_recv() { + Ok(NetworkMessage::SendRequest { + peer_id: _, + request: Request::BlocksByRoot(_request), + request_id: RequestId::Sync(SyncId::SingleBlock { id }), + }) => id, + other => { + panic!("Expected block request, found {:?}", other); + } + } + } + + #[track_caller] + fn expect_parent_request(&mut self) -> Id { + match self.network_rx.try_recv() { + Ok(NetworkMessage::SendRequest { + peer_id: _, + request: Request::BlocksByRoot(_request), + request_id: RequestId::Sync(SyncId::ParentLookup { id }), + }) => id, + other => panic!("Expected parent request, found {:?}", other), + } + } + + #[track_caller] + fn expect_block_process(&mut self) { + match self.beacon_processor_rx.try_recv() { + Ok(work) => { + assert_eq!(work.work_type(), crate::beacon_processor::RPC_BLOCK); + } + other => panic!("Expected block process, found {:?}", other), + } + } + + #[track_caller] + fn expect_parent_chain_process(&mut self) { + match self.beacon_processor_rx.try_recv() { + Ok(work) => { + assert_eq!(work.work_type(), crate::beacon_processor::CHAIN_SEGMENT); + } + other => panic!("Expected chain segment process, found {:?}", other), + } + } + + #[track_caller] + fn expect_empty_network(&mut self) { + assert_eq!( + self.network_rx.try_recv().expect_err("must err"), + mpsc::error::TryRecvError::Empty + ); + } + + #[track_caller] + pub fn expect_penalty(&mut self) { + match self.network_rx.try_recv() { + Ok(NetworkMessage::ReportPeer { .. }) => {} + other => panic!("Expected peer penalty, found {:?}", other), + } + } + + pub fn block_with_parent(&mut self, parent_root: Hash256) -> SignedBeaconBlock { + SignedBeaconBlock::from_block( + types::BeaconBlock::Base(types::BeaconBlockBase { + parent_root, + ..<_>::random_for_test(&mut self.rng) + }), + types::Signature::random_for_test(&mut self.rng), + ) + } +} + +#[test] +fn test_single_block_lookup_happy_path() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let block = rig.rand_block(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block.canonical_root(), peer_id, &mut cx); + let id = rig.expect_block_request(); + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(id, peer_id, Some(Box::new(block)), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized, and the request removed + // after processing. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_block_processed(id, Ok(()), &mut cx); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 0); +} + +#[test] +fn test_single_block_lookup_empty_response() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, &mut cx); + let id = rig.expect_block_request(); + + // The peer does not have the block. It should be penalized. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_penalty(); + + rig.expect_block_request(); // it should be retried +} + +#[test] +fn test_single_block_lookup_wrong_response() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, &mut cx); + let id = rig.expect_block_request(); + + // Peer sends something else. It should be penalized. + let bad_block = rig.rand_block(); + bl.single_block_lookup_response(id, peer_id, Some(Box::new(bad_block)), D, &mut cx); + rig.expect_penalty(); + rig.expect_block_request(); // should be retried + + // Send the stream termination. This should not produce an additional penalty. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_empty_network(); +} + +#[test] +fn test_single_block_lookup_failure() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, &mut cx); + let id = rig.expect_block_request(); + + // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. + bl.single_block_lookup_failed(id, &mut cx); + rig.expect_block_request(); + rig.expect_empty_network(); +} + +#[test] +fn test_single_block_lookup_becomes_parent_request() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let block = rig.rand_block(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block.canonical_root(), peer_id, &mut cx); + let id = rig.expect_block_request(); + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(id, peer_id, Some(Box::new(block.clone())), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized, and the request moved to a + // parent request after processing. + bl.single_block_processed(id, Err(BlockError::ParentUnknown(Box::new(block))), &mut cx); + assert_eq!(bl.single_block_lookups.len(), 0); + rig.expect_parent_request(); + rig.expect_empty_network(); + assert_eq!(bl.parent_queue.len(), 1); +} + +#[test] +fn test_parent_lookup_happy_path() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Box::new(block), peer_id, &mut cx); + let id = rig.expect_parent_request(); + + // Peer sends the right block, it should be sent for processing. Peer should not be penalized. + bl.parent_lookup_response(id, peer_id, Some(Box::new(parent)), D, &mut cx); + rig.expect_block_process(); + rig.expect_empty_network(); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed(chain_hash, Err(BlockError::BlockIsAlreadyKnown), &mut cx); + rig.expect_parent_chain_process(); + bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + assert_eq!(bl.parent_queue.len(), 0); +} + +#[test] +fn test_parent_lookup_wrong_response() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Box::new(block), peer_id, &mut cx); + let id1 = rig.expect_parent_request(); + + // Peer sends the wrong block, peer should be penalized and the block re-requested. + let bad_block = rig.rand_block(); + bl.parent_lookup_response(id1, peer_id, Some(Box::new(bad_block)), D, &mut cx); + rig.expect_penalty(); + let id2 = rig.expect_parent_request(); + + // Send the stream termination for the first request. This should not produce extra penalties. + bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + rig.expect_empty_network(); + + // Send the right block this time. + bl.parent_lookup_response(id2, peer_id, Some(Box::new(parent)), D, &mut cx); + rig.expect_block_process(); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed(chain_hash, Ok(()), &mut cx); + rig.expect_parent_chain_process(); + bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + assert_eq!(bl.parent_queue.len(), 0); +} + +#[test] +fn test_parent_lookup_empty_response() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Box::new(block), peer_id, &mut cx); + let id1 = rig.expect_parent_request(); + + // Peer sends an empty response, peer should be penalized and the block re-requested. + bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + rig.expect_penalty(); + let id2 = rig.expect_parent_request(); + + // Send the right block this time. + bl.parent_lookup_response(id2, peer_id, Some(Box::new(parent)), D, &mut cx); + rig.expect_block_process(); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed(chain_hash, Ok(()), &mut cx); + rig.expect_parent_chain_process(); + bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + assert_eq!(bl.parent_queue.len(), 0); +} + +#[test] +fn test_parent_lookup_rpc_failure() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Box::new(block), peer_id, &mut cx); + let id1 = rig.expect_parent_request(); + + // The request fails. It should be tried again. + bl.parent_lookup_failed(id1, peer_id, &mut cx); + let id2 = rig.expect_parent_request(); + + // Send the right block this time. + bl.parent_lookup_response(id2, peer_id, Some(Box::new(parent)), D, &mut cx); + rig.expect_block_process(); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed(chain_hash, Ok(()), &mut cx); + rig.expect_parent_chain_process(); + bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + assert_eq!(bl.parent_queue.len(), 0); +} + +#[test] +fn test_parent_lookup_too_many_attempts() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Box::new(block), peer_id, &mut cx); + for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE + 1 { + let id = rig.expect_parent_request(); + match i % 2 { + // make sure every error is accounted for + 0 => { + // The request fails. It should be tried again. + bl.parent_lookup_failed(id, peer_id, &mut cx); + } + _ => { + // Send a bad block this time. It should be tried again. + let bad_block = rig.rand_block(); + bl.parent_lookup_response(id, peer_id, Some(Box::new(bad_block)), D, &mut cx); + rig.expect_penalty(); + } + } + if i < parent_lookup::PARENT_FAIL_TOLERANCE { + assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i)); + } + } + + assert_eq!(bl.parent_queue.len(), 0); + assert!(bl.failed_chains.contains(&chain_hash)); +} + +#[test] +fn test_parent_lookup_too_deep() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + let mut blocks = + Vec::>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE); + while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE { + let parent = blocks + .last() + .map(|b| b.canonical_root()) + .unwrap_or_else(Hash256::random); + let block = rig.block_with_parent(parent); + blocks.push(block); + } + + let peer_id = PeerId::random(); + let trigger_block = blocks.pop().unwrap(); + let chain_hash = trigger_block.canonical_root(); + bl.search_parent(Box::new(trigger_block), peer_id, &mut cx); + + for block in blocks.into_iter().rev() { + let id = rig.expect_parent_request(); + // the block + bl.parent_lookup_response(id, peer_id, Some(Box::new(block.clone())), D, &mut cx); + // the stream termination + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // the processing request + rig.expect_block_process(); + // the processing result + bl.parent_block_processed( + chain_hash, + Err(BlockError::ParentUnknown(Box::new(block))), + &mut cx, + ) + } + + rig.expect_penalty(); + assert!(bl.failed_chains.contains(&chain_hash)); +} + +#[test] +fn test_parent_lookup_disconnection() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + let peer_id = PeerId::random(); + let trigger_block = rig.rand_block(); + bl.search_parent(Box::new(trigger_block), peer_id, &mut cx); + bl.peer_disconnected(&peer_id, &mut cx); + assert!(bl.parent_queue.is_empty()); +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 021a12c18..53480db88 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,29 +34,25 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; +use super::block_lookups::BlockLookups; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; -use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; +use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; -use fnv::FnvHashMap; -use lighthouse_network::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; +use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; -use lru_cache::LRUCache; -use slog::{crit, debug, error, info, trace, warn, Logger}; -use smallvec::SmallVec; -use ssz_types::VariableList; +use slog::{crit, debug, error, info, trace, Logger}; use std::boxed::Box; -use std::collections::hash_map::Entry; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -66,12 +62,6 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// gossip if no peers are further than this range ahead of us that we have not already downloaded /// blocks for. pub const SLOT_IMPORT_TOLERANCE: usize = 32; -/// How many attempts we try to find a parent of a block before we give up trying . -const PARENT_FAIL_TOLERANCE: usize = 5; -/// The maximum depth we will search for a parent block. In principle we should have sync'd any -/// canonical chain to its head once the peer connects. A chain should not appear where it's depth -/// is further back than the most recent head slot. -const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; pub type Id = u32; @@ -120,26 +110,22 @@ pub enum SyncMessage { /// A batch has been processed by the block processor thread. BatchProcessed { - sync_type: SyncRequestType, + sync_type: ChainSegmentProcessId, result: BatchProcessResult, }, - /// A parent lookup has failed. - ParentLookupFailed { - /// The head of the chain of blocks that failed to process. - chain_head: Hash256, - /// The peer that instigated the chain lookup. - peer_id: PeerId, + /// Block processed + BlockProcessed { + process_type: BlockProcessType, + result: Result<(), BlockError>, }, } -/// The type of sync request made +/// The type of processing specified for a received block. #[derive(Debug, Clone)] -pub enum SyncRequestType { - /// Request was from the backfill sync algorithm. - BackFillSync(Epoch), - /// The request was from a chain in the range sync algorithm. - RangeSync(Epoch, ChainId), +pub enum BlockProcessType { + SingleBlock { id: Id }, + ParentLookup { chain_hash: Hash256 }, } /// The result of processing multiple blocks (a chain segment). @@ -154,23 +140,6 @@ pub enum BatchProcessResult { }, } -/// Maintains a sequential list of parents to lookup and the lookup's current state. -struct ParentRequests { - /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>, - - /// The number of failed attempts to retrieve a parent block. If too many attempts occur, this - /// lookup is failed and rejected. - failed_attempts: usize, - - /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is - /// penalized. - last_submitted_peer: PeerId, - - /// The request ID of this lookup is in progress. - pending: Option, -} - /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent @@ -194,45 +163,12 @@ pub struct SyncManager { /// Backfill syncing. backfill_sync: BackFillSync, - /// A collection of parent block lookups. - parent_queue: SmallVec<[ParentRequests; 3]>, - - /// A cache of failed chain lookups to prevent duplicate searches. - failed_chains: LRUCache, - - /// A collection of block hashes being searched for and a flag indicating if a result has been - /// received or not. - /// - /// The flag allows us to determine if the peer returned data or sent us nothing. - single_block_lookups: FnvHashMap, - - /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, - - /// Used for spawning tasks. - executor: task_executor::TaskExecutor, + block_lookups: BlockLookups, /// The logger for the import manager. log: Logger, } -/// Object representing a single block lookup request. -struct SingleBlockRequest { - /// The hash of the requested block. - pub hash: Hash256, - /// Whether a block was received from this request, or the peer returned an empty response. - pub block_returned: bool, -} - -impl SingleBlockRequest { - pub fn new(hash: Hash256) -> Self { - Self { - hash, - block_returned: false, - } - } -} - /// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon /// chain. This allows the chain to be /// dropped during the syncing process which will gracefully end the `SyncManager`. @@ -253,26 +189,22 @@ pub fn spawn( // create an instance of the SyncManager let mut sync_manager = SyncManager { + chain: beacon_chain.clone(), + network_globals: network_globals.clone(), + input_channel: sync_recv, + network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()), range_sync: RangeSync::new( beacon_chain.clone(), beacon_processor_send.clone(), log.clone(), ), backfill_sync: BackFillSync::new( - beacon_chain.clone(), - network_globals.clone(), + beacon_chain, + network_globals, beacon_processor_send.clone(), log.clone(), ), - network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()), - chain: beacon_chain, - network_globals, - input_channel: sync_recv, - parent_queue: SmallVec::new(), - failed_chains: LRUCache::new(500), - single_block_lookups: FnvHashMap::default(), - beacon_processor_send, - executor: executor.clone(), + block_lookups: BlockLookups::new(beacon_processor_send, log.clone()), log: log.clone(), }; @@ -322,321 +254,17 @@ impl SyncManager { self.update_sync_state(); } - /// The response to a `BlocksByRoot` request. - /// The current implementation takes one block at a time. As blocks are streamed, any - /// subsequent blocks will simply be ignored. - /// There are two reasons we could have received a BlocksByRoot response - /// - We requested a single hash and have received a response for the single_block_lookup - /// - We are looking up parent blocks in parent lookup search - async fn parent_lookup_response( - &mut self, - peer_id: PeerId, - request_id: Id, - block: Option>, - _seen_timestamp: Duration, - ) { - let mut parent_request = if let Some(pos) = self - .parent_queue - .iter() - .position(|request| request.pending == Some(request_id)) - { - // we remove from the queue and process it. It will get re-added if required - self.parent_queue.remove(pos) - } else { - if block.is_some() { - debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); - } - return; - }; - - match block { - Some(block) => { - // data was returned, not just a stream termination - - // check if the parent of this block isn't in our failed cache. If it is, this - // chain should be dropped and the peer downscored. - if self.failed_chains.contains(&block.message().parent_root()) { - debug!( - self.log, - "Parent chain ignored due to past failure"; - "block" => ?block.message().parent_root(), - "slot" => block.slot() - ); - if !parent_request.downloaded_blocks.is_empty() { - // Add the root block to failed chains - self.failed_chains - .insert(parent_request.downloaded_blocks[0].canonical_root()); - } else { - crit!(self.log, "Parent chain has no blocks"); - } - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "bbroot_failed_chains", - ); - return; - } - // add the block to response - parent_request.downloaded_blocks.push(block); - // queue for processing - self.process_parent_request(parent_request).await; - } - None => { - // An empty response has been returned to a parent request - // if an empty response is given, the peer didn't have the requested block, try again - parent_request.failed_attempts += 1; - parent_request.last_submitted_peer = peer_id; - self.request_parent(parent_request); - } - } - } - - async fn process_block_async( - &mut self, - block: SignedBeaconBlock, - ) -> Option>> { - let (event, rx) = BeaconWorkEvent::rpc_beacon_block(Box::new(block)); - match self.beacon_processor_send.try_send(event) { - Ok(_) => {} - Err(e) => { - error!( - self.log, - "Failed to send sync block to processor"; - "error" => ?e - ); - return None; - } - } - - match rx.await { - Ok(block_result) => Some(block_result), - Err(_) => { - warn!( - self.log, - "Sync block not processed"; - "msg" => "likely due to system resource exhaustion" - ); - None - } - } - } - - /// Processes the response obtained from a single block lookup search. If the block is - /// processed or errors, the search ends. If the blocks parent is unknown, a block parent - /// lookup search is started. - async fn single_block_lookup_response( - &mut self, - request_id: Id, - peer_id: PeerId, - block: Option>, - seen_timestamp: Duration, - ) { - if let Entry::Occupied(mut entry) = self.single_block_lookups.entry(request_id) { - match block { - None => { - // Stream termination. Remove the lookup - let (_, single_block_request) = entry.remove_entry(); - // The peer didn't respond with a block that it referenced. - // This can be allowed as some clients may implement pruning. We mildly - // tolerate this behaviour. - if !single_block_request.block_returned { - warn!(self.log, "Peer didn't respond with a block it referenced"; - "referenced_block_hash" => %single_block_request.hash, "peer_id" => %peer_id); - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "bbroot_no_block", - ); - } - } - Some(block) => { - // update the state of the lookup indicating a block was received from the peer - entry.get_mut().block_returned = true; - // verify the hash is correct and try and process the block - if entry.get().hash != block.canonical_root() { - // The peer that sent this, sent us the wrong block. - // We do not tolerate this behaviour. The peer is instantly disconnected and banned. - warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => %peer_id); - self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); - return; - } - - let block_result = match self.process_block_async(block.clone()).await { - Some(block_result) => block_result, - None => return, - }; - - // we have the correct block, try and process it - match block_result { - Ok(block_root) => { - // Block has been processed, so write the block time to the cache. - self.chain.block_times_cache.write().set_time_observed( - block_root, - block.slot(), - seen_timestamp, - None, - None, - ); - info!(self.log, "Processed block"; "block" => %block_root); - - // Spawn `BeaconChain::fork_choice` in a blocking task. It's - // potentially long-running and it might panic if run from an async - // context. - let chain = self.chain.clone(); - let log = self.log.clone(); - self.executor.spawn_blocking( - move || match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "single block" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => ?e, - "location" => "single block" - ), - }, - "sync_manager_fork_choice", - ); - } - Err(BlockError::ParentUnknown { .. }) => { - // We don't know of the blocks parent, begin a parent lookup search - self.add_unknown_block(peer_id, block); - } - Err(BlockError::BlockIsAlreadyKnown) => { - trace!(self.log, "Single block lookup already known"); - } - Err(BlockError::BeaconChainError(e)) => { - warn!(self.log, "Unexpected block processing error"; "error" => ?e); - } - outcome => { - warn!(self.log, "Single block lookup failed"; "outcome" => ?outcome); - // This could be a range of errors. But we couldn't process the block. - // For now we consider this a mid tolerance error. - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "single_block_lookup_failed", - ); - } - } - } - } - } - } - - /// A block has been sent to us that has an unknown parent. This begins a parent lookup search - /// to find the parent or chain of parents that match our current chain. - fn add_unknown_block(&mut self, peer_id: PeerId, block: SignedBeaconBlock) { - // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore - if !self.network_globals.sync_state.read().is_synced() { - let head_slot = self - .chain - .head_info() - .map(|info| info.slot) - .unwrap_or_else(|_| Slot::from(0u64)); - let unknown_block_slot = block.slot(); - - // if the block is far in the future, ignore it. If its within the slot tolerance of - // our current head, regardless of the syncing state, fetch it. - if (head_slot >= unknown_block_slot - && head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE) - || (head_slot < unknown_block_slot - && unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE) - { - return; - } - } - - let block_root = block.canonical_root(); - // If this block or it's parent is part of a known failed chain, ignore it. - if self.failed_chains.contains(&block.message().parent_root()) - || self.failed_chains.contains(&block_root) - { - debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => ?block_root, "block_slot" => block.slot()); - return; - } - - // Make sure this block is not already being searched for - // NOTE: Potentially store a hashset of blocks for O(1) lookups - for parent_req in self.parent_queue.iter() { - if parent_req - .downloaded_blocks - .iter() - .any(|d_block| d_block == &block) - { - // we are already searching for this block, ignore it - return; - } - } - - debug!(self.log, "Unknown block received. Starting a parent lookup"; "block_slot" => block.slot(), "block_hash" => %block.canonical_root()); - - let parent_request = ParentRequests { - downloaded_blocks: vec![block], - failed_attempts: 0, - last_submitted_peer: peer_id, - pending: None, - }; - - self.request_parent(parent_request) - } - - /// A request to search for a block hash has been received. This function begins a BlocksByRoot - /// request to find the requested block. - fn search_for_block(&mut self, peer_id: PeerId, block_hash: Hash256) { - // If we are not synced, ignore this block - if !self.network_globals.sync_state.read().is_synced() { - return; - } - - // Do not re-request a block that is already being requested - if self - .single_block_lookups - .values() - .any(|single_block_request| single_block_request.hash == block_hash) - { - return; - } - - debug!( - self.log, - "Searching for block"; - "peer_id" => %peer_id, - "block" => %block_hash - ); - - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![block_hash]), - }; - - if let Ok(request_id) = self.network.single_block_lookup_request(peer_id, request) { - self.single_block_lookups - .insert(request_id, SingleBlockRequest::new(block_hash)); - } - } - /// Handles RPC errors related to requests that were emitted from the sync manager. fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) { trace!(self.log, "Sync manager received a failed RPC"); match request_id { RequestId::SingleBlock { id } => { - self.single_block_lookups.remove(&id); + self.block_lookups + .single_block_lookup_failed(id, &mut self.network); } RequestId::ParentLookup { id } => { - if let Some(pos) = self - .parent_queue - .iter() - .position(|request| request.pending == Some(id)) - { - // increment the failure of a parent lookup if the request matches a parent search - let mut parent_request = self.parent_queue.remove(pos); - parent_request.failed_attempts += 1; - parent_request.last_submitted_peer = peer_id; - self.request_parent(parent_request); - } + self.block_lookups + .parent_lookup_failed(id, peer_id, &mut self.network); } RequestId::BackFillSync { id } => { if let Some(batch_id) = self.network.backfill_sync_response(id, true) { @@ -666,6 +294,8 @@ impl SyncManager { fn peer_disconnect(&mut self, peer_id: &PeerId) { self.range_sync.peer_disconnect(&mut self.network, peer_id); + self.block_lookups + .peer_disconnected(peer_id, &mut self.network); // Regardless of the outcome, we update the sync status. let _ = self .backfill_sync @@ -824,187 +454,6 @@ impl SyncManager { } } - /* Processing State Functions */ - // These functions are called in the main poll function to transition the state of the sync - // manager - - /// A new block has been received for a parent lookup query, process it. - async fn process_parent_request(&mut self, mut parent_request: ParentRequests) { - // verify the last added block is the parent of the last requested block - - if parent_request.downloaded_blocks.len() < 2 { - crit!( - self.log, - "There must be at least two blocks in a parent request lookup at all times" - ); - panic!("There must be at least two blocks in parent request lookup at all times"); - // fail loudly - } - let previous_index = parent_request.downloaded_blocks.len() - 2; - let expected_hash = parent_request.downloaded_blocks[previous_index].parent_root(); - - // Note: the length must be greater than 2 so this cannot panic. - let block_hash = parent_request - .downloaded_blocks - .last() - .expect("Complete batch cannot be empty") - .canonical_root(); - if block_hash != expected_hash { - // The sent block is not the correct block, remove the head block and downvote - // the peer - let _ = parent_request.downloaded_blocks.pop(); - let peer = parent_request.last_submitted_peer; - - warn!(self.log, "Peer sent invalid parent."; - "peer_id" => %peer, - "received_block" => %block_hash, - "expected_parent" => %expected_hash, - ); - - // We try again, but downvote the peer. - self.request_parent(parent_request); - // We do not tolerate these kinds of errors. We will accept a few but these are signs - // of a faulty peer. - self.network.report_peer( - peer, - PeerAction::LowToleranceError, - "parent_request_bad_hash", - ); - } else { - // The last block in the queue is the only one that has not attempted to be processed yet. - // - // The logic here attempts to process the last block. If it can be processed, the rest - // of the blocks must have known parents. If any of them cannot be processed, we - // consider the entire chain corrupt and drop it, notifying the user. - // - // If the last block in the queue cannot be processed, we also drop the entire queue. - // If the last block in the queue has an unknown parent, we continue the parent - // lookup-search. - - let chain_block_hash = parent_request.downloaded_blocks[0].canonical_root(); - - let newest_block = parent_request - .downloaded_blocks - .pop() - .expect("There is always at least one block in the queue"); - - let block_result = match self.process_block_async(newest_block.clone()).await { - Some(block_result) => block_result, - None => return, - }; - - match block_result { - Err(BlockError::ParentUnknown { .. }) => { - // need to keep looking for parents - // add the block back to the queue and continue the search - parent_request.downloaded_blocks.push(newest_block); - self.request_parent(parent_request); - } - Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { - let process_id = ProcessId::ParentLookup( - parent_request.last_submitted_peer, - chain_block_hash, - ); - let blocks = parent_request.downloaded_blocks; - - match self - .beacon_processor_send - .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) - { - Ok(_) => {} - Err(e) => { - error!( - self.log, - "Failed to send chain segment to processor"; - "error" => ?e - ); - } - } - } - Err(outcome) => { - // all else we consider the chain a failure and downvote the peer that sent - // us the last block - warn!( - self.log, "Invalid parent chain"; - "score_adjustment" => %PeerAction::MidToleranceError, - "outcome" => ?outcome, - "last_peer" => %parent_request.last_submitted_peer, - ); - - // Add this chain to cache of failed chains - self.failed_chains.insert(chain_block_hash); - - // This currently can be a host of errors. We permit this due to the partial - // ambiguity. - self.network.report_peer( - parent_request.last_submitted_peer, - PeerAction::MidToleranceError, - "parent_request_err", - ); - } - } - } - } - - /// Progresses a parent request query. - /// - /// This checks to ensure there a peers to progress the query, checks for failures and - /// initiates requests. - fn request_parent(&mut self, mut parent_request: ParentRequests) { - // check to make sure this request hasn't failed - if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE - || parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE - { - let error = if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { - // This is a peer-specific error and the chain could be continued with another - // peer. We don't consider this chain a failure and prevent retries with another - // peer. - "too many failed attempts" - } else if !parent_request.downloaded_blocks.is_empty() { - self.failed_chains - .insert(parent_request.downloaded_blocks[0].canonical_root()); - "reached maximum lookup-depth" - } else { - crit!(self.log, "Parent lookup has no blocks"); - "no blocks" - }; - - debug!(self.log, "Parent import failed"; - "block" => ?parent_request.downloaded_blocks[0].canonical_root(), - "ancestors_found" => parent_request.downloaded_blocks.len(), - "reason" => error - ); - // Downscore the peer. - self.network.report_peer( - parent_request.last_submitted_peer, - PeerAction::LowToleranceError, - "request_parent_import_failed", - ); - return; // drop the request - } - - let parent_hash = if let Some(block) = parent_request.downloaded_blocks.last() { - block.parent_root() - } else { - crit!(self.log, "Parent queue is empty. This should never happen"); - return; - }; - - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![parent_hash]), - }; - - // We continue to search for the chain of blocks from the same peer. Other peers are not - // guaranteed to have this chain of blocks. - let peer_id = parent_request.last_submitted_peer; - - if let Ok(request_id) = self.network.parent_lookup_request(peer_id, request) { - // if the request was successful add the queue back into self - parent_request.pending = Some(request_id); - self.parent_queue.push(parent_request); - } - } - /// The main driving future for the sync manager. async fn main(&mut self) { // process any inbound messages @@ -1020,19 +469,43 @@ impl SyncManager { beacon_block, seen_timestamp, } => { - self.rpc_block_received( - request_id, - peer_id, - beacon_block.map(|b| *b), - seen_timestamp, - ) - .await; + self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); } SyncMessage::UnknownBlock(peer_id, block) => { - self.add_unknown_block(peer_id, *block); + // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore + if !self.network_globals.sync_state.read().is_synced() { + let head_slot = self + .chain + .head_info() + .map(|info| info.slot) + .unwrap_or_else(|_| Slot::from(0u64)); + let unknown_block_slot = block.slot(); + + // if the block is far in the future, ignore it. If its within the slot tolerance of + // our current head, regardless of the syncing state, fetch it. + if (head_slot >= unknown_block_slot + && head_slot.sub(unknown_block_slot).as_usize() + > SLOT_IMPORT_TOLERANCE) + || (head_slot < unknown_block_slot + && unknown_block_slot.sub(head_slot).as_usize() + > SLOT_IMPORT_TOLERANCE) + { + continue; + } + } + if self.network_globals.peers.read().is_connected(&peer_id) { + self.block_lookups + .search_parent(block, peer_id, &mut self.network); + } } SyncMessage::UnknownBlockHash(peer_id, block_hash) => { - self.search_for_block(peer_id, block_hash); + // If we are not synced, ignore this block. + if self.network_globals.sync_state.read().is_synced() + && self.network_globals.peers.read().is_connected(&peer_id) + { + self.block_lookups + .search_block(block_hash, peer_id, &mut self.network); + } } SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); @@ -1041,8 +514,19 @@ impl SyncManager { peer_id, request_id, } => self.inject_error(peer_id, request_id), + SyncMessage::BlockProcessed { + process_type, + result, + } => match process_type { + BlockProcessType::SingleBlock { id } => self + .block_lookups + .single_block_processed(id, result, &mut self.network), + BlockProcessType::ParentLookup { chain_hash } => self + .block_lookups + .parent_block_processed(chain_hash, result, &mut self.network), + }, SyncMessage::BatchProcessed { sync_type, result } => match sync_type { - SyncRequestType::RangeSync(epoch, chain_id) => { + ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { self.range_sync.handle_block_process_result( &mut self.network, chain_id, @@ -1051,7 +535,7 @@ impl SyncManager { ); self.update_sync_state(); } - SyncRequestType::BackFillSync(epoch) => { + ChainSegmentProcessId::BackSyncBatchId(epoch) => { match self.backfill_sync.on_batch_process_result( &mut self.network, epoch, @@ -1066,41 +550,37 @@ impl SyncManager { } } } + ChainSegmentProcessId::ParentLookup(chain_hash) => self + .block_lookups + .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::ParentLookupFailed { - chain_head, - peer_id, - } => { - // A peer sent an object (block or attestation) that referenced a parent. - // The processing of this chain failed. - self.failed_chains.insert(chain_head); - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "parent_lookup_failed", - ); - } } } } } - async fn rpc_block_received( + fn rpc_block_received( &mut self, request_id: RequestId, peer_id: PeerId, - beacon_block: Option>, + beacon_block: Option>>, seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => { - self.single_block_lookup_response(id, peer_id, beacon_block, seen_timestamp) - .await; - } - RequestId::ParentLookup { id } => { - self.parent_lookup_response(peer_id, id, beacon_block, seen_timestamp) - .await - } + RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( + id, + peer_id, + beacon_block, + seen_timestamp, + &mut self.network, + ), + RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( + id, + peer_id, + beacon_block, + seen_timestamp, + &mut self.network, + ), RequestId::BackFillSync { id } => { if let Some(batch_id) = self .network @@ -1111,7 +591,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block, + beacon_block.map(|b| *b), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -1133,7 +613,7 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block, + beacon_block.map(|b| *b), ); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 169a41d71..7a891de72 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -2,6 +2,7 @@ //! //! Stores the various syncing methods for the beacon chain. mod backfill_sync; +mod block_lookups; pub mod manager; mod network_context; mod peer_sync_info; diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 73f4ecbe0..3f8164721 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,5 +1,5 @@ use super::batch::{BatchInfo, BatchState}; -use crate::beacon_processor::ProcessId; +use crate::beacon_processor::ChainSegmentProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; use beacon_chain::BeaconChainTypes; @@ -300,7 +300,7 @@ impl SyncingChain { // for removing chains and checking completion is in the callback. let blocks = batch.start_processing()?; - let process_id = ProcessId::RangeBatchId(self.id, batch_id); + let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); if let Err(e) = self