From b8013b7b2c2421f45777e7181575376137ba8e68 Mon Sep 17 00:00:00 2001 From: divma Date: Wed, 23 Sep 2020 06:29:55 +0000 Subject: [PATCH] Super Silky Smooth Syncs, like a Sir (#1628) ## Issue Addressed In principle.. closes #1551 but in general are improvements for performance, maintainability and readability. The logic for the optimistic sync in actually simple ## Proposed Changes There are miscellaneous things here: - Remove unnecessary `BatchProcessResult::Partial` to simplify the batch validation logic - Make batches a state machine. This is done to ensure batch state transitions respect our logic (this was previously done by moving batches between `Vec`s) and to ease the cognitive load of the `SyncingChain` struct - Move most batch-related logic to the batch - Remove `PendingBatches` in favor of a map of peers to their batches. This is to avoid duplicating peers inside the chain (peer_pool and pending_batches) - Add `must_use` decoration to the `ProcessingResult` so that chains that request to be removed are handled accordingly. This also means that chains are now removed in more places than before to account for unhandled cases - Store batches in a sorted map (`BTreeMap`) access is not O(1) but since the number of _active_ batches is bounded this should be fast, and saves performing hashing ops. Batches are indexed by the epoch they start. Sorted, to easily handle chain advancements (range logic) - Produce the chain Id from the identifying fields: target root and target slot. This, to guarantee there can't be duplicated chains and be able to consistently search chains by either Id or checkpoint - Fix chain_id not being present in all chain loggers - Handle mega-edge case where the processor's work queue is full and the batch can't be sent. In this case the chain would lose the blocks, remain in a "syncing" state and waiting for a result that won't arrive, effectively stalling sync. - When a batch imports blocks or the chain starts syncing with a local finalized epoch greater that the chain's start epoch, the chain is advanced instead of reset. This is to avoid losing download progress and validate batches faster. This also means that the old `start_epoch` now means "current first unvalidated batch", so it represents more accurately the progress of the chain. - Batch status peers from the same chain to reduce Arc access. - Handle a couple of cases where the retry counters for a batch were not updated/checked are now handled via the batch state machine. Basically now if we forget to do it, we will know. - Do not send back the blocks from the processor to the batch. Instead register the attempt before sending the blocks (does not count as failed) - When re-requesting a batch, try to avoid not only the last failed peer, but all previous failed peers. - Optimize requesting batches ahead in the buffer by shuffling idle peers just once (this is just addressing a couple of old TODOs in the code) - In chain_collection, store chains by their id in a map - Include a mapping from request_ids to (chain, batch) that requested the batch to avoid the double O(n) search on block responses - Other stuff: - impl `slog::KV` for batches - impl `slog::KV` for syncing chains - PSA: when logging, we can use `%thing` if `thing` implements `Display`. Same for `?` and `Debug` ### Optimistic syncing: Try first the batch that contains the current head, if the batch imports any block, advance the chain. If not, if this optimistic batch is inside the current processing window leave it there for future use, if not drop it. The tolerance for this block is the same for downloading, but just once for processing Co-authored-by: Age Manning --- beacon_node/eth2_libp2p/Cargo.toml | 1 - beacon_node/network/Cargo.toml | 1 + .../src/beacon_processor/chain_segment.rs | 51 +- .../network/src/beacon_processor/worker.rs | 7 +- beacon_node/network/src/router/processor.rs | 6 +- beacon_node/network/src/sync/manager.rs | 35 +- .../network/src/sync/network_context.rs | 63 +- .../network/src/sync/range_sync/batch.rs | 449 ++++-- .../network/src/sync/range_sync/chain.rs | 1295 ++++++++++------- .../src/sync/range_sync/chain_collection.rs | 477 +++--- .../network/src/sync/range_sync/mod.rs | 4 +- .../network/src/sync/range_sync/range.rs | 289 ++-- .../network/src/sync/range_sync/sync_type.rs | 1 + 13 files changed, 1480 insertions(+), 1199 deletions(-) diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index 19eef3cbc..49e6dc92e 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -35,7 +35,6 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] } discv5 = { version = "0.1.0-alpha.10", features = ["libp2p"] } tiny-keccak = "2.0.2" environment = { path = "../../lighthouse/environment" } -# TODO: Remove rand crate for mainnet rand = "0.7.3" regex = "1.3.9" diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 2ef369e3b..0448e7762 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -32,6 +32,7 @@ tokio = { version = "0.2.21", features = ["full"] } parking_lot = "0.11.0" smallvec = "1.4.1" # TODO: Remove rand crate for mainnet +# NOTE: why? rand = "0.7.3" fnv = "1.0.6" rlp = "0.4.5" diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs index 94c7893f0..e659a84b8 100644 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -28,39 +28,26 @@ pub fn handle_chain_segment( match process_id { // this a request from the range sync ProcessId::RangeBatchId(chain_id, epoch) => { - let len = downloaded_blocks.len(); - let start_slot = if len > 0 { - downloaded_blocks[0].message.slot.as_u64() - } else { - 0 - }; - let end_slot = if len > 0 { - downloaded_blocks[len - 1].message.slot.as_u64() - } else { - 0 - }; + let start_slot = downloaded_blocks.first().map(|b| b.message.slot.as_u64()); + let end_slot = downloaded_blocks.last().map(|b| b.message.slot.as_u64()); + let sent_blocks = downloaded_blocks.len(); - debug!(log, "Processing batch"; "batch_epoch" => epoch, "blocks" => downloaded_blocks.len(), "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service" => "sync"); let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { (_, Ok(_)) => { - debug!(log, "Batch processed"; "batch_epoch" => epoch , "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service"=> "sync"); - BatchProcessResult::Success + debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); + BatchProcessResult::Success(sent_blocks > 0) } - (imported_blocks, Err(e)) if imported_blocks > 0 => { - debug!(log, "Batch processing failed but imported some blocks"; - "batch_epoch" => epoch, "error" => e, "imported_blocks"=> imported_blocks, "service" => "sync"); - BatchProcessResult::Partial - } - (_, Err(e)) => { - debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "error" => e, "service" => "sync"); - BatchProcessResult::Failed + (imported_blocks, Err(e)) => { + debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + "last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync"); + BatchProcessResult::Failed(imported_blocks > 0) } }; let msg = SyncMessage::BatchProcessed { chain_id, epoch, - downloaded_blocks, result, }; sync_send.send(msg).unwrap_or_else(|_| { @@ -70,7 +57,7 @@ pub fn handle_chain_segment( ); }); } - // this a parent lookup request from the sync manager + // this is a parent lookup request from the sync manager ProcessId::ParentLookup(peer_id, chain_head) => { debug!( log, "Processing parent lookup"; @@ -81,7 +68,7 @@ pub fn handle_chain_segment( // reverse match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { (_, Err(e)) => { - debug!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); + debug!(log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); sync_send .send(SyncMessage::ParentLookupFailed{peer_id, chain_head}) .unwrap_or_else(|_| { @@ -114,13 +101,7 @@ fn process_blocks< match chain.process_chain_segment(blocks) { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks == 0 { - debug!(log, "All blocks already known"); - } else { - debug!( - log, "Imported blocks from network"; - "count" => imported_blocks, - ); + if imported_blocks > 0 { // Batch completed successfully with at least one block, run fork choice. run_fork_choice(chain, log); } @@ -153,7 +134,7 @@ fn run_fork_choice(chain: Arc>, log: &slog:: Err(e) => error!( log, "Fork choice failed"; - "error" => format!("{:?}", e), + "error" => ?e, "location" => "batch import error" ), } @@ -219,7 +200,7 @@ fn handle_failed_chain_segment( warn!( log, "BlockProcessingFailure"; "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", e) + "outcome" => ?e, ); Err(format!("Internal error whilst processing block: {:?}", e)) @@ -228,7 +209,7 @@ fn handle_failed_chain_segment( debug!( log, "Invalid block received"; "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", other), + "outcome" => %other, ); Err(format!("Peer sent invalid block. Reason: {:?}", other)) diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker.rs index 6388962e5..79d5d53c5 100644 --- a/beacon_node/network/src/beacon_processor/worker.rs +++ b/beacon_node/network/src/beacon_processor/worker.rs @@ -535,9 +535,10 @@ impl Worker { /// /// Creates a log if there is an interal error. fn send_sync_message(&self, message: SyncMessage) { - self.sync_tx - .send(message) - .unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service")); + self.sync_tx.send(message).unwrap_or_else(|e| { + error!(self.log, "Could not send message to the sync service"; + "error" => %e) + }); } /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 9d83c5576..36b799c8d 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -82,10 +82,11 @@ impl Processor { } fn send_to_sync(&mut self, message: SyncMessage) { - self.sync_send.send(message).unwrap_or_else(|_| { + self.sync_send.send(message).unwrap_or_else(|e| { warn!( self.log, "Could not send message to the sync service"; + "error" => %e, ) }); } @@ -691,9 +692,10 @@ impl HandlerNetworkContext { /// Sends a message to the network task. fn inform_network(&mut self, msg: NetworkMessage) { + let msg_r = &format!("{:?}", msg); self.network_send .send(msg) - .unwrap_or_else(|_| warn!(self.log, "Could not send message to the network service")) + .unwrap_or_else(|e| warn!(self.log, "Could not send message to the network service"; "error" => %e, "message" => msg_r)) } /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f14794447..a2f479292 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -29,9 +29,9 @@ //! //! Block Lookup //! -//! To keep the logic maintained to the syncing thread (and manage the request_ids), when a block needs to be searched for (i.e -//! if an attestation references an unknown block) this manager can search for the block and -//! subsequently search for parents if needed. +//! To keep the logic maintained to the syncing thread (and manage the request_ids), when a block +//! needs to be searched for (i.e if an attestation references an unknown block) this manager can +//! search for the block and subsequently search for parents if needed. use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; @@ -106,7 +106,6 @@ pub enum SyncMessage { BatchProcessed { chain_id: ChainId, epoch: Epoch, - downloaded_blocks: Vec>, result: BatchProcessResult, }, @@ -123,12 +122,10 @@ pub enum SyncMessage { // TODO: When correct batch error handling occurs, we will include an error type. #[derive(Debug)] pub enum BatchProcessResult { - /// The batch was completed successfully. - Success, - /// The batch processing failed. - Failed, - /// The batch processing failed but managed to import at least one block. - Partial, + /// The batch was completed successfully. It carries whether the sent batch contained blocks. + Success(bool), + /// The batch processing failed. It carries whether the processing imported any block. + Failed(bool), } /// Maintains a sequential list of parents to lookup and the lookup's current state. @@ -275,9 +272,9 @@ impl SyncManager { match local_peer_info.peer_sync_type(&remote) { PeerSyncType::FullySynced => { trace!(self.log, "Peer synced to our head found"; - "peer" => format!("{:?}", peer_id), - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local_peer_info.head_slot, + "peer" => %peer_id, + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local_peer_info.head_slot, ); self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added @@ -285,11 +282,11 @@ impl SyncManager { } PeerSyncType::Advanced => { trace!(self.log, "Useful peer for sync found"; - "peer" => format!("{:?}", peer_id), - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local_peer_info.head_slot, - "peer_finalized_epoch" => remote.finalized_epoch, - "local_finalized_epoch" => local_peer_info.finalized_epoch, + "peer" => %peer_id, + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local_peer_info.head_slot, + "peer_finalized_epoch" => remote.finalized_epoch, + "local_finalized_epoch" => local_peer_info.finalized_epoch, ); // There are few cases to handle here: @@ -908,14 +905,12 @@ impl SyncManager { SyncMessage::BatchProcessed { chain_id, epoch, - downloaded_blocks, result, } => { self.range_sync.handle_block_process_result( &mut self.network, chain_id, epoch, - downloaded_blocks, result, ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 83f4cb64e..715344eb1 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,11 +1,14 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +use super::range_sync::{BatchId, ChainId}; +use super::RequestId as SyncRequestId; use crate::router::processor::status_message; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request}; +use fnv::FnvHashMap; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -21,7 +24,11 @@ pub struct SyncNetworkContext { network_globals: Arc>, /// A sequential ID for all RPC requests. - request_id: usize, + request_id: SyncRequestId, + + /// BlocksByRange requests made by range syncing chains. + range_requests: FnvHashMap, + /// Logger for the `SyncNetworkContext`. log: slog::Logger, } @@ -36,6 +43,7 @@ impl SyncNetworkContext { network_send, network_globals, request_id: 1, + range_requests: FnvHashMap::default(), log, } } @@ -50,24 +58,26 @@ impl SyncNetworkContext { .unwrap_or_default() } - pub fn status_peer( + pub fn status_peers( &mut self, chain: Arc>, - peer_id: PeerId, + peers: impl Iterator, ) { if let Some(status_message) = status_message(&chain) { - debug!( - self.log, - "Sending Status Request"; - "peer" => format!("{:?}", peer_id), - "fork_digest" => format!("{:?}", status_message.fork_digest), - "finalized_root" => format!("{:?}", status_message.finalized_root), - "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), - "head_root" => format!("{}", status_message.head_root), - "head_slot" => format!("{}", status_message.head_slot), - ); + for peer_id in peers { + debug!( + self.log, + "Sending Status Request"; + "peer" => %peer_id, + "fork_digest" => ?status_message.fork_digest, + "finalized_root" => ?status_message.finalized_root, + "finalized_epoch" => ?status_message.finalized_epoch, + "head_root" => %status_message.head_root, + "head_slot" => %status_message.head_slot, + ); - let _ = self.send_rpc_request(peer_id, Request::Status(status_message)); + let _ = self.send_rpc_request(peer_id, Request::Status(status_message.clone())); + } } } @@ -75,15 +85,34 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: BlocksByRangeRequest, - ) -> Result { + chain_id: ChainId, + batch_id: BatchId, + ) -> Result<(), &'static str> { trace!( self.log, "Sending BlocksByRange Request"; "method" => "BlocksByRange", "count" => request.count, - "peer" => format!("{:?}", peer_id) + "peer" => %peer_id, ); - self.send_rpc_request(peer_id, Request::BlocksByRange(request)) + let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?; + self.range_requests.insert(req_id, (chain_id, batch_id)); + Ok(()) + } + + pub fn blocks_by_range_response( + &mut self, + request_id: usize, + remove: bool, + ) -> Option<(ChainId, BatchId)> { + // NOTE: we can't guarantee that the request must be registered as it could receive more + // than an error, and be removed after receiving the first one. + // FIXME: https://github.com/sigp/lighthouse/issues/1634 + if remove { + self.range_requests.remove(&request_id) + } else { + self.range_requests.get(&request_id).cloned() + } } pub fn blocks_by_root_request( diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 1fa312c57..532dafd2e 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,35 +1,274 @@ -use super::chain::EPOCHS_PER_BATCH; -use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; -use fnv::FnvHashMap; use ssz::Encode; -use std::cmp::min; -use std::cmp::Ordering; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; -/// A collection of sequential blocks that are requested from peers in a single RPC request. -#[derive(PartialEq, Debug)] -pub struct Batch { - /// The requested start epoch of the batch. - pub start_epoch: Epoch, - /// The requested end slot of batch, exclusive. - pub end_slot: Slot, - /// The `Attempts` that have been made to send us this batch. - pub attempts: Vec, - /// The peer that is currently assigned to the batch. - pub current_peer: PeerId, - /// The number of retries this batch has undergone due to a failed request. - /// This occurs when peers do not respond or we get an RPC error. - pub retries: u8, - /// The number of times this batch has attempted to be re-downloaded and re-processed. This - /// occurs when a batch has been received but cannot be processed. - pub reprocess_retries: u8, - /// The blocks that have been downloaded. - pub downloaded_blocks: Vec>, +/// The number of times to retry a batch before it is considered failed. +const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; + +/// A segment of a chain. +pub struct BatchInfo { + /// Start slot of the batch. + start_slot: Slot, + /// End slot of the batch. + end_slot: Slot, + /// The `Attempts` that have been made and failed to send us this batch. + failed_processing_attempts: Vec, + /// The number of download retries this batch has undergone due to a failed request. + failed_download_attempts: Vec, + /// State of the batch. + state: BatchState, +} + +/// Current state of a batch +pub enum BatchState { + /// The batch has failed either downloading or processing, but can be requested again. + AwaitingDownload, + /// The batch is being downloaded. + Downloading(PeerId, Vec>), + /// The batch has been completely downloaded and is ready for processing. + AwaitingProcessing(PeerId, Vec>), + /// The batch is being processed. + Processing(Attempt), + /// The batch was successfully processed and is waiting to be validated. + /// + /// It is not sufficient to process a batch successfully to consider it correct. This is + /// because batches could be erroneously empty, or incomplete. Therefore, a batch is considered + /// valid, only if the next sequential batch imports at least a block. + AwaitingValidation(Attempt), + /// Intermediate state for inner state handling. + Poisoned, + /// The batch has maxed out the allowed attempts for either downloading or processing. It + /// cannot be recovered. + Failed, +} + +impl BatchState { + /// Helper function for poisoning a state. + pub fn poison(&mut self) -> BatchState { + std::mem::replace(self, BatchState::Poisoned) + } +} + +impl BatchInfo { + /// Batches are downloaded excluding the first block of the epoch assuming it has already been + /// downloaded. + /// + /// For example: + /// + /// Epoch boundary | | + /// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | + /// Batch 1 | Batch 2 | Batch 3 + pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self { + let start_slot = start_epoch.start_slot(T::slots_per_epoch()) + 1; + let end_slot = start_slot + num_of_epochs * T::slots_per_epoch(); + BatchInfo { + start_slot, + end_slot, + failed_processing_attempts: Vec::new(), + failed_download_attempts: Vec::new(), + state: BatchState::AwaitingDownload, + } + } + + /// Gives a list of peers from which this batch has had a failed download or processing + /// attempt. + pub fn failed_peers(&self) -> HashSet { + let mut peers = HashSet::with_capacity( + self.failed_processing_attempts.len() + self.failed_download_attempts.len(), + ); + + for attempt in &self.failed_processing_attempts { + peers.insert(attempt.peer_id.clone()); + } + + for download in &self.failed_download_attempts { + peers.insert(download.clone()); + } + + peers + } + + pub fn current_peer(&self) -> Option<&PeerId> { + match &self.state { + BatchState::AwaitingDownload | BatchState::Failed => None, + BatchState::Downloading(peer_id, _) + | BatchState::AwaitingProcessing(peer_id, _) + | BatchState::Processing(Attempt { peer_id, .. }) + | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(&peer_id), + BatchState::Poisoned => unreachable!("Poisoned batch"), + } + } + + pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { + BlocksByRangeRequest { + start_slot: self.start_slot.into(), + count: self.end_slot.sub(self.start_slot).into(), + step: 1, + } + } + + pub fn state(&self) -> &BatchState { + &self.state + } + + pub fn attempts(&self) -> &[Attempt] { + &self.failed_processing_attempts + } + + /// Adds a block to a downloading batch. + pub fn add_block(&mut self, block: SignedBeaconBlock) { + match self.state.poison() { + BatchState::Downloading(peer, mut blocks) => { + blocks.push(block); + self.state = BatchState::Downloading(peer, blocks) + } + other => unreachable!("Add block for batch in wrong state: {:?}", other), + } + } + + /// Marks the batch as ready to be processed if the blocks are in the range. The number of + /// received blocks is returned, or the wrong batch end on failure + #[must_use = "Batch may have failed"] + pub fn download_completed( + &mut self, + ) -> Result< + usize, /* Received blocks */ + ( + Slot, /* expected slot */ + Slot, /* received slot */ + &BatchState, + ), + > { + match self.state.poison() { + BatchState::Downloading(peer, blocks) => { + // verify that blocks are in range + if let Some(last_slot) = blocks.last().map(|b| b.slot()) { + // the batch is non-empty + let first_slot = blocks[0].slot(); + + let failed_range = if first_slot < self.start_slot { + Some((self.start_slot, first_slot)) + } else if self.end_slot < last_slot { + Some((self.end_slot, last_slot)) + } else { + None + }; + + if let Some(range) = failed_range { + // this is a failed download, register the attempt and check if the batch + // can be tried again + self.failed_download_attempts.push(peer); + self.state = if self.failed_download_attempts.len() + >= MAX_BATCH_DOWNLOAD_ATTEMPTS as usize + { + BatchState::Failed + } else { + // drop the blocks + BatchState::AwaitingDownload + }; + return Err((range.0, range.1, &self.state)); + } + } + + let received = blocks.len(); + self.state = BatchState::AwaitingProcessing(peer, blocks); + Ok(received) + } + other => unreachable!("Download completed for batch in wrong state: {:?}", other), + } + } + + #[must_use = "Batch may have failed"] + pub fn download_failed(&mut self) -> &BatchState { + match self.state.poison() { + BatchState::Downloading(peer, _) => { + // register the attempt and check if the batch can be tried again + self.failed_download_attempts.push(peer); + self.state = if self.failed_download_attempts.len() + >= MAX_BATCH_DOWNLOAD_ATTEMPTS as usize + { + BatchState::Failed + } else { + // drop the blocks + BatchState::AwaitingDownload + }; + &self.state + } + other => unreachable!("Download failed for batch in wrong state: {:?}", other), + } + } + + pub fn start_downloading_from_peer(&mut self, peer: PeerId) { + match self.state.poison() { + BatchState::AwaitingDownload => { + self.state = BatchState::Downloading(peer, Vec::new()); + } + other => unreachable!("Starting download for batch in wrong state: {:?}", other), + } + } + + pub fn start_processing(&mut self) -> Vec> { + match self.state.poison() { + BatchState::AwaitingProcessing(peer, blocks) => { + self.state = BatchState::Processing(Attempt::new(peer, &blocks)); + blocks + } + other => unreachable!("Start processing for batch in wrong state: {:?}", other), + } + } + + #[must_use = "Batch may have failed"] + pub fn processing_completed(&mut self, was_sucessful: bool) -> &BatchState { + match self.state.poison() { + BatchState::Processing(attempt) => { + self.state = if !was_sucessful { + // register the failed attempt + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= MAX_BATCH_PROCESSING_ATTEMPTS as usize + { + BatchState::Failed + } else { + BatchState::AwaitingDownload + } + } else { + BatchState::AwaitingValidation(attempt) + }; + &self.state + } + other => unreachable!("Processing completed for batch in wrong state: {:?}", other), + } + } + + #[must_use = "Batch may have failed"] + pub fn validation_failed(&mut self) -> &BatchState { + match self.state.poison() { + BatchState::AwaitingValidation(attempt) => { + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + self.state = if self.failed_processing_attempts.len() + >= MAX_BATCH_PROCESSING_ATTEMPTS as usize + { + BatchState::Failed + } else { + BatchState::AwaitingDownload + }; + &self.state + } + other => unreachable!("Validation failed for batch in wrong state: {:?}", other), + } + } } /// Represents a peer's attempt and providing the result for this batch. @@ -43,131 +282,61 @@ pub struct Attempt { pub hash: u64, } -impl Eq for Batch {} - -impl Batch { - pub fn new(start_epoch: Epoch, end_slot: Slot, peer_id: PeerId) -> Self { - Batch { - start_epoch, - end_slot, - attempts: Vec::new(), - current_peer: peer_id, - retries: 0, - reprocess_retries: 0, - downloaded_blocks: Vec::new(), - } - } - - pub fn start_slot(&self) -> Slot { - // batches are shifted by 1 - self.start_epoch.start_slot(T::slots_per_epoch()) + 1 - } - - pub fn end_slot(&self) -> Slot { - self.end_slot - } - pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { - let start_slot = self.start_slot(); - BlocksByRangeRequest { - start_slot: start_slot.into(), - count: min( - T::slots_per_epoch() * EPOCHS_PER_BATCH, - self.end_slot.sub(start_slot).into(), - ), - step: 1, - } - } - - /// This gets a hash that represents the blocks currently downloaded. This allows comparing a - /// previously downloaded batch of blocks with a new downloaded batch of blocks. - pub fn hash(&self) -> u64 { - // the hash used is the ssz-encoded list of blocks +impl Attempt { + #[allow(clippy::ptr_arg)] + fn new(peer_id: PeerId, blocks: &Vec>) -> Self { let mut hasher = std::collections::hash_map::DefaultHasher::new(); - self.downloaded_blocks.as_ssz_bytes().hash(&mut hasher); - hasher.finish() + blocks.as_ssz_bytes().hash(&mut hasher); + let hash = hasher.finish(); + Attempt { peer_id, hash } } } -impl Ord for Batch { - fn cmp(&self, other: &Self) -> Ordering { - self.start_epoch.cmp(&other.start_epoch) +impl slog::KV for &mut BatchInfo { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::KV::serialize(*self, record, serializer) } } -impl PartialOrd for Batch { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +impl slog::KV for BatchInfo { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + use slog::Value; + Value::serialize(&self.start_slot, record, "start_slot", serializer)?; + Value::serialize( + &(self.end_slot - 1), // NOTE: The -1 shows inclusive blocks + record, + "end_slot", + serializer, + )?; + serializer.emit_usize("downloaded", self.failed_download_attempts.len())?; + serializer.emit_usize("processed", self.failed_processing_attempts.len())?; + serializer.emit_str("state", &format!("{:?}", self.state))?; + slog::Result::Ok(()) } } -/// A structure that contains a mapping of pending batch requests, that also keeps track of which -/// peers are currently making batch requests. -/// -/// This is used to optimise searches for idle peers (peers that have no outbound batch requests). -pub struct PendingBatches { - /// The current pending batches. - batches: FnvHashMap>, - /// A mapping of peers to the number of pending requests. - peer_requests: HashMap>, -} - -impl PendingBatches { - pub fn new() -> Self { - PendingBatches { - batches: FnvHashMap::default(), - peer_requests: HashMap::new(), - } - } - - pub fn insert(&mut self, request_id: usize, batch: Batch) -> Option> { - let peer_request = batch.current_peer.clone(); - self.peer_requests - .entry(peer_request) - .or_insert_with(HashSet::new) - .insert(request_id); - self.batches.insert(request_id, batch) - } - - pub fn remove(&mut self, request_id: usize) -> Option> { - if let Some(batch) = self.batches.remove(&request_id) { - if let Entry::Occupied(mut entry) = self.peer_requests.entry(batch.current_peer.clone()) - { - entry.get_mut().remove(&request_id); - - if entry.get().is_empty() { - entry.remove(); - } +impl std::fmt::Debug for BatchState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BatchState::Processing(_) => f.write_str("Processing"), + BatchState::AwaitingValidation(_) => f.write_str("AwaitingValidation"), + BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), + BatchState::Failed => f.write_str("Failed"), + BatchState::AwaitingProcessing(ref peer, ref blocks) => { + write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - Some(batch) - } else { - None + BatchState::Downloading(peer, blocks) => { + write!(f, "Downloading({}, {} blocks)", peer, blocks.len()) + } + BatchState::Poisoned => f.write_str("Poisoned"), } } - - /// The number of current pending batch requests. - pub fn len(&self) -> usize { - self.batches.len() - } - - /// Adds a block to the batches if the request id exists. Returns None if there is no batch - /// matching the request id. - pub fn add_block(&mut self, request_id: usize, block: SignedBeaconBlock) -> Option<()> { - let batch = self.batches.get_mut(&request_id)?; - batch.downloaded_blocks.push(block); - Some(()) - } - - /// Returns true if there the peer does not exist in the peer_requests mapping. Indicating it - /// has no pending outgoing requests. - pub fn peer_is_idle(&self, peer_id: &PeerId) -> bool { - self.peer_requests.get(peer_id).is_none() - } - - /// Removes a batch for a given peer. - pub fn remove_batch_by_peer(&mut self, peer_id: &PeerId) -> Option> { - let request_ids = self.peer_requests.get(peer_id)?; - - let request_id = *request_ids.iter().next()?; - self.remove(request_id) - } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d79dff469..4decdc212 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,40 +1,36 @@ -use super::batch::{Batch, PendingBatches}; +use super::batch::{BatchInfo, BatchState}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::sync::RequestId; use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{PeerAction, PeerId}; -use rand::prelude::*; -use slog::{crit, debug, error, warn}; -use std::collections::HashSet; +use fnv::FnvHashMap; +use rand::seq::SliceRandom; +use slog::{crit, debug, o, warn}; +use std::collections::{btree_map::Entry, BTreeMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, -/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which case the -/// responder will fill the response up to the max request size, assuming they have the bandwidth -/// to do so. -pub const EPOCHS_PER_BATCH: u64 = 2; - -/// The number of times to retry a batch before the chain is considered failed and removed. -const MAX_BATCH_RETRIES: u8 = 5; +/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which +/// case the responder will fill the response up to the max request size, assuming they have the +/// bandwidth to do so. +pub const EPOCHS_PER_BATCH: u64 = 8; /// The maximum number of batches to queue before requesting more. const BATCH_BUFFER_SIZE: u8 = 5; -/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed -/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will -/// be reported negatively. -const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; - /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. #[derive(PartialEq)] +#[must_use = "Should be checked, since a failed chain must be removed. A chain that requested + being removed and continued is now in an inconsistent state"] + pub enum ProcessingResult { KeepChain, RemoveChain, @@ -42,6 +38,7 @@ pub enum ProcessingResult { /// A chain identifier pub type ChainId = u64; +pub type BatchId = Epoch; /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the @@ -50,7 +47,7 @@ pub struct SyncingChain { /// A random id used to identify this chain. id: ChainId, - /// The original start slot when this chain was initialised. + /// The start of the chain segment. Any epoch previous to this one has been validated. pub start_epoch: Epoch, /// The target head slot. @@ -59,35 +56,37 @@ pub struct SyncingChain { /// The target head root. pub target_head_root: Hash256, - /// The batches that are currently awaiting a response from a peer. An RPC request for these - /// has been sent. - pub pending_batches: PendingBatches, - - /// The batches that have been downloaded and are awaiting processing and/or validation. - completed_batches: Vec>, - - /// Batches that have been processed and awaiting validation before being removed. - processed_batches: Vec>, + /// Sorted map of batches undergoing some kind of processing. + batches: BTreeMap>, /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain - /// and thus available to download this chain from. - pub peer_pool: HashSet, + /// and thus available to download this chain from, as well as the batches we are currently + /// requesting. + peers: FnvHashMap>, /// Starting epoch of the next batch that needs to be downloaded. - to_be_downloaded: Epoch, + to_be_downloaded: BatchId, /// Starting epoch of the batch that needs to be processed next. /// This is incremented as the chain advances. - processing_target: Epoch, + processing_target: BatchId, + + /// Optimistic head to sync. + /// If a block is imported for this batch, the chain advances to this point. + optimistic_start: Option, + + /// When a batch for an optimistic start fails processing, it is stored to avoid trying it + /// again due to chain stopping/re-starting on chain switching. + failed_optimistic_starts: HashSet, /// The current state of the chain. pub state: ChainSyncingState, /// The current processing batch, if any. - current_processing_batch: Option>, + current_processing_batch: Option, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: Sender>, /// A reference to the underlying beacon chain. chain: Arc>, @@ -105,36 +104,85 @@ pub enum ChainSyncingState { } impl SyncingChain { + pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + (target_root, target_slot).hash(&mut hasher); + hasher.finish() + } + #[allow(clippy::too_many_arguments)] pub fn new( - id: u64, start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: Sender>, chain: Arc>, - log: slog::Logger, + log: &slog::Logger, ) -> Self { - let mut peer_pool = HashSet::new(); - peer_pool.insert(peer_id); + let mut peers = FnvHashMap::default(); + peers.insert(peer_id, Default::default()); + + let id = SyncingChain::::id(&target_head_root, &target_head_slot); SyncingChain { id, start_epoch, target_head_slot, target_head_root, - pending_batches: PendingBatches::new(), - completed_batches: Vec::new(), - processed_batches: Vec::new(), - peer_pool, + batches: BTreeMap::new(), + peers, to_be_downloaded: start_epoch, processing_target: start_epoch, + optimistic_start: None, + failed_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, beacon_processor_send, chain, - log, + log: log.new(o!("chain" => id)), + } + } + + /// Check if the chain has peers from which to process batches. + pub fn available_peers(&self) -> usize { + self.peers.len() + } + + /// Get the chain's id. + pub fn get_id(&self) -> ChainId { + self.id + } + + /// Removes a peer from the chain. + /// If the peer has active batches, those are considered failed and re-requested. + pub fn remove_peer( + &mut self, + peer_id: &PeerId, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + if let Some(batch_ids) = self.peers.remove(peer_id) { + // fail the batches + for id in batch_ids { + if let BatchState::Failed = self + .batches + .get_mut(&id) + .expect("registered batch exists") + .download_failed() + { + return ProcessingResult::RemoveChain; + } + if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { + // drop the chain early + return ProcessingResult::RemoveChain; + } + } + } + + if self.peers.is_empty() { + ProcessingResult::RemoveChain + } else { + ProcessingResult::KeepChain } } @@ -146,127 +194,213 @@ impl SyncingChain { .start_slot(T::EthSpec::slots_per_epoch()) } - /// A batch of blocks has been received. This function gets run on all chains and should - /// return Some if the request id matches a pending request on this chain, or None if it does - /// not. - /// - /// If the request corresponds to a pending batch, this function processes the completed - /// batch. + /// A block has been received for a batch on this chain. + /// If the block correctly completes the batch it will be processed if possible. pub fn on_block_response( &mut self, network: &mut SyncNetworkContext, - request_id: RequestId, - beacon_block: &Option>, - ) -> Option<()> { + batch_id: BatchId, + peer_id: PeerId, + beacon_block: Option>, + ) -> ProcessingResult { + // check if we have this batch + let batch = match self.batches.get_mut(&batch_id) { + None => { + debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id); + // A batch might get removed when the chain advances, so this is non fatal. + return ProcessingResult::KeepChain; + } + Some(batch) => { + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed from the chain for other + // reasons. Check that this block belongs to the expected peer + if Some(&peer_id) != batch.current_peer() { + return ProcessingResult::KeepChain; + } + batch + } + }; + if let Some(block) = beacon_block { // This is not a stream termination, simply add the block to the request - self.pending_batches.add_block(request_id, block.clone()) + batch.add_block(block); + ProcessingResult::KeepChain } else { // A stream termination has been sent. This batch has ended. Process a completed batch. - let batch = self.pending_batches.remove(request_id)?; - self.handle_completed_batch(network, batch); - Some(()) - } - } + // Remove the request from the peer's active batches + let peer = batch + .current_peer() + .expect("Batch is downloading from a peer"); + self.peers + .get_mut(peer) + .unwrap_or_else(|| panic!("Batch is registered for the peer")) + .remove(&batch_id); - /// A completed batch has been received, process the batch. - /// This will return `ProcessingResult::KeepChain` if the chain has not completed or - /// failed indicating that further batches are required. - fn handle_completed_batch( - &mut self, - network: &mut SyncNetworkContext, - batch: Batch, - ) { - // An entire batch of blocks has been received. This functions checks to see if it can be processed, - // remove any batches waiting to be verified and if this chain is syncing, request new - // blocks for the peer. - debug!(self.log, "Completed batch received"; "epoch" => batch.start_epoch, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); + match batch.download_completed() { + Ok(received) => { + let awaiting_batches = batch_id.saturating_sub( + self.optimistic_start + .unwrap_or_else(|| self.processing_target), + ) / EPOCHS_PER_BATCH; + debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches); - // verify the range of received blocks - // Note that the order of blocks is verified in block processing - if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot()) { - // the batch is non-empty - let first_slot = batch.downloaded_blocks[0].slot(); - if batch.start_slot() > first_slot || batch.end_slot() < last_slot { - warn!(self.log, "BlocksByRange response returned out of range blocks"; - "response_initial_slot" => first_slot, - "requested_initial_slot" => batch.start_slot()); - // this batch can't be used, so we need to request it again. - self.failed_batch(network, batch); - return; + // pre-emptively request more blocks from peers whilst we process current blocks, + if let ProcessingResult::RemoveChain = self.request_batches(network) { + return ProcessingResult::RemoveChain; + } + self.process_completed_batches(network) + } + Err((expected, received, state)) => { + warn!(self.log, "Batch received out of range blocks"; + "epoch" => batch_id, "expected" => expected, "received" => received); + if let BatchState::Failed = state { + return ProcessingResult::RemoveChain; + } + // this batch can't be used, so we need to request it again. + self.retry_batch_download(network, batch_id) + } } } - - // Add this completed batch to the list of completed batches. This list will then need to - // be checked if any batches can be processed and verified for errors or invalid responses - // from peers. The logic is simpler to create this ordered batch list and to then process - // the list. - - let insert_index = self - .completed_batches - .binary_search(&batch) - .unwrap_or_else(|index| index); - self.completed_batches.insert(insert_index, batch); - - // We have a list of completed batches. It is not sufficient to process batch successfully - // to consider the batch correct. This is because batches could be erroneously empty, or - // incomplete. Therefore, a batch is considered valid, only if the next sequential batch is - // processed successfully. Therefore the `completed_batches` will store batches that have - // already be processed but not verified and therefore have Id's less than - // `self.to_be_processed_id`. - - // pre-emptively request more blocks from peers whilst we process current blocks, - self.request_batches(network); - - // Try and process any completed batches. This will spawn a new task to process any blocks - // that are ready to be processed. - self.process_completed_batches(); } - /// Tries to process any batches if there are any available and we are not currently processing - /// other batches. - fn process_completed_batches(&mut self) { - // Only process batches if this chain is Syncing - if self.state != ChainSyncingState::Syncing { - return; + /// Sends to process the batch with the given id. + /// The batch must exist and be ready for processing + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> ProcessingResult { + // Only process batches if this chain is Syncing, and only one at a time + if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { + return ProcessingResult::KeepChain; } - // Only process one batch at a time - if self.current_processing_batch.is_some() { - return; - } + let batch = self.batches.get_mut(&batch_id).expect("Batch exists"); - // Check if there is a batch ready to be processed - if !self.completed_batches.is_empty() - && self.completed_batches[0].start_epoch == self.processing_target - { - let batch = self.completed_batches.remove(0); + // NOTE: We send empty batches to the processor in order to trigger the block processor + // result callback. This is done, because an empty batch could end a chain and the logic + // for removing chains and checking completion is in the callback. - // Note: We now send empty batches to the processor in order to trigger the block - // processor result callback. This is done, because an empty batch could end a chain - // and the logic for removing chains and checking completion is in the callback. - - // send the batch to the batch processor thread - return self.process_batch(batch); - } - } - - /// Sends a batch to the beacon processor for async processing in a queue. - fn process_batch(&mut self, mut batch: Batch) { - let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); - let process_id = ProcessId::RangeBatchId(self.id, batch.start_epoch); - self.current_processing_batch = Some(batch); + let blocks = batch.start_processing(); + let process_id = ProcessId::RangeBatchId(self.id, batch_id); + self.current_processing_batch = Some(batch_id); if let Err(e) = self .beacon_processor_send .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) { - error!( - self.log, - "Failed to send chain segment to processor"; - "msg" => "process_batch", - "error" => format!("{:?}", e) - ); + crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", + "error" => %e, "batch" => self.processing_target); + // This is unlikely to happen but it would stall syncing since the batch now has no + // blocks to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + // TODO: needs better handling + self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) + } else { + ProcessingResult::KeepChain + } + } + + /// Processes the next ready batch, prioritizing optimistic batches over the processing target. + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + // Only process batches if this chain is Syncing and only process one batch at a time + if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { + return ProcessingResult::KeepChain; + } + + // Find the id of the batch we are going to process. + // + // First try our optimistic start, if any. If this batch is ready, we process it. If the + // batch has not already been completed, check the current chain target. + let optimistic_id = if let Some(epoch) = self.optimistic_start { + if let Some(batch) = self.batches.get(&epoch) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => { + // this batch is ready + debug!(self.log, "Processing optimistic start"; "epoch" => epoch); + Some(epoch) + } + BatchState::Downloading(..) => { + // The optimistic batch is being downloaded. We wait for this before + // attempting to process other batches. + return ProcessingResult::KeepChain; + } + BatchState::Processing(_) + | BatchState::AwaitingDownload + | BatchState::Failed + | BatchState::Poisoned + | BatchState::AwaitingValidation(_) => { + // these are all inconsistent states: + // - Processing -> `self.current_processing_batch` is Some + // - Failed -> non recoverable batch. For a optimistic batch, it should + // have been removed + // - Poisoned -> this is an intermediate state that should never be reached + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - AwaitingValidation -> If an optimistic batch is successfully processed + // it is no longer considered an optimistic candidate. If the batch was + // empty the chain rejects it; if it was non empty the chain is advanced + // to this point (so that the old optimistic batch is now the processing + // target) + unreachable!( + "Optimistic batch indicates inconsistent chain state: {:?}", + state + ) + } + } + } else { + None + } + } else { + None + }; + + // if the optimistic target can't be processed, check the processing target + let id = optimistic_id.or_else(|| { + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => Some(self.processing_target), + BatchState::Downloading(..) => { + // Batch is not ready, nothing to process + None + } + BatchState::Failed + | BatchState::AwaitingDownload + | BatchState::AwaitingValidation(_) + | BatchState::Processing(_) + | BatchState::Poisoned => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have beee removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - AwaitingValidation -> self.processing_target should have been moved + // forward + // - Processing -> `self.current_processing_batch` is Some + // - Poisoned -> Intermediate state that should never be reached + unreachable!( + "Robust target batch indicates inconsistent chain state: {:?}", + state + ) + } + } + } else { + crit!(self.log, "Batch not found for current processing target"; + "epoch" => self.processing_target); + None + } + }); + + // we found a batch to process + if let Some(id) = id { + self.process_batch(network, id) + } else { + ProcessingResult::KeepChain } } @@ -275,92 +409,82 @@ impl SyncingChain { pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, - chain_id: ChainId, - batch_start_epoch: Epoch, - downloaded_blocks: &mut Option>>, + batch_id: BatchId, result: &BatchProcessResult, - ) -> Option { - if chain_id != self.id { - // the result does not belong to this chain - return None; - } + ) -> ProcessingResult { + // the first two cases are possible if the chain advances while waiting for a processing + // result match &self.current_processing_batch { - Some(current_batch) if current_batch.start_epoch != batch_start_epoch => { + Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; - "batch_epoch" => batch_start_epoch, "expected_batch_epoch" => current_batch.start_epoch); - return None; + "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); + return ProcessingResult::KeepChain; } None => { debug!(self.log, "Chain was not expecting a batch result"; - "batch_epoch" => batch_start_epoch); - return None; + "batch_epoch" => batch_id); + return ProcessingResult::KeepChain; } _ => { - // chain_id and batch_id match, continue + // batch_id matches, continue + self.current_processing_batch = None; } } - // claim the result by consuming the option - let downloaded_blocks = downloaded_blocks.take().or_else(|| { - // if taken by another chain, we are no longer waiting on a result. - self.current_processing_batch = None; - crit!(self.log, "Processed batch taken by another chain"); - None - })?; - - // No longer waiting on a processing result - let mut batch = self.current_processing_batch.take().unwrap(); - // These are the blocks of this batch - batch.downloaded_blocks = downloaded_blocks; - - // double check batches are processed in order TODO: Remove for prod - if batch.start_epoch != self.processing_target { - crit!(self.log, "Batch processed out of order"; - "processed_starting_epoch" => batch.start_epoch, - "expected_epoch" => self.processing_target); - } - - let res = match result { - BatchProcessResult::Success => { - self.processing_target += EPOCHS_PER_BATCH; - - // If the processed batch was not empty, we can validate previous invalidated - // blocks including the current batch. - if !batch.downloaded_blocks.is_empty() { - self.mark_processed_batches_as_valid(network, &batch); + match result { + BatchProcessResult::Success(was_non_empty) => { + let batch = self + .batches + .get_mut(&batch_id) + .expect("Chain was expecting a known batch"); + let _ = batch.processing_completed(true); + // If the processed batch was not empty, we can validate previous unvalidated + // blocks. + if *was_non_empty { + self.advance_chain(network, batch_id); + } else if let Some(epoch) = self.optimistic_start { + // check if this batch corresponds to an optimistic batch. In this case, we + // reject it as an optimistic candidate since the batch was empty + if epoch == batch_id { + if let ProcessingResult::RemoveChain = self.reject_optimistic_batch( + network, + false, /* do not re-request */ + "batch was empty", + ) { + return ProcessingResult::RemoveChain; + }; + } } - // Add the current batch to processed batches to be verified in the future. - self.processed_batches.push(batch); + self.processing_target += EPOCHS_PER_BATCH; // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { // chain is completed + debug!(self.log, "Chain is complete"); ProcessingResult::RemoveChain } else { // chain is not completed - // attempt to request more batches - self.request_batches(network); - + if let ProcessingResult::RemoveChain = self.request_batches(network) { + return ProcessingResult::RemoveChain; + } // attempt to process more batches - self.process_completed_batches(); - - // keep the chain - ProcessingResult::KeepChain + self.process_completed_batches(network) } } - BatchProcessResult::Partial => { - warn!(self.log, "Batch processing failed but at least one block was imported"; - "batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string() - ); - // At least one block was successfully verified and imported, so we can be sure all - // previous batches are valid and we only need to download the current failed - // batch. - self.mark_processed_batches_as_valid(network, &batch); - - // check that we have not exceeded the re-process retry counter - if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { + BatchProcessResult::Failed(imported_blocks) => { + let batch = self + .batches + .get_mut(&batch_id) + .expect("Chain was expecting a known batch"); + let peer = batch + .current_peer() + .expect("batch is processing blocks from a peer"); + debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, + "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); + if let BatchState::Failed = batch.processing_completed(false) { + // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and @@ -370,109 +494,156 @@ impl SyncingChain { let action = PeerAction::LowToleranceError; warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; "score_adjustment" => action.to_string(), - "batch_epoch"=> batch.start_epoch); - for peer_id in self.peer_pool.drain() { - network.report_peer(peer_id, action); + "batch_epoch"=> batch_id); + for (peer, _) in self.peers.drain() { + network.report_peer(peer, action); } ProcessingResult::RemoveChain } else { - // Handle this invalid batch, that is within the re-process retries limit. - self.handle_invalid_batch(network, batch); - ProcessingResult::KeepChain - } - } - BatchProcessResult::Failed => { - debug!(self.log, "Batch processing failed"; - "batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string()); - // The batch processing failed - // This could be because this batch is invalid, or a previous invalidated batch - // is invalid. We need to find out which and downvote the peer that has sent us - // an invalid batch. - - // check that we have not exceeded the re-process retry counter - if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { - // If a batch has exceeded the invalid batch lookup attempts limit, it means - // that it is likely all peers in this chain are are sending invalid batches - // repeatedly and are either malicious or faulty. We drop the chain and - // downvote all peers. - let action = PeerAction::LowToleranceError; - warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; - "score_adjustment" => action.to_string(), - "batch_epoch" => batch.start_epoch); - for peer_id in self.peer_pool.drain() { - network.report_peer(peer_id, action); + // chain can continue. Check if it can be moved forward + if *imported_blocks { + // At least one block was successfully verified and imported, so we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance_chain(network, batch_id); } - ProcessingResult::RemoveChain - } else { // Handle this invalid batch, that is within the re-process retries limit. - self.handle_invalid_batch(network, batch); - ProcessingResult::KeepChain - } - } - }; - - Some(res) - } - - /// Removes any batches awaiting validation. - /// - /// All blocks in `processed_batches` should be prior batches. As the `last_batch` has been - /// processed with blocks in it, all previous batches are valid. - /// - /// If a previous batch has been validated and it had been re-processed, downvote - /// the original peer. - fn mark_processed_batches_as_valid( - &mut self, - network: &mut SyncNetworkContext, - last_batch: &Batch, - ) { - while !self.processed_batches.is_empty() { - let mut processed_batch = self.processed_batches.remove(0); - if processed_batch.start_epoch >= last_batch.start_epoch { - crit!(self.log, "A processed batch had a greater id than the current process id"; - "processed_start_epoch" => processed_batch.start_epoch, - "current_start_epoch" => last_batch.start_epoch); - } - - // Go through passed attempts and downscore peers that returned invalid batches - while !processed_batch.attempts.is_empty() { - let attempt = processed_batch.attempts.remove(0); - // The validated batch has been re-processed - if attempt.hash != processed_batch.hash() { - // The re-downloaded version was different - if processed_batch.current_peer != attempt.peer_id { - // A different peer sent the correct batch, the previous peer did not - // We negatively score the original peer. - let action = PeerAction::LowToleranceError; - debug!( - self.log, "Re-processed batch validated. Scoring original peer"; - "batch_epoch" => processed_batch.start_epoch, - "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",attempt.peer_id), - "new_peer" => format!("{}", processed_batch.current_peer) - ); - network.report_peer(attempt.peer_id, action); - } else { - // The same peer corrected it's previous mistake. There was an error, so we - // negative score the original peer. - let action = PeerAction::MidToleranceError; - debug!( - self.log, "Re-processed batch validated by the same peer."; - "batch_epoch" => processed_batch.start_epoch, - "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",attempt.peer_id), - "new_peer" => format!("{}", processed_batch.current_peer) - ); - network.report_peer(attempt.peer_id, action); - } + self.handle_invalid_batch(network, batch_id) } } } } - /// An invalid batch has been received that could not be processed. + fn reject_optimistic_batch( + &mut self, + network: &mut SyncNetworkContext, + redownload: bool, + reason: &str, + ) -> ProcessingResult { + if let Some(epoch) = self.optimistic_start { + self.optimistic_start = None; + self.failed_optimistic_starts.insert(epoch); + // if this batch is inside the current processing range, keep it, otherwise drop + // it. NOTE: this is done to prevent non-sequential batches coming from optimistic + // starts from filling up the buffer size + if epoch < self.to_be_downloaded { + debug!(self.log, "Rejected optimistic batch left for future use"; "epoch" => %epoch, "reason" => reason); + // this batch is now treated as any other batch, and re-requested for future use + if redownload { + return self.retry_batch_download(network, epoch); + } + } else { + debug!(self.log, "Rejected optimistic batch"; "epoch" => %epoch, "reason" => reason); + self.batches.remove(&epoch); + } + } + + ProcessingResult::KeepChain + } + + /// Removes any batches previous to the given `validating_epoch` and updates the current + /// boundaries of the chain. /// - /// These events occur when a peer as successfully responded with blocks, but the blocks we + /// The `validating_epoch` must align with batch boundaries. + /// + /// If a previous batch has been validated and it had been re-processed, penalize the original + /// peer. + fn advance_chain( + &mut self, + network: &mut SyncNetworkContext, + validating_epoch: Epoch, + ) { + // make sure this epoch produces an advancement + if validating_epoch <= self.start_epoch { + return; + } + + // safety check for batch boundaries + if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH { + crit!(self.log, "Validating Epoch is not aligned"); + } + + // batches in the range [BatchId, ..) (not yet validated) + let remaining_batches = self.batches.split_off(&validating_epoch); + // batches less than `validating_epoch` + let removed_batches = std::mem::replace(&mut self.batches, remaining_batches); + + for (id, batch) in removed_batches.into_iter() { + // only for batches awaiting validation can we be sure the last attempt is + // right, and thus, that any different attempt is wrong + match batch.state() { + BatchState::AwaitingValidation(ref processed_attempt) => { + for attempt in batch.attempts() { + // The validated batch has been re-processed + if attempt.hash != processed_attempt.hash { + // The re-downloaded version was different + if processed_attempt.peer_id != attempt.peer_id { + // A different peer sent the correct batch, the previous peer did not + // We negatively score the original peer. + let action = PeerAction::LowToleranceError; + debug!(self.log, "Re-processed batch validated. Scoring original peer"; + "batch_epoch" => id, "score_adjustment" => %action, + "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id + ); + network.report_peer(attempt.peer_id.clone(), action); + } else { + // The same peer corrected it's previous mistake. There was an error, so we + // negative score the original peer. + let action = PeerAction::MidToleranceError; + debug!(self.log, "Re-processed batch validated by the same peer"; + "batch_epoch" => id, "score_adjustment" => %action, + "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id + ); + network.report_peer(attempt.peer_id.clone(), action); + } + } + } + } + BatchState::Downloading(peer, ..) => { + // remove this batch from the peer's active requests + if let Some(active_batches) = self.peers.get_mut(peer) { + active_batches.remove(&id); + } + } + BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { + unreachable!("batch indicates inconsistent chain state while advancing chain") + } + BatchState::AwaitingProcessing(..) => { + // TODO: can we be sure the old attempts are wrong? + } + BatchState::Processing(_) => { + assert_eq!( + id, + self.current_processing_batch.expect( + "A batch in a processing state means the chain is processing it" + ) + ); + self.current_processing_batch = None; + } + } + } + + self.processing_target = self.processing_target.max(validating_epoch); + let old_start = self.start_epoch; + self.start_epoch = validating_epoch; + self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch); + if self.batches.contains_key(&self.to_be_downloaded) { + // if a chain is advanced by Range beyond the previous `seld.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded += EPOCHS_PER_BATCH; + } + if let Some(epoch) = self.optimistic_start { + if epoch <= validating_epoch { + self.optimistic_start = None; + } + } + debug!(self.log, "Chain advanced"; "previous_start" => old_start, + "new_start" => self.start_epoch, "processing_target" => self.processing_target); + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with blocks, but the blocks we /// have received are incorrect or invalid. This indicates the peer has not performed as /// intended and can result in downvoting a peer. // TODO: Batches could have been partially downloaded due to RPC size-limit restrictions. We @@ -481,10 +652,10 @@ impl SyncingChain { fn handle_invalid_batch( &mut self, network: &mut SyncNetworkContext, - batch: Batch, - ) { + batch_id: BatchId, + ) -> ProcessingResult { // The current batch could not be processed, indicating either the current or previous - // batches are invalid + // batches are invalid. // The previous batch could be incomplete due to the block sizes being too large to fit in // a single RPC request or there could be consecutive empty batches which are not supposed @@ -494,70 +665,51 @@ impl SyncingChain { // potentially be faulty. If a batch returns a different result than the original and // results in successful processing, we downvote the original peer that sent us the batch. - // If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS - // times before considering the entire chain invalid and downvoting all peers. + if let Some(epoch) = self.optimistic_start { + // If this batch is an optimistic batch, we reject this epoch as an optimistic + // candidate and try to re download it + if epoch == batch_id { + if let ProcessingResult::RemoveChain = + self.reject_optimistic_batch(network, true, "batch was invalid") + { + return ProcessingResult::RemoveChain; + } else { + // since this is the optimistic batch, we can't consider previous batches as + // invalid. + return ProcessingResult::KeepChain; + } + } + } + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); - // Find any pre-processed batches awaiting validation - while !self.processed_batches.is_empty() { - let past_batch = self.processed_batches.remove(0); - self.processing_target = std::cmp::min(self.processing_target, past_batch.start_epoch); - self.reprocess_batch(network, past_batch); + for (id, batch) in self.batches.range_mut(..batch_id) { + if let BatchState::Failed = batch.validation_failed() { + // remove the chain early + return ProcessingResult::RemoveChain; + } + redownload_queue.push(*id); } - // re-process the current batch - self.reprocess_batch(network, batch); - } + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.start_epoch; - /// This re-downloads and marks the batch as being re-processed. - /// - /// If the re-downloaded batch is different to the original and can be processed, the original - /// peer will be downvoted. - fn reprocess_batch( - &mut self, - network: &mut SyncNetworkContext, - mut batch: Batch, - ) { - // marks the batch as attempting to be reprocessed by hashing the downloaded blocks - let attempt = super::batch::Attempt { - peer_id: batch.current_peer.clone(), - hash: batch.hash(), - }; - - // add this attempt to the batch - batch.attempts.push(attempt); - - // remove previously downloaded blocks - batch.downloaded_blocks.clear(); - - // increment the re-process counter - batch.reprocess_retries += 1; - - // attempt to find another peer to download the batch from (this potentially doubles up - // requests on a single peer) - let current_peer = &batch.current_peer; - let new_peer = self - .peer_pool - .iter() - .find(|peer| *peer != current_peer) - .unwrap_or_else(|| current_peer); - - batch.current_peer = new_peer.clone(); - - debug!(self.log, "Re-requesting batch"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "batch_epoch" => batch.start_epoch, - "peer" => batch.current_peer.to_string(), - "retries" => batch.retries, - "re-processes" => batch.reprocess_retries); - self.send_batch(network, batch); + for id in redownload_queue { + if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { + return ProcessingResult::RemoveChain; + } + } + // finally, re-request the failed batch. + self.retry_batch_download(network, batch_id) } pub fn stop_syncing(&mut self) { self.state = ChainSyncingState::Stopped; } - // Either a new chain, or an old one with a peer list + /// Either a new chain, or an old one with a peer list /// This chain has been requested to start syncing. /// /// This could be new chain, or an old chain that is being resumed. @@ -565,127 +717,181 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, local_finalized_epoch: Epoch, - ) { - // A local finalized slot is provided as other chains may have made - // progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to - // accommodate potentially downloaded batches from other chains. Also prune any old batches - // awaiting processing + optimistic_start_epoch: Epoch, + ) -> ProcessingResult { + // to avoid dropping local progress, we advance the chain wrt its batch boundaries. This + let align = |epoch| { + // start_epoch + (number of batches in between)*length_of_batch + self.start_epoch + ((epoch - self.start_epoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH + }; + // get the *aligned* epoch that produces a batch containing the `local_finalized_epoch` + let validating_epoch = align(local_finalized_epoch); + // align the optimistic_start too. + let optimistic_epoch = align(optimistic_start_epoch); - // If the local finalized epoch is ahead of our current processed chain, update the chain - // to start from this point and re-index all subsequent batches starting from one - // (effectively creating a new chain). - - let local_finalized_slot = local_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let current_processed_slot = self.current_processed_slot(); - - if local_finalized_slot > current_processed_slot { - // Advance the chain to account for already downloaded blocks. - self.start_epoch = local_finalized_epoch; - - debug!(self.log, "Updating chain's progress"; - "prev_completed_slot" => current_processed_slot, - "new_completed_slot" => self.current_processed_slot()); - // Re-index batches - self.to_be_downloaded = local_finalized_epoch; - self.processing_target = local_finalized_epoch; - - // remove any completed or processed batches - self.completed_batches.clear(); - self.processed_batches.clear(); + // advance the chain to the new validating epoch + self.advance_chain(network, validating_epoch); + if self.optimistic_start.is_none() + && optimistic_epoch > self.start_epoch + && !self.failed_optimistic_starts.contains(&optimistic_epoch) + { + self.optimistic_start = Some(optimistic_epoch); } + // update the state self.state = ChainSyncingState::Syncing; - // start processing batches if needed - self.process_completed_batches(); - // begin requesting blocks from the peer pool, until all peers are exhausted. - self.request_batches(network); + if let ProcessingResult::RemoveChain = self.request_batches(network) { + return ProcessingResult::RemoveChain; + } + + // start processing batches if needed + self.process_completed_batches(network) } /// Add a peer to the chain. /// /// If the chain is active, this starts requesting batches from this peer. - pub fn add_peer(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) { - self.peer_pool.insert(peer_id.clone()); - // do not request blocks if the chain is not syncing + pub fn add_peer( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + ) -> ProcessingResult { if let ChainSyncingState::Stopped = self.state { - debug!(self.log, "Peer added to a non-syncing chain"; - "peer_id" => format!("{}", peer_id)); - return; + debug!(self.log, "Peer added to non-syncing chain"; "peer" => %peer_id) + } + // add the peer without overwriting its active requests + if self.peers.entry(peer_id).or_default().is_empty() { + // Either new or not, this peer is idle, try to request more batches + self.request_batches(network) + } else { + ProcessingResult::KeepChain } - - // find the next batch and request it from any peers if we need to - self.request_batches(network); } /// Sends a STATUS message to all peers in the peer pool. pub fn status_peers(&self, network: &mut SyncNetworkContext) { - for peer_id in self.peer_pool.iter() { - network.status_peer(self.chain.clone(), peer_id.clone()); - } + network.status_peers(self.chain.clone(), self.peers.keys().cloned()); } /// An RPC error has occurred. /// - /// Checks if the request_id is associated with this chain. If so, attempts to re-request the - /// batch. If the batch has exceeded the number of retries, returns - /// Some(`ProcessingResult::RemoveChain)`. Returns `None` if the request isn't related to - /// this chain. + /// If the batch exists it is re-requested. pub fn inject_error( &mut self, network: &mut SyncNetworkContext, - peer_id: &PeerId, - request_id: RequestId, - ) -> Option { - if let Some(batch) = self.pending_batches.remove(request_id) { - debug!(self.log, "Batch failed. RPC Error"; - "batch_epoch" => batch.start_epoch, - "retries" => batch.retries, - "peer" => format!("{:?}", peer_id)); - - Some(self.failed_batch(network, batch)) + batch_id: BatchId, + peer_id: PeerId, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed from the chain for other + // reasons. Check that this block belongs to the expected peer + if Some(&peer_id) != batch.current_peer() { + return ProcessingResult::KeepChain; + } + debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); + let failed_peer = batch + .current_peer() + .expect("Batch is downloading from a peer"); + self.peers + .get_mut(failed_peer) + .expect("Peer belongs to the chain") + .remove(&batch_id); + if let BatchState::Failed = batch.download_failed() { + return ProcessingResult::RemoveChain; + } + self.retry_batch_download(network, batch_id) } else { - None + // this could be an error for an old batch, removed when the chain advances + ProcessingResult::KeepChain } } - /// A batch has failed. This occurs when a network timeout happens or the peer didn't respond. - /// These events do not indicate a malicious peer, more likely simple networking issues. - /// - /// Attempts to re-request from another peer in the peer pool (if possible) and returns - /// `ProcessingResult::RemoveChain` if the number of retries on the batch exceeds - /// `MAX_BATCH_RETRIES`. - pub fn failed_batch( + /// Sends and registers the request of a batch awaiting download. + pub fn retry_batch_download( &mut self, network: &mut SyncNetworkContext, - mut batch: Batch, + batch_id: BatchId, ) -> ProcessingResult { - batch.retries += 1; + let batch = match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => return ProcessingResult::KeepChain, + }; - if batch.retries > MAX_BATCH_RETRIES || self.peer_pool.is_empty() { - // chain is unrecoverable, remove it - ProcessingResult::RemoveChain - } else { - // try to re-process the request using a different peer, if possible - let current_peer = &batch.current_peer; - let new_peer = self - .peer_pool + // Find a peer to request the batch + let failed_peers = batch.failed_peers(); + + let new_peer = { + let mut priorized_peers = self + .peers .iter() - .find(|peer| *peer != current_peer) - .unwrap_or_else(|| current_peer); + .map(|(peer, requests)| (failed_peers.contains(peer), requests.len(), peer)) + .collect::>(); + // Sort peers prioritizing unrelated peers with less active requests. + priorized_peers.sort_unstable(); + priorized_peers.get(0).map(|&(_, _, peer)| peer.clone()) + }; - batch.current_peer = new_peer.clone(); - debug!(self.log, "Re-Requesting batch"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "batch_epoch" => batch.start_epoch, - "peer" => batch.current_peer.to_string()); - self.send_batch(network, batch); - ProcessingResult::KeepChain + if let Some(peer) = new_peer { + self.send_batch(network, batch_id, peer) + } else { + // If we are here the chain has no more peers + ProcessingResult::RemoveChain } } + /// Requests the batch asigned to the given id from a given peer. + pub fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + peer: PeerId, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + let request = batch.to_blocks_by_range_request(); + // inform the batch about the new request + batch.start_downloading_from_peer(peer.clone()); + match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { + Ok(()) => { + if self + .optimistic_start + .map(|epoch| epoch == batch_id) + .unwrap_or(false) + { + debug!(self.log, "Requesting optimistic batch"; "epoch" => batch_id, &batch); + } else { + debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch); + } + // register the batch for this peer + self.peers + .get_mut(&peer) + .expect("peer belongs to the peer pool") + .insert(batch_id); + return ProcessingResult::KeepChain; + } + Err(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(self.log, "Could not send batch request"; + "batch_id" => batch_id, "error" => e, &batch); + // register the failed download and check if the batch can be retried + self.peers + .get_mut(&peer) + .expect("peer belongs to the peer pool") + .remove(&batch_id); + if let BatchState::Failed = batch.download_failed() { + return ProcessingResult::RemoveChain; + } else { + return self.retry_batch_download(network, batch_id); + } + } + } + } + + ProcessingResult::KeepChain + } + /// Returns true if this chain is currently syncing. pub fn is_syncing(&self) -> bool { match self.state { @@ -696,119 +902,142 @@ impl SyncingChain { /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. - fn request_batches(&mut self, network: &mut SyncNetworkContext) { - if let ChainSyncingState::Syncing = self.state { - while self.send_range_request(network) {} + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + if !matches!(self.state, ChainSyncingState::Syncing) { + return ProcessingResult::KeepChain; } - } - /// Requests the next required batch from a peer. Returns true, if there was a peer available - /// to send a request and there are batches to request, false otherwise. - fn send_range_request(&mut self, network: &mut SyncNetworkContext) -> bool { // find the next pending batch and request it from the peer - if let Some(peer_id) = self.get_next_peer() { - if let Some(batch) = self.get_next_batch(peer_id) { - debug!(self.log, "Requesting batch"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "batch_epoch" => batch.start_epoch, - "peer" => format!("{}", batch.current_peer)); - // send the batch - self.send_batch(network, batch); - return true; - } - } - false - } - /// Returns a peer if there exists a peer which does not currently have a pending request. - /// - /// This is used to create the next request. - fn get_next_peer(&self) -> Option { - // TODO: Optimize this by combining with above two functions. // randomize the peers for load balancing let mut rng = rand::thread_rng(); - let mut peers = self.peer_pool.iter().collect::>(); - peers.shuffle(&mut rng); - for peer in peers { - if self.pending_batches.peer_is_idle(peer) { - return Some(peer.clone()); + let mut idle_peers = self + .peers + .iter() + .filter_map(|(peer, requests)| { + if requests.is_empty() { + Some(peer.clone()) + } else { + None + } + }) + .collect::>(); + idle_peers.shuffle(&mut rng); + + // check if we have the batch for our optimistic start. If not, request it first. + // We wait for this batch before requesting any other batches. + if let Some(epoch) = self.optimistic_start { + if !self.batches.contains_key(&epoch) { + if let Some(peer) = idle_peers.pop() { + let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH); + self.batches.insert(epoch, optimistic_batch); + if let ProcessingResult::RemoveChain = self.send_batch(network, epoch, peer) { + return ProcessingResult::RemoveChain; + } + } + } + return ProcessingResult::KeepChain; + } + + while let Some(peer) = idle_peers.pop() { + if let Some(batch_id) = self.include_next_batch() { + // send the batch + if let ProcessingResult::RemoveChain = self.send_batch(network, batch_id, peer) { + return ProcessingResult::RemoveChain; + } + } else { + // No more batches, simply stop + return ProcessingResult::KeepChain; } } - None + + ProcessingResult::KeepChain } - /// Returns the next required batch from the chain if it exists. If there are no more batches - /// required, `None` is returned. - /// - /// Batches are downloaded excluding the first block of the epoch assuming it has already been - /// downloaded. - /// - /// For example: - /// - /// - /// Epoch boundary | | - /// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | - /// Batch 1 | Batch 2 | Batch 3 - fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { - let slots_per_epoch = T::EthSpec::slots_per_epoch(); - let blocks_per_batch = slots_per_epoch * EPOCHS_PER_BATCH; - - // only request batches up to the buffer size limit + /// Creates the next required batch from the chain. If there are no more batches required, + /// `false` is returned. + fn include_next_batch(&mut self) -> Option { + // don't request batches beyond the target head slot if self - .completed_batches - .len() - .saturating_add(self.pending_batches.len()) + .to_be_downloaded + .start_slot(T::EthSpec::slots_per_epoch()) + > self.target_head_slot + { + return None; + } + // only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &BatchInfo| { + matches!( + batch.state(), + BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() > BATCH_BUFFER_SIZE as usize { return None; } - // don't request batches beyond the target head slot - if self.to_be_downloaded.start_slot(slots_per_epoch) > self.target_head_slot { - return None; - } - - // truncate the batch to the epoch containing the target head of the chain - let batch_end_slot = std::cmp::min( - // request either a batch containing the max number of blocks per batch - self.to_be_downloaded.start_slot(slots_per_epoch) + blocks_per_batch + 1, - // or a batch of one epoch of blocks, which contains the `target_head_slot` - self.target_head_slot - .saturating_add(slots_per_epoch) - .epoch(slots_per_epoch) - .start_slot(slots_per_epoch), - ); - - let batch = Some(Batch::new(self.to_be_downloaded, batch_end_slot, peer_id)); - self.to_be_downloaded += EPOCHS_PER_BATCH; - batch - } - - /// Requests the provided batch from the provided peer. - fn send_batch( - &mut self, - network: &mut SyncNetworkContext, - batch: Batch, - ) { - let request = batch.to_blocks_by_range_request(); - - match network.blocks_by_range_request(batch.current_peer.clone(), request) { - Ok(request_id) => { - // add the batch to pending list - self.pending_batches.insert(request_id, batch); + let batch_id = self.to_be_downloaded; + // this batch could have been included already being an optimistic batch + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downlading, let this same function decide the next batch + self.to_be_downloaded += EPOCHS_PER_BATCH; + self.include_next_batch() } - Err(e) => { - warn!(self.log, "Batch request failed"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "start_epoch" => batch.start_epoch, - "peer" => batch.current_peer.to_string(), - "retries" => batch.retries, - "error" => e, - "re-processes" => batch.reprocess_retries); - self.failed_batch(network, batch); + Entry::Vacant(entry) => { + entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH)); + self.to_be_downloaded += EPOCHS_PER_BATCH; + Some(batch_id) } } } } + +impl slog::KV for &mut SyncingChain { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::KV::serialize(*self, record, serializer) + } +} + +impl slog::KV for SyncingChain { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + use slog::Value; + serializer.emit_u64("id", self.id)?; + Value::serialize(&self.start_epoch, record, "from", serializer)?; + Value::serialize( + &self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()), + record, + "to", + serializer, + )?; + serializer.emit_str("end_root", &self.target_head_root.to_string())?; + Value::serialize( + &self.processing_target, + record, + "current_target", + serializer, + )?; + serializer.emit_usize("batches", self.batches.len())?; + serializer.emit_usize("peers", self.peers.len())?; + slog::Result::Ok(()) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 71e060670..fdfa4b8eb 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -1,15 +1,18 @@ //! This provides the logic for the finalized and head chains. //! -//! Each chain type is stored in it's own vector. A variety of helper functions are given along -//! with this struct to to simplify the logic of the other layers of sync. +//! Each chain type is stored in it's own map. A variety of helper functions are given along with +//! this struct to simplify the logic of the other layers of sync. -use super::chain::{ChainSyncingState, SyncingChain}; +use super::chain::{ChainId, ChainSyncingState, ProcessingResult, SyncingChain}; +use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId}; -use slog::{debug, error, info, o}; +use fnv::FnvHashMap; +use slog::{crit, debug, error, info, trace}; +use std::collections::hash_map::Entry; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -83,9 +86,9 @@ pub struct ChainCollection { /// A reference to the global network parameters. network_globals: Arc>, /// The set of finalized chains being synced. - finalized_chains: Vec>, + finalized_chains: FnvHashMap>, /// The set of head chains being synced. - head_chains: Vec>, + head_chains: FnvHashMap>, /// The current sync state of the process. state: RangeSyncState, /// Logger for the collection. @@ -101,8 +104,8 @@ impl ChainCollection { ChainCollection { beacon_chain, network_globals, - finalized_chains: Vec::new(), - head_chains: Vec::new(), + finalized_chains: FnvHashMap::default(), + head_chains: FnvHashMap::default(), state: RangeSyncState::Idle, log, } @@ -129,7 +132,7 @@ impl ChainCollection { .unwrap_or_else(|| SyncState::Stalled); let mut peer_state = self.network_globals.sync_state.write(); if new_state != *peer_state { - info!(self.log, "Sync state updated"; "old_state" => format!("{}",peer_state), "new_state" => format!("{}",new_state)); + info!(self.log, "Sync state updated"; "old_state" => %peer_state, "new_state" => %new_state); if new_state == SyncState::Synced { network.subscribe_core_topics(); } @@ -141,7 +144,7 @@ impl ChainCollection { let new_state: SyncState = self.state.clone().into(); if *node_sync_state != new_state { // we are updating the state, inform the user - info!(self.log, "Sync state updated"; "old_state" => format!("{}",node_sync_state), "new_state" => format!("{}",new_state)); + info!(self.log, "Sync state updated"; "old_state" => %node_sync_state, "new_state" => %new_state); } *node_sync_state = new_state; } @@ -182,30 +185,67 @@ impl ChainCollection { } } - /// Finds any finalized chain if it exists. - pub fn get_finalized_mut( - &mut self, - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&mut SyncingChain> { - ChainCollection::get_chain( - self.finalized_chains.as_mut(), - target_head_root, - target_head_slot, - ) + /// Calls `func` on every chain of the collection. If the result is + /// `ProcessingResult::RemoveChain`, the chain is removed and returned. + pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType)> + where + F: FnMut(&mut SyncingChain) -> ProcessingResult, + { + let mut to_remove = Vec::new(); + + for (id, chain) in self.finalized_chains.iter_mut() { + if let ProcessingResult::RemoveChain = func(chain) { + to_remove.push((*id, RangeSyncType::Finalized)); + } + } + + for (id, chain) in self.head_chains.iter_mut() { + if let ProcessingResult::RemoveChain = func(chain) { + to_remove.push((*id, RangeSyncType::Head)); + } + } + + let mut results = Vec::with_capacity(to_remove.len()); + for (id, sync_type) in to_remove.into_iter() { + let chain = match sync_type { + RangeSyncType::Finalized => self.finalized_chains.remove(&id), + RangeSyncType::Head => self.head_chains.remove(&id), + }; + results.push((chain.expect("Chain exits"), sync_type)); + } + results } - /// Finds any finalized chain if it exists. - pub fn get_head_mut( + /// Executes a function on the chain with the given id. + /// + /// If the function returns `ProcessingResult::RemoveChain`, the chain is removed and returned. + /// If the chain is found, its syncing type is returned, or an error otherwise. + pub fn call_by_id( &mut self, - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&mut SyncingChain> { - ChainCollection::get_chain( - self.head_chains.as_mut(), - target_head_root, - target_head_slot, - ) + id: ChainId, + func: F, + ) -> Result<(Option>, RangeSyncType), ()> + where + F: FnOnce(&mut SyncingChain) -> ProcessingResult, + { + if let Entry::Occupied(mut entry) = self.finalized_chains.entry(id) { + // Search in our finalized chains first + if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + Ok((Some(entry.remove()), RangeSyncType::Finalized)) + } else { + Ok((None, RangeSyncType::Finalized)) + } + } else if let Entry::Occupied(mut entry) = self.head_chains.entry(id) { + // Search in our head chains next + if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + Ok((Some(entry.remove()), RangeSyncType::Head)) + } else { + Ok((None, RangeSyncType::Head)) + } + } else { + // Chain was not found in the finalized collection, nor the head collection + Err(()) + } } /// Updates the state of the chain collection. @@ -214,9 +254,8 @@ impl ChainCollection { /// updates the state of the collection. This starts head chains syncing if any are required to /// do so. pub fn update(&mut self, network: &mut SyncNetworkContext) { - let local_epoch = { - let local = match PeerSyncInfo::from_chain(&self.beacon_chain) { - Some(local) => local, + let (local_finalized_epoch, local_head_epoch) = + match PeerSyncInfo::from_chain(&self.beacon_chain) { None => { return error!( self.log, @@ -224,20 +263,21 @@ impl ChainCollection { "msg" => "likely due to head lock contention" ) } + Some(local) => ( + local.finalized_epoch, + local.head_slot.epoch(T::EthSpec::slots_per_epoch()), + ), }; - local.finalized_epoch - }; - // Remove any outdated finalized/head chains self.purge_outdated_chains(network); // Choose the best finalized chain if one needs to be selected. - self.update_finalized_chains(network, local_epoch); + self.update_finalized_chains(network, local_finalized_epoch, local_head_epoch); - if self.finalized_syncing_index().is_none() { + if self.finalized_syncing_chain().is_none() { // Handle head syncing chains if there are no finalized chains left. - self.update_head_chains(network, local_epoch); + self.update_head_chains(network, local_finalized_epoch, local_head_epoch); } } @@ -247,53 +287,57 @@ impl ChainCollection { &mut self, network: &mut SyncNetworkContext, local_epoch: Epoch, + local_head_epoch: Epoch, ) { - // Check if any chains become the new syncing chain - if let Some(index) = self.finalized_syncing_index() { - // There is a current finalized chain syncing - let _syncing_chain_peer_count = self.finalized_chains[index].peer_pool.len(); - - // search for a chain with more peers - if let Some((new_index, chain)) = - self.finalized_chains - .iter_mut() - .enumerate() - .find(|(_iter_index, _chain)| { - false - // && *iter_index != index - // && chain.peer_pool.len() > syncing_chain_peer_count - }) - { - // A chain has more peers. Swap the syncing chain - debug!(self.log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> local_epoch); - - // update the state to a new finalized state - let state = RangeSyncState::Finalized { - start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), - head_slot: chain.target_head_slot, - head_root: chain.target_head_root, - }; - self.state = state; - - // Stop the current chain from syncing - self.finalized_chains[index].stop_syncing(); - // Start the new chain - self.finalized_chains[new_index].start_syncing(network, local_epoch); - } - } else if let Some(chain) = self + // Find the chain with most peers and check if it is already syncing + if let Some((new_id, peers)) = self .finalized_chains - .iter_mut() - .max_by_key(|chain| chain.peer_pool.len()) + .iter() + .max_by_key(|(_, chain)| chain.available_peers()) + .map(|(id, chain)| (*id, chain.available_peers())) { - // There is no currently syncing finalization chain, starting the one with the most peers - debug!(self.log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); - chain.start_syncing(network, local_epoch); + let old_id = self.finalized_syncing_chain().map( + |(currently_syncing_id, currently_syncing_chain)| { + if *currently_syncing_id != new_id + && peers > currently_syncing_chain.available_peers() + { + currently_syncing_chain.stop_syncing(); + // we stop this chain and start syncing the one with more peers + Some(*currently_syncing_id) + } else { + // the best chain is already the syncing chain, advance it if possible + None + } + }, + ); + + let chain = self + .finalized_chains + .get_mut(&new_id) + .expect("Chain exists"); + + match old_id { + Some(Some(old_id)) => debug!(self.log, "Switching finalized chains"; + "old_id" => old_id, &chain), + None => debug!(self.log, "Syncing new chain"; &chain), + Some(None) => trace!(self.log, "Advancing currently syncing chain"), + // this is the same chain. We try to advance it. + } + // update the state to a new finalized state let state = RangeSyncState::Finalized { start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), head_slot: chain.target_head_slot, head_root: chain.target_head_root, }; self.state = state; + + if let ProcessingResult::RemoveChain = + chain.start_syncing(network, local_epoch, local_head_epoch) + { + // this happens only if sending a batch over the `network` fails a lot + error!(self.log, "Chain removed while switching chains"); + self.finalized_chains.remove(&new_id); + } } } @@ -302,6 +346,7 @@ impl ChainCollection { &mut self, network: &mut SyncNetworkContext, local_epoch: Epoch, + local_head_epoch: Epoch, ) { // There are no finalized chains, update the state. if self.head_chains.is_empty() { @@ -311,42 +356,41 @@ impl ChainCollection { let mut currently_syncing = self .head_chains - .iter() + .values() .filter(|chain| chain.is_syncing()) .count(); let mut not_syncing = self.head_chains.len() - currently_syncing; - // Find all head chains that are not currently syncing ordered by peer count. while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 { // Find the chain with the most peers and start syncing - if let Some((_index, chain)) = self + if let Some((_id, chain)) = self .head_chains .iter_mut() - .filter(|chain| !chain.is_syncing()) - .enumerate() - .max_by_key(|(_index, chain)| chain.peer_pool.len()) + .filter(|(_id, chain)| !chain.is_syncing()) + .max_by_key(|(_id, chain)| chain.available_peers()) { // start syncing this chain - debug!(self.log, "New head chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); - chain.start_syncing(network, local_epoch); + debug!(self.log, "New head chain started syncing"; &chain); + if let ProcessingResult::RemoveChain = + chain.start_syncing(network, local_epoch, local_head_epoch) + { + error!(self.log, "Chain removed while switching head chains") + } } - // update variables currently_syncing = self .head_chains .iter() - .filter(|chain| chain.is_syncing()) + .filter(|(_id, chain)| chain.is_syncing()) .count(); not_syncing = self.head_chains.len() - currently_syncing; } - // Start // for the syncing API, we find the minimal start_slot and the maximum // target_slot of all head chains to report back. - let (min_epoch, max_slot) = self .head_chains - .iter() + .values() .filter(|chain| chain.is_syncing()) .fold( (Epoch::from(0u64), Slot::from(0u64)), @@ -368,10 +412,9 @@ impl ChainCollection { /// chains and re-status their peers. pub fn clear_head_chains(&mut self, network: &mut SyncNetworkContext) { let log_ref = &self.log; - self.head_chains.retain(|chain| { - if !chain.is_syncing() - { - debug!(log_ref, "Removing old head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); + self.head_chains.retain(|_id, chain| { + if !chain.is_syncing() { + debug!(log_ref, "Removing old head chain"; &chain); chain.status_peers(network); false } else { @@ -380,140 +423,20 @@ impl ChainCollection { }); } - /// Add a new finalized chain to the collection. - pub fn new_finalized_chain( - &mut self, - local_finalized_epoch: Epoch, - target_head: Hash256, - target_slot: Slot, - peer_id: PeerId, - beacon_processor_send: mpsc::Sender>, - ) { - let chain_id = rand::random(); - self.finalized_chains.push(SyncingChain::new( - chain_id, - local_finalized_epoch, - target_slot, - target_head, - peer_id, - beacon_processor_send, - self.beacon_chain.clone(), - self.log.new(o!("chain" => chain_id)), - )); - } - - /// Add a new finalized chain to the collection and starts syncing it. - #[allow(clippy::too_many_arguments)] - pub fn new_head_chain( - &mut self, - remote_finalized_epoch: Epoch, - target_head: Hash256, - target_slot: Slot, - peer_id: PeerId, - beacon_processor_send: mpsc::Sender>, - ) { - // remove the peer from any other head chains - - self.head_chains.iter_mut().for_each(|chain| { - chain.peer_pool.remove(&peer_id); - }); - self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); - - let chain_id = rand::random(); - let new_head_chain = SyncingChain::new( - chain_id, - remote_finalized_epoch, - target_slot, - target_head, - peer_id, - beacon_processor_send, - self.beacon_chain.clone(), - self.log.clone(), - ); - self.head_chains.push(new_head_chain); - } - /// Returns if `true` if any finalized chains exist, `false` otherwise. pub fn is_finalizing_sync(&self) -> bool { !self.finalized_chains.is_empty() } - /// Given a chain iterator, runs a given function on each chain until the function returns - /// `Some`. This allows the `RangeSync` struct to loop over chains and optionally remove the - /// chain from the collection if the function results in completing the chain. - fn request_function<'a, F, I, U>(chain: I, mut func: F) -> Option<(usize, U)> - where - I: Iterator>, - F: FnMut(&'a mut SyncingChain) -> Option, - { - chain - .enumerate() - .find_map(|(index, chain)| Some((index, func(chain)?))) - } - - /// Given a chain iterator, runs a given function on each chain and return all `Some` results. - fn request_function_all<'a, F, I, U>(chain: I, mut func: F) -> Vec<(usize, U)> - where - I: Iterator>, - F: FnMut(&'a mut SyncingChain) -> Option, - { - chain - .enumerate() - .filter_map(|(index, chain)| Some((index, func(chain)?))) - .collect() - } - - /// Runs a function on finalized chains until we get the first `Some` result from `F`. - pub fn finalized_request(&mut self, func: F) -> Option<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function(self.finalized_chains.iter_mut(), func) - } - - /// Runs a function on head chains until we get the first `Some` result from `F`. - pub fn head_request(&mut self, func: F) -> Option<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function(self.head_chains.iter_mut(), func) - } - - /// Runs a function on finalized and head chains until we get the first `Some` result from `F`. - pub fn head_finalized_request(&mut self, func: F) -> Option<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function( - self.finalized_chains - .iter_mut() - .chain(self.head_chains.iter_mut()), - func, - ) - } - - /// Runs a function on all finalized and head chains and collects all `Some` results from `F`. - pub fn head_finalized_request_all(&mut self, func: F) -> Vec<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function_all( - self.finalized_chains - .iter_mut() - .chain(self.head_chains.iter_mut()), - func, - ) - } - /// Removes any outdated finalized or head chains. - /// /// This removes chains with no peers, or chains whose start block slot is less than our current /// finalized block slot. pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext) { // Remove any chains that have no peers self.finalized_chains - .retain(|chain| !chain.peer_pool.is_empty()); - self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); + .retain(|_id, chain| chain.available_peers() > 0); + self.head_chains + .retain(|_id, chain| chain.available_peers() > 0); let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, @@ -533,28 +456,28 @@ impl ChainCollection { let beacon_chain = &self.beacon_chain; let log_ref = &self.log; // Remove chains that are out-dated and re-status their peers - self.finalized_chains.retain(|chain| { + self.finalized_chains.retain(|_id, chain| { if chain.target_head_slot <= local_finalized_slot || beacon_chain .fork_choice .read() .contains_block(&chain.target_head_root) { - debug!(log_ref, "Purging out of finalized chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of finalized chain"; &chain); chain.status_peers(network); false } else { true } }); - self.head_chains.retain(|chain| { + self.head_chains.retain(|_id, chain| { if chain.target_head_slot <= local_finalized_slot || beacon_chain .fork_choice .read() .contains_block(&chain.target_head_root) { - debug!(log_ref, "Purging out of date head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of date head chain"; &chain); chain.status_peers(network); false } else { @@ -563,63 +486,71 @@ impl ChainCollection { }); } - /// Removes and returns a finalized chain from the collection. - pub fn remove_finalized_chain(&mut self, index: usize) -> SyncingChain { - self.finalized_chains.swap_remove(index) - } - - /// Removes and returns a head chain from the collection. - pub fn remove_head_chain(&mut self, index: usize) -> SyncingChain { - self.head_chains.swap_remove(index) - } - - /// Removes a chain from either finalized or head chains based on the index. Using a request - /// iterates of finalized chains before head chains. Thus an index that is greater than the - /// finalized chain length, indicates a head chain. - /// - /// This will re-status the chains peers on removal. The index must exist. - pub fn remove_chain(&mut self, network: &mut SyncNetworkContext, index: usize) { - let chain = if index >= self.finalized_chains.len() { - let index = index - self.finalized_chains.len(); - let chain = self.head_chains.swap_remove(index); - chain.status_peers(network); - chain + /// Adds a peer to a chain with the given target, or creates a new syncing chain if it doesn't + /// exits. + #[allow(clippy::too_many_arguments)] + pub fn add_peer_or_create_chain( + &mut self, + start_epoch: Epoch, + target_head_root: Hash256, + target_head_slot: Slot, + peer: PeerId, + sync_type: RangeSyncType, + beacon_processor_send: &mpsc::Sender>, + network: &mut SyncNetworkContext, + ) { + let id = SyncingChain::::id(&target_head_root, &target_head_slot); + let collection = if let RangeSyncType::Finalized = sync_type { + if let Some(chain) = self.head_chains.get(&id) { + // sanity verification for chain duplication / purging issues + crit!(self.log, "Adding known head chain as finalized chain"; chain); + } + &mut self.finalized_chains } else { - let chain = self.finalized_chains.swap_remove(index); - chain.status_peers(network); - chain + if let Some(chain) = self.finalized_chains.get(&id) { + // sanity verification for chain duplication / purging issues + crit!(self.log, "Adding known finalized chain as head chain"; chain); + } + &mut self.head_chains }; - - debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); - - // update the state - self.update(network); + match collection.entry(id) { + Entry::Occupied(mut entry) => { + let chain = entry.get_mut(); + debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); + assert_eq!(chain.target_head_root, target_head_root); + assert_eq!(chain.target_head_slot, target_head_slot); + if let ProcessingResult::RemoveChain = chain.add_peer(network, peer) { + debug!(self.log, "Chain removed after adding peer"; "chain" => id); + entry.remove(); + } + } + Entry::Vacant(entry) => { + let peer_rpr = peer.to_string(); + let new_chain = SyncingChain::new( + start_epoch, + target_head_slot, + target_head_root, + peer, + beacon_processor_send.clone(), + self.beacon_chain.clone(), + &self.log, + ); + assert_eq!(new_chain.get_id(), id); + debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); + entry.insert(new_chain); + } + } } /// Returns the index of finalized chain that is currently syncing. Returns `None` if no /// finalized chain is currently syncing. - fn finalized_syncing_index(&self) -> Option { - self.finalized_chains - .iter() - .enumerate() - .find_map(|(index, chain)| { - if chain.state == ChainSyncingState::Syncing { - Some(index) - } else { - None - } - }) - } - - /// Returns a chain given the target head root and slot. - fn get_chain<'a>( - chain: &'a mut [SyncingChain], - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&'a mut SyncingChain> { - chain.iter_mut().find(|iter_chain| { - iter_chain.target_head_root == target_head_root - && iter_chain.target_head_slot == target_head_slot + fn finalized_syncing_chain(&mut self) -> Option<(&ChainId, &mut SyncingChain)> { + self.finalized_chains.iter_mut().find_map(|(id, chain)| { + if chain.state == ChainSyncingState::Syncing { + Some((id, chain)) + } else { + None + } }) } } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 85db6d378..07939569a 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -7,6 +7,6 @@ mod chain_collection; mod range; mod sync_type; -pub use batch::Batch; -pub use chain::{ChainId, EPOCHS_PER_BATCH}; +pub use batch::BatchInfo; +pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 66339be06..6847838e0 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,7 +39,7 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::chain::{ChainId, ProcessingResult}; +use super::chain::ChainId; use super::chain_collection::{ChainCollection, RangeSyncState}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; @@ -49,7 +49,7 @@ use crate::sync::PeerSyncInfo; use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{NetworkGlobals, PeerId}; -use slog::{debug, error, trace}; +use slog::{debug, error, trace, warn}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc; @@ -121,21 +121,15 @@ impl RangeSync { let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, None => { - return error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) + return error!(self.log, "Failed to get peer sync info"; + "msg" => "likely due to head lock contention") } }; - // convenience variables + // convenience variable let remote_finalized_slot = remote_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); - let local_finalized_slot = local_info - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. @@ -146,7 +140,7 @@ impl RangeSync { match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) { RangeSyncType::Finalized => { // Finalized chain search - debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); + debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id); // remove the peer from the awaiting_head_peers list if it exists self.awaiting_head_peers.remove(&peer_id); @@ -154,37 +148,19 @@ impl RangeSync { // Note: We keep current head chains. These can continue syncing whilst we complete // this new finalized chain. - // If a finalized chain already exists that matches, add this peer to the chain's peer - // pool. - if let Some(chain) = self - .chains - .get_finalized_mut(remote_info.finalized_root, remote_finalized_slot) - { - debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => peer_id.to_string(), "target_root" => chain.target_head_root.to_string(), "targe_slot" => chain.target_head_slot); + self.chains.add_peer_or_create_chain( + local_info.finalized_epoch, + remote_info.finalized_root, + remote_finalized_slot, + peer_id, + RangeSyncType::Finalized, + &self.beacon_processor_send, + network, + ); - // add the peer to the chain's peer pool - chain.add_peer(network, peer_id); - - // check if the new peer's addition will favour a new syncing chain. - self.chains.update(network); - // update the global sync state if necessary - self.chains.update_sync_state(network); - } else { - // there is no finalized chain that matches this peer's last finalized target - // create a new finalized chain - debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot, "end_slot" => remote_finalized_slot, "finalized_root" => format!("{}", remote_info.finalized_root)); - - self.chains.new_finalized_chain( - local_info.finalized_epoch, - remote_info.finalized_root, - remote_finalized_slot, - peer_id, - self.beacon_processor_send.clone(), - ); - self.chains.update(network); - // update the global sync state - self.chains.update_sync_state(network); - } + self.chains.update(network); + // update the global sync state + self.chains.update_sync_state(network); } RangeSyncType::Head => { // This peer requires a head chain sync @@ -192,7 +168,7 @@ impl RangeSync { if self.chains.is_finalizing_sync() { // If there are finalized chains to sync, finish these first, before syncing head // chains. This allows us to re-sync all known peers - trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id)); + trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => %peer_id); // store the peer to re-status after all finalized chains complete self.awaiting_head_peers.insert(peer_id); return; @@ -203,31 +179,18 @@ impl RangeSync { // The new peer has the same finalized (earlier filters should prevent a peer with an // earlier finalized chain from reaching here). - debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id)); - // search if there is a matching head chain, then add the peer to the chain - if let Some(chain) = self - .chains - .get_head_mut(remote_info.head_root, remote_info.head_slot) - { - debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote_info.head_root), "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); - - // add the peer to the head's pool - chain.add_peer(network, peer_id); - } else { - // There are no other head chains that match this peer's status, create a new one, and - let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) - .epoch(T::EthSpec::slots_per_epoch()); - debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); - - self.chains.new_head_chain( - start_epoch, - remote_info.head_root, - remote_info.head_slot, - peer_id, - self.beacon_processor_send.clone(), - ); - } + let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) + .epoch(T::EthSpec::slots_per_epoch()); + self.chains.add_peer_or_create_chain( + start_epoch, + remote_info.head_root, + remote_info.head_slot, + peer_id, + RangeSyncType::Head, + &self.beacon_processor_send, + network, + ); self.chains.update(network); self.chains.update_sync_state(network); } @@ -245,23 +208,27 @@ impl RangeSync { request_id: RequestId, beacon_block: Option>, ) { - // Find the request. Most likely the first finalized chain (the syncing chain). If there - // are no finalized chains, then it will be a head chain. At most, there should only be - // `connected_peers` number of head chains, which should be relatively small and this - // lookup should not be very expensive. However, we could add an extra index that maps the - // request id to index of the vector to avoid O(N) searches and O(N) hash lookups. - - let id_not_found = self - .chains - .head_finalized_request(|chain| { - chain.on_block_response(network, request_id, &beacon_block) - }) - .is_none(); - if id_not_found { - // The request didn't exist in any `SyncingChain`. Could have been an old request or - // the chain was purged due to being out of date whilst a request was pending. Log - // and ignore. - debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id); + // get the chain and batch for which this response belongs + if let Some((chain_id, batch_id)) = + network.blocks_by_range_response(request_id, beacon_block.is_none()) + { + // check if this chunk removes the chain + match self.chains.call_by_id(chain_id, |chain| { + chain.on_block_response(network, batch_id, peer_id, beacon_block) + }) { + Ok((removed_chain, sync_type)) => { + if let Some(removed_chain) = removed_chain { + debug!(self.log, "Chain removed after block response"; "sync_type" => ?sync_type, "chain_id" => chain_id); + removed_chain.status_peers(network); + // TODO: update & update_sync_state? + } + } + Err(_) => { + debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + } + } + } else { + warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } @@ -269,76 +236,57 @@ impl RangeSync { &mut self, network: &mut SyncNetworkContext, chain_id: ChainId, - epoch: Epoch, - downloaded_blocks: Vec>, + batch_id: Epoch, result: BatchProcessResult, ) { - // build an option for passing the downloaded_blocks to each chain - let mut downloaded_blocks = Some(downloaded_blocks); - - match self.chains.finalized_request(|chain| { - chain.on_batch_process_result(network, chain_id, epoch, &mut downloaded_blocks, &result) + // check if this response removes the chain + match self.chains.call_by_id(chain_id, |chain| { + chain.on_batch_process_result(network, batch_id, &result) }) { - Some((index, ProcessingResult::RemoveChain)) => { - let chain = self.chains.remove_finalized_chain(index); - debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); - // update the state of the collection - self.chains.update(network); - - // the chain is complete, re-status it's peers - chain.status_peers(network); - - // set the state to a head sync if there are no finalized chains, to inform the manager that we are awaiting a - // head chain. - self.chains.set_head_sync(); - // Update the global variables - self.chains.update_sync_state(network); - - // if there are no more finalized chains, re-status all known peers awaiting a head - // sync - match self.chains.state() { - RangeSyncState::Idle | RangeSyncState::Head { .. } => { - for peer_id in self.awaiting_head_peers.drain() { - network.status_peer(self.beacon_chain.clone(), peer_id); + Ok((None, _sync_type)) => { + // Chain was found and not removed + } + Ok((Some(removed_chain), sync_type)) => { + debug!(self.log, "Chain removed after processing result"; "chain" => chain_id, "sync_type" => ?sync_type); + // Chain ended, re-status its peers + removed_chain.status_peers(network); + match sync_type { + RangeSyncType::Finalized => { + // update the state of the collection + self.chains.update(network); + // set the state to a head sync if there are no finalized chains, to inform + // the manager that we are awaiting a head chain. + self.chains.set_head_sync(); + // Update the global variables + self.chains.update_sync_state(network); + // if there are no more finalized chains, re-status all known peers + // awaiting a head sync + match self.chains.state() { + RangeSyncState::Idle | RangeSyncState::Head { .. } => { + network.status_peers( + self.beacon_chain.clone(), + self.awaiting_head_peers.drain(), + ); + } + RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete } } - RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete - } - } - Some((_, ProcessingResult::KeepChain)) => {} - None => { - match self.chains.head_request(|chain| { - chain.on_batch_process_result( - network, - chain_id, - epoch, - &mut downloaded_blocks, - &result, - ) - }) { - Some((index, ProcessingResult::RemoveChain)) => { - let chain = self.chains.remove_head_chain(index); - debug!(self.log, "Head chain completed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); - // the chain is complete, re-status it's peers and remove it - chain.status_peers(network); - - // Remove non-syncing head chains and re-status the peers - // This removes a build-up of potentially duplicate head chains. Any - // legitimate head chains will be re-established + RangeSyncType::Head => { + // Remove non-syncing head chains and re-status the peers. This removes a + // build-up of potentially duplicate head chains. Any legitimate head + // chains will be re-established self.chains.clear_head_chains(network); // update the state of the collection self.chains.update(network); // update the global state and log any change self.chains.update_sync_state(network); } - Some((_, ProcessingResult::KeepChain)) => {} - None => { - // This can happen if a chain gets purged due to being out of date whilst a - // batch process is in progress. - debug!(self.log, "No chains match the block processing id"; "batch_epoch" => epoch, "chain_id" => chain_id); - } } } + + Err(_) => { + debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + } } } @@ -352,7 +300,7 @@ impl RangeSync { // if the peer is in the awaiting head mapping, remove it self.awaiting_head_peers.remove(peer_id); - // remove the peer from any peer pool + // remove the peer from any peer pool, failing its batches self.remove_peer(network, peer_id); // update the state of the collection @@ -361,30 +309,17 @@ impl RangeSync { self.chains.update_sync_state(network); } - /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting + /// When a peer gets removed, both the head and finalized chains need to be searched to check + /// which pool the peer is in. The chain may also have a batch or batches awaiting /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain and re-status all the peers. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (index, result) in self.chains.head_finalized_request_all(|chain| { - if chain.peer_pool.remove(peer_id) { - // this chain contained the peer - while let Some(batch) = chain.pending_batches.remove_batch_by_peer(peer_id) { - if let ProcessingResult::RemoveChain = chain.failed_batch(network, batch) { - // a single batch failed, remove the chain - return Some(ProcessingResult::RemoveChain); - } - } - // peer removed from chain, no batch failed - Some(ProcessingResult::KeepChain) - } else { - None - } - }) { - if result == ProcessingResult::RemoveChain { - // the chain needed to be removed - debug!(self.log, "Chain being removed due to failed batch"); - self.chains.remove_chain(network, index); - } + for (removed_chain, sync_type) in self + .chains + .call_all(|chain| chain.remove_peer(peer_id, network)) + { + debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + // TODO: anything else to do? } } @@ -398,17 +333,25 @@ impl RangeSync { peer_id: PeerId, request_id: RequestId, ) { - // check that this request is pending - match self - .chains - .head_finalized_request(|chain| chain.inject_error(network, &peer_id, request_id)) - { - Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists - Some((index, ProcessingResult::RemoveChain)) => { - debug!(self.log, "Chain being removed due to RPC error"); - self.chains.remove_chain(network, index) + // get the chain and batch for which this response belongs + if let Some((chain_id, batch_id)) = network.blocks_by_range_response(request_id, true) { + // check that this request is pending + match self.chains.call_by_id(chain_id, |chain| { + chain.inject_error(network, batch_id, peer_id) + }) { + Ok((removed_chain, sync_type)) => { + if let Some(removed_chain) = removed_chain { + debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + removed_chain.status_peers(network); + // TODO: update & update_sync_state? + } + } + Err(_) => { + debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + } } - None => {} // request wasn't in the finalized chains, check the head chains + } else { + warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } } diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index 103ef77b8..d9f1d3f17 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -6,6 +6,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use std::sync::Arc; /// The type of Range sync that should be done relative to our current state. +#[derive(Debug)] pub enum RangeSyncType { /// A finalized chain sync should be started with this peer. Finalized,