From f4ffa9e0b4acbe3cc3b50f9eeeb6b3d87e58a1a5 Mon Sep 17 00:00:00 2001 From: Divma Date: Fri, 12 Aug 2022 00:56:38 +0000 Subject: [PATCH] Handle processing results of non faulty batches (#3439) ## Issue Addressed Solves #3390 So after checking some logs @pawanjay176 got, we conclude that this happened because we blacklisted a chain after trying it "too much". Now here, in all occurrences it seems that "too much" means we got too many download failures. This happened very slowly, exactly because the batch is allowed to stay alive for very long times after not counting penalties when the ee is offline. The error here then was not that the batch failed because of offline ee errors, but that we blacklisted a chain because of download errors, which we can't pin on the chain but on the peer. This PR fixes that. ## Proposed Changes Adds a missing piece of logic so that if a chain fails for errors that can't be attributed to an objectively bad behavior from the peer, it is not blacklisted. The issue at hand occurred when new peers arrived claiming a head that had wrongfully blacklisted, even if the original peers participating in the chain were not penalized. Another notable change is that we need to consider a batch invalid if it processed correctly but its next non empty batch fails processing. Now since a batch can fail processing in non empty ways, there is no need to mark as invalid previous batches. Improves some logging as well. ## Additional Info We should do this regardless of pausing sync on ee offline/unsynced state. This is because I think it's almost impossible to ensure a processing result will reach in a predictable order with a synced notification from the ee. Doing this handles what I think are inevitable data races when we actually pause sync This also fixes a return that reports which batch failed and caused us some confusion checking the logs --- .../network/src/beacon_processor/mod.rs | 4 +- .../src/beacon_processor/worker/mod.rs | 2 +- .../beacon_processor/worker/sync_methods.rs | 65 +++--- .../network/src/sync/backfill_sync/mod.rs | 131 ++++++------ .../network/src/sync/block_lookups/mod.rs | 31 +-- .../network/src/sync/block_lookups/tests.rs | 20 +- beacon_node/network/src/sync/manager.rs | 16 +- beacon_node/network/src/sync/mod.rs | 2 +- .../network/src/sync/range_sync/batch.rs | 101 +++++---- .../network/src/sync/range_sync/chain.rs | 194 ++++++++++-------- .../network/src/sync/range_sync/mod.rs | 2 +- .../network/src/sync/range_sync/range.rs | 4 +- 12 files changed, 298 insertions(+), 274 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index a08f34f70..e9a115904 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -76,9 +76,7 @@ mod work_reprocessing_queue; mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; -pub use worker::{ - ChainSegmentProcessId, FailureMode, GossipAggregatePackage, GossipAttestationPackage, -}; +pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index 04147245e..f907c49b7 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -10,7 +10,7 @@ mod rpc_methods; mod sync_methods; pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage}; -pub use sync_methods::{ChainSegmentProcessId, FailureMode}; +pub use sync_methods::ChainSegmentProcessId; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 3b2429ee9..760896e0e 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -34,15 +34,6 @@ struct ChainSegmentFailed { message: String, /// Used to penalize peers. peer_action: Option, - /// Failure mode - mode: FailureMode, -} - -/// Represents if a block processing failure was on the consensus or execution side. -#[derive(Debug)] -pub enum FailureMode { - ExecutionLayer { pause_sync: bool }, - ConsensusLayer, } impl Worker { @@ -150,7 +141,9 @@ impl Worker { "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); - BatchProcessResult::Success(sent_blocks > 0) + BatchProcessResult::Success { + was_non_empty: sent_blocks > 0, + } } (imported_blocks, Err(e)) => { debug!(self.log, "Batch processing failed"; @@ -161,11 +154,12 @@ impl Worker { "imported_blocks" => imported_blocks, "error" => %e.message, "service" => "sync"); - - BatchProcessResult::Failed { - imported_blocks: imported_blocks > 0, - peer_action: e.peer_action, - mode: e.mode, + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: imported_blocks > 0, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } } @@ -184,7 +178,9 @@ impl Worker { "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); - BatchProcessResult::Success(sent_blocks > 0) + BatchProcessResult::Success { + was_non_empty: sent_blocks > 0, + } } (_, Err(e)) => { debug!(self.log, "Backfill batch processing failed"; @@ -193,10 +189,12 @@ impl Worker { "last_block_slot" => end_slot, "error" => %e.message, "service" => "sync"); - BatchProcessResult::Failed { - imported_blocks: false, - peer_action: e.peer_action, - mode: e.mode, + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: false, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } } @@ -216,15 +214,19 @@ impl Worker { { (imported_blocks, Err(e)) => { debug!(self.log, "Parent lookup failed"; "error" => %e.message); - BatchProcessResult::Failed { - imported_blocks: imported_blocks > 0, - peer_action: e.peer_action, - mode: e.mode, + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: imported_blocks > 0, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } (imported_blocks, Ok(_)) => { debug!(self.log, "Parent lookup processed successfully"); - BatchProcessResult::Success(imported_blocks > 0) + BatchProcessResult::Success { + was_non_empty: imported_blocks > 0, + } } } } @@ -307,7 +309,6 @@ impl Worker { message: String::from("mismatched_block_root"), // The peer is faulty if they send blocks with bad roots. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::InvalidSignature @@ -322,7 +323,6 @@ impl Worker { message: "invalid_signature".into(), // The peer is faulty if they bad signatures. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::ValidatorPubkeyCacheTimeout => { @@ -336,7 +336,6 @@ impl Worker { message: "pubkey_cache_timeout".into(), // This is an internal error, do not penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::NoAnchorInfo => { @@ -347,7 +346,6 @@ impl Worker { // There is no need to do a historical sync, this is not a fault of // the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::IndexOutOfBounds => { @@ -360,7 +358,6 @@ impl Worker { message: String::from("logic_error"), // This should never occur, don't penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::BlockOutOfRange { .. } => { @@ -373,7 +370,6 @@ impl Worker { message: String::from("unexpected_error"), // This should never occur, don't penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } }, @@ -383,7 +379,6 @@ impl Worker { message: format!("{:?}", other), // This is an internal error, don't penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } }; @@ -404,7 +399,6 @@ impl Worker { message: format!("Block has an unknown parent: {}", block.parent_root()), // Peers are faulty if they send non-sequential blocks. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, }) } BlockError::BlockIsAlreadyKnown => { @@ -442,7 +436,6 @@ impl Worker { ), // Peers are faulty if they send blocks from the future. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, }) } BlockError::WouldRevertFinalizedSlot { .. } => { @@ -464,7 +457,6 @@ impl Worker { message: format!("Internal error whilst processing block: {:?}", e), // Do not penalize peers for internal errors. peer_action: None, - mode: FailureMode::ConsensusLayer, }) } ref err @ BlockError::ExecutionPayloadError(ref epe) => { @@ -480,7 +472,6 @@ impl Worker { message: format!("Execution layer offline. Reason: {:?}", err), // Do not penalize peers for internal errors. peer_action: None, - mode: FailureMode::ExecutionLayer { pause_sync: true }, }) } else { debug!(self.log, @@ -493,7 +484,6 @@ impl Worker { err ), peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ExecutionLayer { pause_sync: false }, }) } } @@ -508,7 +498,6 @@ impl Worker { message: format!("Peer sent invalid block. Reason: {:?}", other), // Do not penalize peers for internal errors. peer_action: None, - mode: FailureMode::ConsensusLayer, }) } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 7ff640065..6767350ce 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,10 +8,12 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode, WorkEvent as BeaconWorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::sync::manager::{BatchProcessResult, Id}; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchProcessingResult, BatchState}; +use crate::sync::range_sync::{ + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, +}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; @@ -324,10 +326,10 @@ impl BackFillSync { for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { match batch.download_failed(false) { - Ok(true) => { + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(id))?; } - Ok(false) => {} + Ok(BatchOperationOutcome::Continue) => {} Err(e) => { self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; } @@ -371,8 +373,10 @@ impl BackFillSync { } match batch.download_failed(true) { Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)), - Ok(true) => self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)), - Ok(false) => self.retry_batch_download(network, batch_id), + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)) + } + Ok(BatchOperationOutcome::Continue) => self.retry_batch_download(network, batch_id), } } else { // this could be an error for an old batch, removed when the chain advances @@ -439,7 +443,7 @@ impl BackFillSync { self.process_completed_batches(network) } Err(result) => { - let (expected_boundary, received_boundary, is_failed) = match result { + let (expected_boundary, received_boundary, outcome) = match result { Err(e) => { return self .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) @@ -450,7 +454,7 @@ impl BackFillSync { warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, "peer_id" => %peer_id, batch); - if is_failed { + if let BatchOperationOutcome::Failed { blacklist: _ } = outcome { error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary); return self .fail_sync(BackFillError::BatchDownloadFailed(batch_id)) @@ -547,16 +551,7 @@ impl BackFillSync { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result( - network, - batch_id, - &BatchProcessResult::Failed { - imported_blocks: false, - // The beacon processor queue is full, no need to penalize the peer. - peer_action: None, - mode: FailureMode::ConsensusLayer, - }, - ) + self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) } else { Ok(ProcessResult::Successful) } @@ -575,7 +570,7 @@ impl BackFillSync { // The first two cases are possible in regular sync, should not occur in backfill, but we // keep this logic for handling potential processing race conditions. // result - match &self.current_processing_batch { + let batch = match &self.current_processing_batch { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); @@ -589,13 +584,9 @@ impl BackFillSync { _ => { // batch_id matches, continue self.current_processing_batch = None; - } - } - match result { - BatchProcessResult::Success(was_non_empty) => { - let batch = match self.batches.get_mut(&batch_id) { - Some(v) => v, + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, None => { // This is an error. Fail the sync algorithm. return self @@ -605,8 +596,27 @@ impl BackFillSync { ))) .map(|_| ProcessResult::Successful); } - }; + } + } + }; + let peer = match batch.current_peer() { + Some(v) => *v, + None => { + return self + .fail_sync(BackFillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + )) + .map(|_| ProcessResult::Successful) + } + }; + + debug!(self.log, "Backfill batch processed"; "result" => ?result, &batch, + "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); + + match result { + BatchProcessResult::Success { was_non_empty } => { if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } @@ -636,45 +646,17 @@ impl BackFillSync { self.process_completed_batches(network) } } - BatchProcessResult::Failed { + BatchProcessResult::FaultyFailure { imported_blocks, - peer_action, - mode: _, + penalty, } => { - let batch = match self.batches.get_mut(&batch_id) { - Some(v) => v, - None => { - return self - .fail_sync(BackFillError::InvalidSyncState(format!( - "Batch not found for current processing target {}", - batch_id - ))) - .map(|_| ProcessResult::Successful) - } - }; - - let peer = match batch.current_peer() { - Some(v) => *v, - None => { - return self - .fail_sync(BackFillError::BatchInvalidState( - batch_id, - String::from("Peer does not exist"), - )) - .map(|_| ProcessResult::Successful) - } - }; - debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, - "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - match batch.processing_completed(BatchProcessingResult::Failed { - count_attempt: peer_action.is_some(), - }) { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { Err(e) => { // Batch was in the wrong state self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) .map(|_| ProcessResult::Successful) } - Ok(true) => { + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers are sending invalid batches @@ -683,23 +665,18 @@ impl BackFillSync { warn!( self.log, "Backfill batch failed to download. Penalizing peers"; - "score_adjustment" => %peer_action - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "None".into()), + "score_adjustment" => %penalty, "batch_epoch"=> batch_id ); - if let Some(peer_action) = peer_action { - for peer in self.participating_peers.drain() { - network.report_peer(peer, *peer_action, "backfill_batch_failed"); - } + for peer in self.participating_peers.drain() { + network.report_peer(peer, *penalty, "backfill_batch_failed"); } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) .map(|_| ProcessResult::Successful) } - Ok(false) => { + Ok(BatchOperationOutcome::Continue) => { // chain can continue. Check if it can be progressed if *imported_blocks { // At least one block was successfully verified and imported, then we can be sure all @@ -713,6 +690,14 @@ impl BackFillSync { } } } + BatchProcessResult::NonFaultyFailure => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; + } + self.retry_batch_download(network, batch_id) + .map(|_| ProcessResult::Successful) + } } } @@ -905,11 +890,11 @@ impl BackFillSync { .validation_failed() .map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))? { - true => { + BatchOperationOutcome::Failed { blacklist: _ } => { // Batch has failed and cannot be redownloaded. return self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)); } - false => { + BatchOperationOutcome::Continue => { redownload_queue.push(*id); } } @@ -1010,8 +995,12 @@ impl BackFillSync { Err(e) => { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? } - Ok(true) => self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?, - Ok(false) => return self.retry_batch_download(network, batch_id), + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.retry_batch_download(network, batch_id) + } } } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 2aa4acdb5..9f2a5fdce 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; use tokio::sync::mpsc; -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode, WorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; use self::{ @@ -610,35 +610,26 @@ impl BlockLookups { chain_hash ); #[cfg(not(debug_assertions))] - return crit!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); + return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); match result { - BatchProcessResult::Success(_) => { + BatchProcessResult::Success { .. } => { // nothing to do. } - BatchProcessResult::Failed { + BatchProcessResult::FaultyFailure { imported_blocks: _, - peer_action, - mode, + penalty, } => { - if let FailureMode::ExecutionLayer { pause_sync: _ } = mode { - debug!( - self.log, - "Chain segment processing failed. Execution layer is offline"; - "chain_hash" => %chain_hash, - "error" => ?mode - ); - } else { - self.failed_chains.insert(parent_lookup.chain_hash()); - if let Some(peer_action) = peer_action { - for &peer_id in parent_lookup.used_peers() { - cx.report_peer(peer_id, peer_action, "parent_chain_failure") - } - } + self.failed_chains.insert(parent_lookup.chain_hash()); + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, penalty, "parent_chain_failure") } } + BatchProcessResult::NonFaultyFailure => { + // We might request this chain again if there is need but otherwise, don't try again + } } metrics::set_gauge( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index b3afadda2..2f2720fd1 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -284,7 +284,10 @@ fn test_parent_lookup_happy_path() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } @@ -318,7 +321,10 @@ fn test_parent_lookup_wrong_response() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } @@ -347,7 +353,10 @@ fn test_parent_lookup_empty_response() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } @@ -375,7 +384,10 @@ fn test_parent_lookup_rpc_failure() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fe27a33c5..64755300c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -38,7 +38,7 @@ use super::block_lookups::BlockLookups; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode, WorkEvent as BeaconWorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; @@ -139,13 +139,15 @@ pub enum BlockProcessResult { #[derive(Debug)] pub enum BatchProcessResult { /// The batch was completed successfully. It carries whether the sent batch contained blocks. - Success(bool), - /// The batch processing failed. It carries whether the processing imported any block. - Failed { - imported_blocks: bool, - peer_action: Option, - mode: FailureMode, + Success { + was_non_empty: bool, }, + /// The batch processing failed. It carries whether the processing imported any block. + FaultyFailure { + imported_blocks: bool, + penalty: PeerAction, + }, + NonFaultyFailure, } /// The primary object for handling and driving all the current syncing logic. It maintains the diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 7a891de72..dc18a5c98 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -9,4 +9,4 @@ mod peer_sync_info; mod range_sync; pub use manager::{BatchProcessResult, SyncMessage}; -pub use range_sync::ChainId; +pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index c642d81db..3eee7223d 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -70,12 +70,16 @@ impl BatchConfig for RangeSyncBatchConfig { // Such errors should never be encountered. pub struct WrongState(pub(crate) String); -/// Auxiliary type alias for readability. -type IsFailed = bool; +/// After batch operations, we use this to communicate whether a batch can continue or not +pub enum BatchOperationOutcome { + Continue, + Failed { blacklist: bool }, +} pub enum BatchProcessingResult { Success, - Failed { count_attempt: bool }, + FaultyFailure, + NonFaultyFailure, } /// A segment of a chain. @@ -87,7 +91,7 @@ pub struct BatchInfo { /// The `Attempts` that have been made and failed to send us this batch. failed_processing_attempts: Vec, /// Number of processing attempts that have failed but we do not count. - other_failed_processing_attempts: u8, + non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. failed_download_attempts: Vec, /// State of the batch. @@ -124,14 +128,6 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } - - pub fn is_failed(&self) -> IsFailed { - match self { - BatchState::Failed => true, - BatchState::Poisoned => unreachable!("Poisoned batch"), - _ => false, - } - } } impl BatchInfo { @@ -151,7 +147,7 @@ impl BatchInfo { end_slot, failed_processing_attempts: Vec::new(), failed_download_attempts: Vec::new(), - other_failed_processing_attempts: 0, + non_faulty_processing_attempts: 0, state: BatchState::AwaitingDownload, marker: std::marker::PhantomData, } @@ -175,7 +171,16 @@ impl BatchInfo { peers } - /// Verifies if an incomming block belongs to this batch. + /// Return the number of times this batch has failed downloading and failed processing, in this + /// order. + pub fn failed_attempts(&self) -> (usize, usize) { + ( + self.failed_download_attempts.len(), + self.failed_processing_attempts.len(), + ) + } + + /// Verifies if an incoming block belongs to this batch. pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool { if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { return peer_id == expected_peer && expected_id == request_id; @@ -203,6 +208,20 @@ impl BatchInfo { } } + /// After different operations over a batch, this could be in a state that allows it to + /// continue, or in failed state. When the batch has failed, we check if it did mainly due to + /// processing failures. In this case the batch is considered failed and faulty. + pub fn outcome(&self) -> BatchOperationOutcome { + match self.state { + BatchState::Poisoned => unreachable!("Poisoned batch"), + BatchState::Failed => BatchOperationOutcome::Failed { + blacklist: self.failed_processing_attempts.len() + > self.failed_download_attempts.len(), + }, + _ => BatchOperationOutcome::Continue, + } + } + pub fn state(&self) -> &BatchState { &self.state } @@ -235,7 +254,10 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_completed( &mut self, - ) -> Result> { + ) -> Result< + usize, /* Received blocks */ + Result<(Slot, Slot, BatchOperationOutcome), WrongState>, + > { match self.state.poison() { BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range @@ -264,7 +286,7 @@ impl BatchInfo { BatchState::AwaitingDownload }; - return Err(Ok((expected, received, self.state.is_failed()))); + return Err(Ok((expected, received, self.outcome()))); } } @@ -289,7 +311,10 @@ impl BatchInfo { /// THe `mark_failed` parameter, when set to false, does not increment the failed attempts of /// this batch and register the peer, rather attempts a re-download. #[must_use = "Batch may have failed"] - pub fn download_failed(&mut self, mark_failed: bool) -> Result { + pub fn download_failed( + &mut self, + mark_failed: bool, + ) -> Result { match self.state.poison() { BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again @@ -304,7 +329,7 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - Ok(self.state.is_failed()) + Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -359,32 +384,31 @@ impl BatchInfo { pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, - ) -> Result { + ) -> Result { match self.state.poison() { BatchState::Processing(attempt) => { self.state = match procesing_result { BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), - BatchProcessingResult::Failed { count_attempt } => { - if count_attempt { - // register the failed attempt - self.failed_processing_attempts.push(attempt); + BatchProcessingResult::FaultyFailure => { + // register the failed attempt + self.failed_processing_attempts.push(attempt); - // check if the batch can be downloaded again - if self.failed_processing_attempts.len() - >= B::max_batch_processing_attempts() as usize - { - BatchState::Failed - } else { - BatchState::AwaitingDownload - } + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= B::max_batch_processing_attempts() as usize + { + BatchState::Failed } else { - self.other_failed_processing_attempts = - self.other_failed_processing_attempts.saturating_add(1); BatchState::AwaitingDownload } } + BatchProcessingResult::NonFaultyFailure => { + self.non_faulty_processing_attempts = + self.non_faulty_processing_attempts.saturating_add(1); + BatchState::AwaitingDownload + } }; - Ok(self.state.is_failed()) + Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -398,7 +422,7 @@ impl BatchInfo { } #[must_use = "Batch may have failed"] - pub fn validation_failed(&mut self) -> Result { + pub fn validation_failed(&mut self) -> Result { match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); @@ -411,7 +435,7 @@ impl BatchInfo { } else { BatchState::AwaitingDownload }; - Ok(self.state.is_failed()) + Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -472,10 +496,7 @@ impl slog::KV for BatchInfo { )?; serializer.emit_usize("downloaded", self.failed_download_attempts.len())?; serializer.emit_usize("processed", self.failed_processing_attempts.len())?; - serializer.emit_u8( - "processed_no_penalty", - self.other_failed_processing_attempts, - )?; + serializer.emit_u8("processed_no_penalty", self.non_faulty_processing_attempts)?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; slog::Result::Ok(()) } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index caa08165a..a54105f5c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,7 +1,8 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; -use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode}; -use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; +use crate::sync::{ + manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, +}; use beacon_chain::{BeaconChainTypes, CountUnrealized}; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; @@ -37,7 +38,11 @@ pub type ProcessingResult = Result; pub enum RemoveChain { EmptyPeerPool, ChainCompleted, - ChainFailed(BatchId), + /// A chain has failed. This boolean signals whether the chain should be blacklisted. + ChainFailed { + blacklist: bool, + failing_batch: BatchId, + }, WrongBatchState(String), WrongChainState(String), } @@ -187,8 +192,13 @@ impl SyncingChain { // fail the batches for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { - if batch.download_failed(true)? { - return Err(RemoveChain::ChainFailed(id)); + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(true)? + { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: id, + }); } self.retry_batch_download(network, id)?; } else { @@ -265,12 +275,15 @@ impl SyncingChain { self.process_completed_batches(network) } Err(result) => { - let (expected_boundary, received_boundary, is_failed) = result?; + let (expected_boundary, received_boundary, outcome) = result?; warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, "peer_id" => %peer_id, batch); - if is_failed { - return Err(RemoveChain::ChainFailed(batch_id)); + if let BatchOperationOutcome::Failed { blacklist } = outcome { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }); } // this batch can't be used, so we need to request it again. self.retry_batch_download(network, batch_id) @@ -324,15 +337,7 @@ impl SyncingChain { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result( - network, - batch_id, - &BatchProcessResult::Failed { - imported_blocks: false, - peer_action: None, - mode: FailureMode::ConsensusLayer, - }, - ) + self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) } else { Ok(KeepChain) } @@ -448,7 +453,7 @@ impl SyncingChain { ) -> ProcessingResult { // the first two cases are possible if the chain advances while waiting for a processing // result - match &self.current_processing_batch { + let batch = match &self.current_processing_batch { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); @@ -462,22 +467,35 @@ impl SyncingChain { _ => { // batch_id matches, continue self.current_processing_batch = None; - } - } - - match result { - BatchProcessResult::Success(was_non_empty) => { - let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { + self.batches.get_mut(&batch_id).ok_or_else(|| { RemoveChain::WrongChainState(format!( "Current processing batch not found: {}", batch_id )) - })?; + })? + } + }; + let peer = batch.current_peer().cloned().ok_or_else(|| { + RemoveChain::WrongBatchState(format!( + "Processing target is in wrong state: {:?}", + batch.state(), + )) + })?; + + // Log the process result and the batch for debugging purposes. + debug!(self.log, "Batch processing result"; "result" => ?result, &batch, + "batch_epoch" => batch_id, "client" => %network.client_type(&peer)); + + // We consider three cases. Batch was successfully processed, Batch failed processing due + // to a faulty peer, or batch failed processing but the peer can't be deemed faulty. + match result { + BatchProcessResult::Success { was_non_empty } => { batch.processing_completed(BatchProcessingResult::Success)?; - // If the processed batch was not empty, we can validate previous unvalidated - // blocks. + if *was_non_empty { + // If the processed batch was not empty, we can validate previous unvalidated + // blocks. self.advance_chain(network, batch_id); // we register so that on chain switching we don't try it again self.attempted_optimistic_starts.insert(batch_id); @@ -507,64 +525,56 @@ impl SyncingChain { self.process_completed_batches(network) } } - BatchProcessResult::Failed { + BatchProcessResult::FaultyFailure { imported_blocks, - peer_action, - mode: _, + penalty, } => { - let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { - RemoveChain::WrongChainState(format!( - "Batch not found for current processing target {}", - batch_id - )) - })?; - let peer = batch.current_peer().cloned().ok_or_else(|| { - RemoveChain::WrongBatchState(format!( - "Processing target is in wrong state: {:?}", - batch.state(), - )) - })?; - debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "peer_penalty" => ?peer_action, - "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); + // Penalize the peer appropiately. + network.report_peer(peer, *penalty, "faulty_batch"); - if batch.processing_completed(BatchProcessingResult::Failed { - count_attempt: peer_action.is_some(), - })? { - // check that we have not exceeded the re-process retry counter - // If a batch has exceeded the invalid batch lookup attempts limit, it means - // that it is likely all peers in this chain are are sending invalid batches - // repeatedly and are either malicious or faulty. We drop the chain and - // report all peers. - // There are some edge cases with forks that could land us in this situation. - // This should be unlikely, so we tolerate these errors, but not often. - warn!( - self.log, - "Batch failed to download. Dropping chain scoring peers"; - "score_adjustment" => %peer_action - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "None".into()), - "batch_epoch"=> batch_id - ); - - if let Some(peer_action) = peer_action { - for (peer, _) in self.peers.drain() { - network.report_peer(peer, *peer_action, "batch_failed"); + // Check if this batch is allowed to continue + match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { + BatchOperationOutcome::Continue => { + // Chain can continue. Check if it can be moved forward. + if *imported_blocks { + // At least one block was successfully verified and imported, so we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance_chain(network, batch_id); } + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch_id) } - Err(RemoveChain::ChainFailed(batch_id)) - } else { - // chain can continue. Check if it can be moved forward - if *imported_blocks { - // At least one block was successfully verified and imported, so we can be sure all - // previous batches are valid and we only need to download the current failed - // batch. - self.advance_chain(network, batch_id); + BatchOperationOutcome::Failed { blacklist } => { + // Check that we have not exceeded the re-process retry counter, + // If a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers in this chain are are sending invalid batches + // repeatedly and are either malicious or faulty. We drop the chain and + // report all peers. + // There are some edge cases with forks that could land us in this situation. + // This should be unlikely, so we tolerate these errors, but not often. + warn!( + self.log, + "Batch failed to download. Dropping chain scoring peers"; + "score_adjustment" => %penalty, + "batch_epoch"=> batch_id, + ); + + for (peer, _) in self.peers.drain() { + network.report_peer(peer, *penalty, "faulty_chain"); + } + Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }) } - // Handle this invalid batch, that is within the re-process retries limit. - self.handle_invalid_batch(network, batch_id) } } + BatchProcessResult::NonFaultyFailure => { + batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; + // Simply redownload the batch. + self.retry_batch_download(network, batch_id) + } } } @@ -737,9 +747,12 @@ impl SyncingChain { let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { - if batch.validation_failed()? { + if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? { // remove the chain early - return Err(RemoveChain::ChainFailed(batch_id)); + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: *id, + }); } redownload_queue.push(*id); } @@ -836,8 +849,11 @@ impl SyncingChain { if let Some(active_requests) = self.peers.get_mut(peer_id) { active_requests.remove(&batch_id); } - if batch.download_failed(true)? { - return Err(RemoveChain::ChainFailed(batch_id)); + if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }); } self.retry_batch_download(network, batch_id) } else { @@ -925,10 +941,16 @@ impl SyncingChain { self.peers .get_mut(&peer) .map(|request| request.remove(&batch_id)); - if batch.download_failed(true)? { - return Err(RemoveChain::ChainFailed(batch_id)); - } else { - return self.retry_batch_download(network, batch_id); + match batch.download_failed(true)? { + BatchOperationOutcome::Failed { blacklist } => { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }) + } + BatchOperationOutcome::Continue => { + return self.retry_batch_download(network, batch_id) + } } } } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 31122d59a..f4db32bc9 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,7 +8,7 @@ mod chain_collection; mod range; mod sync_type; -pub use batch::{BatchConfig, BatchInfo, BatchProcessingResult, BatchState}; +pub use batch::{BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState}; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index f08f8eb82..4b29d3129 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -356,8 +356,8 @@ where debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op); } - if let RemoveChain::ChainFailed(_) = remove_reason { - if RangeSyncType::Finalized == sync_type { + if let RemoveChain::ChainFailed { blacklist, .. } = remove_reason { + if RangeSyncType::Finalized == sync_type && blacklist { warn!(self.log, "Chain failed! Syncing to its head won't be retried for at least the next {} seconds", FAILED_CHAINS_EXPIRY_SECONDS; &chain); self.failed_chains.insert(chain.target_head_root); }