do not count sync batch attempts when peer is not at fault (#3245)

## Issue Addressed
currently we count a failed attempt for a syncing chain even if the peer is not at fault. This makes us do more work if the chain fails, and heavily penalize peers, when we can simply retry. Inspired by a proposal I made to #3094 

## Proposed Changes
If a batch fails but the peer is not at fault, do not count the attempt
Also removes some annoying logs

## Additional Info
We still get a counter on ignored attempts.. just in case
This commit is contained in:
Divma 2022-06-07 02:35:56 +00:00
parent 58e223e429
commit cfd26d25e0
5 changed files with 49 additions and 26 deletions

View File

@ -11,7 +11,7 @@
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::sync::manager::{BatchProcessResult, Id}; use crate::sync::manager::{BatchProcessResult, Id};
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchState}; use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchProcessingResult, BatchState};
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
@ -606,7 +606,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
} }
}; };
if let Err(e) = batch.processing_completed(true) { if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
} }
// If the processed batch was not empty, we can validate previous unvalidated // If the processed batch was not empty, we can validate previous unvalidated
@ -664,7 +664,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}; };
debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks,
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
match batch.processing_completed(false) { match batch.processing_completed(BatchProcessingResult::Failed {
count_attempt: peer_action.is_some(),
}) {
Err(e) => { Err(e) => {
// Batch was in the wrong state // Batch was in the wrong state
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))

View File

@ -72,6 +72,11 @@ pub struct WrongState(pub(crate) String);
/// Auxiliary type alias for readability. /// Auxiliary type alias for readability.
type IsFailed = bool; type IsFailed = bool;
pub enum BatchProcessingResult {
Success,
Failed { count_attempt: bool },
}
/// A segment of a chain. /// A segment of a chain.
pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> { pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
/// Start slot of the batch. /// Start slot of the batch.
@ -80,6 +85,8 @@ pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
end_slot: Slot, end_slot: Slot,
/// The `Attempts` that have been made and failed to send us this batch. /// The `Attempts` that have been made and failed to send us this batch.
failed_processing_attempts: Vec<Attempt>, failed_processing_attempts: Vec<Attempt>,
/// Number of processing attempts that have failed but we do not count.
other_failed_processing_attempts: u8,
/// The number of download retries this batch has undergone due to a failed request. /// The number of download retries this batch has undergone due to a failed request.
failed_download_attempts: Vec<PeerId>, failed_download_attempts: Vec<PeerId>,
/// State of the batch. /// State of the batch.
@ -143,6 +150,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
end_slot, end_slot,
failed_processing_attempts: Vec::new(), failed_processing_attempts: Vec::new(),
failed_download_attempts: Vec::new(), failed_download_attempts: Vec::new(),
other_failed_processing_attempts: 0,
state: BatchState::AwaitingDownload, state: BatchState::AwaitingDownload,
marker: std::marker::PhantomData, marker: std::marker::PhantomData,
} }
@ -348,23 +356,33 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
} }
#[must_use = "Batch may have failed"] #[must_use = "Batch may have failed"]
pub fn processing_completed(&mut self, was_sucessful: bool) -> Result<IsFailed, WrongState> { pub fn processing_completed(
&mut self,
procesing_result: BatchProcessingResult,
) -> Result<IsFailed, WrongState> {
match self.state.poison() { match self.state.poison() {
BatchState::Processing(attempt) => { BatchState::Processing(attempt) => {
self.state = if !was_sucessful { self.state = match procesing_result {
// register the failed attempt BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt),
self.failed_processing_attempts.push(attempt); BatchProcessingResult::Failed { count_attempt } => {
if count_attempt {
// register the failed attempt
self.failed_processing_attempts.push(attempt);
// check if the batch can be downloaded again // check if the batch can be downloaded again
if self.failed_processing_attempts.len() if self.failed_processing_attempts.len()
>= B::max_batch_processing_attempts() as usize >= B::max_batch_processing_attempts() as usize
{ {
BatchState::Failed BatchState::Failed
} else { } else {
BatchState::AwaitingDownload BatchState::AwaitingDownload
}
} else {
self.other_failed_processing_attempts =
self.other_failed_processing_attempts.saturating_add(1);
BatchState::AwaitingDownload
}
} }
} else {
BatchState::AwaitingValidation(attempt)
}; };
Ok(self.state.is_failed()) Ok(self.state.is_failed())
} }
@ -451,6 +469,10 @@ impl<T: EthSpec, B: BatchConfig> slog::KV for BatchInfo<T, B> {
)?; )?;
serializer.emit_usize("downloaded", self.failed_download_attempts.len())?; serializer.emit_usize("downloaded", self.failed_download_attempts.len())?;
serializer.emit_usize("processed", self.failed_processing_attempts.len())?; serializer.emit_usize("processed", self.failed_processing_attempts.len())?;
serializer.emit_u8(
"processed_no_penalty",
self.other_failed_processing_attempts,
)?;
serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?;
slog::Result::Ok(()) slog::Result::Ok(())
} }

View File

@ -1,4 +1,4 @@
use super::batch::{BatchInfo, BatchState}; use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::beacon_processor::ChainSegmentProcessId; use crate::beacon_processor::ChainSegmentProcessId;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult};
@ -463,7 +463,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
)) ))
})?; })?;
batch.processing_completed(true)?; batch.processing_completed(BatchProcessingResult::Success)?;
// If the processed batch was not empty, we can validate previous unvalidated // If the processed batch was not empty, we can validate previous unvalidated
// blocks. // blocks.
if *was_non_empty { if *was_non_empty {
@ -512,9 +512,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.state(), batch.state(),
)) ))
})?; })?;
debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "peer_penalty" => ?peer_action,
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
if batch.processing_completed(false)? {
if batch.processing_completed(BatchProcessingResult::Failed {
count_attempt: peer_action.is_some(),
})? {
// check that we have not exceeded the re-process retry counter // check that we have not exceeded the re-process retry counter
// If a batch has exceeded the invalid batch lookup attempts limit, it means // If a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches // that it is likely all peers in this chain are are sending invalid batches

View File

@ -407,7 +407,6 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
local_info: &SyncInfo, local_info: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>, awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
) { ) {
debug!(self.log, "Purging chains");
let local_finalized_slot = local_info let local_finalized_slot = local_info
.finalized_epoch .finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch()); .start_slot(T::EthSpec::slots_per_epoch());
@ -416,10 +415,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
let log_ref = &self.log; let log_ref = &self.log;
let is_outdated = |target_slot: &Slot, target_root: &Hash256| { let is_outdated = |target_slot: &Slot, target_root: &Hash256| {
let is = target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root)
target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root);
debug!(log_ref, "Chain is outdated {}", is);
is
}; };
// Retain only head peers that remain relevant // Retain only head peers that remain relevant

View File

@ -8,7 +8,7 @@ mod chain_collection;
mod range; mod range;
mod sync_type; mod sync_type;
pub use batch::{BatchConfig, BatchInfo, BatchState}; pub use batch::{BatchConfig, BatchInfo, BatchProcessingResult, BatchState};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync; pub use range::RangeSync;
pub use sync_type::RangeSyncType; pub use sync_type::RangeSyncType;