diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index b9dc04ccd..580a8d4df 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,7 +1,6 @@ use crate::sync::RequestId; use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; -use slog::{crit, warn, Logger}; use ssz::Encode; use std::collections::HashSet; use std::hash::{Hash, Hasher}; @@ -15,6 +14,13 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; +/// Error type of a batch in a wrong state. +// Such errors should never be encountered. +pub struct WrongState(pub(super) String); + +/// Auxiliary type alias for readability. +type IsFailed = bool; + /// A segment of a chain. pub struct BatchInfo { /// Start slot of the batch. @@ -57,6 +63,14 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } + + pub fn is_failed(&self) -> IsFailed { + match self { + BatchState::Failed => true, + BatchState::Poisoned => unreachable!("Poisoned batch"), + _ => false, + } + } } impl BatchInfo { @@ -134,16 +148,20 @@ impl BatchInfo { } /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: SignedBeaconBlock, logger: &Logger) { + pub fn add_block(&mut self, block: SignedBeaconBlock) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); - self.state = BatchState::Downloading(peer, blocks, req_id) + self.state = BatchState::Downloading(peer, blocks, req_id); + Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Add block for batch in wrong state"; "state" => ?other); - self.state = other + self.state = other; + Err(WrongState(format!( + "Add block for batch in wrong state {:?}", + self.state + ))) } } } @@ -153,8 +171,7 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_completed( &mut self, - logger: &Logger, - ) -> Result> { + ) -> Result> { match self.state.poison() { BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range @@ -182,9 +199,8 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - warn!(logger, "Batch received out of range blocks"; - &self, "expected" => expected, "received" => received); - return Err(&self.state); + + return Err(Ok((expected, received, self.state.is_failed()))); } } @@ -194,15 +210,17 @@ impl BatchInfo { } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Download completed for batch in wrong state"; "state" => ?other); self.state = other; - Err(&self.state) + Err(Err(WrongState(format!( + "Download completed for batch in wrong state {:?}", + self.state + )))) } } } #[must_use = "Batch may have failed"] - pub fn download_failed(&mut self, logger: &Logger) -> &BatchState { + pub fn download_failed(&mut self) -> Result { match self.state.poison() { BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again @@ -215,13 +233,15 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - &self.state + Ok(self.state.is_failed()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Download failed for batch in wrong state"; "state" => ?other); self.state = other; - &self.state + Err(WrongState(format!( + "Download failed for batch in wrong state {:?}", + self.state + ))) } } } @@ -230,37 +250,42 @@ impl BatchInfo { &mut self, peer: PeerId, request_id: RequestId, - logger: &Logger, - ) { + ) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { self.state = BatchState::Downloading(peer, Vec::new(), request_id); + Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Starting download for batch in wrong state"; "state" => ?other); - self.state = other + self.state = other; + Err(WrongState(format!( + "Starting download for batch in wrong state {:?}", + self.state + ))) } } } - pub fn start_processing(&mut self, logger: &Logger) -> Vec> { + pub fn start_processing(&mut self) -> Result>, WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new(peer, &blocks)); - blocks + Ok(blocks) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Starting procesing batch in wrong state"; "state" => ?other); self.state = other; - vec![] + Err(WrongState(format!( + "Starting procesing batch in wrong state {:?}", + self.state + ))) } } } #[must_use = "Batch may have failed"] - pub fn processing_completed(&mut self, was_sucessful: bool, logger: &Logger) -> &BatchState { + pub fn processing_completed(&mut self, was_sucessful: bool) -> Result { match self.state.poison() { BatchState::Processing(attempt) => { self.state = if !was_sucessful { @@ -278,19 +303,21 @@ impl BatchInfo { } else { BatchState::AwaitingValidation(attempt) }; - &self.state + Ok(self.state.is_failed()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Procesing completed for batch in wrong state"; "state" => ?other); self.state = other; - &self.state + Err(WrongState(format!( + "Procesing completed for batch in wrong state: {:?}", + self.state + ))) } } } #[must_use = "Batch may have failed"] - pub fn validation_failed(&mut self, logger: &Logger) -> &BatchState { + pub fn validation_failed(&mut self) -> Result { match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); @@ -303,13 +330,15 @@ impl BatchInfo { } else { BatchState::AwaitingDownload }; - &self.state + Ok(self.state.is_failed()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { - crit!(logger, "Validation failed for batch in wrong state"; "state" => ?other); self.state = other; - &self.state + Err(WrongState(format!( + "Validation failed for batch in wrong state: {:?}", + self.state + ))) } } } @@ -370,8 +399,14 @@ impl slog::KV for BatchInfo { impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(_) => f.write_str("Processing"), - BatchState::AwaitingValidation(_) => f.write_str("AwaitingValidation"), + BatchState::Processing(Attempt { + ref peer_id, + hash: _, + }) => write!(f, "Processing({})", peer_id), + BatchState::AwaitingValidation(Attempt { + ref peer_id, + hash: _, + }) => write!(f, "AwaitingValidation({})", peer_id), BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), BatchState::AwaitingProcessing(ref peer, ref blocks) => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index aace878c8..295991365 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -26,14 +26,22 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. -#[derive(PartialEq)] #[must_use = "Should be checked, since a failed chain must be removed. A chain that requested being removed and continued is now in an inconsistent state"] -pub enum ProcessingResult { - KeepChain, - RemoveChain, +pub type ProcessingResult = Result; + +/// Reasons for removing a chain +pub enum RemoveChain { + EmptyPeerPool, + ChainCompleted, + ChainFailed(BatchId), + WrongBatchState(String), + WrongChainState(String), } +#[derive(Debug)] +pub struct KeepChain; + /// A chain identifier pub type ChainId = u64; pub type BatchId = Epoch; @@ -158,24 +166,21 @@ impl SyncingChain { // fail the batches for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { - if let BatchState::Failed = batch.download_failed(&self.log) { - return ProcessingResult::RemoveChain; - } - if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { - // drop the chain early - return ProcessingResult::RemoveChain; + if batch.download_failed()? { + return Err(RemoveChain::ChainFailed(id)); } + self.retry_batch_download(network, id)?; } else { debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => "id") + "peer" => %peer_id, "batch" => id) } } } if self.peers.is_empty() { - ProcessingResult::RemoveChain + Err(RemoveChain::EmptyPeerPool) } else { - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -202,7 +207,7 @@ impl SyncingChain { None => { debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id); // A batch might get removed when the chain advances, so this is non fatal. - return ProcessingResult::KeepChain; + return Ok(KeepChain); } Some(batch) => { // A batch could be retried without the peer failing the request (disconnecting/ @@ -210,7 +215,7 @@ impl SyncingChain { // reasons. Check that this block belongs to the expected peer, and that the // request_id matches if !batch.is_expecting_block(peer_id, &request_id) { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } batch } @@ -218,17 +223,16 @@ impl SyncingChain { if let Some(block) = beacon_block { // This is not a stream termination, simply add the block to the request - batch.add_block(block, &self.log); - ProcessingResult::KeepChain + batch.add_block(block)?; + Ok(KeepChain) } else { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches self.peers .get_mut(peer_id) - .unwrap_or_else(|| panic!("Batch is registered for the peer")) - .remove(&batch_id); + .map(|active_requests| active_requests.remove(&batch_id)); - match batch.download_completed(&self.log) { + match batch.download_completed() { Ok(received) => { let awaiting_batches = batch_id.saturating_sub( self.optimistic_start @@ -237,14 +241,16 @@ impl SyncingChain { debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches); // pre-emptively request more blocks from peers whilst we process current blocks, - if let ProcessingResult::RemoveChain = self.request_batches(network) { - return ProcessingResult::RemoveChain; - } + self.request_batches(network)?; self.process_completed_batches(network) } - Err(state) => { - if let BatchState::Failed = state { - return ProcessingResult::RemoveChain; + Err(result) => { + let (expected_boundary, received_boundary, is_failed) = result?; + warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, + "peer_id" => %peer_id, batch); + + if is_failed { + return Err(RemoveChain::ChainFailed(batch_id)); } // this batch can't be used, so we need to request it again. self.retry_batch_download(network, batch_id) @@ -262,14 +268,16 @@ impl SyncingChain { ) -> ProcessingResult { // Only process batches if this chain is Syncing, and only one at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } let batch = match self.batches.get_mut(&batch_id) { Some(batch) => batch, None => { - debug!(self.log, "Processing unknown batch"; "batch" => %batch_id); - return ProcessingResult::RemoveChain; + return Err(RemoveChain::WrongChainState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))); } }; @@ -277,7 +285,7 @@ impl SyncingChain { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = batch.start_processing(&self.log); + let blocks = batch.start_processing()?; let process_id = ProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); @@ -293,7 +301,7 @@ impl SyncingChain { // re-downloaded. self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) } else { - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -304,101 +312,96 @@ impl SyncingChain { ) -> ProcessingResult { // Only process batches if this chain is Syncing and only process one batch at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } // Find the id of the batch we are going to process. // // First try our optimistic start, if any. If this batch is ready, we process it. If the // batch has not already been completed, check the current chain target. - let optimistic_id = if let Some(epoch) = self.optimistic_start { + if let Some(epoch) = self.optimistic_start { if let Some(batch) = self.batches.get(&epoch) { let state = batch.state(); match state { BatchState::AwaitingProcessing(..) => { // this batch is ready debug!(self.log, "Processing optimistic start"; "epoch" => epoch); - Some(epoch) + return self.process_batch(network, epoch); } BatchState::Downloading(..) => { // The optimistic batch is being downloaded. We wait for this before // attempting to process other batches. - return ProcessingResult::KeepChain; + return Ok(KeepChain); } + BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Processing(_) | BatchState::AwaitingDownload - | BatchState::Failed - | BatchState::Poisoned => { + | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should // have been removed - // - Poisoned -> this is an intermediate state that should never be reached // - AwaitingDownload -> A recoverable failed batch should have been // re-requested. - crit!(self.log, "Optimistic batch indicates inconsistent chain state"; "state" => ?state); - return ProcessingResult::RemoveChain; + return Err(RemoveChain::WrongChainState(format!( + "Optimistic batch indicates inconsistent chain state: {:?}", + state + ))); } BatchState::AwaitingValidation(_) => { - // This is possible due to race conditions, and tho it would be considered - // an inconsistent state, the chain can continue. If an optimistic batch - // is successfully processed it is no longer considered an optimistic - // candidate. If the batch was empty the chain rejects it; if it was non - // empty the chain is advanced to this point (so that the old optimistic - // batch is now the processing target) - debug!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch); - None + // If an optimistic start is given to the chain after the corresponding + // batch has been requested and processed we can land here. We drop the + // optimistic candidate since we can't conclude whether the batch included + // blocks or not at this point + debug!(self.log, "Dropping optimistic candidate"; "batch" => epoch); + self.optimistic_start = None; } } - } else { - None } - } else { - None - }; + } // if the optimistic target can't be processed, check the processing target - let id = optimistic_id.or_else(|| { - if let Some(batch) = self.batches.get(&self.processing_target) { - let state = batch.state(); - match state { - BatchState::AwaitingProcessing(..) => Some(self.processing_target), - BatchState::Downloading(..) => { - // Batch is not ready, nothing to process - None - } - BatchState::Failed - | BatchState::AwaitingDownload - | BatchState::AwaitingValidation(_) - | BatchState::Processing(_) - | BatchState::Poisoned => { - // these are all inconsistent states: - // - Failed -> non recoverable batch. Chain should have beee removed - // - AwaitingDownload -> A recoverable failed batch should have been - // re-requested. - // - AwaitingValidation -> self.processing_target should have been moved - // forward - // - Processing -> `self.current_processing_batch` is None - // - Poisoned -> Intermediate state that should never be reached - unreachable!( - "Robust target batch indicates inconsistent chain state: {:?}", - state - ) + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + BatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + BatchState::Poisoned => unreachable!("Poisoned batch"), + BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have beee removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + return Err(RemoveChain::WrongChainState(format!( + "Robust target batch indicates inconsistent chain state: {:?}", + state + ))); + } + BatchState::AwaitingValidation(_) => { + // we can land here if an empty optimistic batch succeeds processing and is + // inside the download buffer (between `self.processing_target` and + // `self.to_be_downloaded`). In this case, eventually the chain advances to the + // batch (`self.processing_target` reaches this point). + debug!(self.log, "Chain encountered a robust batch awaiting validation"; "batch" => self.processing_target); + + self.processing_target += EPOCHS_PER_BATCH; + if self.to_be_downloaded <= self.processing_target { + self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH; } } - } else { - crit!(self.log, "Batch not found for current processing target"; - "epoch" => self.processing_target); - None } - }); - - // we found a batch to process - if let Some(id) = id { - self.process_batch(network, id) } else { - ProcessingResult::KeepChain + return Err(RemoveChain::WrongChainState(format!( + "Batch not found for current processing target {}", + self.processing_target + ))); } + Ok(KeepChain) } /// The block processor has completed processing a batch. This function handles the result @@ -415,12 +418,12 @@ impl SyncingChain { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); - return ProcessingResult::KeepChain; + return Ok(KeepChain); } None => { debug!(self.log, "Chain was not expecting a batch result"; "batch_epoch" => batch_id); - return ProcessingResult::KeepChain; + return Ok(KeepChain); } _ => { // batch_id matches, continue @@ -430,14 +433,14 @@ impl SyncingChain { match result { BatchProcessResult::Success(was_non_empty) => { - let batch = match self.batches.get_mut(&batch_id) { - Some(batch) => batch, - None => { - debug!(self.log, "Current processing batch not found"; "batch" => batch_id); - return ProcessingResult::RemoveChain; - } - }; - let _ = batch.processing_completed(true, &self.log); + let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { + RemoveChain::WrongChainState(format!( + "Current processing batch not found: {}", + batch_id + )) + })?; + + batch.processing_completed(true)?; // If the processed batch was not empty, we can validate previous unvalidated // blocks. if *was_non_empty { @@ -448,13 +451,11 @@ impl SyncingChain { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty if epoch == batch_id { - if let ProcessingResult::RemoveChain = self.reject_optimistic_batch( + self.reject_optimistic_batch( network, false, /* do not re-request */ "batch was empty", - ) { - return ProcessingResult::RemoveChain; - }; + )?; } } @@ -463,35 +464,31 @@ impl SyncingChain { // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { // chain is completed - debug!(self.log, "Chain is complete"); - ProcessingResult::RemoveChain + Err(RemoveChain::ChainCompleted) } else { // chain is not completed // attempt to request more batches - if let ProcessingResult::RemoveChain = self.request_batches(network) { - return ProcessingResult::RemoveChain; - } + self.request_batches(network)?; // attempt to process more batches self.process_completed_batches(network) } } BatchProcessResult::Failed(imported_blocks) => { - let (batch, peer) = match self.batches.get_mut(&batch_id) { - Some(batch) => match batch.current_peer().cloned() { - Some(peer) => (batch, peer), - None => { - debug!(self.log, "Current processing has no peer"; "batch" => batch_id); - return ProcessingResult::RemoveChain; - } - }, - None => { - debug!(self.log, "Current processing batch not found"; "batch" => batch_id); - return ProcessingResult::RemoveChain; - } - }; + let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { + RemoveChain::WrongChainState(format!( + "Batch not found for current processing target {}", + batch_id + )) + })?; + let peer = batch.current_peer().cloned().ok_or_else(|| { + RemoveChain::WrongBatchState(format!( + "Processing target is in wrong state: {:?}", + batch.state(), + )) + })?; debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - if let BatchState::Failed = batch.processing_completed(false, &self.log) { + if batch.processing_completed(false)? { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches @@ -506,7 +503,7 @@ impl SyncingChain { for (peer, _) in self.peers.drain() { network.report_peer(peer, action); } - ProcessingResult::RemoveChain + Err(RemoveChain::ChainFailed(batch_id)) } else { // chain can continue. Check if it can be moved forward if *imported_blocks { @@ -545,7 +542,7 @@ impl SyncingChain { } } - ProcessingResult::KeepChain + Ok(KeepChain) } /// Removes any batches previous to the given `validating_epoch` and updates the current @@ -619,13 +616,12 @@ impl SyncingChain { ), BatchState::AwaitingProcessing(..) => {} BatchState::Processing(_) => { - debug_assert_eq!( - id, - self.current_processing_batch.expect( - "A batch in a processing state means the chain is processing it" - ) - ); - self.current_processing_batch = None; + debug!(self.log, "Advancing chain while processing a batch"; "batch" => id, batch); + if let Some(processing_id) = self.current_processing_batch { + if id <= processing_id { + self.current_processing_batch = None; + } + } } } } @@ -673,15 +669,9 @@ impl SyncingChain { // If this batch is an optimistic batch, we reject this epoch as an optimistic // candidate and try to re download it if epoch == batch_id { - if let ProcessingResult::RemoveChain = - self.reject_optimistic_batch(network, true, "batch was invalid") - { - return ProcessingResult::RemoveChain; - } else { - // since this is the optimistic batch, we can't consider previous batches as - // invalid. - return ProcessingResult::KeepChain; - } + return self.reject_optimistic_batch(network, true, "batch was invalid"); + // since this is the optimistic batch, we can't consider previous batches as + // invalid. } } // this is our robust `processing_target`. All previous batches must be awaiting @@ -689,9 +679,9 @@ impl SyncingChain { let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { - if let BatchState::Failed = batch.validation_failed(&self.log) { + if batch.validation_failed()? { // remove the chain early - return ProcessingResult::RemoveChain; + return Err(RemoveChain::ChainFailed(batch_id)); } redownload_queue.push(*id); } @@ -701,9 +691,7 @@ impl SyncingChain { self.processing_target = self.start_epoch; for id in redownload_queue { - if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { - return ProcessingResult::RemoveChain; - } + self.retry_batch_download(network, id)?; } // finally, re-request the failed batch. self.retry_batch_download(network, batch_id) @@ -746,9 +734,7 @@ impl SyncingChain { self.state = ChainSyncingState::Syncing; // begin requesting blocks from the peer pool, until all peers are exhausted. - if let ProcessingResult::RemoveChain = self.request_batches(network) { - return ProcessingResult::RemoveChain; - } + self.request_batches(network)?; // start processing batches if needed self.process_completed_batches(network) @@ -770,7 +756,7 @@ impl SyncingChain { // Either new or not, this peer is idle, try to request more batches self.request_batches(network) } else { - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -789,19 +775,19 @@ impl SyncingChain { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer if !batch.is_expecting_block(peer_id, &request_id) { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); if let Some(active_requests) = self.peers.get_mut(peer_id) { active_requests.remove(&batch_id); } - if let BatchState::Failed = batch.download_failed(&self.log) { - return ProcessingResult::RemoveChain; + if batch.download_failed()? { + return Err(RemoveChain::ChainFailed(batch_id)); } self.retry_batch_download(network, batch_id) } else { // this could be an error for an old batch, removed when the chain advances - ProcessingResult::KeepChain + Ok(KeepChain) } } @@ -813,7 +799,7 @@ impl SyncingChain { ) -> ProcessingResult { let batch = match self.batches.get_mut(&batch_id) { Some(batch) => batch, - None => return ProcessingResult::KeepChain, + None => return Ok(KeepChain), }; // Find a peer to request the batch @@ -834,7 +820,7 @@ impl SyncingChain { self.send_batch(network, batch_id, peer) } else { // If we are here the chain has no more peers - ProcessingResult::RemoveChain + Err(RemoveChain::EmptyPeerPool) } } @@ -850,7 +836,7 @@ impl SyncingChain { match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { Ok(request_id) => { // inform the batch about the new request - batch.start_downloading_from_peer(peer.clone(), request_id, &self.log); + batch.start_downloading_from_peer(peer.clone(), request_id)?; if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -866,21 +852,26 @@ impl SyncingChain { .get_mut(&peer) .map(|requests| { requests.insert(batch_id); - ProcessingResult::KeepChain + Ok(KeepChain) }) - .unwrap_or(ProcessingResult::RemoveChain); + .unwrap_or_else(|| { + Err(RemoveChain::WrongChainState(format!( + "Sending batch to a peer that is not in the chain: {}", + peer + ))) + }); } Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => e, &batch); // register the failed download and check if the batch can be retried - batch.start_downloading_from_peer(peer.clone(), 1, &self.log); // fake request_id is not relevant + batch.start_downloading_from_peer(peer.clone(), 1)?; // fake request_id is not relevant self.peers .get_mut(&peer) .map(|request| request.remove(&batch_id)); - if let BatchState::Failed = batch.download_failed(&self.log) { - return ProcessingResult::RemoveChain; + if batch.download_failed()? { + return Err(RemoveChain::ChainFailed(batch_id)); } else { return self.retry_batch_download(network, batch_id); } @@ -888,7 +879,7 @@ impl SyncingChain { } } - ProcessingResult::KeepChain + Ok(KeepChain) } /// Returns true if this chain is currently syncing. @@ -906,7 +897,7 @@ impl SyncingChain { network: &mut SyncNetworkContext, ) -> ProcessingResult { if !matches!(self.state, ChainSyncingState::Syncing) { - return ProcessingResult::KeepChain; + return Ok(KeepChain); } // find the next pending batch and request it from the peer @@ -933,27 +924,23 @@ impl SyncingChain { if let Some(peer) = idle_peers.pop() { let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH); self.batches.insert(epoch, optimistic_batch); - if let ProcessingResult::RemoveChain = self.send_batch(network, epoch, peer) { - return ProcessingResult::RemoveChain; - } + self.send_batch(network, epoch, peer)?; } } - return ProcessingResult::KeepChain; + return Ok(KeepChain); } while let Some(peer) = idle_peers.pop() { if let Some(batch_id) = self.include_next_batch() { // send the batch - if let ProcessingResult::RemoveChain = self.send_batch(network, batch_id, peer) { - return ProcessingResult::RemoveChain; - } + self.send_batch(network, batch_id, peer)?; } else { // No more batches, simply stop - return ProcessingResult::KeepChain; + return Ok(KeepChain); } } - ProcessingResult::KeepChain + Ok(KeepChain) } /// Creates the next required batch from the chain. If there are no more batches required, @@ -1040,3 +1027,23 @@ impl slog::KV for SyncingChain { slog::Result::Ok(()) } } + +use super::batch::WrongState as WrongBatchState; +impl From for RemoveChain { + fn from(err: WrongBatchState) -> Self { + RemoveChain::WrongBatchState(err.0) + } +} + +impl std::fmt::Debug for RemoveChain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // needed to avoid Debuggins Strings + match self { + RemoveChain::ChainCompleted => f.write_str("ChainCompleted"), + RemoveChain::EmptyPeerPool => f.write_str("EmptyPeerPool"), + RemoveChain::ChainFailed(batch) => write!(f, "ChainFailed(batch: {} )", batch), + RemoveChain::WrongBatchState(reason) => write!(f, "WrongBatchState: {}", reason), + RemoveChain::WrongChainState(reason) => write!(f, "WrongChainState: {}", reason), + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 4af972ceb..c458cc847 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,7 +3,7 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. -use super::chain::{ChainId, ProcessingResult, SyncingChain}; +use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; @@ -108,33 +108,33 @@ impl ChainCollection { /// Calls `func` on every chain of the collection. If the result is /// `ProcessingResult::RemoveChain`, the chain is removed and returned. /// NOTE: `func` must not change the syncing state of a chain. - pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType)> + pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType, RemoveChain)> where F: FnMut(&mut SyncingChain) -> ProcessingResult, { let mut to_remove = Vec::new(); for (id, chain) in self.finalized_chains.iter_mut() { - if let ProcessingResult::RemoveChain = func(chain) { - to_remove.push((*id, RangeSyncType::Finalized)); + if let Err(remove_reason) = func(chain) { + to_remove.push((*id, RangeSyncType::Finalized, remove_reason)); } } for (id, chain) in self.head_chains.iter_mut() { - if let ProcessingResult::RemoveChain = func(chain) { - to_remove.push((*id, RangeSyncType::Head)); + if let Err(remove_reason) = func(chain) { + to_remove.push((*id, RangeSyncType::Head, remove_reason)); } } let mut results = Vec::with_capacity(to_remove.len()); - for (id, sync_type) in to_remove.into_iter() { + for (id, sync_type, reason) in to_remove.into_iter() { let chain = match sync_type { RangeSyncType::Finalized => self.finalized_chains.remove(&id), RangeSyncType::Head => self.head_chains.remove(&id), }; let chain = chain.expect("Chain exists"); self.on_chain_removed(&id, chain.is_syncing()); - results.push((chain, sync_type)); + results.push((chain, sync_type, reason)); } results } @@ -144,29 +144,30 @@ impl ChainCollection { /// If the function returns `ProcessingResult::RemoveChain`, the chain is removed and returned. /// If the chain is found, its syncing type is returned, or an error otherwise. /// NOTE: `func` should not change the sync state of a chain. + #[allow(clippy::type_complexity)] pub fn call_by_id( &mut self, id: ChainId, func: F, - ) -> Result<(Option>, RangeSyncType), ()> + ) -> Result<(Option<(SyncingChain, RemoveChain)>, RangeSyncType), ()> where F: FnOnce(&mut SyncingChain) -> ProcessingResult, { if let Entry::Occupied(mut entry) = self.finalized_chains.entry(id) { // Search in our finalized chains first - if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + if let Err(remove_reason) = func(entry.get_mut()) { let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); - Ok((Some(chain), RangeSyncType::Finalized)) + Ok((Some((chain, remove_reason)), RangeSyncType::Finalized)) } else { Ok((None, RangeSyncType::Finalized)) } } else if let Entry::Occupied(mut entry) = self.head_chains.entry(id) { // Search in our head chains next - if let ProcessingResult::RemoveChain = func(entry.get_mut()) { + if let Err(remove_reason) = func(entry.get_mut()) { let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); - Ok((Some(chain), RangeSyncType::Head)) + Ok((Some((chain, remove_reason)), RangeSyncType::Head)) } else { Ok((None, RangeSyncType::Head)) } @@ -308,11 +309,10 @@ impl ChainCollection { // update the state to a new finalized state self.state = RangeSyncState::Finalized(new_id); - if let ProcessingResult::RemoveChain = - chain.start_syncing(network, local_epoch, local_head_epoch) + if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch) { // this happens only if sending a batch over the `network` fails a lot - error!(self.log, "Chain removed while switching chains"); + error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); self.finalized_chains.remove(&new_id); self.on_chain_removed(&new_id, true); } @@ -364,11 +364,11 @@ impl ChainCollection { if !chain.is_syncing() { debug!(self.log, "New head chain started syncing"; &chain); } - if let ProcessingResult::RemoveChain = + if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch) { self.head_chains.remove(&id); - error!(self.log, "Chain removed while switching head chains"; "id" => id); + error!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason); } else { syncing_chains.push(id); } @@ -481,8 +481,8 @@ impl ChainCollection { debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); - if let ProcessingResult::RemoveChain = chain.add_peer(network, peer) { - debug!(self.log, "Chain removed after adding peer"; "chain" => id); + if let Err(remove_reason) = chain.add_peer(network, peer) { + debug!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 824a38daa..d816cfa84 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -275,11 +275,11 @@ impl RangeSync { /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type) in self + for (removed_chain, sync_type, remove_reason) in self .chains .call_all(|chain| chain.remove_peer(peer_id, network)) { - debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id(), "reason" => ?remove_reason); // update the state of the collection } self.chains.update( @@ -306,8 +306,8 @@ impl RangeSync { chain.inject_error(network, batch_id, &peer_id, request_id) }) { Ok((removed_chain, sync_type)) => { - if let Some(removed_chain) = removed_chain { - debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); + if let Some((removed_chain, remove_reason)) = removed_chain { + debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id(), "reason" => ?remove_reason); // update the state of the collection self.chains.update( network,