From panic to crit (#1726)

## Issue Addressed
Downgrade inconsistent chain segment states from `panic` to `crit`. I don't love this solution but since range can always bounce back from any of those, we don't panic.

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
divma 2020-10-05 04:02:09 +00:00
parent 39bd511838
commit b1c121b880
2 changed files with 85 additions and 50 deletions

View File

@ -1,6 +1,7 @@
use crate::sync::RequestId;
use eth2_libp2p::rpc::methods::BlocksByRangeRequest;
use eth2_libp2p::PeerId;
use slog::{crit, warn, Logger};
use ssz::Encode;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
@ -125,13 +126,17 @@ impl<T: EthSpec> BatchInfo<T> {
}
/// Adds a block to a downloading batch.
pub fn add_block(&mut self, block: SignedBeaconBlock<T>) {
pub fn add_block(&mut self, block: SignedBeaconBlock<T>, logger: &Logger) {
match self.state.poison() {
BatchState::Downloading(peer, mut blocks, req_id) => {
blocks.push(block);
self.state = BatchState::Downloading(peer, blocks, req_id)
}
other => unreachable!("Add block for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Add block for batch in wrong state"; "state" => ?other);
self.state = other
}
}
}
@ -140,14 +145,8 @@ impl<T: EthSpec> BatchInfo<T> {
#[must_use = "Batch may have failed"]
pub fn download_completed(
&mut self,
) -> Result<
usize, /* Received blocks */
(
Slot, /* expected slot */
Slot, /* received slot */
&BatchState<T>,
),
> {
logger: &Logger,
) -> Result<usize /* Received blocks */, &BatchState<T>> {
match self.state.poison() {
BatchState::Downloading(peer, blocks, _request_id) => {
// verify that blocks are in range
@ -163,7 +162,7 @@ impl<T: EthSpec> BatchInfo<T> {
None
};
if let Some(range) = failed_range {
if let Some((expected, received)) = failed_range {
// this is a failed download, register the attempt and check if the batch
// can be tried again
self.failed_download_attempts.push(peer);
@ -175,7 +174,9 @@ impl<T: EthSpec> BatchInfo<T> {
// drop the blocks
BatchState::AwaitingDownload
};
return Err((range.0, range.1, &self.state));
warn!(logger, "Batch received out of range blocks";
&self, "expected" => expected, "received" => received);
return Err(&self.state);
}
}
@ -183,12 +184,17 @@ impl<T: EthSpec> BatchInfo<T> {
self.state = BatchState::AwaitingProcessing(peer, blocks);
Ok(received)
}
other => unreachable!("Download completed for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Download completed for batch in wrong state"; "state" => ?other);
self.state = other;
Err(&self.state)
}
}
}
#[must_use = "Batch may have failed"]
pub fn download_failed(&mut self) -> &BatchState<T> {
pub fn download_failed(&mut self, logger: &Logger) -> &BatchState<T> {
match self.state.poison() {
BatchState::Downloading(peer, _, _request_id) => {
// register the attempt and check if the batch can be tried again
@ -203,31 +209,50 @@ impl<T: EthSpec> BatchInfo<T> {
};
&self.state
}
other => unreachable!("Download failed for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Download failed for batch in wrong state"; "state" => ?other);
self.state = other;
&self.state
}
}
}
pub fn start_downloading_from_peer(&mut self, peer: PeerId, request_id: RequestId) {
pub fn start_downloading_from_peer(
&mut self,
peer: PeerId,
request_id: RequestId,
logger: &Logger,
) {
match self.state.poison() {
BatchState::AwaitingDownload => {
self.state = BatchState::Downloading(peer, Vec::new(), request_id);
}
other => unreachable!("Starting download for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Starting download for batch in wrong state"; "state" => ?other);
self.state = other
}
}
}
pub fn start_processing(&mut self) -> Vec<SignedBeaconBlock<T>> {
pub fn start_processing(&mut self, logger: &Logger) -> Vec<SignedBeaconBlock<T>> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => {
self.state = BatchState::Processing(Attempt::new(peer, &blocks));
blocks
}
other => unreachable!("Start processing for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Starting procesing batch in wrong state"; "state" => ?other);
self.state = other;
vec![]
}
}
}
#[must_use = "Batch may have failed"]
pub fn processing_completed(&mut self, was_sucessful: bool) -> &BatchState<T> {
pub fn processing_completed(&mut self, was_sucessful: bool, logger: &Logger) -> &BatchState<T> {
match self.state.poison() {
BatchState::Processing(attempt) => {
self.state = if !was_sucessful {
@ -247,12 +272,17 @@ impl<T: EthSpec> BatchInfo<T> {
};
&self.state
}
other => unreachable!("Processing completed for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Procesing completed for batch in wrong state"; "state" => ?other);
self.state = other;
&self.state
}
}
}
#[must_use = "Batch may have failed"]
pub fn validation_failed(&mut self) -> &BatchState<T> {
pub fn validation_failed(&mut self, logger: &Logger) -> &BatchState<T> {
match self.state.poison() {
BatchState::AwaitingValidation(attempt) => {
self.failed_processing_attempts.push(attempt);
@ -267,7 +297,12 @@ impl<T: EthSpec> BatchInfo<T> {
};
&self.state
}
other => unreachable!("Validation failed for batch in wrong state: {:?}", other),
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
crit!(logger, "Validation failed for batch in wrong state"; "state" => ?other);
self.state = other;
&self.state
}
}
}
}

View File

@ -30,7 +30,6 @@ const BATCH_BUFFER_SIZE: u8 = 5;
#[derive(PartialEq)]
#[must_use = "Should be checked, since a failed chain must be removed. A chain that requested
being removed and continued is now in an inconsistent state"]
pub enum ProcessingResult {
KeepChain,
RemoveChain,
@ -168,7 +167,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.batches
.get_mut(&id)
.expect("registered batch exists")
.download_failed()
.download_failed(&self.log)
{
return ProcessingResult::RemoveChain;
}
@ -229,18 +228,17 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(block) = beacon_block {
// This is not a stream termination, simply add the block to the request
batch.add_block(block);
batch.add_block(block, &self.log);
ProcessingResult::KeepChain
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.peers
.get_mut(peer_id)
.unwrap_or_else(|| panic!("Batch is registered for the peer"))
.remove(&batch_id);
match batch.download_completed() {
match batch.download_completed(&self.log) {
Ok(received) => {
let awaiting_batches = batch_id.saturating_sub(
self.optimistic_start
@ -254,9 +252,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
self.process_completed_batches(network)
}
Err((expected, received, state)) => {
warn!(self.log, "Batch received out of range blocks";
"epoch" => batch_id, "expected" => expected, "received" => received);
Err(state) => {
if let BatchState::Failed = state {
return ProcessingResult::RemoveChain;
}
@ -285,7 +281,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// result callback. This is done, because an empty batch could end a chain and the logic
// for removing chains and checking completion is in the callback.
let blocks = batch.start_processing();
let blocks = batch.start_processing(&self.log);
let process_id = ProcessId::RangeBatchId(self.id, batch_id);
self.current_processing_batch = Some(batch_id);
@ -299,7 +295,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// blocks to continue, and the chain is expecting a processing result that won't
// arrive. To mitigate this, (fake) fail this processing so that the batch is
// re-downloaded.
// TODO: needs better handling
self.on_batch_process_result(network, batch_id, &BatchProcessResult::Failed(false))
} else {
ProcessingResult::KeepChain
@ -337,25 +332,29 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
BatchState::Processing(_)
| BatchState::AwaitingDownload
| BatchState::Failed
| BatchState::Poisoned
| BatchState::AwaitingValidation(_) => {
| BatchState::Poisoned => {
// these are all inconsistent states:
// - Processing -> `self.current_processing_batch` is Some
// - Failed -> non recoverable batch. For a optimistic batch, it should
// - Processing -> `self.current_processing_batch` is None
// - Failed -> non recoverable batch. For an optimistic batch, it should
// have been removed
// - Poisoned -> this is an intermediate state that should never be reached
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
// - AwaitingValidation -> If an optimistic batch is successfully processed
// it is no longer considered an optimistic candidate. If the batch was
// empty the chain rejects it; if it was non empty the chain is advanced
// to this point (so that the old optimistic batch is now the processing
// target)
unreachable!(
"Optimistic batch indicates inconsistent chain state: {:?}",
state
)
}
BatchState::AwaitingValidation(_) => {
// This is possible due to race conditions, and tho it would be considered
// an inconsistent state, the chain can continue. If an optimistic batch
// is successfully processed it is no longer considered an optimistic
// candidate. If the batch was empty the chain rejects it; if it was non
// empty the chain is advanced to this point (so that the old optimistic
// batch is now the processing target)
crit!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch);
None
}
}
} else {
None
@ -385,7 +384,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// re-requested.
// - AwaitingValidation -> self.processing_target should have been moved
// forward
// - Processing -> `self.current_processing_batch` is Some
// - Processing -> `self.current_processing_batch` is None
// - Poisoned -> Intermediate state that should never be reached
unreachable!(
"Robust target batch indicates inconsistent chain state: {:?}",
@ -441,7 +440,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.batches
.get_mut(&batch_id)
.expect("Chain was expecting a known batch");
let _ = batch.processing_completed(true);
let _ = batch.processing_completed(true, &self.log);
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
if *was_non_empty {
@ -489,7 +488,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.expect("batch is processing blocks from a peer");
debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks,
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
if let BatchState::Failed = batch.processing_completed(false) {
if let BatchState::Failed = batch.processing_completed(false, &self.log) {
// check that we have not exceeded the re-process retry counter
// If a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
@ -566,6 +565,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// safety check for batch boundaries
if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH {
crit!(self.log, "Validating Epoch is not aligned");
return;
}
// batches in the range [BatchId, ..) (not yet validated)
@ -690,7 +690,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let mut redownload_queue = Vec::new();
for (id, batch) in self.batches.range_mut(..batch_id) {
if let BatchState::Failed = batch.validation_failed() {
if let BatchState::Failed = batch.validation_failed(&self.log) {
// remove the chain early
return ProcessingResult::RemoveChain;
}
@ -804,7 +804,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.get_mut(peer_id)
.expect("Peer belongs to the chain")
.remove(&batch_id);
if let BatchState::Failed = batch.download_failed() {
if let BatchState::Failed = batch.download_failed(&self.log) {
return ProcessingResult::RemoveChain;
}
self.retry_batch_download(network, batch_id)
@ -859,7 +859,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) {
Ok(request_id) => {
// inform the batch about the new request
batch.start_downloading_from_peer(peer.clone(), request_id);
batch.start_downloading_from_peer(peer.clone(), request_id, &self.log);
if self
.optimistic_start
.map(|epoch| epoch == batch_id)
@ -881,12 +881,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
warn!(self.log, "Could not send batch request";
"batch_id" => batch_id, "error" => e, &batch);
// register the failed download and check if the batch can be retried
batch.start_downloading_from_peer(peer.clone(), 1); // fake request_id is not relevant
batch.start_downloading_from_peer(peer.clone(), 1, &self.log); // fake request_id is not relevant
self.peers
.get_mut(&peer)
.expect("peer belongs to the peer pool")
.remove(&batch_id);
if let BatchState::Failed = batch.download_failed() {
if let BatchState::Failed = batch.download_failed(&self.log) {
return ProcessingResult::RemoveChain;
} else {
return self.retry_batch_download(network, batch_id);