Avoid peer penalties on internal errors for batch block import (#2898)

## Issue Addressed

NA

## Proposed Changes

I've observed some Prater nodes (and potentially some mainnet nodes) banning peers due to validator pubkey cache lock timeouts. For the `BeaconChainError`-type of errors, they're caused by internal faults and we can't necessarily tell if the peer is bad or not. I think this is causing us to ban peers unnecessarily when running on under-resourced machines.

## Additional Info

NA
This commit is contained in:
Paul Hauner 2022-01-11 05:33:28 +00:00
parent 6976796162
commit 4848e53155
4 changed files with 156 additions and 46 deletions

View File

@ -7,7 +7,7 @@ use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
};
use lighthouse_network::PeerId;
use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, warn};
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
@ -23,6 +23,14 @@ pub enum ProcessId {
ParentLookup(PeerId, Hash256),
}
/// Returned when a chain segment import fails.
struct ChainSegmentFailed {
/// To be displayed in logs.
message: String,
/// Used to penalize peers.
peer_action: Option<PeerAction>,
}
impl<T: BeaconChainTypes> Worker<T> {
/// Attempt to process a block received from a direct RPC request, returning the processing
/// result on the `result_tx` channel.
@ -123,9 +131,13 @@ impl<T: BeaconChainTypes> Worker<T> {
"chain" => chain_id,
"last_block_slot" => end_slot,
"imported_blocks" => imported_blocks,
"error" => e,
"error" => %e.message,
"service" => "sync");
BatchProcessResult::Failed(imported_blocks > 0)
BatchProcessResult::Failed {
imported_blocks: imported_blocks > 0,
peer_action: e.peer_action,
}
}
};
@ -154,9 +166,12 @@ impl<T: BeaconChainTypes> Worker<T> {
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
"last_block_slot" => end_slot,
"error" => e,
"error" => %e.message,
"service" => "sync");
BatchProcessResult::Failed(false)
BatchProcessResult::Failed {
imported_blocks: false,
peer_action: e.peer_action,
}
}
};
@ -175,7 +190,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// reverse
match self.process_blocks(downloaded_blocks.iter().rev()) {
(_, Err(e)) => {
debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e);
debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => %e.message);
self.send_sync_message(SyncMessage::ParentLookupFailed {
peer_id,
chain_head,
@ -193,7 +208,7 @@ impl<T: BeaconChainTypes> Worker<T> {
fn process_blocks<'a>(
&self,
downloaded_blocks: impl Iterator<Item = &'a SignedBeaconBlock<T::EthSpec>>,
) -> (usize, Result<(), String>) {
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks = downloaded_blocks.cloned().collect::<Vec<_>>();
match self.chain.process_chain_segment(blocks) {
ChainSegmentResult::Successful { imported_blocks } => {
@ -223,7 +238,7 @@ impl<T: BeaconChainTypes> Worker<T> {
fn process_backfill_blocks(
&self,
blocks: &[SignedBeaconBlock<T::EthSpec>],
) -> (usize, Result<(), String>) {
) -> (usize, Result<(), ChainSegmentFailed>) {
match self.chain.import_historical_block_batch(blocks) {
Ok(imported_blocks) => {
metrics::inc_counter(
@ -250,7 +265,12 @@ impl<T: BeaconChainTypes> Worker<T> {
"block_root" => ?block_root,
"expected_root" => ?expected_block_root
);
String::from("mismatched_block_root")
ChainSegmentFailed {
message: String::from("mismatched_block_root"),
// The peer is faulty if they send blocks with bad roots.
peer_action: Some(PeerAction::LowToleranceError),
}
}
HistoricalBlockError::InvalidSignature
| HistoricalBlockError::SignatureSet(_) => {
@ -259,7 +279,12 @@ impl<T: BeaconChainTypes> Worker<T> {
"Backfill batch processing error";
"error" => ?e
);
"invalid_signature".into()
ChainSegmentFailed {
message: "invalid_signature".into(),
// The peer is faulty if they bad signatures.
peer_action: Some(PeerAction::LowToleranceError),
}
}
HistoricalBlockError::ValidatorPubkeyCacheTimeout => {
warn!(
@ -267,25 +292,55 @@ impl<T: BeaconChainTypes> Worker<T> {
"Backfill batch processing error";
"error" => "pubkey_cache_timeout"
);
"pubkey_cache_timeout".into()
ChainSegmentFailed {
message: "pubkey_cache_timeout".into(),
// This is an internal error, do not penalize the peer.
peer_action: None,
}
}
HistoricalBlockError::NoAnchorInfo => {
warn!(self.log, "Backfill not required");
String::from("no_anchor_info")
ChainSegmentFailed {
message: String::from("no_anchor_info"),
// There is no need to do a historical sync, this is not a fault of
// the peer.
peer_action: None,
}
}
HistoricalBlockError::IndexOutOfBounds
| HistoricalBlockError::BlockOutOfRange { .. } => {
HistoricalBlockError::IndexOutOfBounds => {
error!(
self.log,
"Backfill batch processing error";
"Backfill batch OOB error";
"error" => ?e,
);
String::from("logic_error")
ChainSegmentFailed {
message: String::from("logic_error"),
// This should never occur, don't penalize the peer.
peer_action: None,
}
}
HistoricalBlockError::BlockOutOfRange { .. } => {
error!(
self.log,
"Backfill batch error";
"error" => ?e,
);
ChainSegmentFailed {
message: String::from("unexpected_error"),
// This should never occur, don't penalize the peer.
peer_action: None,
}
}
},
other => {
warn!(self.log, "Backfill batch processing error"; "error" => ?other);
format!("{:?}", other)
ChainSegmentFailed {
message: format!("{:?}", other),
// This is an internal error, don't penalize the peer.
peer_action: None,
}
}
};
(0, Err(err))
@ -312,15 +367,18 @@ impl<T: BeaconChainTypes> Worker<T> {
}
/// Helper function to handle a `BlockError` from `process_chain_segment`
fn handle_failed_chain_segment(&self, error: BlockError<T::EthSpec>) -> Result<(), String> {
fn handle_failed_chain_segment(
&self,
error: BlockError<T::EthSpec>,
) -> Result<(), ChainSegmentFailed> {
match error {
BlockError::ParentUnknown(block) => {
// blocks should be sequential and all parents should exist
Err(format!(
"Block has an unknown parent: {}",
block.parent_root()
))
Err(ChainSegmentFailed {
message: format!("Block has an unknown parent: {}", block.parent_root()),
// Peers are faulty if they send non-sequential blocks.
peer_action: Some(PeerAction::LowToleranceError),
})
}
BlockError::BlockIsAlreadyKnown => {
// This can happen for many reasons. Head sync's can download multiples and parent
@ -350,10 +408,14 @@ impl<T: BeaconChainTypes> Worker<T> {
);
}
Err(format!(
"Block with slot {} is higher than the current slot {}",
block_slot, present_slot
))
Err(ChainSegmentFailed {
message: format!(
"Block with slot {} is higher than the current slot {}",
block_slot, present_slot
),
// Peers are faulty if they send blocks from the future.
peer_action: Some(PeerAction::LowToleranceError),
})
}
BlockError::WouldRevertFinalizedSlot { .. } => {
debug!(self.log, "Finalized or earlier block processed";);
@ -370,7 +432,11 @@ impl<T: BeaconChainTypes> Worker<T> {
"outcome" => ?e,
);
Err(format!("Internal error whilst processing block: {:?}", e))
Err(ChainSegmentFailed {
message: format!("Internal error whilst processing block: {:?}", e),
// Do not penalize peers for internal errors.
peer_action: None,
})
}
other => {
debug!(
@ -379,7 +445,11 @@ impl<T: BeaconChainTypes> Worker<T> {
"outcome" => %other,
);
Err(format!("Peer sent invalid block. Reason: {:?}", other))
Err(ChainSegmentFailed {
message: format!("Peer sent invalid block. Reason: {:?}", other),
// Do not penalize peers for internal errors.
peer_action: None,
})
}
}
}

View File

@ -541,7 +541,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// blocks to continue, and the chain is expecting a processing result that won't
// arrive. To mitigate this, (fake) fail this processing so that the batch is
// re-downloaded.
self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false))
self.on_batch_process_result(
network,
batch_id,
&BatchProcessResult::Failed {
imported_blocks: false,
// The beacon processor queue is full, no need to penalize the peer.
peer_action: None,
},
)
} else {
Ok(ProcessResult::Successful)
}
@ -621,7 +629,10 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.process_completed_batches(network)
}
}
BatchProcessResult::Failed(imported_blocks) => {
BatchProcessResult::Failed {
imported_blocks,
peer_action,
} => {
let batch = match self.batches.get_mut(&batch_id) {
Some(v) => v,
None => {
@ -659,12 +670,20 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// that it is likely all peers are sending invalid batches
// repeatedly and are either malicious or faulty. We stop the backfill sync and
// report all synced peers that have participated.
let action = PeerAction::LowToleranceError;
warn!(self.log, "Backfill batch failed to download. Penalizing peers";
"score_adjustment" => %action,
"batch_epoch"=> batch_id);
for peer in self.participating_peers.drain() {
network.report_peer(peer, action);
warn!(
self.log,
"Backfill batch failed to download. Penalizing peers";
"score_adjustment" => %peer_action
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "None".into()),
"batch_epoch"=> batch_id
);
if let Some(peer_action) = peer_action {
for peer in self.participating_peers.drain() {
network.report_peer(peer, *peer_action);
}
}
self.fail_sync(BackFillError::BatchProcessingFailed(batch_id))
.map(|_| ProcessResult::Successful)

View File

@ -137,7 +137,10 @@ pub enum BatchProcessResult {
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
Success(bool),
/// The batch processing failed. It carries whether the processing imported any block.
Failed(bool),
Failed {
imported_blocks: bool,
peer_action: Option<PeerAction>,
},
}
/// Maintains a sequential list of parents to lookup and the lookup's current state.

View File

@ -313,7 +313,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// blocks to continue, and the chain is expecting a processing result that won't
// arrive. To mitigate this, (fake) fail this processing so that the batch is
// re-downloaded.
self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false))
self.on_batch_process_result(
network,
batch_id,
&BatchProcessResult::Failed {
imported_blocks: false,
peer_action: None,
},
)
} else {
Ok(KeepChain)
}
@ -488,7 +495,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.process_completed_batches(network)
}
}
BatchProcessResult::Failed(imported_blocks) => {
BatchProcessResult::Failed {
imported_blocks,
peer_action,
} => {
let batch = self.batches.get_mut(&batch_id).ok_or_else(|| {
RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
@ -511,12 +521,20 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// report all peers.
// There are some edge cases with forks that could land us in this situation.
// This should be unlikely, so we tolerate these errors, but not often.
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => %action,
"batch_epoch"=> batch_id);
for (peer, _) in self.peers.drain() {
network.report_peer(peer, action);
warn!(
self.log,
"Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => %peer_action
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "None".into()),
"batch_epoch"=> batch_id
);
if let Some(peer_action) = peer_action {
for (peer, _) in self.peers.drain() {
network.report_peer(peer, *peer_action);
}
}
Err(RemoveChain::ChainFailed(batch_id))
} else {