diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index 19eef3cbc..49e6dc92e 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -35,7 +35,6 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] } discv5 = { version = "0.1.0-alpha.10", features = ["libp2p"] } tiny-keccak = "2.0.2" environment = { path = "../../lighthouse/environment" } -# TODO: Remove rand crate for mainnet rand = "0.7.3" regex = "1.3.9" diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 2ef369e3b..0448e7762 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -32,6 +32,7 @@ tokio = { version = "0.2.21", features = ["full"] } parking_lot = "0.11.0" smallvec = "1.4.1" # TODO: Remove rand crate for mainnet +# NOTE: why? rand = "0.7.3" fnv = "1.0.6" rlp = "0.4.5" diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs index 94c7893f0..e659a84b8 100644 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -28,39 +28,26 @@ pub fn handle_chain_segment( match process_id { // this a request from the range sync ProcessId::RangeBatchId(chain_id, epoch) => { - let len = downloaded_blocks.len(); - let start_slot = if len > 0 { - downloaded_blocks[0].message.slot.as_u64() - } else { - 0 - }; - let end_slot = if len > 0 { - downloaded_blocks[len - 1].message.slot.as_u64() - } else { - 0 - }; + let start_slot = downloaded_blocks.first().map(|b| b.message.slot.as_u64()); + let end_slot = downloaded_blocks.last().map(|b| b.message.slot.as_u64()); + let sent_blocks = downloaded_blocks.len(); - debug!(log, "Processing batch"; "batch_epoch" => epoch, "blocks" => downloaded_blocks.len(), "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service" => "sync"); let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { (_, Ok(_)) => { - debug!(log, "Batch processed"; "batch_epoch" => epoch , "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service"=> "sync"); - BatchProcessResult::Success + debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); + BatchProcessResult::Success(sent_blocks > 0) } - (imported_blocks, Err(e)) if imported_blocks > 0 => { - debug!(log, "Batch processing failed but imported some blocks"; - "batch_epoch" => epoch, "error" => e, "imported_blocks"=> imported_blocks, "service" => "sync"); - BatchProcessResult::Partial - } - (_, Err(e)) => { - debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "error" => e, "service" => "sync"); - BatchProcessResult::Failed + (imported_blocks, Err(e)) => { + debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + "last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync"); + BatchProcessResult::Failed(imported_blocks > 0) } }; let msg = SyncMessage::BatchProcessed { chain_id, epoch, - downloaded_blocks, result, }; sync_send.send(msg).unwrap_or_else(|_| { @@ -70,7 +57,7 @@ pub fn handle_chain_segment( ); }); } - // this a parent lookup request from the sync manager + // this is a parent lookup request from the sync manager ProcessId::ParentLookup(peer_id, chain_head) => { debug!( log, "Processing parent lookup"; @@ -81,7 +68,7 @@ pub fn handle_chain_segment( // reverse match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { (_, Err(e)) => { - debug!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); + debug!(log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); sync_send .send(SyncMessage::ParentLookupFailed{peer_id, chain_head}) .unwrap_or_else(|_| { @@ -114,13 +101,7 @@ fn process_blocks< match chain.process_chain_segment(blocks) { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks == 0 { - debug!(log, "All blocks already known"); - } else { - debug!( - log, "Imported blocks from network"; - "count" => imported_blocks, - ); + if imported_blocks > 0 { // Batch completed successfully with at least one block, run fork choice. run_fork_choice(chain, log); } @@ -153,7 +134,7 @@ fn run_fork_choice(chain: Arc>, log: &slog:: Err(e) => error!( log, "Fork choice failed"; - "error" => format!("{:?}", e), + "error" => ?e, "location" => "batch import error" ), } @@ -219,7 +200,7 @@ fn handle_failed_chain_segment( warn!( log, "BlockProcessingFailure"; "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", e) + "outcome" => ?e, ); Err(format!("Internal error whilst processing block: {:?}", e)) @@ -228,7 +209,7 @@ fn handle_failed_chain_segment( debug!( log, "Invalid block received"; "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", other), + "outcome" => %other, ); Err(format!("Peer sent invalid block. Reason: {:?}", other)) diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker.rs index 6388962e5..79d5d53c5 100644 --- a/beacon_node/network/src/beacon_processor/worker.rs +++ b/beacon_node/network/src/beacon_processor/worker.rs @@ -535,9 +535,10 @@ impl Worker { /// /// Creates a log if there is an interal error. fn send_sync_message(&self, message: SyncMessage) { - self.sync_tx - .send(message) - .unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service")); + self.sync_tx.send(message).unwrap_or_else(|e| { + error!(self.log, "Could not send message to the sync service"; + "error" => %e) + }); } /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 9d83c5576..36b799c8d 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -82,10 +82,11 @@ impl Processor { } fn send_to_sync(&mut self, message: SyncMessage) { - self.sync_send.send(message).unwrap_or_else(|_| { + self.sync_send.send(message).unwrap_or_else(|e| { warn!( self.log, "Could not send message to the sync service"; + "error" => %e, ) }); } @@ -691,9 +692,10 @@ impl HandlerNetworkContext { /// Sends a message to the network task. fn inform_network(&mut self, msg: NetworkMessage) { + let msg_r = &format!("{:?}", msg); self.network_send .send(msg) - .unwrap_or_else(|_| warn!(self.log, "Could not send message to the network service")) + .unwrap_or_else(|e| warn!(self.log, "Could not send message to the network service"; "error" => %e, "message" => msg_r)) } /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f14794447..a2f479292 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -29,9 +29,9 @@ //! //! Block Lookup //! -//! To keep the logic maintained to the syncing thread (and manage the request_ids), when a block needs to be searched for (i.e -//! if an attestation references an unknown block) this manager can search for the block and -//! subsequently search for parents if needed. +//! To keep the logic maintained to the syncing thread (and manage the request_ids), when a block +//! needs to be searched for (i.e if an attestation references an unknown block) this manager can +//! search for the block and subsequently search for parents if needed. use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; @@ -106,7 +106,6 @@ pub enum SyncMessage { BatchProcessed { chain_id: ChainId, epoch: Epoch, - downloaded_blocks: Vec>, result: BatchProcessResult, }, @@ -123,12 +122,10 @@ pub enum SyncMessage { // TODO: When correct batch error handling occurs, we will include an error type. #[derive(Debug)] pub enum BatchProcessResult { - /// The batch was completed successfully. - Success, - /// The batch processing failed. - Failed, - /// The batch processing failed but managed to import at least one block. - Partial, + /// The batch was completed successfully. It carries whether the sent batch contained blocks. + Success(bool), + /// The batch processing failed. It carries whether the processing imported any block. + Failed(bool), } /// Maintains a sequential list of parents to lookup and the lookup's current state. @@ -275,9 +272,9 @@ impl SyncManager { match local_peer_info.peer_sync_type(&remote) { PeerSyncType::FullySynced => { trace!(self.log, "Peer synced to our head found"; - "peer" => format!("{:?}", peer_id), - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local_peer_info.head_slot, + "peer" => %peer_id, + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local_peer_info.head_slot, ); self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added @@ -285,11 +282,11 @@ impl SyncManager { } PeerSyncType::Advanced => { trace!(self.log, "Useful peer for sync found"; - "peer" => format!("{:?}", peer_id), - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local_peer_info.head_slot, - "peer_finalized_epoch" => remote.finalized_epoch, - "local_finalized_epoch" => local_peer_info.finalized_epoch, + "peer" => %peer_id, + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local_peer_info.head_slot, + "peer_finalized_epoch" => remote.finalized_epoch, + "local_finalized_epoch" => local_peer_info.finalized_epoch, ); // There are few cases to handle here: @@ -908,14 +905,12 @@ impl SyncManager { SyncMessage::BatchProcessed { chain_id, epoch, - downloaded_blocks, result, } => { self.range_sync.handle_block_process_result( &mut self.network, chain_id, epoch, - downloaded_blocks, result, ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 83f4cb64e..715344eb1 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,11 +1,14 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +use super::range_sync::{BatchId, ChainId}; +use super::RequestId as SyncRequestId; use crate::router::processor::status_message; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request}; +use fnv::FnvHashMap; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -21,7 +24,11 @@ pub struct SyncNetworkContext { network_globals: Arc>, /// A sequential ID for all RPC requests. - request_id: usize, + request_id: SyncRequestId, + + /// BlocksByRange requests made by range syncing chains. + range_requests: FnvHashMap, + /// Logger for the `SyncNetworkContext`. log: slog::Logger, } @@ -36,6 +43,7 @@ impl SyncNetworkContext { network_send, network_globals, request_id: 1, + range_requests: FnvHashMap::default(), log, } } @@ -50,24 +58,26 @@ impl SyncNetworkContext { .unwrap_or_default() } - pub fn status_peer( + pub fn status_peers( &mut self, chain: Arc>, - peer_id: PeerId, + peers: impl Iterator, ) { if let Some(status_message) = status_message(&chain) { - debug!( - self.log, - "Sending Status Request"; - "peer" => format!("{:?}", peer_id), - "fork_digest" => format!("{:?}", status_message.fork_digest), - "finalized_root" => format!("{:?}", status_message.finalized_root), - "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), - "head_root" => format!("{}", status_message.head_root), - "head_slot" => format!("{}", status_message.head_slot), - ); + for peer_id in peers { + debug!( + self.log, + "Sending Status Request"; + "peer" => %peer_id, + "fork_digest" => ?status_message.fork_digest, + "finalized_root" => ?status_message.finalized_root, + "finalized_epoch" => ?status_message.finalized_epoch, + "head_root" => %status_message.head_root, + "head_slot" => %status_message.head_slot, + ); - let _ = self.send_rpc_request(peer_id, Request::Status(status_message)); + let _ = self.send_rpc_request(peer_id, Request::Status(status_message.clone())); + } } } @@ -75,15 +85,34 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: BlocksByRangeRequest, - ) -> Result { + chain_id: ChainId, + batch_id: BatchId, + ) -> Result<(), &'static str> { trace!( self.log, "Sending BlocksByRange Request"; "method" => "BlocksByRange", "count" => request.count, - "peer" => format!("{:?}", peer_id) + "peer" => %peer_id, ); - self.send_rpc_request(peer_id, Request::BlocksByRange(request)) + let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?; + self.range_requests.insert(req_id, (chain_id, batch_id)); + Ok(()) + } + + pub fn blocks_by_range_response( + &mut self, + request_id: usize, + remove: bool, + ) -> Option<(ChainId, BatchId)> { + // NOTE: we can't guarantee that the request must be registered as it could receive more + // than an error, and be removed after receiving the first one. + // FIXME: https://github.com/sigp/lighthouse/issues/1634 + if remove { + self.range_requests.remove(&request_id) + } else { + self.range_requests.get(&request_id).cloned() + } } pub fn blocks_by_root_request( diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 1fa312c57..532dafd2e 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,35 +1,274 @@ -use super::chain::EPOCHS_PER_BATCH; -use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; -use fnv::FnvHashMap; use ssz::Encode; -use std::cmp::min; -use std::cmp::Ordering; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; -/// A collection of sequential blocks that are requested from peers in a single RPC request. -#[derive(PartialEq, Debug)] -pub struct Batch { - /// The requested start epoch of the batch. - pub start_epoch: Epoch, - /// The requested end slot of batch, exclusive. - pub end_slot: Slot, - /// The `Attempts` that have been made to send us this batch. - pub attempts: Vec, - /// The peer that is currently assigned to the batch. - pub current_peer: PeerId, - /// The number of retries this batch has undergone due to a failed request. - /// This occurs when peers do not respond or we get an RPC error. - pub retries: u8, - /// The number of times this batch has attempted to be re-downloaded and re-processed. This - /// occurs when a batch has been received but cannot be processed. - pub reprocess_retries: u8, - /// The blocks that have been downloaded. - pub downloaded_blocks: Vec>, +/// The number of times to retry a batch before it is considered failed. +const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; + +/// A segment of a chain. +pub struct BatchInfo { + /// Start slot of the batch. + start_slot: Slot, + /// End slot of the batch. + end_slot: Slot, + /// The `Attempts` that have been made and failed to send us this batch. + failed_processing_attempts: Vec, + /// The number of download retries this batch has undergone due to a failed request. + failed_download_attempts: Vec, + /// State of the batch. + state: BatchState, +} + +/// Current state of a batch +pub enum BatchState { + /// The batch has failed either downloading or processing, but can be requested again. + AwaitingDownload, + /// The batch is being downloaded. + Downloading(PeerId, Vec>), + /// The batch has been completely downloaded and is ready for processing. + AwaitingProcessing(PeerId, Vec>), + /// The batch is being processed. + Processing(Attempt), + /// The batch was successfully processed and is waiting to be validated. + /// + /// It is not sufficient to process a batch successfully to consider it correct. This is + /// because batches could be erroneously empty, or incomplete. Therefore, a batch is considered + /// valid, only if the next sequential batch imports at least a block. + AwaitingValidation(Attempt), + /// Intermediate state for inner state handling. + Poisoned, + /// The batch has maxed out the allowed attempts for either downloading or processing. It + /// cannot be recovered. + Failed, +} + +impl BatchState { + /// Helper function for poisoning a state. + pub fn poison(&mut self) -> BatchState { + std::mem::replace(self, BatchState::Poisoned) + } +} + +impl BatchInfo { + /// Batches are downloaded excluding the first block of the epoch assuming it has already been + /// downloaded. + /// + /// For example: + /// + /// Epoch boundary | | + /// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | + /// Batch 1 | Batch 2 | Batch 3 + pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self { + let start_slot = start_epoch.start_slot(T::slots_per_epoch()) + 1; + let end_slot = start_slot + num_of_epochs * T::slots_per_epoch(); + BatchInfo { + start_slot, + end_slot, + failed_processing_attempts: Vec::new(), + failed_download_attempts: Vec::new(), + state: BatchState::AwaitingDownload, + } + } + + /// Gives a list of peers from which this batch has had a failed download or processing + /// attempt. + pub fn failed_peers(&self) -> HashSet { + let mut peers = HashSet::with_capacity( + self.failed_processing_attempts.len() + self.failed_download_attempts.len(), + ); + + for attempt in &self.failed_processing_attempts { + peers.insert(attempt.peer_id.clone()); + } + + for download in &self.failed_download_attempts { + peers.insert(download.clone()); + } + + peers + } + + pub fn current_peer(&self) -> Option<&PeerId> { + match &self.state { + BatchState::AwaitingDownload | BatchState::Failed => None, + BatchState::Downloading(peer_id, _) + | BatchState::AwaitingProcessing(peer_id, _) + | BatchState::Processing(Attempt { peer_id, .. }) + | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(&peer_id), + BatchState::Poisoned => unreachable!("Poisoned batch"), + } + } + + pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { + BlocksByRangeRequest { + start_slot: self.start_slot.into(), + count: self.end_slot.sub(self.start_slot).into(), + step: 1, + } + } + + pub fn state(&self) -> &BatchState { + &self.state + } + + pub fn attempts(&self) -> &[Attempt] { + &self.failed_processing_attempts + } + + /// Adds a block to a downloading batch. + pub fn add_block(&mut self, block: SignedBeaconBlock) { + match self.state.poison() { + BatchState::Downloading(peer, mut blocks) => { + blocks.push(block); + self.state = BatchState::Downloading(peer, blocks) + } + other => unreachable!("Add block for batch in wrong state: {:?}", other), + } + } + + /// Marks the batch as ready to be processed if the blocks are in the range. The number of + /// received blocks is returned, or the wrong batch end on failure + #[must_use = "Batch may have failed"] + pub fn download_completed( + &mut self, + ) -> Result< + usize, /* Received blocks */ + ( + Slot, /* expected slot */ + Slot, /* received slot */ + &BatchState, + ), + > { + match self.state.poison() { + BatchState::Downloading(peer, blocks) => { + // verify that blocks are in range + if let Some(last_slot) = blocks.last().map(|b| b.slot()) { + // the batch is non-empty + let first_slot = blocks[0].slot(); + + let failed_range = if first_slot < self.start_slot { + Some((self.start_slot, first_slot)) + } else if self.end_slot < last_slot { + Some((self.end_slot, last_slot)) + } else { + None + }; + + if let Some(range) = failed_range { + // this is a failed download, register the attempt and check if the batch + // can be tried again + self.failed_download_attempts.push(peer); + self.state = if self.failed_download_attempts.len() + >= MAX_BATCH_DOWNLOAD_ATTEMPTS as usize + { + BatchState::Failed + } else { + // drop the blocks + BatchState::AwaitingDownload + }; + return Err((range.0, range.1, &self.state)); + } + } + + let received = blocks.len(); + self.state = BatchState::AwaitingProcessing(peer, blocks); + Ok(received) + } + other => unreachable!("Download completed for batch in wrong state: {:?}", other), + } + } + + #[must_use = "Batch may have failed"] + pub fn download_failed(&mut self) -> &BatchState { + match self.state.poison() { + BatchState::Downloading(peer, _) => { + // register the attempt and check if the batch can be tried again + self.failed_download_attempts.push(peer); + self.state = if self.failed_download_attempts.len() + >= MAX_BATCH_DOWNLOAD_ATTEMPTS as usize + { + BatchState::Failed + } else { + // drop the blocks + BatchState::AwaitingDownload + }; + &self.state + } + other => unreachable!("Download failed for batch in wrong state: {:?}", other), + } + } + + pub fn start_downloading_from_peer(&mut self, peer: PeerId) { + match self.state.poison() { + BatchState::AwaitingDownload => { + self.state = BatchState::Downloading(peer, Vec::new()); + } + other => unreachable!("Starting download for batch in wrong state: {:?}", other), + } + } + + pub fn start_processing(&mut self) -> Vec> { + match self.state.poison() { + BatchState::AwaitingProcessing(peer, blocks) => { + self.state = BatchState::Processing(Attempt::new(peer, &blocks)); + blocks + } + other => unreachable!("Start processing for batch in wrong state: {:?}", other), + } + } + + #[must_use = "Batch may have failed"] + pub fn processing_completed(&mut self, was_sucessful: bool) -> &BatchState { + match self.state.poison() { + BatchState::Processing(attempt) => { + self.state = if !was_sucessful { + // register the failed attempt + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= MAX_BATCH_PROCESSING_ATTEMPTS as usize + { + BatchState::Failed + } else { + BatchState::AwaitingDownload + } + } else { + BatchState::AwaitingValidation(attempt) + }; + &self.state + } + other => unreachable!("Processing completed for batch in wrong state: {:?}", other), + } + } + + #[must_use = "Batch may have failed"] + pub fn validation_failed(&mut self) -> &BatchState { + match self.state.poison() { + BatchState::AwaitingValidation(attempt) => { + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + self.state = if self.failed_processing_attempts.len() + >= MAX_BATCH_PROCESSING_ATTEMPTS as usize + { + BatchState::Failed + } else { + BatchState::AwaitingDownload + }; + &self.state + } + other => unreachable!("Validation failed for batch in wrong state: {:?}", other), + } + } } /// Represents a peer's attempt and providing the result for this batch. @@ -43,131 +282,61 @@ pub struct Attempt { pub hash: u64, } -impl Eq for Batch {} - -impl Batch { - pub fn new(start_epoch: Epoch, end_slot: Slot, peer_id: PeerId) -> Self { - Batch { - start_epoch, - end_slot, - attempts: Vec::new(), - current_peer: peer_id, - retries: 0, - reprocess_retries: 0, - downloaded_blocks: Vec::new(), - } - } - - pub fn start_slot(&self) -> Slot { - // batches are shifted by 1 - self.start_epoch.start_slot(T::slots_per_epoch()) + 1 - } - - pub fn end_slot(&self) -> Slot { - self.end_slot - } - pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { - let start_slot = self.start_slot(); - BlocksByRangeRequest { - start_slot: start_slot.into(), - count: min( - T::slots_per_epoch() * EPOCHS_PER_BATCH, - self.end_slot.sub(start_slot).into(), - ), - step: 1, - } - } - - /// This gets a hash that represents the blocks currently downloaded. This allows comparing a - /// previously downloaded batch of blocks with a new downloaded batch of blocks. - pub fn hash(&self) -> u64 { - // the hash used is the ssz-encoded list of blocks +impl Attempt { + #[allow(clippy::ptr_arg)] + fn new(peer_id: PeerId, blocks: &Vec>) -> Self { let mut hasher = std::collections::hash_map::DefaultHasher::new(); - self.downloaded_blocks.as_ssz_bytes().hash(&mut hasher); - hasher.finish() + blocks.as_ssz_bytes().hash(&mut hasher); + let hash = hasher.finish(); + Attempt { peer_id, hash } } } -impl Ord for Batch { - fn cmp(&self, other: &Self) -> Ordering { - self.start_epoch.cmp(&other.start_epoch) +impl slog::KV for &mut BatchInfo { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::KV::serialize(*self, record, serializer) } } -impl PartialOrd for Batch { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +impl slog::KV for BatchInfo { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + use slog::Value; + Value::serialize(&self.start_slot, record, "start_slot", serializer)?; + Value::serialize( + &(self.end_slot - 1), // NOTE: The -1 shows inclusive blocks + record, + "end_slot", + serializer, + )?; + serializer.emit_usize("downloaded", self.failed_download_attempts.len())?; + serializer.emit_usize("processed", self.failed_processing_attempts.len())?; + serializer.emit_str("state", &format!("{:?}", self.state))?; + slog::Result::Ok(()) } } -/// A structure that contains a mapping of pending batch requests, that also keeps track of which -/// peers are currently making batch requests. -/// -/// This is used to optimise searches for idle peers (peers that have no outbound batch requests). -pub struct PendingBatches { - /// The current pending batches. - batches: FnvHashMap>, - /// A mapping of peers to the number of pending requests. - peer_requests: HashMap>, -} - -impl PendingBatches { - pub fn new() -> Self { - PendingBatches { - batches: FnvHashMap::default(), - peer_requests: HashMap::new(), - } - } - - pub fn insert(&mut self, request_id: usize, batch: Batch) -> Option> { - let peer_request = batch.current_peer.clone(); - self.peer_requests - .entry(peer_request) - .or_insert_with(HashSet::new) - .insert(request_id); - self.batches.insert(request_id, batch) - } - - pub fn remove(&mut self, request_id: usize) -> Option> { - if let Some(batch) = self.batches.remove(&request_id) { - if let Entry::Occupied(mut entry) = self.peer_requests.entry(batch.current_peer.clone()) - { - entry.get_mut().remove(&request_id); - - if entry.get().is_empty() { - entry.remove(); - } +impl std::fmt::Debug for BatchState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BatchState::Processing(_) => f.write_str("Processing"), + BatchState::AwaitingValidation(_) => f.write_str("AwaitingValidation"), + BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), + BatchState::Failed => f.write_str("Failed"), + BatchState::AwaitingProcessing(ref peer, ref blocks) => { + write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - Some(batch) - } else { - None + BatchState::Downloading(peer, blocks) => { + write!(f, "Downloading({}, {} blocks)", peer, blocks.len()) + } + BatchState::Poisoned => f.write_str("Poisoned"), } } - - /// The number of current pending batch requests. - pub fn len(&self) -> usize { - self.batches.len() - } - - /// Adds a block to the batches if the request id exists. Returns None if there is no batch - /// matching the request id. - pub fn add_block(&mut self, request_id: usize, block: SignedBeaconBlock) -> Option<()> { - let batch = self.batches.get_mut(&request_id)?; - batch.downloaded_blocks.push(block); - Some(()) - } - - /// Returns true if there the peer does not exist in the peer_requests mapping. Indicating it - /// has no pending outgoing requests. - pub fn peer_is_idle(&self, peer_id: &PeerId) -> bool { - self.peer_requests.get(peer_id).is_none() - } - - /// Removes a batch for a given peer. - pub fn remove_batch_by_peer(&mut self, peer_id: &PeerId) -> Option> { - let request_ids = self.peer_requests.get(peer_id)?; - - let request_id = *request_ids.iter().next()?; - self.remove(request_id) - } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d79dff469..4decdc212 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,40 +1,36 @@ -use super::batch::{Batch, PendingBatches}; +use super::batch::{BatchInfo, BatchState}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::sync::RequestId; use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{PeerAction, PeerId}; -use rand::prelude::*; -use slog::{crit, debug, error, warn}; -use std::collections::HashSet; +use fnv::FnvHashMap; +use rand::seq::SliceRandom; +use slog::{crit, debug, o, warn}; +use std::collections::{btree_map::Entry, BTreeMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, -/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which case the -/// responder will fill the response up to the max request size, assuming they have the bandwidth -/// to do so. -pub const EPOCHS_PER_BATCH: u64 = 2; - -/// The number of times to retry a batch before the chain is considered failed and removed. -const MAX_BATCH_RETRIES: u8 = 5; +/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which +/// case the responder will fill the response up to the max request size, assuming they have the +/// bandwidth to do so. +pub const EPOCHS_PER_BATCH: u64 = 8; /// The maximum number of batches to queue before requesting more. const BATCH_BUFFER_SIZE: u8 = 5; -/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed -/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will -/// be reported negatively. -const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; - /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. #[derive(PartialEq)] +#[must_use = "Should be checked, since a failed chain must be removed. A chain that requested + being removed and continued is now in an inconsistent state"] + pub enum ProcessingResult { KeepChain, RemoveChain, @@ -42,6 +38,7 @@ pub enum ProcessingResult { /// A chain identifier pub type ChainId = u64; +pub type BatchId = Epoch; /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the @@ -50,7 +47,7 @@ pub struct SyncingChain { /// A random id used to identify this chain. id: ChainId, - /// The original start slot when this chain was initialised. + /// The start of the chain segment. Any epoch previous to this one has been validated. pub start_epoch: Epoch, /// The target head slot. @@ -59,35 +56,37 @@ pub struct SyncingChain { /// The target head root. pub target_head_root: Hash256, - /// The batches that are currently awaiting a response from a peer. An RPC request for these - /// has been sent. - pub pending_batches: PendingBatches, - - /// The batches that have been downloaded and are awaiting processing and/or validation. - completed_batches: Vec>, - - /// Batches that have been processed and awaiting validation before being removed. - processed_batches: Vec>, + /// Sorted map of batches undergoing some kind of processing. + batches: BTreeMap>, /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain - /// and thus available to download this chain from. - pub peer_pool: HashSet, + /// and thus available to download this chain from, as well as the batches we are currently + /// requesting. + peers: FnvHashMap>, /// Starting epoch of the next batch that needs to be downloaded. - to_be_downloaded: Epoch, + to_be_downloaded: BatchId, /// Starting epoch of the batch that needs to be processed next. /// This is incremented as the chain advances. - processing_target: Epoch, + processing_target: BatchId, + + /// Optimistic head to sync. + /// If a block is imported for this batch, the chain advances to this point. + optimistic_start: Option, + + /// When a batch for an optimistic start fails processing, it is stored to avoid trying it + /// again due to chain stopping/re-starting on chain switching. + failed_optimistic_starts: HashSet, /// The current state of the chain. pub state: ChainSyncingState, /// The current processing batch, if any. - current_processing_batch: Option>, + current_processing_batch: Option, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: Sender>, /// A reference to the underlying beacon chain. chain: Arc>, @@ -105,36 +104,85 @@ pub enum ChainSyncingState { } impl SyncingChain { + pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + (target_root, target_slot).hash(&mut hasher); + hasher.finish() + } + #[allow(clippy::too_many_arguments)] pub fn new( - id: u64, start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: Sender>, chain: Arc>, - log: slog::Logger, + log: &slog::Logger, ) -> Self { - let mut peer_pool = HashSet::new(); - peer_pool.insert(peer_id); + let mut peers = FnvHashMap::default(); + peers.insert(peer_id, Default::default()); + + let id = SyncingChain::::id(&target_head_root, &target_head_slot); SyncingChain { id, start_epoch, target_head_slot, target_head_root, - pending_batches: PendingBatches::new(), - completed_batches: Vec::new(), - processed_batches: Vec::new(), - peer_pool, + batches: BTreeMap::new(), + peers, to_be_downloaded: start_epoch, processing_target: start_epoch, + optimistic_start: None, + failed_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, beacon_processor_send, chain, - log, + log: log.new(o!("chain" => id)), + } + } + + /// Check if the chain has peers from which to process batches. + pub fn available_peers(&self) -> usize { + self.peers.len() + } + + /// Get the chain's id. + pub fn get_id(&self) -> ChainId { + self.id + } + + /// Removes a peer from the chain. + /// If the peer has active batches, those are considered failed and re-requested. + pub fn remove_peer( + &mut self, + peer_id: &PeerId, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + if let Some(batch_ids) = self.peers.remove(peer_id) { + // fail the batches + for id in batch_ids { + if let BatchState::Failed = self + .batches + .get_mut(&id) + .expect("registered batch exists") + .download_failed() + { + return ProcessingResult::RemoveChain; + } + if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { + // drop the chain early + return ProcessingResult::RemoveChain; + } + } + } + + if self.peers.is_empty() { + ProcessingResult::RemoveChain + } else { + ProcessingResult::KeepChain } } @@ -146,127 +194,213 @@ impl SyncingChain { .start_slot(T::EthSpec::slots_per_epoch()) } - /// A batch of blocks has been received. This function gets run on all chains and should - /// return Some if the request id matches a pending request on this chain, or None if it does - /// not. - /// - /// If the request corresponds to a pending batch, this function processes the completed - /// batch. + /// A block has been received for a batch on this chain. + /// If the block correctly completes the batch it will be processed if possible. pub fn on_block_response( &mut self, network: &mut SyncNetworkContext, - request_id: RequestId, - beacon_block: &Option>, - ) -> Option<()> { + batch_id: BatchId, + peer_id: PeerId, + beacon_block: Option>, + ) -> ProcessingResult { + // check if we have this batch + let batch = match self.batches.get_mut(&batch_id) { + None => { + debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id); + // A batch might get removed when the chain advances, so this is non fatal. + return ProcessingResult::KeepChain; + } + Some(batch) => { + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed from the chain for other + // reasons. Check that this block belongs to the expected peer + if Some(&peer_id) != batch.current_peer() { + return ProcessingResult::KeepChain; + } + batch + } + }; + if let Some(block) = beacon_block { // This is not a stream termination, simply add the block to the request - self.pending_batches.add_block(request_id, block.clone()) + batch.add_block(block); + ProcessingResult::KeepChain } else { // A stream termination has been sent. This batch has ended. Process a completed batch. - let batch = self.pending_batches.remove(request_id)?; - self.handle_completed_batch(network, batch); - Some(()) - } - } + // Remove the request from the peer's active batches + let peer = batch + .current_peer() + .expect("Batch is downloading from a peer"); + self.peers + .get_mut(peer) + .unwrap_or_else(|| panic!("Batch is registered for the peer")) + .remove(&batch_id); - /// A completed batch has been received, process the batch. - /// This will return `ProcessingResult::KeepChain` if the chain has not completed or - /// failed indicating that further batches are required. - fn handle_completed_batch( - &mut self, - network: &mut SyncNetworkContext, - batch: Batch, - ) { - // An entire batch of blocks has been received. This functions checks to see if it can be processed, - // remove any batches waiting to be verified and if this chain is syncing, request new - // blocks for the peer. - debug!(self.log, "Completed batch received"; "epoch" => batch.start_epoch, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); + match batch.download_completed() { + Ok(received) => { + let awaiting_batches = batch_id.saturating_sub( + self.optimistic_start + .unwrap_or_else(|| self.processing_target), + ) / EPOCHS_PER_BATCH; + debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches); - // verify the range of received blocks - // Note that the order of blocks is verified in block processing - if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot()) { - // the batch is non-empty - let first_slot = batch.downloaded_blocks[0].slot(); - if batch.start_slot() > first_slot || batch.end_slot() < last_slot { - warn!(self.log, "BlocksByRange response returned out of range blocks"; - "response_initial_slot" => first_slot, - "requested_initial_slot" => batch.start_slot()); - // this batch can't be used, so we need to request it again. - self.failed_batch(network, batch); - return; + // pre-emptively request more blocks from peers whilst we process current blocks, + if let ProcessingResult::RemoveChain = self.request_batches(network) { + return ProcessingResult::RemoveChain; + } + self.process_completed_batches(network) + } + Err((expected, received, state)) => { + warn!(self.log, "Batch received out of range blocks"; + "epoch" => batch_id, "expected" => expected, "received" => received); + if let BatchState::Failed = state { + return ProcessingResult::RemoveChain; + } + // this batch can't be used, so we need to request it again. + self.retry_batch_download(network, batch_id) + } } } - - // Add this completed batch to the list of completed batches. This list will then need to - // be checked if any batches can be processed and verified for errors or invalid responses - // from peers. The logic is simpler to create this ordered batch list and to then process - // the list. - - let insert_index = self - .completed_batches - .binary_search(&batch) - .unwrap_or_else(|index| index); - self.completed_batches.insert(insert_index, batch); - - // We have a list of completed batches. It is not sufficient to process batch successfully - // to consider the batch correct. This is because batches could be erroneously empty, or - // incomplete. Therefore, a batch is considered valid, only if the next sequential batch is - // processed successfully. Therefore the `completed_batches` will store batches that have - // already be processed but not verified and therefore have Id's less than - // `self.to_be_processed_id`. - - // pre-emptively request more blocks from peers whilst we process current blocks, - self.request_batches(network); - - // Try and process any completed batches. This will spawn a new task to process any blocks - // that are ready to be processed. - self.process_completed_batches(); } - /// Tries to process any batches if there are any available and we are not currently processing - /// other batches. - fn process_completed_batches(&mut self) { - // Only process batches if this chain is Syncing - if self.state != ChainSyncingState::Syncing { - return; + /// Sends to process the batch with the given id. + /// The batch must exist and be ready for processing + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> ProcessingResult { + // Only process batches if this chain is Syncing, and only one at a time + if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { + return ProcessingResult::KeepChain; } - // Only process one batch at a time - if self.current_processing_batch.is_some() { - return; - } + let batch = self.batches.get_mut(&batch_id).expect("Batch exists"); - // Check if there is a batch ready to be processed - if !self.completed_batches.is_empty() - && self.completed_batches[0].start_epoch == self.processing_target - { - let batch = self.completed_batches.remove(0); + // NOTE: We send empty batches to the processor in order to trigger the block processor + // result callback. This is done, because an empty batch could end a chain and the logic + // for removing chains and checking completion is in the callback. - // Note: We now send empty batches to the processor in order to trigger the block - // processor result callback. This is done, because an empty batch could end a chain - // and the logic for removing chains and checking completion is in the callback. - - // send the batch to the batch processor thread - return self.process_batch(batch); - } - } - - /// Sends a batch to the beacon processor for async processing in a queue. - fn process_batch(&mut self, mut batch: Batch) { - let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); - let process_id = ProcessId::RangeBatchId(self.id, batch.start_epoch); - self.current_processing_batch = Some(batch); + let blocks = batch.start_processing(); + let process_id = ProcessId::RangeBatchId(self.id, batch_id); + self.current_processing_batch = Some(batch_id); if let Err(e) = self .beacon_processor_send .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) { - error!( - self.log, - "Failed to send chain segment to processor"; - "msg" => "process_batch", - "error" => format!("{:?}", e) - ); + crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", + "error" => %e, "batch" => self.processing_target); + // This is unlikely to happen but it would stall syncing since the batch now has no + // blocks to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + // TODO: needs better handling + self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) + } else { + ProcessingResult::KeepChain + } + } + + /// Processes the next ready batch, prioritizing optimistic batches over the processing target. + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + // Only process batches if this chain is Syncing and only process one batch at a time + if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { + return ProcessingResult::KeepChain; + } + + // Find the id of the batch we are going to process. + // + // First try our optimistic start, if any. If this batch is ready, we process it. If the + // batch has not already been completed, check the current chain target. + let optimistic_id = if let Some(epoch) = self.optimistic_start { + if let Some(batch) = self.batches.get(&epoch) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => { + // this batch is ready + debug!(self.log, "Processing optimistic start"; "epoch" => epoch); + Some(epoch) + } + BatchState::Downloading(..) => { + // The optimistic batch is being downloaded. We wait for this before + // attempting to process other batches. + return ProcessingResult::KeepChain; + } + BatchState::Processing(_) + | BatchState::AwaitingDownload + | BatchState::Failed + | BatchState::Poisoned + | BatchState::AwaitingValidation(_) => { + // these are all inconsistent states: + // - Processing -> `self.current_processing_batch` is Some + // - Failed -> non recoverable batch. For a optimistic batch, it should + // have been removed + // - Poisoned -> this is an intermediate state that should never be reached + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - AwaitingValidation -> If an optimistic batch is successfully processed + // it is no longer considered an optimistic candidate. If the batch was + // empty the chain rejects it; if it was non empty the chain is advanced + // to this point (so that the old optimistic batch is now the processing + // target) + unreachable!( + "Optimistic batch indicates inconsistent chain state: {:?}", + state + ) + } + } + } else { + None + } + } else { + None + }; + + // if the optimistic target can't be processed, check the processing target + let id = optimistic_id.or_else(|| { + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => Some(self.processing_target), + BatchState::Downloading(..) => { + // Batch is not ready, nothing to process + None + } + BatchState::Failed + | BatchState::AwaitingDownload + | BatchState::AwaitingValidation(_) + | BatchState::Processing(_) + | BatchState::Poisoned => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have beee removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - AwaitingValidation -> self.processing_target should have been moved + // forward + // - Processing -> `self.current_processing_batch` is Some + // - Poisoned -> Intermediate state that should never be reached + unreachable!( + "Robust target batch indicates inconsistent chain state: {:?}", + state + ) + } + } + } else { + crit!(self.log, "Batch not found for current processing target"; + "epoch" => self.processing_target); + None + } + }); + + // we found a batch to process + if let Some(id) = id { + self.process_batch(network, id) + } else { + ProcessingResult::KeepChain } } @@ -275,92 +409,82 @@ impl SyncingChain { pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, - chain_id: ChainId, - batch_start_epoch: Epoch, - downloaded_blocks: &mut Option>>, + batch_id: BatchId, result: &BatchProcessResult, - ) -> Option { - if chain_id != self.id { - // the result does not belong to this chain - return None; - } + ) -> ProcessingResult { + // the first two cases are possible if the chain advances while waiting for a processing + // result match &self.current_processing_batch { - Some(current_batch) if current_batch.start_epoch != batch_start_epoch => { + Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; - "batch_epoch" => batch_start_epoch, "expected_batch_epoch" => current_batch.start_epoch); - return None; + "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); + return ProcessingResult::KeepChain; } None => { debug!(self.log, "Chain was not expecting a batch result"; - "batch_epoch" => batch_start_epoch); - return None; + "batch_epoch" => batch_id); + return ProcessingResult::KeepChain; } _ => { - // chain_id and batch_id match, continue + // batch_id matches, continue + self.current_processing_batch = None; } } - // claim the result by consuming the option - let downloaded_blocks = downloaded_blocks.take().or_else(|| { - // if taken by another chain, we are no longer waiting on a result. - self.current_processing_batch = None; - crit!(self.log, "Processed batch taken by another chain"); - None - })?; - - // No longer waiting on a processing result - let mut batch = self.current_processing_batch.take().unwrap(); - // These are the blocks of this batch - batch.downloaded_blocks = downloaded_blocks; - - // double check batches are processed in order TODO: Remove for prod - if batch.start_epoch != self.processing_target { - crit!(self.log, "Batch processed out of order"; - "processed_starting_epoch" => batch.start_epoch, - "expected_epoch" => self.processing_target); - } - - let res = match result { - BatchProcessResult::Success => { - self.processing_target += EPOCHS_PER_BATCH; - - // If the processed batch was not empty, we can validate previous invalidated - // blocks including the current batch. - if !batch.downloaded_blocks.is_empty() { - self.mark_processed_batches_as_valid(network, &batch); + match result { + BatchProcessResult::Success(was_non_empty) => { + let batch = self + .batches + .get_mut(&batch_id) + .expect("Chain was expecting a known batch"); + let _ = batch.processing_completed(true); + // If the processed batch was not empty, we can validate previous unvalidated + // blocks. + if *was_non_empty { + self.advance_chain(network, batch_id); + } else if let Some(epoch) = self.optimistic_start { + // check if this batch corresponds to an optimistic batch. In this case, we + // reject it as an optimistic candidate since the batch was empty + if epoch == batch_id { + if let ProcessingResult::RemoveChain = self.reject_optimistic_batch( + network, + false, /* do not re-request */ + "batch was empty", + ) { + return ProcessingResult::RemoveChain; + }; + } } - // Add the current batch to processed batches to be verified in the future. - self.processed_batches.push(batch); + self.processing_target += EPOCHS_PER_BATCH; // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { // chain is completed + debug!(self.log, "Chain is complete"); ProcessingResult::RemoveChain } else { // chain is not completed - // attempt to request more batches - self.request_batches(network); - + if let ProcessingResult::RemoveChain = self.request_batches(network) { + return ProcessingResult::RemoveChain; + } // attempt to process more batches - self.process_completed_batches(); - - // keep the chain - ProcessingResult::KeepChain + self.process_completed_batches(network) } } - BatchProcessResult::Partial => { - warn!(self.log, "Batch processing failed but at least one block was imported"; - "batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string() - ); - // At least one block was successfully verified and imported, so we can be sure all - // previous batches are valid and we only need to download the current failed - // batch. - self.mark_processed_batches_as_valid(network, &batch); - - // check that we have not exceeded the re-process retry counter - if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { + BatchProcessResult::Failed(imported_blocks) => { + let batch = self + .batches + .get_mut(&batch_id) + .expect("Chain was expecting a known batch"); + let peer = batch + .current_peer() + .expect("batch is processing blocks from a peer"); + debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, + "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); + if let BatchState::Failed = batch.processing_completed(false) { + // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and @@ -370,109 +494,156 @@ impl SyncingChain { let action = PeerAction::LowToleranceError; warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; "score_adjustment" => action.to_string(), - "batch_epoch"=> batch.start_epoch); - for peer_id in self.peer_pool.drain() { - network.report_peer(peer_id, action); + "batch_epoch"=> batch_id); + for (peer, _) in self.peers.drain() { + network.report_peer(peer, action); } ProcessingResult::RemoveChain } else { - // Handle this invalid batch, that is within the re-process retries limit. - self.handle_invalid_batch(network, batch); - ProcessingResult::KeepChain - } - } - BatchProcessResult::Failed => { - debug!(self.log, "Batch processing failed"; - "batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string()); - // The batch processing failed - // This could be because this batch is invalid, or a previous invalidated batch - // is invalid. We need to find out which and downvote the peer that has sent us - // an invalid batch. - - // check that we have not exceeded the re-process retry counter - if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { - // If a batch has exceeded the invalid batch lookup attempts limit, it means - // that it is likely all peers in this chain are are sending invalid batches - // repeatedly and are either malicious or faulty. We drop the chain and - // downvote all peers. - let action = PeerAction::LowToleranceError; - warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; - "score_adjustment" => action.to_string(), - "batch_epoch" => batch.start_epoch); - for peer_id in self.peer_pool.drain() { - network.report_peer(peer_id, action); + // chain can continue. Check if it can be moved forward + if *imported_blocks { + // At least one block was successfully verified and imported, so we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance_chain(network, batch_id); } - ProcessingResult::RemoveChain - } else { // Handle this invalid batch, that is within the re-process retries limit. - self.handle_invalid_batch(network, batch); - ProcessingResult::KeepChain - } - } - }; - - Some(res) - } - - /// Removes any batches awaiting validation. - /// - /// All blocks in `processed_batches` should be prior batches. As the `last_batch` has been - /// processed with blocks in it, all previous batches are valid. - /// - /// If a previous batch has been validated and it had been re-processed, downvote - /// the original peer. - fn mark_processed_batches_as_valid( - &mut self, - network: &mut SyncNetworkContext, - last_batch: &Batch, - ) { - while !self.processed_batches.is_empty() { - let mut processed_batch = self.processed_batches.remove(0); - if processed_batch.start_epoch >= last_batch.start_epoch { - crit!(self.log, "A processed batch had a greater id than the current process id"; - "processed_start_epoch" => processed_batch.start_epoch, - "current_start_epoch" => last_batch.start_epoch); - } - - // Go through passed attempts and downscore peers that returned invalid batches - while !processed_batch.attempts.is_empty() { - let attempt = processed_batch.attempts.remove(0); - // The validated batch has been re-processed - if attempt.hash != processed_batch.hash() { - // The re-downloaded version was different - if processed_batch.current_peer != attempt.peer_id { - // A different peer sent the correct batch, the previous peer did not - // We negatively score the original peer. - let action = PeerAction::LowToleranceError; - debug!( - self.log, "Re-processed batch validated. Scoring original peer"; - "batch_epoch" => processed_batch.start_epoch, - "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",attempt.peer_id), - "new_peer" => format!("{}", processed_batch.current_peer) - ); - network.report_peer(attempt.peer_id, action); - } else { - // The same peer corrected it's previous mistake. There was an error, so we - // negative score the original peer. - let action = PeerAction::MidToleranceError; - debug!( - self.log, "Re-processed batch validated by the same peer."; - "batch_epoch" => processed_batch.start_epoch, - "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",attempt.peer_id), - "new_peer" => format!("{}", processed_batch.current_peer) - ); - network.report_peer(attempt.peer_id, action); - } + self.handle_invalid_batch(network, batch_id) } } } } - /// An invalid batch has been received that could not be processed. + fn reject_optimistic_batch( + &mut self, + network: &mut SyncNetworkContext, + redownload: bool, + reason: &str, + ) -> ProcessingResult { + if let Some(epoch) = self.optimistic_start { + self.optimistic_start = None; + self.failed_optimistic_starts.insert(epoch); + // if this batch is inside the current processing range, keep it, otherwise drop + // it. NOTE: this is done to prevent non-sequential batches coming from optimistic + // starts from filling up the buffer size + if epoch < self.to_be_downloaded { + debug!(self.log, "Rejected optimistic batch left for future use"; "epoch" => %epoch, "reason" => reason); + // this batch is now treated as any other batch, and re-requested for future use + if redownload { + return self.retry_batch_download(network, epoch); + } + } else { + debug!(self.log, "Rejected optimistic batch"; "epoch" => %epoch, "reason" => reason); + self.batches.remove(&epoch); + } + } + + ProcessingResult::KeepChain + } + + /// Removes any batches previous to the given `validating_epoch` and updates the current + /// boundaries of the chain. /// - /// These events occur when a peer as successfully responded with blocks, but the blocks we + /// The `validating_epoch` must align with batch boundaries. + /// + /// If a previous batch has been validated and it had been re-processed, penalize the original + /// peer. + fn advance_chain( + &mut self, + network: &mut SyncNetworkContext, + validating_epoch: Epoch, + ) { + // make sure this epoch produces an advancement + if validating_epoch <= self.start_epoch { + return; + } + + // safety check for batch boundaries + if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH { + crit!(self.log, "Validating Epoch is not aligned"); + } + + // batches in the range [BatchId, ..) (not yet validated) + let remaining_batches = self.batches.split_off(&validating_epoch); + // batches less than `validating_epoch` + let removed_batches = std::mem::replace(&mut self.batches, remaining_batches); + + for (id, batch) in removed_batches.into_iter() { + // only for batches awaiting validation can we be sure the last attempt is + // right, and thus, that any different attempt is wrong + match batch.state() { + BatchState::AwaitingValidation(ref processed_attempt) => { + for attempt in batch.attempts() { + // The validated batch has been re-processed + if attempt.hash != processed_attempt.hash { + // The re-downloaded version was different + if processed_attempt.peer_id != attempt.peer_id { + // A different peer sent the correct batch, the previous peer did not + // We negatively score the original peer. + let action = PeerAction::LowToleranceError; + debug!(self.log, "Re-processed batch validated. Scoring original peer"; + "batch_epoch" => id, "score_adjustment" => %action, + "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id + ); + network.report_peer(attempt.peer_id.clone(), action); + } else { + // The same peer corrected it's previous mistake. There was an error, so we + // negative score the original peer. + let action = PeerAction::MidToleranceError; + debug!(self.log, "Re-processed batch validated by the same peer"; + "batch_epoch" => id, "score_adjustment" => %action, + "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id + ); + network.report_peer(attempt.peer_id.clone(), action); + } + } + } + } + BatchState::Downloading(peer, ..) => { + // remove this batch from the peer's active requests + if let Some(active_batches) = self.peers.get_mut(peer) { + active_batches.remove(&id); + } + } + BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { + unreachable!("batch indicates inconsistent chain state while advancing chain") + } + BatchState::AwaitingProcessing(..) => { + // TODO: can we be sure the old attempts are wrong? + } + BatchState::Processing(_) => { + assert_eq!( + id, + self.current_processing_batch.expect( + "A batch in a processing state means the chain is processing it" + ) + ); + self.current_processing_batch = None; + } + } + } + + self.processing_target = self.processing_target.max(validating_epoch); + let old_start = self.start_epoch; + self.start_epoch = validating_epoch; + self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch); + if self.batches.contains_key(&self.to_be_downloaded) { + // if a chain is advanced by Range beyond the previous `seld.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded += EPOCHS_PER_BATCH; + } + if let Some(epoch) = self.optimistic_start { + if epoch <= validating_epoch { + self.optimistic_start = None; + } + } + debug!(self.log, "Chain advanced"; "previous_start" => old_start, + "new_start" => self.start_epoch, "processing_target" => self.processing_target); + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with blocks, but the blocks we /// have received are incorrect or invalid. This indicates the peer has not performed as /// intended and can result in downvoting a peer. // TODO: Batches could have been partially downloaded due to RPC size-limit restrictions. We @@ -481,10 +652,10 @@ impl SyncingChain { fn handle_invalid_batch( &mut self, network: &mut SyncNetworkContext, - batch: Batch, - ) { + batch_id: BatchId, + ) -> ProcessingResult { // The current batch could not be processed, indicating either the current or previous - // batches are invalid + // batches are invalid. // The previous batch could be incomplete due to the block sizes being too large to fit in // a single RPC request or there could be consecutive empty batches which are not supposed @@ -494,70 +665,51 @@ impl SyncingChain { // potentially be faulty. If a batch returns a different result than the original and // results in successful processing, we downvote the original peer that sent us the batch. - // If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS - // times before considering the entire chain invalid and downvoting all peers. + if let Some(epoch) = self.optimistic_start { + // If this batch is an optimistic batch, we reject this epoch as an optimistic + // candidate and try to re download it + if epoch == batch_id { + if let ProcessingResult::RemoveChain = + self.reject_optimistic_batch(network, true, "batch was invalid") + { + return ProcessingResult::RemoveChain; + } else { + // since this is the optimistic batch, we can't consider previous batches as + // invalid. + return ProcessingResult::KeepChain; + } + } + } + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); - // Find any pre-processed batches awaiting validation - while !self.processed_batches.is_empty() { - let past_batch = self.processed_batches.remove(0); - self.processing_target = std::cmp::min(self.processing_target, past_batch.start_epoch); - self.reprocess_batch(network, past_batch); + for (id, batch) in self.batches.range_mut(..batch_id) { + if let BatchState::Failed = batch.validation_failed() { + // remove the chain early + return ProcessingResult::RemoveChain; + } + redownload_queue.push(*id); } - // re-process the current batch - self.reprocess_batch(network, batch); - } + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.start_epoch; - /// This re-downloads and marks the batch as being re-processed. - /// - /// If the re-downloaded batch is different to the original and can be processed, the original - /// peer will be downvoted. - fn reprocess_batch( - &mut self, - network: &mut SyncNetworkContext, - mut batch: Batch, - ) { - // marks the batch as attempting to be reprocessed by hashing the downloaded blocks - let attempt = super::batch::Attempt { - peer_id: batch.current_peer.clone(), - hash: batch.hash(), - }; - - // add this attempt to the batch - batch.attempts.push(attempt); - - // remove previously downloaded blocks - batch.downloaded_blocks.clear(); - - // increment the re-process counter - batch.reprocess_retries += 1; - - // attempt to find another peer to download the batch from (this potentially doubles up - // requests on a single peer) - let current_peer = &batch.current_peer; - let new_peer = self - .peer_pool - .iter() - .find(|peer| *peer != current_peer) - .unwrap_or_else(|| current_peer); - - batch.current_peer = new_peer.clone(); - - debug!(self.log, "Re-requesting batch"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "batch_epoch" => batch.start_epoch, - "peer" => batch.current_peer.to_string(), - "retries" => batch.retries, - "re-processes" => batch.reprocess_retries); - self.send_batch(network, batch); + for id in redownload_queue { + if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { + return ProcessingResult::RemoveChain; + } + } + // finally, re-request the failed batch. + self.retry_batch_download(network, batch_id) } pub fn stop_syncing(&mut self) { self.state = ChainSyncingState::Stopped; } - // Either a new chain, or an old one with a peer list + /// Either a new chain, or an old one with a peer list /// This chain has been requested to start syncing. /// /// This could be new chain, or an old chain that is being resumed. @@ -565,127 +717,181 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, local_finalized_epoch: Epoch, - ) { - // A local finalized slot is provided as other chains may have made - // progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to - // accommodate potentially downloaded batches from other chains. Also prune any old batches - // awaiting processing + optimistic_start_epoch: Epoch, + ) -> ProcessingResult { + // to avoid dropping local progress, we advance the chain wrt its batch boundaries. This + let align = |epoch| { + // start_epoch + (number of batches in between)*length_of_batch + self.start_epoch + ((epoch - self.start_epoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH + }; + // get the *aligned* epoch that produces a batch containing the `local_finalized_epoch` + let validating_epoch = align(local_finalized_epoch); + // align the optimistic_start too. + let optimistic_epoch = align(optimistic_start_epoch); - // If the local finalized epoch is ahead of our current processed chain, update the chain - // to start from this point and re-index all subsequent batches starting from one - // (effectively creating a new chain). - - let local_finalized_slot = local_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let current_processed_slot = self.current_processed_slot(); - - if local_finalized_slot > current_processed_slot { - // Advance the chain to account for already downloaded blocks. - self.start_epoch = local_finalized_epoch; - - debug!(self.log, "Updating chain's progress"; - "prev_completed_slot" => current_processed_slot, - "new_completed_slot" => self.current_processed_slot()); - // Re-index batches - self.to_be_downloaded = local_finalized_epoch; - self.processing_target = local_finalized_epoch; - - // remove any completed or processed batches - self.completed_batches.clear(); - self.processed_batches.clear(); + // advance the chain to the new validating epoch + self.advance_chain(network, validating_epoch); + if self.optimistic_start.is_none() + && optimistic_epoch > self.start_epoch + && !self.failed_optimistic_starts.contains(&optimistic_epoch) + { + self.optimistic_start = Some(optimistic_epoch); } + // update the state self.state = ChainSyncingState::Syncing; - // start processing batches if needed - self.process_completed_batches(); - // begin requesting blocks from the peer pool, until all peers are exhausted. - self.request_batches(network); + if let ProcessingResult::RemoveChain = self.request_batches(network) { + return ProcessingResult::RemoveChain; + } + + // start processing batches if needed + self.process_completed_batches(network) } /// Add a peer to the chain. /// /// If the chain is active, this starts requesting batches from this peer. - pub fn add_peer(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) { - self.peer_pool.insert(peer_id.clone()); - // do not request blocks if the chain is not syncing + pub fn add_peer( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + ) -> ProcessingResult { if let ChainSyncingState::Stopped = self.state { - debug!(self.log, "Peer added to a non-syncing chain"; - "peer_id" => format!("{}", peer_id)); - return; + debug!(self.log, "Peer added to non-syncing chain"; "peer" => %peer_id) + } + // add the peer without overwriting its active requests + if self.peers.entry(peer_id).or_default().is_empty() { + // Either new or not, this peer is idle, try to request more batches + self.request_batches(network) + } else { + ProcessingResult::KeepChain } - - // find the next batch and request it from any peers if we need to - self.request_batches(network); } /// Sends a STATUS message to all peers in the peer pool. pub fn status_peers(&self, network: &mut SyncNetworkContext) { - for peer_id in self.peer_pool.iter() { - network.status_peer(self.chain.clone(), peer_id.clone()); - } + network.status_peers(self.chain.clone(), self.peers.keys().cloned()); } /// An RPC error has occurred. /// - /// Checks if the request_id is associated with this chain. If so, attempts to re-request the - /// batch. If the batch has exceeded the number of retries, returns - /// Some(`ProcessingResult::RemoveChain)`. Returns `None` if the request isn't related to - /// this chain. + /// If the batch exists it is re-requested. pub fn inject_error( &mut self, network: &mut SyncNetworkContext, - peer_id: &PeerId, - request_id: RequestId, - ) -> Option { - if let Some(batch) = self.pending_batches.remove(request_id) { - debug!(self.log, "Batch failed. RPC Error"; - "batch_epoch" => batch.start_epoch, - "retries" => batch.retries, - "peer" => format!("{:?}", peer_id)); - - Some(self.failed_batch(network, batch)) + batch_id: BatchId, + peer_id: PeerId, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed from the chain for other + // reasons. Check that this block belongs to the expected peer + if Some(&peer_id) != batch.current_peer() { + return ProcessingResult::KeepChain; + } + debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); + let failed_peer = batch + .current_peer() + .expect("Batch is downloading from a peer"); + self.peers + .get_mut(failed_peer) + .expect("Peer belongs to the chain") + .remove(&batch_id); + if let BatchState::Failed = batch.download_failed() { + return ProcessingResult::RemoveChain; + } + self.retry_batch_download(network, batch_id) } else { - None + // this could be an error for an old batch, removed when the chain advances + ProcessingResult::KeepChain } } - /// A batch has failed. This occurs when a network timeout happens or the peer didn't respond. - /// These events do not indicate a malicious peer, more likely simple networking issues. - /// - /// Attempts to re-request from another peer in the peer pool (if possible) and returns - /// `ProcessingResult::RemoveChain` if the number of retries on the batch exceeds - /// `MAX_BATCH_RETRIES`. - pub fn failed_batch( + /// Sends and registers the request of a batch awaiting download. + pub fn retry_batch_download( &mut self, network: &mut SyncNetworkContext, - mut batch: Batch, + batch_id: BatchId, ) -> ProcessingResult { - batch.retries += 1; + let batch = match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => return ProcessingResult::KeepChain, + }; - if batch.retries > MAX_BATCH_RETRIES || self.peer_pool.is_empty() { - // chain is unrecoverable, remove it - ProcessingResult::RemoveChain - } else { - // try to re-process the request using a different peer, if possible - let current_peer = &batch.current_peer; - let new_peer = self - .peer_pool + // Find a peer to request the batch + let failed_peers = batch.failed_peers(); + + let new_peer = { + let mut priorized_peers = self + .peers .iter() - .find(|peer| *peer != current_peer) - .unwrap_or_else(|| current_peer); + .map(|(peer, requests)| (failed_peers.contains(peer), requests.len(), peer)) + .collect::>(); + // Sort peers prioritizing unrelated peers with less active requests. + priorized_peers.sort_unstable(); + priorized_peers.get(0).map(|&(_, _, peer)| peer.clone()) + }; - batch.current_peer = new_peer.clone(); - debug!(self.log, "Re-Requesting batch"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "batch_epoch" => batch.start_epoch, - "peer" => batch.current_peer.to_string()); - self.send_batch(network, batch); - ProcessingResult::KeepChain + if let Some(peer) = new_peer { + self.send_batch(network, batch_id, peer) + } else { + // If we are here the chain has no more peers + ProcessingResult::RemoveChain } } + /// Requests the batch asigned to the given id from a given peer. + pub fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + peer: PeerId, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + let request = batch.to_blocks_by_range_request(); + // inform the batch about the new request + batch.start_downloading_from_peer(peer.clone()); + match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { + Ok(()) => { + if self + .optimistic_start + .map(|epoch| epoch == batch_id) + .unwrap_or(false) + { + debug!(self.log, "Requesting optimistic batch"; "epoch" => batch_id, &batch); + } else { + debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch); + } + // register the batch for this peer + self.peers + .get_mut(&peer) + .expect("peer belongs to the peer pool") + .insert(batch_id); + return ProcessingResult::KeepChain; + } + Err(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(self.log, "Could not send batch request"; + "batch_id" => batch_id, "error" => e, &batch); + // register the failed download and check if the batch can be retried + self.peers + .get_mut(&peer) + .expect("peer belongs to the peer pool") + .remove(&batch_id); + if let BatchState::Failed = batch.download_failed() { + return ProcessingResult::RemoveChain; + } else { + return self.retry_batch_download(network, batch_id); + } + } + } + } + + ProcessingResult::KeepChain + } + /// Returns true if this chain is currently syncing. pub fn is_syncing(&self) -> bool { match self.state { @@ -696,119 +902,142 @@ impl SyncingChain { /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. - fn request_batches(&mut self, network: &mut SyncNetworkContext) { - if let ChainSyncingState::Syncing = self.state { - while self.send_range_request(network) {} + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + if !matches!(self.state, ChainSyncingState::Syncing) { + return ProcessingResult::KeepChain; } - } - /// Requests the next required batch from a peer. Returns true, if there was a peer available - /// to send a request and there are batches to request, false otherwise. - fn send_range_request(&mut self, network: &mut SyncNetworkContext) -> bool { // find the next pending batch and request it from the peer - if let Some(peer_id) = self.get_next_peer() { - if let Some(batch) = self.get_next_batch(peer_id) { - debug!(self.log, "Requesting batch"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "batch_epoch" => batch.start_epoch, - "peer" => format!("{}", batch.current_peer)); - // send the batch - self.send_batch(network, batch); - return true; - } - } - false - } - /// Returns a peer if there exists a peer which does not currently have a pending request. - /// - /// This is used to create the next request. - fn get_next_peer(&self) -> Option { - // TODO: Optimize this by combining with above two functions. // randomize the peers for load balancing let mut rng = rand::thread_rng(); - let mut peers = self.peer_pool.iter().collect::>(); - peers.shuffle(&mut rng); - for peer in peers { - if self.pending_batches.peer_is_idle(peer) { - return Some(peer.clone()); + let mut idle_peers = self + .peers + .iter() + .filter_map(|(peer, requests)| { + if requests.is_empty() { + Some(peer.clone()) + } else { + None + } + }) + .collect::>(); + idle_peers.shuffle(&mut rng); + + // check if we have the batch for our optimistic start. If not, request it first. + // We wait for this batch before requesting any other batches. + if let Some(epoch) = self.optimistic_start { + if !self.batches.contains_key(&epoch) { + if let Some(peer) = idle_peers.pop() { + let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH); + self.batches.insert(epoch, optimistic_batch); + if let ProcessingResult::RemoveChain = self.send_batch(network, epoch, peer) { + return ProcessingResult::RemoveChain; + } + } + } + return ProcessingResult::KeepChain; + } + + while let Some(peer) = idle_peers.pop() { + if let Some(batch_id) = self.include_next_batch() { + // send the batch + if let ProcessingResult::RemoveChain = self.send_batch(network, batch_id, peer) { + return ProcessingResult::RemoveChain; + } + } else { + // No more batches, simply stop + return ProcessingResult::KeepChain; } } - None + + ProcessingResult::KeepChain } - /// Returns the next required batch from the chain if it exists. If there are no more batches - /// required, `None` is returned. - /// - /// Batches are downloaded excluding the first block of the epoch assuming it has already been - /// downloaded. - /// - /// For example: - /// - /// - /// Epoch boundary | | - /// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | - /// Batch 1 | Batch 2 | Batch 3 - fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { - let slots_per_epoch = T::EthSpec::slots_per_epoch(); - let blocks_per_batch = slots_per_epoch * EPOCHS_PER_BATCH; - - // only request batches up to the buffer size limit + /// Creates the next required batch from the chain. If there are no more batches required, + /// `false` is returned. + fn include_next_batch(&mut self) -> Option { + // don't request batches beyond the target head slot if self - .completed_batches - .len() - .saturating_add(self.pending_batches.len()) + .to_be_downloaded + .start_slot(T::EthSpec::slots_per_epoch()) + > self.target_head_slot + { + return None; + } + // only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &BatchInfo| { + matches!( + batch.state(), + BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() > BATCH_BUFFER_SIZE as usize { return None; } - // don't request batches beyond the target head slot - if self.to_be_downloaded.start_slot(slots_per_epoch) > self.target_head_slot { - return None; - } - - // truncate the batch to the epoch containing the target head of the chain - let batch_end_slot = std::cmp::min( - // request either a batch containing the max number of blocks per batch - self.to_be_downloaded.start_slot(slots_per_epoch) + blocks_per_batch + 1, - // or a batch of one epoch of blocks, which contains the `target_head_slot` - self.target_head_slot - .saturating_add(slots_per_epoch) - .epoch(slots_per_epoch) - .start_slot(slots_per_epoch), - ); - - let batch = Some(Batch::new(self.to_be_downloaded, batch_end_slot, peer_id)); - self.to_be_downloaded += EPOCHS_PER_BATCH; - batch - } - - /// Requests the provided batch from the provided peer. - fn send_batch( - &mut self, - network: &mut SyncNetworkContext, - batch: Batch, - ) { - let request = batch.to_blocks_by_range_request(); - - match network.blocks_by_range_request(batch.current_peer.clone(), request) { - Ok(request_id) => { - // add the batch to pending list - self.pending_batches.insert(request_id, batch); + let batch_id = self.to_be_downloaded; + // this batch could have been included already being an optimistic batch + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downlading, let this same function decide the next batch + self.to_be_downloaded += EPOCHS_PER_BATCH; + self.include_next_batch() } - Err(e) => { - warn!(self.log, "Batch request failed"; - "start_slot" => batch.start_slot(), - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "start_epoch" => batch.start_epoch, - "peer" => batch.current_peer.to_string(), - "retries" => batch.retries, - "error" => e, - "re-processes" => batch.reprocess_retries); - self.failed_batch(network, batch); + Entry::Vacant(entry) => { + entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH)); + self.to_be_downloaded += EPOCHS_PER_BATCH; + Some(batch_id) } } } } + +impl slog::KV for &mut SyncingChain { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::KV::serialize(*self, record, serializer) + } +} + +impl slog::KV for SyncingChain { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + use slog::Value; + serializer.emit_u64("id", self.id)?; + Value::serialize(&self.start_epoch, record, "from", serializer)?; + Value::serialize( + &self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()), + record, + "to", + serializer, + )?; + serializer.emit_str("end_root", &self.target_head_root.to_string())?; + Value::serialize( + &self.processing_target, + record, + "current_target", + serializer, + )?; + serializer.emit_usize("batches", self.batches.len())?; + serializer.emit_usize("peers", self.peers.len())?; + slog::Result::Ok(()) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 71e060670..fdfa4b8eb 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -1,15 +1,18 @@ //! This provides the logic for the finalized and head chains. //! -//! Each chain type is stored in it's own vector. A variety of helper functions are given along -//! with this struct to to simplify the logic of the other layers of sync. +//! Each chain type is stored in it's own map. A variety of helper functions are given along with +//! this struct to simplify the logic of the other layers of sync. -use super::chain::{ChainSyncingState, SyncingChain}; +use super::chain::{ChainId, ChainSyncingState, ProcessingResult, SyncingChain}; +use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId}; -use slog::{debug, error, info, o}; +use fnv::FnvHashMap; +use slog::{crit, debug, error, info, trace}; +use std::collections::hash_map::Entry; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -83,9 +86,9 @@ pub struct ChainCollection { /// A reference to the global network parameters. network_globals: Arc>, /// The set of finalized chains being synced. - finalized_chains: Vec>, + finalized_chains: FnvHashMap>, /// The set of head chains being synced. - head_chains: Vec>, + head_chains: FnvHashMap>, /// The current sync state of the process. state: RangeSyncState, /// Logger for the collection. @@ -101,8 +104,8 @@ impl ChainCollection { ChainCollection { beacon_chain, network_globals, - finalized_chains: Vec::new(), - head_chains: Vec::new(), + finalized_chains: FnvHashMap::default(), + head_chains: FnvHashMap::default(), state: RangeSyncState::Idle, log, } @@ -129,7 +132,7 @@ impl ChainCollection { .unwrap_or_else(|| SyncState::Stalled); let mut peer_state = self.network_globals.sync_state.write(); if new_state != *peer_state { - info!(self.log, "Sync state updated"; "old_state" => format!("{}",peer_state), "new_state" => format!("{}",new_state)); + info!(self.log, "Sync state updated"; "old_state" => %peer_state, "new_state" => %new_state); if new_state == SyncState::Synced { network.subscribe_core_topics(); } @@ -141,7 +144,7 @@ impl ChainCollection { let new_state: SyncState = self.state.clone().into(); if *node_sync_state != new_state { // we are updating the state, inform the user - info!(self.log, "Sync state updated"; "old_state" => format!("{}",node_sync_state), "new_state" => format!("{}",new_state)); + info!(self.log, "Sync state updated"; "old_state" => %node_sync_state, "new_state" => %new_state); } *node_sync_state = new_state; } @@ -182,30 +185,67 @@ impl ChainCollection { } } - /// Finds any finalized chain if it exists. - pub fn get_finalized_mut( - &mut self, - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&mut SyncingChain> { - ChainCollection::get_chain( - self.finalized_chains.as_mut(), - target_head_root, - target_head_slot, - ) + /// Calls `func` on every chain of the collection. If the result is + /// `ProcessingResult::RemoveChain`, the chain is removed and returned. + pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType)> + where + F: FnMut(&mut SyncingChain) -> ProcessingResult, + { + let mut to_remove = Vec::new(); + + for (id, chain) in self.finalized_chains.iter_mut() { + if let ProcessingResult::RemoveChain = func(chain) { + to_remove.push((*id, RangeSyncType::Finalized)); + } + } + + for (id, chain) in self.head_chains.iter_mut() { + if let ProcessingResult::RemoveChain = func(chain) { + to_remove.push((*id, RangeSyncType::Head)); + } + } + + let mut results = Vec::with_capacity(to_remove.len()); + for (id, sync_type) in to_remove.into_iter() { + let chain = match sync_type { + RangeSyncType::Finalized => self.finalized_chains.remove(&id), + RangeSyncType::Head => self.head_chains.remove(&id), + }; + results.push((chain.expect("Chain exits"), sync_type)); + } + results } - /// Finds any finalized chain if it exists. - pub fn get_head_mut( + /// Executes a function on the chain with the given id. + /// + /// If the function returns `ProcessingResult::RemoveChain`, the chain is removed and returned. + /// If the chain is found, its syncing type is returned, or an error otherwise. + pub fn call_by_id( &mut self, - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&mut SyncingChain> { - ChainCollection::get_chain( - self.head_chains.as_mut(), - target_head_root, - target_head_slot, - ) + id: ChainId, + func: F, + ) -> Result<(Option>, RangeSyncType), ()> + where + F: FnOnce(&mut SyncingChain) -> ProcessingResult, + { + if let Entry::Occupied(mut entry) = self.finalized_chains.entry(id) { + // Search in our finalized chains first + if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + Ok((Some(entry.remove()), RangeSyncType::Finalized)) + } else { + Ok((None, RangeSyncType::Finalized)) + } + } else if let Entry::Occupied(mut entry) = self.head_chains.entry(id) { + // Search in our head chains next + if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + Ok((Some(entry.remove()), RangeSyncType::Head)) + } else { + Ok((None, RangeSyncType::Head)) + } + } else { + // Chain was not found in the finalized collection, nor the head collection + Err(()) + } } /// Updates the state of the chain collection. @@ -214,9 +254,8 @@ impl ChainCollection { /// updates the state of the collection. This starts head chains syncing if any are required to /// do so. pub fn update(&mut self, network: &mut SyncNetworkContext) { - let local_epoch = { - let local = match PeerSyncInfo::from_chain(&self.beacon_chain) { - Some(local) => local, + let (local_finalized_epoch, local_head_epoch) = + match PeerSyncInfo::from_chain(&self.beacon_chain) { None => { return error!( self.log, @@ -224,20 +263,21 @@ impl ChainCollection { "msg" => "likely due to head lock contention" ) } + Some(local) => ( + local.finalized_epoch, + local.head_slot.epoch(T::EthSpec::slots_per_epoch()), + ), }; - local.finalized_epoch - }; - // Remove any outdated finalized/head chains self.purge_outdated_chains(network); // Choose the best finalized chain if one needs to be selected. - self.update_finalized_chains(network, local_epoch); + self.update_finalized_chains(network, local_finalized_epoch, local_head_epoch); - if self.finalized_syncing_index().is_none() { + if self.finalized_syncing_chain().is_none() { // Handle head syncing chains if there are no finalized chains left. - self.update_head_chains(network, local_epoch); + self.update_head_chains(network, local_finalized_epoch, local_head_epoch); } } @@ -247,53 +287,57 @@ impl ChainCollection { &mut self, network: &mut SyncNetworkContext, local_epoch: Epoch, + local_head_epoch: Epoch, ) { - // Check if any chains become the new syncing chain - if let Some(index) = self.finalized_syncing_index() { - // There is a current finalized chain syncing - let _syncing_chain_peer_count = self.finalized_chains[index].peer_pool.len(); - - // search for a chain with more peers - if let Some((new_index, chain)) = - self.finalized_chains - .iter_mut() - .enumerate() - .find(|(_iter_index, _chain)| { - false - // && *iter_index != index - // && chain.peer_pool.len() > syncing_chain_peer_count - }) - { - // A chain has more peers. Swap the syncing chain - debug!(self.log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> local_epoch); - - // update the state to a new finalized state - let state = RangeSyncState::Finalized { - start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), - head_slot: chain.target_head_slot, - head_root: chain.target_head_root, - }; - self.state = state; - - // Stop the current chain from syncing - self.finalized_chains[index].stop_syncing(); - // Start the new chain - self.finalized_chains[new_index].start_syncing(network, local_epoch); - } - } else if let Some(chain) = self + // Find the chain with most peers and check if it is already syncing + if let Some((new_id, peers)) = self .finalized_chains - .iter_mut() - .max_by_key(|chain| chain.peer_pool.len()) + .iter() + .max_by_key(|(_, chain)| chain.available_peers()) + .map(|(id, chain)| (*id, chain.available_peers())) { - // There is no currently syncing finalization chain, starting the one with the most peers - debug!(self.log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); - chain.start_syncing(network, local_epoch); + let old_id = self.finalized_syncing_chain().map( + |(currently_syncing_id, currently_syncing_chain)| { + if *currently_syncing_id != new_id + && peers > currently_syncing_chain.available_peers() + { + currently_syncing_chain.stop_syncing(); + // we stop this chain and start syncing the one with more peers + Some(*currently_syncing_id) + } else { + // the best chain is already the syncing chain, advance it if possible + None + } + }, + ); + + let chain = self + .finalized_chains + .get_mut(&new_id) + .expect("Chain exists"); + + match old_id { + Some(Some(old_id)) => debug!(self.log, "Switching finalized chains"; + "old_id" => old_id, &chain), + None => debug!(self.log, "Syncing new chain"; &chain), + Some(None) => trace!(self.log, "Advancing currently syncing chain"), + // this is the same chain. We try to advance it. + } + // update the state to a new finalized state let state = RangeSyncState::Finalized { start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), head_slot: chain.target_head_slot, head_root: chain.target_head_root, }; self.state = state; + + if let ProcessingResult::RemoveChain = + chain.start_syncing(network, local_epoch, local_head_epoch) + { + // this happens only if sending a batch over the `network` fails a lot + error!(self.log, "Chain removed while switching chains"); + self.finalized_chains.remove(&new_id); + } } } @@ -302,6 +346,7 @@ impl ChainCollection { &mut self, network: &mut SyncNetworkContext, local_epoch: Epoch, + local_head_epoch: Epoch, ) { // There are no finalized chains, update the state. if self.head_chains.is_empty() { @@ -311,42 +356,41 @@ impl ChainCollection { let mut currently_syncing = self .head_chains - .iter() + .values() .filter(|chain| chain.is_syncing()) .count(); let mut not_syncing = self.head_chains.len() - currently_syncing; - // Find all head chains that are not currently syncing ordered by peer count. while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 { // Find the chain with the most peers and start syncing - if let Some((_index, chain)) = self + if let Some((_id, chain)) = self .head_chains .iter_mut() - .filter(|chain| !chain.is_syncing()) - .enumerate() - .max_by_key(|(_index, chain)| chain.peer_pool.len()) + .filter(|(_id, chain)| !chain.is_syncing()) + .max_by_key(|(_id, chain)| chain.available_peers()) { // start syncing this chain - debug!(self.log, "New head chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); - chain.start_syncing(network, local_epoch); + debug!(self.log, "New head chain started syncing"; &chain); + if let ProcessingResult::RemoveChain = + chain.start_syncing(network, local_epoch, local_head_epoch) + { + error!(self.log, "Chain removed while switching head chains") + } } - // update variables currently_syncing = self .head_chains .iter() - .filter(|chain| chain.is_syncing()) + .filter(|(_id, chain)| chain.is_syncing()) .count(); not_syncing = self.head_chains.len() - currently_syncing; } - // Start // for the syncing API, we find the minimal start_slot and the maximum // target_slot of all head chains to report back. - let (min_epoch, max_slot) = self .head_chains - .iter() + .values() .filter(|chain| chain.is_syncing()) .fold( (Epoch::from(0u64), Slot::from(0u64)), @@ -368,10 +412,9 @@ impl ChainCollection { /// chains and re-status their peers. pub fn clear_head_chains(&mut self, network: &mut SyncNetworkContext) { let log_ref = &self.log; - self.head_chains.retain(|chain| { - if !chain.is_syncing() - { - debug!(log_ref, "Removing old head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); + self.head_chains.retain(|_id, chain| { + if !chain.is_syncing() { + debug!(log_ref, "Removing old head chain"; &chain); chain.status_peers(network); false } else { @@ -380,140 +423,20 @@ impl ChainCollection { }); } - /// Add a new finalized chain to the collection. - pub fn new_finalized_chain( - &mut self, - local_finalized_epoch: Epoch, - target_head: Hash256, - target_slot: Slot, - peer_id: PeerId, - beacon_processor_send: mpsc::Sender>, - ) { - let chain_id = rand::random(); - self.finalized_chains.push(SyncingChain::new( - chain_id, - local_finalized_epoch, - target_slot, - target_head, - peer_id, - beacon_processor_send, - self.beacon_chain.clone(), - self.log.new(o!("chain" => chain_id)), - )); - } - - /// Add a new finalized chain to the collection and starts syncing it. - #[allow(clippy::too_many_arguments)] - pub fn new_head_chain( - &mut self, - remote_finalized_epoch: Epoch, - target_head: Hash256, - target_slot: Slot, - peer_id: PeerId, - beacon_processor_send: mpsc::Sender>, - ) { - // remove the peer from any other head chains - - self.head_chains.iter_mut().for_each(|chain| { - chain.peer_pool.remove(&peer_id); - }); - self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); - - let chain_id = rand::random(); - let new_head_chain = SyncingChain::new( - chain_id, - remote_finalized_epoch, - target_slot, - target_head, - peer_id, - beacon_processor_send, - self.beacon_chain.clone(), - self.log.clone(), - ); - self.head_chains.push(new_head_chain); - } - /// Returns if `true` if any finalized chains exist, `false` otherwise. pub fn is_finalizing_sync(&self) -> bool { !self.finalized_chains.is_empty() } - /// Given a chain iterator, runs a given function on each chain until the function returns - /// `Some`. This allows the `RangeSync` struct to loop over chains and optionally remove the - /// chain from the collection if the function results in completing the chain. - fn request_function<'a, F, I, U>(chain: I, mut func: F) -> Option<(usize, U)> - where - I: Iterator>, - F: FnMut(&'a mut SyncingChain) -> Option, - { - chain - .enumerate() - .find_map(|(index, chain)| Some((index, func(chain)?))) - } - - /// Given a chain iterator, runs a given function on each chain and return all `Some` results. - fn request_function_all<'a, F, I, U>(chain: I, mut func: F) -> Vec<(usize, U)> - where - I: Iterator>, - F: FnMut(&'a mut SyncingChain) -> Option, - { - chain - .enumerate() - .filter_map(|(index, chain)| Some((index, func(chain)?))) - .collect() - } - - /// Runs a function on finalized chains until we get the first `Some` result from `F`. - pub fn finalized_request(&mut self, func: F) -> Option<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function(self.finalized_chains.iter_mut(), func) - } - - /// Runs a function on head chains until we get the first `Some` result from `F`. - pub fn head_request(&mut self, func: F) -> Option<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function(self.head_chains.iter_mut(), func) - } - - /// Runs a function on finalized and head chains until we get the first `Some` result from `F`. - pub fn head_finalized_request(&mut self, func: F) -> Option<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function( - self.finalized_chains - .iter_mut() - .chain(self.head_chains.iter_mut()), - func, - ) - } - - /// Runs a function on all finalized and head chains and collects all `Some` results from `F`. - pub fn head_finalized_request_all(&mut self, func: F) -> Vec<(usize, U)> - where - F: FnMut(&mut SyncingChain) -> Option, - { - ChainCollection::request_function_all( - self.finalized_chains - .iter_mut() - .chain(self.head_chains.iter_mut()), - func, - ) - } - /// Removes any outdated finalized or head chains. - /// /// This removes chains with no peers, or chains whose start block slot is less than our current /// finalized block slot. pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext) { // Remove any chains that have no peers self.finalized_chains - .retain(|chain| !chain.peer_pool.is_empty()); - self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); + .retain(|_id, chain| chain.available_peers() > 0); + self.head_chains + .retain(|_id, chain| chain.available_peers() > 0); let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, @@ -533,28 +456,28 @@ impl ChainCollection { let beacon_chain = &self.beacon_chain; let log_ref = &self.log; // Remove chains that are out-dated and re-status their peers - self.finalized_chains.retain(|chain| { + self.finalized_chains.retain(|_id, chain| { if chain.target_head_slot <= local_finalized_slot || beacon_chain .fork_choice .read() .contains_block(&chain.target_head_root) { - debug!(log_ref, "Purging out of finalized chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of finalized chain"; &chain); chain.status_peers(network); false } else { true } }); - self.head_chains.retain(|chain| { + self.head_chains.retain(|_id, chain| { if chain.target_head_slot <= local_finalized_slot || beacon_chain .fork_choice .read() .contains_block(&chain.target_head_root) { - debug!(log_ref, "Purging out of date head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of date head chain"; &chain); chain.status_peers(network); false } else { @@ -563,63 +486,71 @@ impl ChainCollection { }); } - /// Removes and returns a finalized chain from the collection. - pub fn remove_finalized_chain(&mut self, index: usize) -> SyncingChain { - self.finalized_chains.swap_remove(index) - } - - /// Removes and returns a head chain from the collection. - pub fn remove_head_chain(&mut self, index: usize) -> SyncingChain { - self.head_chains.swap_remove(index) - } - - /// Removes a chain from either finalized or head chains based on the index. Using a request - /// iterates of finalized chains before head chains. Thus an index that is greater than the - /// finalized chain length, indicates a head chain. - /// - /// This will re-status the chains peers on removal. The index must exist. - pub fn remove_chain(&mut self, network: &mut SyncNetworkContext, index: usize) { - let chain = if index >= self.finalized_chains.len() { - let index = index - self.finalized_chains.len(); - let chain = self.head_chains.swap_remove(index); - chain.status_peers(network); - chain + /// Adds a peer to a chain with the given target, or creates a new syncing chain if it doesn't + /// exits. + #[allow(clippy::too_many_arguments)] + pub fn add_peer_or_create_chain( + &mut self, + start_epoch: Epoch, + target_head_root: Hash256, + target_head_slot: Slot, + peer: PeerId, + sync_type: RangeSyncType, + beacon_processor_send: &mpsc::Sender>, + network: &mut SyncNetworkContext, + ) { + let id = SyncingChain::::id(&target_head_root, &target_head_slot); + let collection = if let RangeSyncType::Finalized = sync_type { + if let Some(chain) = self.head_chains.get(&id) { + // sanity verification for chain duplication / purging issues + crit!(self.log, "Adding known head chain as finalized chain"; chain); + } + &mut self.finalized_chains } else { - let chain = self.finalized_chains.swap_remove(index); - chain.status_peers(network); - chain + if let Some(chain) = self.finalized_chains.get(&id) { + // sanity verification for chain duplication / purging issues + crit!(self.log, "Adding known finalized chain as head chain"; chain); + } + &mut self.head_chains }; - - debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); - - // update the state - self.update(network); + match collection.entry(id) { + Entry::Occupied(mut entry) => { + let chain = entry.get_mut(); + debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); + assert_eq!(chain.target_head_root, target_head_root); + assert_eq!(chain.target_head_slot, target_head_slot); + if let ProcessingResult::RemoveChain = chain.add_peer(network, peer) { + debug!(self.log, "Chain removed after adding peer"; "chain" => id); + entry.remove(); + } + } + Entry::Vacant(entry) => { + let peer_rpr = peer.to_string(); + let new_chain = SyncingChain::new( + start_epoch, + target_head_slot, + target_head_root, + peer, + beacon_processor_send.clone(), + self.beacon_chain.clone(), + &self.log, + ); + assert_eq!(new_chain.get_id(), id); + debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); + entry.insert(new_chain); + } + } } /// Returns the index of finalized chain that is currently syncing. Returns `None` if no /// finalized chain is currently syncing. - fn finalized_syncing_index(&self) -> Option { - self.finalized_chains - .iter() - .enumerate() - .find_map(|(index, chain)| { - if chain.state == ChainSyncingState::Syncing { - Some(index) - } else { - None - } - }) - } - - /// Returns a chain given the target head root and slot. - fn get_chain<'a>( - chain: &'a mut [SyncingChain], - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&'a mut SyncingChain> { - chain.iter_mut().find(|iter_chain| { - iter_chain.target_head_root == target_head_root - && iter_chain.target_head_slot == target_head_slot + fn finalized_syncing_chain(&mut self) -> Option<(&ChainId, &mut SyncingChain)> { + self.finalized_chains.iter_mut().find_map(|(id, chain)| { + if chain.state == ChainSyncingState::Syncing { + Some((id, chain)) + } else { + None + } }) } } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 85db6d378..07939569a 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -7,6 +7,6 @@ mod chain_collection; mod range; mod sync_type; -pub use batch::Batch; -pub use chain::{ChainId, EPOCHS_PER_BATCH}; +pub use batch::BatchInfo; +pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 66339be06..6847838e0 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,7 +39,7 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::chain::{ChainId, ProcessingResult}; +use super::chain::ChainId; use super::chain_collection::{ChainCollection, RangeSyncState}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; @@ -49,7 +49,7 @@ use crate::sync::PeerSyncInfo; use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{NetworkGlobals, PeerId}; -use slog::{debug, error, trace}; +use slog::{debug, error, trace, warn}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc; @@ -121,21 +121,15 @@ impl RangeSync { let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, None => { - return error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) + return error!(self.log, "Failed to get peer sync info"; + "msg" => "likely due to head lock contention") } }; - // convenience variables + // convenience variable let remote_finalized_slot = remote_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); - let local_finalized_slot = local_info - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. @@ -146,7 +140,7 @@ impl RangeSync { match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) { RangeSyncType::Finalized => { // Finalized chain search - debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); + debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id); // remove the peer from the awaiting_head_peers list if it exists self.awaiting_head_peers.remove(&peer_id); @@ -154,37 +148,19 @@ impl RangeSync { // Note: We keep current head chains. These can continue syncing whilst we complete // this new finalized chain. - // If a finalized chain already exists that matches, add this peer to the chain's peer - // pool. - if let Some(chain) = self - .chains - .get_finalized_mut(remote_info.finalized_root, remote_finalized_slot) - { - debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => peer_id.to_string(), "target_root" => chain.target_head_root.to_string(), "targe_slot" => chain.target_head_slot); + self.chains.add_peer_or_create_chain( + local_info.finalized_epoch, + remote_info.finalized_root, + remote_finalized_slot, + peer_id, + RangeSyncType::Finalized, + &self.beacon_processor_send, + network, + ); - // add the peer to the chain's peer pool - chain.add_peer(network, peer_id); - - // check if the new peer's addition will favour a new syncing chain. - self.chains.update(network); - // update the global sync state if necessary - self.chains.update_sync_state(network); - } else { - // there is no finalized chain that matches this peer's last finalized target - // create a new finalized chain - debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot, "end_slot" => remote_finalized_slot, "finalized_root" => format!("{}", remote_info.finalized_root)); - - self.chains.new_finalized_chain( - local_info.finalized_epoch, - remote_info.finalized_root, - remote_finalized_slot, - peer_id, - self.beacon_processor_send.clone(), - ); - self.chains.update(network); - // update the global sync state - self.chains.update_sync_state(network); - } + self.chains.update(network); + // update the global sync state + self.chains.update_sync_state(network); } RangeSyncType::Head => { // This peer requires a head chain sync @@ -192,7 +168,7 @@ impl RangeSync { if self.chains.is_finalizing_sync() { // If there are finalized chains to sync, finish these first, before syncing head // chains. This allows us to re-sync all known peers - trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id)); + trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => %peer_id); // store the peer to re-status after all finalized chains complete self.awaiting_head_peers.insert(peer_id); return; @@ -203,31 +179,18 @@ impl RangeSync { // The new peer has the same finalized (earlier filters should prevent a peer with an // earlier finalized chain from reaching here). - debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id)); - // search if there is a matching head chain, then add the peer to the chain - if let Some(chain) = self - .chains - .get_head_mut(remote_info.head_root, remote_info.head_slot) - { - debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote_info.head_root), "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); - - // add the peer to the head's pool - chain.add_peer(network, peer_id); - } else { - // There are no other head chains that match this peer's status, create a new one, and - let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) - .epoch(T::EthSpec::slots_per_epoch()); - debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); - - self.chains.new_head_chain( - start_epoch, - remote_info.head_root, - remote_info.head_slot, - peer_id, - self.beacon_processor_send.clone(), - ); - } + let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) + .epoch(T::EthSpec::slots_per_epoch()); + self.chains.add_peer_or_create_chain( + start_epoch, + remote_info.head_root, + remote_info.head_slot, + peer_id, + RangeSyncType::Head, + &self.beacon_processor_send, + network, + ); self.chains.update(network); self.chains.update_sync_state(network); } @@ -245,23 +208,27 @@ impl RangeSync { request_id: RequestId, beacon_block: Option>, ) { - // Find the request. Most likely the first finalized chain (the syncing chain). If there - // are no finalized chains, then it will be a head chain. At most, there should only be - // `connected_peers` number of head chains, which should be relatively small and this - // lookup should not be very expensive. However, we could add an extra index that maps the - // request id to index of the vector to avoid O(N) searches and O(N) hash lookups. - - let id_not_found = self - .chains - .head_finalized_request(|chain| { - chain.on_block_response(network, request_id, &beacon_block) - }) - .is_none(); - if id_not_found { - // The request didn't exist in any `SyncingChain`. Could have been an old request or - // the chain was purged due to being out of date whilst a request was pending. Log - // and ignore. - debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id); + // get the chain and batch for which this response belongs + if let Some((chain_id, batch_id)) = + network.blocks_by_range_response(request_id, beacon_block.is_none()) + { + // check if this chunk removes the chain + match self.chains.call_by_id(chain_id, |chain| { + chain.on_block_response(network, batch_id, peer_id, beacon_block) + }) { + Ok((removed_chain, sync_type)) => { + if let Some(removed_chain) = removed_chain { + debug!(self.log, "Chain removed after block response"; "sync_type" => ?sync_type, "chain_id" => chain_id); + removed_chain.status_peers(network); + // TODO: update & update_sync_state? + } + } + Err(_) => { + debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + } + } + } else { + warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } @@ -269,76 +236,57 @@ impl RangeSync { &mut self, network: &mut SyncNetworkContext, chain_id: ChainId, - epoch: Epoch, - downloaded_blocks: Vec>, + batch_id: Epoch, result: BatchProcessResult, ) { - // build an option for passing the downloaded_blocks to each chain - let mut downloaded_blocks = Some(downloaded_blocks); - - match self.chains.finalized_request(|chain| { - chain.on_batch_process_result(network, chain_id, epoch, &mut downloaded_blocks, &result) + // check if this response removes the chain + match self.chains.call_by_id(chain_id, |chain| { + chain.on_batch_process_result(network, batch_id, &result) }) { - Some((index, ProcessingResult::RemoveChain)) => { - let chain = self.chains.remove_finalized_chain(index); - debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); - // update the state of the collection - self.chains.update(network); - - // the chain is complete, re-status it's peers - chain.status_peers(network); - - // set the state to a head sync if there are no finalized chains, to inform the manager that we are awaiting a - // head chain. - self.chains.set_head_sync(); - // Update the global variables - self.chains.update_sync_state(network); - - // if there are no more finalized chains, re-status all known peers awaiting a head - // sync - match self.chains.state() { - RangeSyncState::Idle | RangeSyncState::Head { .. } => { - for peer_id in self.awaiting_head_peers.drain() { - network.status_peer(self.beacon_chain.clone(), peer_id); + Ok((None, _sync_type)) => { + // Chain was found and not removed + } + Ok((Some(removed_chain), sync_type)) => { + debug!(self.log, "Chain removed after processing result"; "chain" => chain_id, "sync_type" => ?sync_type); + // Chain ended, re-status its peers + removed_chain.status_peers(network); + match sync_type { + RangeSyncType::Finalized => { + // update the state of the collection + self.chains.update(network); + // set the state to a head sync if there are no finalized chains, to inform + // the manager that we are awaiting a head chain. + self.chains.set_head_sync(); + // Update the global variables + self.chains.update_sync_state(network); + // if there are no more finalized chains, re-status all known peers + // awaiting a head sync + match self.chains.state() { + RangeSyncState::Idle | RangeSyncState::Head { .. } => { + network.status_peers( + self.beacon_chain.clone(), + self.awaiting_head_peers.drain(), + ); + } + RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete } } - RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete - } - } - Some((_, ProcessingResult::KeepChain)) => {} - None => { - match self.chains.head_request(|chain| { - chain.on_batch_process_result( - network, - chain_id, - epoch, - &mut downloaded_blocks, - &result, - ) - }) { - Some((index, ProcessingResult::RemoveChain)) => { - let chain = self.chains.remove_head_chain(index); - debug!(self.log, "Head chain completed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); - // the chain is complete, re-status it's peers and remove it - chain.status_peers(network); - - // Remove non-syncing head chains and re-status the peers - // This removes a build-up of potentially duplicate head chains. Any - // legitimate head chains will be re-established + RangeSyncType::Head => { + // Remove non-syncing head chains and re-status the peers. This removes a + // build-up of potentially duplicate head chains. Any legitimate head + // chains will be re-established self.chains.clear_head_chains(network); // update the state of the collection self.chains.update(network); // update the global state and log any change self.chains.update_sync_state(network); } - Some((_, ProcessingResult::KeepChain)) => {} - None => { - // This can happen if a chain gets purged due to being out of date whilst a - // batch process is in progress. - debug!(self.log, "No chains match the block processing id"; "batch_epoch" => epoch, "chain_id" => chain_id); - } } } + + Err(_) => { + debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + } } } @@ -352,7 +300,7 @@ impl RangeSync { // if the peer is in the awaiting head mapping, remove it self.awaiting_head_peers.remove(peer_id); - // remove the peer from any peer pool + // remove the peer from any peer pool, failing its batches self.remove_peer(network, peer_id); // update the state of the collection @@ -361,30 +309,17 @@ impl RangeSync { self.chains.update_sync_state(network); } - /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting + /// When a peer gets removed, both the head and finalized chains need to be searched to check + /// which pool the peer is in. The chain may also have a batch or batches awaiting /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain and re-status all the peers. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (index, result) in self.chains.head_finalized_request_all(|chain| { - if chain.peer_pool.remove(peer_id) { - // this chain contained the peer - while let Some(batch) = chain.pending_batches.remove_batch_by_peer(peer_id) { - if let ProcessingResult::RemoveChain = chain.failed_batch(network, batch) { - // a single batch failed, remove the chain - return Some(ProcessingResult::RemoveChain); - } - } - // peer removed from chain, no batch failed - Some(ProcessingResult::KeepChain) - } else { - None - } - }) { - if result == ProcessingResult::RemoveChain { - // the chain needed to be removed - debug!(self.log, "Chain being removed due to failed batch"); - self.chains.remove_chain(network, index); - } + for (removed_chain, sync_type) in self + .chains + .call_all(|chain| chain.remove_peer(peer_id, network)) + { + debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + // TODO: anything else to do? } } @@ -398,17 +333,25 @@ impl RangeSync { peer_id: PeerId, request_id: RequestId, ) { - // check that this request is pending - match self - .chains - .head_finalized_request(|chain| chain.inject_error(network, &peer_id, request_id)) - { - Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists - Some((index, ProcessingResult::RemoveChain)) => { - debug!(self.log, "Chain being removed due to RPC error"); - self.chains.remove_chain(network, index) + // get the chain and batch for which this response belongs + if let Some((chain_id, batch_id)) = network.blocks_by_range_response(request_id, true) { + // check that this request is pending + match self.chains.call_by_id(chain_id, |chain| { + chain.inject_error(network, batch_id, peer_id) + }) { + Ok((removed_chain, sync_type)) => { + if let Some(removed_chain) = removed_chain { + debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + removed_chain.status_peers(network); + // TODO: update & update_sync_state? + } + } + Err(_) => { + debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + } } - None => {} // request wasn't in the finalized chains, check the head chains + } else { + warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } } diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index 103ef77b8..d9f1d3f17 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -6,6 +6,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use std::sync::Arc; /// The type of Range sync that should be done relative to our current state. +#[derive(Debug)] pub enum RangeSyncType { /// A finalized chain sync should be started with this peer. Finalized,