diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 6a75c2990..27e0a6711 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -7,7 +7,7 @@ use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, }; -use lighthouse_network::PeerId; +use lighthouse_network::{PeerAction, PeerId}; use slog::{crit, debug, error, info, trace, warn}; use tokio::sync::mpsc; use types::{Epoch, Hash256, SignedBeaconBlock}; @@ -23,6 +23,14 @@ pub enum ProcessId { ParentLookup(PeerId, Hash256), } +/// Returned when a chain segment import fails. +struct ChainSegmentFailed { + /// To be displayed in logs. + message: String, + /// Used to penalize peers. + peer_action: Option, +} + impl Worker { /// Attempt to process a block received from a direct RPC request, returning the processing /// result on the `result_tx` channel. @@ -123,9 +131,13 @@ impl Worker { "chain" => chain_id, "last_block_slot" => end_slot, "imported_blocks" => imported_blocks, - "error" => e, + "error" => %e.message, "service" => "sync"); - BatchProcessResult::Failed(imported_blocks > 0) + + BatchProcessResult::Failed { + imported_blocks: imported_blocks > 0, + peer_action: e.peer_action, + } } }; @@ -154,9 +166,12 @@ impl Worker { "batch_epoch" => epoch, "first_block_slot" => start_slot, "last_block_slot" => end_slot, - "error" => e, + "error" => %e.message, "service" => "sync"); - BatchProcessResult::Failed(false) + BatchProcessResult::Failed { + imported_blocks: false, + peer_action: e.peer_action, + } } }; @@ -175,7 +190,7 @@ impl Worker { // reverse match self.process_blocks(downloaded_blocks.iter().rev()) { (_, Err(e)) => { - debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); + debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => %e.message); self.send_sync_message(SyncMessage::ParentLookupFailed { peer_id, chain_head, @@ -193,7 +208,7 @@ impl Worker { fn process_blocks<'a>( &self, downloaded_blocks: impl Iterator>, - ) -> (usize, Result<(), String>) { + ) -> (usize, Result<(), ChainSegmentFailed>) { let blocks = downloaded_blocks.cloned().collect::>(); match self.chain.process_chain_segment(blocks) { ChainSegmentResult::Successful { imported_blocks } => { @@ -223,7 +238,7 @@ impl Worker { fn process_backfill_blocks( &self, blocks: &[SignedBeaconBlock], - ) -> (usize, Result<(), String>) { + ) -> (usize, Result<(), ChainSegmentFailed>) { match self.chain.import_historical_block_batch(blocks) { Ok(imported_blocks) => { metrics::inc_counter( @@ -250,7 +265,12 @@ impl Worker { "block_root" => ?block_root, "expected_root" => ?expected_block_root ); - String::from("mismatched_block_root") + + ChainSegmentFailed { + message: String::from("mismatched_block_root"), + // The peer is faulty if they send blocks with bad roots. + peer_action: Some(PeerAction::LowToleranceError), + } } HistoricalBlockError::InvalidSignature | HistoricalBlockError::SignatureSet(_) => { @@ -259,7 +279,12 @@ impl Worker { "Backfill batch processing error"; "error" => ?e ); - "invalid_signature".into() + + ChainSegmentFailed { + message: "invalid_signature".into(), + // The peer is faulty if they bad signatures. + peer_action: Some(PeerAction::LowToleranceError), + } } HistoricalBlockError::ValidatorPubkeyCacheTimeout => { warn!( @@ -267,25 +292,55 @@ impl Worker { "Backfill batch processing error"; "error" => "pubkey_cache_timeout" ); - "pubkey_cache_timeout".into() + + ChainSegmentFailed { + message: "pubkey_cache_timeout".into(), + // This is an internal error, do not penalize the peer. + peer_action: None, + } } HistoricalBlockError::NoAnchorInfo => { warn!(self.log, "Backfill not required"); - String::from("no_anchor_info") + + ChainSegmentFailed { + message: String::from("no_anchor_info"), + // There is no need to do a historical sync, this is not a fault of + // the peer. + peer_action: None, + } } - HistoricalBlockError::IndexOutOfBounds - | HistoricalBlockError::BlockOutOfRange { .. } => { + HistoricalBlockError::IndexOutOfBounds => { error!( self.log, - "Backfill batch processing error"; + "Backfill batch OOB error"; "error" => ?e, ); - String::from("logic_error") + ChainSegmentFailed { + message: String::from("logic_error"), + // This should never occur, don't penalize the peer. + peer_action: None, + } + } + HistoricalBlockError::BlockOutOfRange { .. } => { + error!( + self.log, + "Backfill batch error"; + "error" => ?e, + ); + ChainSegmentFailed { + message: String::from("unexpected_error"), + // This should never occur, don't penalize the peer. + peer_action: None, + } } }, other => { warn!(self.log, "Backfill batch processing error"; "error" => ?other); - format!("{:?}", other) + ChainSegmentFailed { + message: format!("{:?}", other), + // This is an internal error, don't penalize the peer. + peer_action: None, + } } }; (0, Err(err)) @@ -312,15 +367,18 @@ impl Worker { } /// Helper function to handle a `BlockError` from `process_chain_segment` - fn handle_failed_chain_segment(&self, error: BlockError) -> Result<(), String> { + fn handle_failed_chain_segment( + &self, + error: BlockError, + ) -> Result<(), ChainSegmentFailed> { match error { BlockError::ParentUnknown(block) => { // blocks should be sequential and all parents should exist - - Err(format!( - "Block has an unknown parent: {}", - block.parent_root() - )) + Err(ChainSegmentFailed { + message: format!("Block has an unknown parent: {}", block.parent_root()), + // Peers are faulty if they send non-sequential blocks. + peer_action: Some(PeerAction::LowToleranceError), + }) } BlockError::BlockIsAlreadyKnown => { // This can happen for many reasons. Head sync's can download multiples and parent @@ -350,10 +408,14 @@ impl Worker { ); } - Err(format!( - "Block with slot {} is higher than the current slot {}", - block_slot, present_slot - )) + Err(ChainSegmentFailed { + message: format!( + "Block with slot {} is higher than the current slot {}", + block_slot, present_slot + ), + // Peers are faulty if they send blocks from the future. + peer_action: Some(PeerAction::LowToleranceError), + }) } BlockError::WouldRevertFinalizedSlot { .. } => { debug!(self.log, "Finalized or earlier block processed";); @@ -370,7 +432,11 @@ impl Worker { "outcome" => ?e, ); - Err(format!("Internal error whilst processing block: {:?}", e)) + Err(ChainSegmentFailed { + message: format!("Internal error whilst processing block: {:?}", e), + // Do not penalize peers for internal errors. + peer_action: None, + }) } other => { debug!( @@ -379,7 +445,11 @@ impl Worker { "outcome" => %other, ); - Err(format!("Peer sent invalid block. Reason: {:?}", other)) + Err(ChainSegmentFailed { + message: format!("Peer sent invalid block. Reason: {:?}", other), + // Do not penalize peers for internal errors. + peer_action: None, + }) } } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index b9016b9fd..fc94eaca0 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -541,7 +541,15 @@ impl BackFillSync { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) + self.on_batch_process_result( + network, + batch_id, + &BatchProcessResult::Failed { + imported_blocks: false, + // The beacon processor queue is full, no need to penalize the peer. + peer_action: None, + }, + ) } else { Ok(ProcessResult::Successful) } @@ -621,7 +629,10 @@ impl BackFillSync { self.process_completed_batches(network) } } - BatchProcessResult::Failed(imported_blocks) => { + BatchProcessResult::Failed { + imported_blocks, + peer_action, + } => { let batch = match self.batches.get_mut(&batch_id) { Some(v) => v, None => { @@ -659,12 +670,20 @@ impl BackFillSync { // that it is likely all peers are sending invalid batches // repeatedly and are either malicious or faulty. We stop the backfill sync and // report all synced peers that have participated. - let action = PeerAction::LowToleranceError; - warn!(self.log, "Backfill batch failed to download. Penalizing peers"; - "score_adjustment" => %action, - "batch_epoch"=> batch_id); - for peer in self.participating_peers.drain() { - network.report_peer(peer, action); + warn!( + self.log, + "Backfill batch failed to download. Penalizing peers"; + "score_adjustment" => %peer_action + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "None".into()), + "batch_epoch"=> batch_id + ); + + if let Some(peer_action) = peer_action { + for peer in self.participating_peers.drain() { + network.report_peer(peer, *peer_action); + } } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) .map(|_| ProcessResult::Successful) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f0726ca94..f9055665c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -137,7 +137,10 @@ pub enum BatchProcessResult { /// The batch was completed successfully. It carries whether the sent batch contained blocks. Success(bool), /// The batch processing failed. It carries whether the processing imported any block. - Failed(bool), + Failed { + imported_blocks: bool, + peer_action: Option, + }, } /// Maintains a sequential list of parents to lookup and the lookup's current state. diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a1acac614..4b8980899 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -313,7 +313,14 @@ impl SyncingChain { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false)) + self.on_batch_process_result( + network, + batch_id, + &BatchProcessResult::Failed { + imported_blocks: false, + peer_action: None, + }, + ) } else { Ok(KeepChain) } @@ -488,7 +495,10 @@ impl SyncingChain { self.process_completed_batches(network) } } - BatchProcessResult::Failed(imported_blocks) => { + BatchProcessResult::Failed { + imported_blocks, + peer_action, + } => { let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { RemoveChain::WrongChainState(format!( "Batch not found for current processing target {}", @@ -511,12 +521,20 @@ impl SyncingChain { // report all peers. // There are some edge cases with forks that could land us in this situation. // This should be unlikely, so we tolerate these errors, but not often. - let action = PeerAction::LowToleranceError; - warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; - "score_adjustment" => %action, - "batch_epoch"=> batch_id); - for (peer, _) in self.peers.drain() { - network.report_peer(peer, action); + warn!( + self.log, + "Batch failed to download. Dropping chain scoring peers"; + "score_adjustment" => %peer_action + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "None".into()), + "batch_epoch"=> batch_id + ); + + if let Some(peer_action) = peer_action { + for (peer, _) in self.peers.drain() { + network.report_peer(peer, *peer_action); + } } Err(RemoveChain::ChainFailed(batch_id)) } else {