From 9f45ac2f5e1e0f61da690fa2d6ef4d146c6bcb0d Mon Sep 17 00:00:00 2001 From: divma Date: Thu, 29 Oct 2020 02:29:24 +0000 Subject: [PATCH] More sync edge cases + prettify range (#1834) ## Issue Addressed Sync edge case when we get an empty optimistic batch that passes validation and is inside the download buffer. Eventually the chain would reach the batch and treat it as an ugly state. ## Proposed Changes - Handle the edge case advancing the chain's target + code clarification - Some largey changes for readability + ergonomics since rust has try ops - Better handling of bad batch and chain states --- .../network/src/sync/range_sync/batch.rs | 103 +++-- .../network/src/sync/range_sync/chain.rs | 351 +++++++++--------- .../src/sync/range_sync/chain_collection.rs | 40 +- .../network/src/sync/range_sync/range.rs | 8 +- 4 files changed, 272 insertions(+), 230 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index b9dc04ccd..580a8d4df 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,7 +1,6 @@ use crate::sync::RequestId; use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; -use slog::{crit, warn, Logger}; use ssz::Encode; use std::collections::HashSet; use std::hash::{Hash, Hasher}; @@ -15,6 +14,13 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; +/// Error type of a batch in a wrong state. +// Such errors should never be encountered. +pub struct WrongState(pub(super) String); + +/// Auxiliary type alias for readability. +type IsFailed = bool; + /// A segment of a chain. pub struct BatchInfo { /// Start slot of the batch. @@ -57,6 +63,14 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } + + pub fn is_failed(&self) -> IsFailed { + match self { + BatchState::Failed => true, + BatchState::Poisoned => unreachable!("Poisoned batch"), + _ => false, + } + } } impl BatchInfo { @@ -134,16 +148,20 @@ impl BatchInfo { } /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: SignedBeaconBlock, logger: &Logger) { + pub fn add_block(&mut self, block: SignedBeaconBlock) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); - self.state = BatchState::Downloading(peer, blocks, req_id) + self.state = BatchState::Downloading(peer, blocks, req_id); + Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Add block for batch in wrong state"; "state" => ?other); - self.state = other + self.state = other; + Err(WrongState(format!( + "Add block for batch in wrong state {:?}", + self.state + ))) } } } @@ -153,8 +171,7 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_completed( &mut self, - logger: &Logger, - ) -> Result> { + ) -> Result> { match self.state.poison() { BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range @@ -182,9 +199,8 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - warn!(logger, "Batch received out of range blocks"; - &self, "expected" => expected, "received" => received); - return Err(&self.state); + + return Err(Ok((expected, received, self.state.is_failed()))); } } @@ -194,15 +210,17 @@ impl BatchInfo { } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Download completed for batch in wrong state"; "state" => ?other); self.state = other; - Err(&self.state) + Err(Err(WrongState(format!( + "Download completed for batch in wrong state {:?}", + self.state + )))) } } } #[must_use = "Batch may have failed"] - pub fn download_failed(&mut self, logger: &Logger) -> &BatchState { + pub fn download_failed(&mut self) -> Result { match self.state.poison() { BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again @@ -215,13 +233,15 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - &self.state + Ok(self.state.is_failed()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Download failed for batch in wrong state"; "state" => ?other); self.state = other; - &self.state + Err(WrongState(format!( + "Download failed for batch in wrong state {:?}", + self.state + ))) } } } @@ -230,37 +250,42 @@ impl BatchInfo { &mut self, peer: PeerId, request_id: RequestId, - logger: &Logger, - ) { + ) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { self.state = BatchState::Downloading(peer, Vec::new(), request_id); + Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Starting download for batch in wrong state"; "state" => ?other); - self.state = other + self.state = other; + Err(WrongState(format!( + "Starting download for batch in wrong state {:?}", + self.state + ))) } } } - pub fn start_processing(&mut self, logger: &Logger) -> Vec> { + pub fn start_processing(&mut self) -> Result>, WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new(peer, &blocks)); - blocks + Ok(blocks) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Starting procesing batch in wrong state"; "state" => ?other); self.state = other; - vec![] + Err(WrongState(format!( + "Starting procesing batch in wrong state {:?}", + self.state + ))) } } } #[must_use = "Batch may have failed"] - pub fn processing_completed(&mut self, was_sucessful: bool, logger: &Logger) -> &BatchState { + pub fn processing_completed(&mut self, was_sucessful: bool) -> Result { match self.state.poison() { BatchState::Processing(attempt) => { self.state = if !was_sucessful { @@ -278,19 +303,21 @@ impl BatchInfo { } else { BatchState::AwaitingValidation(attempt) }; - &self.state + Ok(self.state.is_failed()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Procesing completed for batch in wrong state"; "state" => ?other); self.state = other; - &self.state + Err(WrongState(format!( + "Procesing completed for batch in wrong state: {:?}", + self.state + ))) } } } #[must_use = "Batch may have failed"] - pub fn validation_failed(&mut self, logger: &Logger) -> &BatchState { + pub fn validation_failed(&mut self) -> Result { match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); @@ -303,13 +330,15 @@ impl BatchInfo { } else { BatchState::AwaitingDownload }; - &self.state + Ok(self.state.is_failed()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Validation failed for batch in wrong state"; "state" => ?other); self.state = other; - &self.state + Err(WrongState(format!( + "Validation failed for batch in wrong state: {:?}", + self.state + ))) } } } @@ -370,8 +399,14 @@ impl slog::KV for BatchInfo { impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(_) => f.write_str("Processing"), - BatchState::AwaitingValidation(_) => f.write_str("AwaitingValidation"), + BatchState::Processing(Attempt { + ref peer_id, + hash: _, + }) => write!(f, "Processing({})", peer_id), + BatchState::AwaitingValidation(Attempt { + ref peer_id, + hash: _, + }) => write!(f, "AwaitingValidation({})", peer_id), BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), BatchState::AwaitingProcessing(ref peer, ref blocks) => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index aace878c8..295991365 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -26,14 +26,22 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. -#[derive(PartialEq)] #[must_use = "Should be checked, since a failed chain must be removed. A chain that requested being removed and continued is now in an inconsistent state"] -pub enum ProcessingResult { - KeepChain, - RemoveChain, +pub type ProcessingResult = Result; + +/// Reasons for removing a chain +pub enum RemoveChain { + EmptyPeerPool, + ChainCompleted, + ChainFailed(BatchId), + WrongBatchState(String), + WrongChainState(String), } +#[derive(Debug)] +pub struct KeepChain; + /// A chain identifier pub type ChainId = u64; pub type BatchId = Epoch; @@ -158,24 +166,21 @@ impl SyncingChain { // fail the batches for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { - if let BatchState::Failed = batch.download_failed(&self.log) { - return ProcessingResult::RemoveChain; - } - if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { - // drop the chain early - return ProcessingResult::RemoveChain; + if batch.download_failed()? { + return Err(RemoveChain::ChainFailed(id)); } + self.retry_batch_download(network, id)?; } else { debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => "id") + "peer" => %peer_id, "batch" => id) } } } if self.peers.is_empty() { - ProcessingResult::RemoveChain + Err(RemoveChain::EmptyPeerPool) } else { - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -202,7 +207,7 @@ impl SyncingChain { None => { debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id); // A batch might get removed when the chain advances, so this is non fatal. - return ProcessingResult::KeepChain; + return Ok(KeepChain); } Some(batch) => { // A batch could be retried without the peer failing the request (disconnecting/ @@ -210,7 +215,7 @@ impl SyncingChain { // reasons. Check that this block belongs to the expected peer, and that the // request_id matches if !batch.is_expecting_block(peer_id, &request_id) { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } batch } @@ -218,17 +223,16 @@ impl SyncingChain { if let Some(block) = beacon_block { // This is not a stream termination, simply add the block to the request - batch.add_block(block, &self.log); - ProcessingResult::KeepChain + batch.add_block(block)?; + Ok(KeepChain) } else { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches self.peers .get_mut(peer_id) - .unwrap_or_else(|| panic!("Batch is registered for the peer")) - .remove(&batch_id); + .map(|active_requests| active_requests.remove(&batch_id)); - match batch.download_completed(&self.log) { + match batch.download_completed() { Ok(received) => { let awaiting_batches = batch_id.saturating_sub( self.optimistic_start @@ -237,14 +241,16 @@ impl SyncingChain { debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches); // pre-emptively request more blocks from peers whilst we process current blocks, - if let ProcessingResult::RemoveChain = self.request_batches(network) { - return ProcessingResult::RemoveChain; - } + self.request_batches(network)?; self.process_completed_batches(network) } - Err(state) => { - if let BatchState::Failed = state { - return ProcessingResult::RemoveChain; + Err(result) => { + let (expected_boundary, received_boundary, is_failed) = result?; + warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, + "peer_id" => %peer_id, batch); + + if is_failed { + return Err(RemoveChain::ChainFailed(batch_id)); } // this batch can't be used, so we need to request it again. self.retry_batch_download(network, batch_id) @@ -262,14 +268,16 @@ impl SyncingChain { ) -> ProcessingResult { // Only process batches if this chain is Syncing, and only one at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } let batch = match self.batches.get_mut(&batch_id) { Some(batch) => batch, None => { - debug!(self.log, "Processing unknown batch"; "batch" => %batch_id); - return ProcessingResult::RemoveChain; + return Err(RemoveChain::WrongChainState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))); } }; @@ -277,7 +285,7 @@ impl SyncingChain { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = batch.start_processing(&self.log); + let blocks = batch.start_processing()?; let process_id = ProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); @@ -293,7 +301,7 @@ impl SyncingChain { // re-downloaded. self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) } else { - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -304,101 +312,96 @@ impl SyncingChain { ) -> ProcessingResult { // Only process batches if this chain is Syncing and only process one batch at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } // Find the id of the batch we are going to process. // // First try our optimistic start, if any. If this batch is ready, we process it. If the // batch has not already been completed, check the current chain target. - let optimistic_id = if let Some(epoch) = self.optimistic_start { + if let Some(epoch) = self.optimistic_start { if let Some(batch) = self.batches.get(&epoch) { let state = batch.state(); match state { BatchState::AwaitingProcessing(..) => { // this batch is ready debug!(self.log, "Processing optimistic start"; "epoch" => epoch); - Some(epoch) + return self.process_batch(network, epoch); } BatchState::Downloading(..) => { // The optimistic batch is being downloaded. We wait for this before // attempting to process other batches. - return ProcessingResult::KeepChain; + return Ok(KeepChain); } + BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Processing(_) | BatchState::AwaitingDownload - | BatchState::Failed - | BatchState::Poisoned => { + | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should // have been removed - // - Poisoned -> this is an intermediate state that should never be reached // - AwaitingDownload -> A recoverable failed batch should have been // re-requested. - crit!(self.log, "Optimistic batch indicates inconsistent chain state"; "state" => ?state); - return ProcessingResult::RemoveChain; + return Err(RemoveChain::WrongChainState(format!( + "Optimistic batch indicates inconsistent chain state: {:?}", + state + ))); } BatchState::AwaitingValidation(_) => { - // This is possible due to race conditions, and tho it would be considered - // an inconsistent state, the chain can continue. If an optimistic batch - // is successfully processed it is no longer considered an optimistic - // candidate. If the batch was empty the chain rejects it; if it was non - // empty the chain is advanced to this point (so that the old optimistic - // batch is now the processing target) - debug!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch); - None + // If an optimistic start is given to the chain after the corresponding + // batch has been requested and processed we can land here. We drop the + // optimistic candidate since we can't conclude whether the batch included + // blocks or not at this point + debug!(self.log, "Dropping optimistic candidate"; "batch" => epoch); + self.optimistic_start = None; } } - } else { - None } - } else { - None - }; + } // if the optimistic target can't be processed, check the processing target - let id = optimistic_id.or_else(|| { - if let Some(batch) = self.batches.get(&self.processing_target) { - let state = batch.state(); - match state { - BatchState::AwaitingProcessing(..) => Some(self.processing_target), - BatchState::Downloading(..) => { - // Batch is not ready, nothing to process - None - } - BatchState::Failed - | BatchState::AwaitingDownload - | BatchState::AwaitingValidation(_) - | BatchState::Processing(_) - | BatchState::Poisoned => { - // these are all inconsistent states: - // - Failed -> non recoverable batch. Chain should have beee removed - // - AwaitingDownload -> A recoverable failed batch should have been - // re-requested. - // - AwaitingValidation -> self.processing_target should have been moved - // forward - // - Processing -> `self.current_processing_batch` is None - // - Poisoned -> Intermediate state that should never be reached - unreachable!( - "Robust target batch indicates inconsistent chain state: {:?}", - state - ) + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + BatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + BatchState::Poisoned => unreachable!("Poisoned batch"), + BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have beee removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + return Err(RemoveChain::WrongChainState(format!( + "Robust target batch indicates inconsistent chain state: {:?}", + state + ))); + } + BatchState::AwaitingValidation(_) => { + // we can land here if an empty optimistic batch succeeds processing and is + // inside the download buffer (between `self.processing_target` and + // `self.to_be_downloaded`). In this case, eventually the chain advances to the + // batch (`self.processing_target` reaches this point). + debug!(self.log, "Chain encountered a robust batch awaiting validation"; "batch" => self.processing_target); + + self.processing_target += EPOCHS_PER_BATCH; + if self.to_be_downloaded <= self.processing_target { + self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH; } } - } else { - crit!(self.log, "Batch not found for current processing target"; - "epoch" => self.processing_target); - None } - }); - - // we found a batch to process - if let Some(id) = id { - self.process_batch(network, id) } else { - ProcessingResult::KeepChain + return Err(RemoveChain::WrongChainState(format!( + "Batch not found for current processing target {}", + self.processing_target + ))); } + Ok(KeepChain) } /// The block processor has completed processing a batch. This function handles the result @@ -415,12 +418,12 @@ impl SyncingChain { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); - return ProcessingResult::KeepChain; + return Ok(KeepChain); } None => { debug!(self.log, "Chain was not expecting a batch result"; "batch_epoch" => batch_id); - return ProcessingResult::KeepChain; + return Ok(KeepChain); } _ => { // batch_id matches, continue @@ -430,14 +433,14 @@ impl SyncingChain { match result { BatchProcessResult::Success(was_non_empty) => { - let batch = match self.batches.get_mut(&batch_id) { - Some(batch) => batch, - None => { - debug!(self.log, "Current processing batch not found"; "batch" => batch_id); - return ProcessingResult::RemoveChain; - } - }; - let _ = batch.processing_completed(true, &self.log); + let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { + RemoveChain::WrongChainState(format!( + "Current processing batch not found: {}", + batch_id + )) + })?; + + batch.processing_completed(true)?; // If the processed batch was not empty, we can validate previous unvalidated // blocks. if *was_non_empty { @@ -448,13 +451,11 @@ impl SyncingChain { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty if epoch == batch_id { - if let ProcessingResult::RemoveChain = self.reject_optimistic_batch( + self.reject_optimistic_batch( network, false, /* do not re-request */ "batch was empty", - ) { - return ProcessingResult::RemoveChain; - }; + )?; } } @@ -463,35 +464,31 @@ impl SyncingChain { // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { // chain is completed - debug!(self.log, "Chain is complete"); - ProcessingResult::RemoveChain + Err(RemoveChain::ChainCompleted) } else { // chain is not completed // attempt to request more batches - if let ProcessingResult::RemoveChain = self.request_batches(network) { - return ProcessingResult::RemoveChain; - } + self.request_batches(network)?; // attempt to process more batches self.process_completed_batches(network) } } BatchProcessResult::Failed(imported_blocks) => { - let (batch, peer) = match self.batches.get_mut(&batch_id) { - Some(batch) => match batch.current_peer().cloned() { - Some(peer) => (batch, peer), - None => { - debug!(self.log, "Current processing has no peer"; "batch" => batch_id); - return ProcessingResult::RemoveChain; - } - }, - None => { - debug!(self.log, "Current processing batch not found"; "batch" => batch_id); - return ProcessingResult::RemoveChain; - } - }; + let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { + RemoveChain::WrongChainState(format!( + "Batch not found for current processing target {}", + batch_id + )) + })?; + let peer = batch.current_peer().cloned().ok_or_else(|| { + RemoveChain::WrongBatchState(format!( + "Processing target is in wrong state: {:?}", + batch.state(), + )) + })?; debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - if let BatchState::Failed = batch.processing_completed(false, &self.log) { + if batch.processing_completed(false)? { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches @@ -506,7 +503,7 @@ impl SyncingChain { for (peer, _) in self.peers.drain() { network.report_peer(peer, action); } - ProcessingResult::RemoveChain + Err(RemoveChain::ChainFailed(batch_id)) } else { // chain can continue. Check if it can be moved forward if *imported_blocks { @@ -545,7 +542,7 @@ impl SyncingChain { } } - ProcessingResult::KeepChain + Ok(KeepChain) } /// Removes any batches previous to the given `validating_epoch` and updates the current @@ -619,13 +616,12 @@ impl SyncingChain { ), BatchState::AwaitingProcessing(..) => {} BatchState::Processing(_) => { - debug_assert_eq!( - id, - self.current_processing_batch.expect( - "A batch in a processing state means the chain is processing it" - ) - ); - self.current_processing_batch = None; + debug!(self.log, "Advancing chain while processing a batch"; "batch" => id, batch); + if let Some(processing_id) = self.current_processing_batch { + if id <= processing_id { + self.current_processing_batch = None; + } + } } } } @@ -673,15 +669,9 @@ impl SyncingChain { // If this batch is an optimistic batch, we reject this epoch as an optimistic // candidate and try to re download it if epoch == batch_id { - if let ProcessingResult::RemoveChain = - self.reject_optimistic_batch(network, true, "batch was invalid") - { - return ProcessingResult::RemoveChain; - } else { - // since this is the optimistic batch, we can't consider previous batches as - // invalid. - return ProcessingResult::KeepChain; - } + return self.reject_optimistic_batch(network, true, "batch was invalid"); + // since this is the optimistic batch, we can't consider previous batches as + // invalid. } } // this is our robust `processing_target`. All previous batches must be awaiting @@ -689,9 +679,9 @@ impl SyncingChain { let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { - if let BatchState::Failed = batch.validation_failed(&self.log) { + if batch.validation_failed()? { // remove the chain early - return ProcessingResult::RemoveChain; + return Err(RemoveChain::ChainFailed(batch_id)); } redownload_queue.push(*id); } @@ -701,9 +691,7 @@ impl SyncingChain { self.processing_target = self.start_epoch; for id in redownload_queue { - if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { - return ProcessingResult::RemoveChain; - } + self.retry_batch_download(network, id)?; } // finally, re-request the failed batch. self.retry_batch_download(network, batch_id) @@ -746,9 +734,7 @@ impl SyncingChain { self.state = ChainSyncingState::Syncing; // begin requesting blocks from the peer pool, until all peers are exhausted. - if let ProcessingResult::RemoveChain = self.request_batches(network) { - return ProcessingResult::RemoveChain; - } + self.request_batches(network)?; // start processing batches if needed self.process_completed_batches(network) @@ -770,7 +756,7 @@ impl SyncingChain { // Either new or not, this peer is idle, try to request more batches self.request_batches(network) } else { - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -789,19 +775,19 @@ impl SyncingChain { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer if !batch.is_expecting_block(peer_id, &request_id) { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); if let Some(active_requests) = self.peers.get_mut(peer_id) { active_requests.remove(&batch_id); } - if let BatchState::Failed = batch.download_failed(&self.log) { - return ProcessingResult::RemoveChain; + if batch.download_failed()? { + return Err(RemoveChain::ChainFailed(batch_id)); } self.retry_batch_download(network, batch_id) } else { // this could be an error for an old batch, removed when the chain advances - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -813,7 +799,7 @@ impl SyncingChain { ) -> ProcessingResult { let batch = match self.batches.get_mut(&batch_id) { Some(batch) => batch, - None => return ProcessingResult::KeepChain, + None => return Ok(KeepChain), }; // Find a peer to request the batch @@ -834,7 +820,7 @@ impl SyncingChain { self.send_batch(network, batch_id, peer) } else { // If we are here the chain has no more peers - ProcessingResult::RemoveChain + Err(RemoveChain::EmptyPeerPool) } } @@ -850,7 +836,7 @@ impl SyncingChain { match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { Ok(request_id) => { // inform the batch about the new request - batch.start_downloading_from_peer(peer.clone(), request_id, &self.log); + batch.start_downloading_from_peer(peer.clone(), request_id)?; if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -866,21 +852,26 @@ impl SyncingChain { .get_mut(&peer) .map(|requests| { requests.insert(batch_id); - ProcessingResult::KeepChain + Ok(KeepChain) }) - .unwrap_or(ProcessingResult::RemoveChain); + .unwrap_or_else(|| { + Err(RemoveChain::WrongChainState(format!( + "Sending batch to a peer that is not in the chain: {}", + peer + ))) + }); } Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => e, &batch); // register the failed download and check if the batch can be retried - batch.start_downloading_from_peer(peer.clone(), 1, &self.log); // fake request_id is not relevant + batch.start_downloading_from_peer(peer.clone(), 1)?; // fake request_id is not relevant self.peers .get_mut(&peer) .map(|request| request.remove(&batch_id)); - if let BatchState::Failed = batch.download_failed(&self.log) { - return ProcessingResult::RemoveChain; + if batch.download_failed()? { + return Err(RemoveChain::ChainFailed(batch_id)); } else { return self.retry_batch_download(network, batch_id); } @@ -888,7 +879,7 @@ impl SyncingChain { } } - ProcessingResult::KeepChain + Ok(KeepChain) } /// Returns true if this chain is currently syncing. @@ -906,7 +897,7 @@ impl SyncingChain { network: &mut SyncNetworkContext, ) -> ProcessingResult { if !matches!(self.state, ChainSyncingState::Syncing) { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } // find the next pending batch and request it from the peer @@ -933,27 +924,23 @@ impl SyncingChain { if let Some(peer) = idle_peers.pop() { let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH); self.batches.insert(epoch, optimistic_batch); - if let ProcessingResult::RemoveChain = self.send_batch(network, epoch, peer) { - return ProcessingResult::RemoveChain; - } + self.send_batch(network, epoch, peer)?; } } - return ProcessingResult::KeepChain; + return Ok(KeepChain); } while let Some(peer) = idle_peers.pop() { if let Some(batch_id) = self.include_next_batch() { // send the batch - if let ProcessingResult::RemoveChain = self.send_batch(network, batch_id, peer) { - return ProcessingResult::RemoveChain; - } + self.send_batch(network, batch_id, peer)?; } else { // No more batches, simply stop - return ProcessingResult::KeepChain; + return Ok(KeepChain); } } - ProcessingResult::KeepChain + Ok(KeepChain) } /// Creates the next required batch from the chain. If there are no more batches required, @@ -1040,3 +1027,23 @@ impl slog::KV for SyncingChain { slog::Result::Ok(()) } } + +use super::batch::WrongState as WrongBatchState; +impl From for RemoveChain { + fn from(err: WrongBatchState) -> Self { + RemoveChain::WrongBatchState(err.0) + } +} + +impl std::fmt::Debug for RemoveChain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // needed to avoid Debuggins Strings + match self { + RemoveChain::ChainCompleted => f.write_str("ChainCompleted"), + RemoveChain::EmptyPeerPool => f.write_str("EmptyPeerPool"), + RemoveChain::ChainFailed(batch) => write!(f, "ChainFailed(batch: {} )", batch), + RemoveChain::WrongBatchState(reason) => write!(f, "WrongBatchState: {}", reason), + RemoveChain::WrongChainState(reason) => write!(f, "WrongChainState: {}", reason), + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 4af972ceb..c458cc847 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,7 +3,7 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. -use super::chain::{ChainId, ProcessingResult, SyncingChain}; +use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; @@ -108,33 +108,33 @@ impl ChainCollection { /// Calls `func` on every chain of the collection. If the result is /// `ProcessingResult::RemoveChain`, the chain is removed and returned. /// NOTE: `func` must not change the syncing state of a chain. - pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType)> + pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType, RemoveChain)> where F: FnMut(&mut SyncingChain) -> ProcessingResult, { let mut to_remove = Vec::new(); for (id, chain) in self.finalized_chains.iter_mut() { - if let ProcessingResult::RemoveChain = func(chain) { - to_remove.push((*id, RangeSyncType::Finalized)); + if let Err(remove_reason) = func(chain) { + to_remove.push((*id, RangeSyncType::Finalized, remove_reason)); } } for (id, chain) in self.head_chains.iter_mut() { - if let ProcessingResult::RemoveChain = func(chain) { - to_remove.push((*id, RangeSyncType::Head)); + if let Err(remove_reason) = func(chain) { + to_remove.push((*id, RangeSyncType::Head, remove_reason)); } } let mut results = Vec::with_capacity(to_remove.len()); - for (id, sync_type) in to_remove.into_iter() { + for (id, sync_type, reason) in to_remove.into_iter() { let chain = match sync_type { RangeSyncType::Finalized => self.finalized_chains.remove(&id), RangeSyncType::Head => self.head_chains.remove(&id), }; let chain = chain.expect("Chain exists"); self.on_chain_removed(&id, chain.is_syncing()); - results.push((chain, sync_type)); + results.push((chain, sync_type, reason)); } results } @@ -144,29 +144,30 @@ impl ChainCollection { /// If the function returns `ProcessingResult::RemoveChain`, the chain is removed and returned. /// If the chain is found, its syncing type is returned, or an error otherwise. /// NOTE: `func` should not change the sync state of a chain. + #[allow(clippy::type_complexity)] pub fn call_by_id( &mut self, id: ChainId, func: F, - ) -> Result<(Option>, RangeSyncType), ()> + ) -> Result<(Option<(SyncingChain, RemoveChain)>, RangeSyncType), ()> where F: FnOnce(&mut SyncingChain) -> ProcessingResult, { if let Entry::Occupied(mut entry) = self.finalized_chains.entry(id) { // Search in our finalized chains first - if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + if let Err(remove_reason) = func(entry.get_mut()) { let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); - Ok((Some(chain), RangeSyncType::Finalized)) + Ok((Some((chain, remove_reason)), RangeSyncType::Finalized)) } else { Ok((None, RangeSyncType::Finalized)) } } else if let Entry::Occupied(mut entry) = self.head_chains.entry(id) { // Search in our head chains next - if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + if let Err(remove_reason) = func(entry.get_mut()) { let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); - Ok((Some(chain), RangeSyncType::Head)) + Ok((Some((chain, remove_reason)), RangeSyncType::Head)) } else { Ok((None, RangeSyncType::Head)) } @@ -308,11 +309,10 @@ impl ChainCollection { // update the state to a new finalized state self.state = RangeSyncState::Finalized(new_id); - if let ProcessingResult::RemoveChain = - chain.start_syncing(network, local_epoch, local_head_epoch) + if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch) { // this happens only if sending a batch over the `network` fails a lot - error!(self.log, "Chain removed while switching chains"); + error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); self.finalized_chains.remove(&new_id); self.on_chain_removed(&new_id, true); } @@ -364,11 +364,11 @@ impl ChainCollection { if !chain.is_syncing() { debug!(self.log, "New head chain started syncing"; &chain); } - if let ProcessingResult::RemoveChain = + if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch) { self.head_chains.remove(&id); - error!(self.log, "Chain removed while switching head chains"; "id" => id); + error!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason); } else { syncing_chains.push(id); } @@ -481,8 +481,8 @@ impl ChainCollection { debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); - if let ProcessingResult::RemoveChain = chain.add_peer(network, peer) { - debug!(self.log, "Chain removed after adding peer"; "chain" => id); + if let Err(remove_reason) = chain.add_peer(network, peer) { + debug!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 824a38daa..d816cfa84 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -275,11 +275,11 @@ impl RangeSync { /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type) in self + for (removed_chain, sync_type, remove_reason) in self .chains .call_all(|chain| chain.remove_peer(peer_id, network)) { - debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id(), "reason" => ?remove_reason); // update the state of the collection } self.chains.update( @@ -306,8 +306,8 @@ impl RangeSync { chain.inject_error(network, batch_id, &peer_id, request_id) }) { Ok((removed_chain, sync_type)) => { - if let Some(removed_chain) = removed_chain { - debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + if let Some((removed_chain, remove_reason)) = removed_chain { + debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id(), "reason" => ?remove_reason); // update the state of the collection self.chains.update( network,