Advanced error handling for syncing (#819)
* Initial block processing thread design * Correct compilation issues * Increase logging and request from all given peers * Patch peer request bug * Adds fork choice to block processing * Adds logging for bug isolation * Patch syncing for chains with skip-slots * Bump block processing error logs * Improve logging for attestation processing * Randomize peer selection during sync * Resuming chains restarts from local finalized slot * Downgrades Arc batches to Rc batches * Add clippy fixes * Add advanced error handling for invalid/malicious batches * Downgrade Rc<Batch> to Option<Batch> to pass processed batches to chains * Squash edge case rpc and syncing bugs * Process empty batches which could end chains * Removes last_processed_id concept to account for ending skip-slot batches * Add logging for chain purges * Adds retries to re-request batch logging * Remove bug finding log * Add reviewers suggestions * Revert to master modifications * Line wrapping * Revert to master
This commit is contained in:
parent
23a35c3767
commit
81b028b805
@ -163,12 +163,17 @@ where
|
|||||||
|
|
||||||
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
|
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
|
||||||
}
|
}
|
||||||
InboundSubstreamState::ResponseIdle(substream) => {
|
InboundSubstreamState::ResponseIdle(mut substream) => {
|
||||||
|
// check if the stream is already closed
|
||||||
|
if let Ok(Async::Ready(None)) = substream.poll() {
|
||||||
|
*self = InboundSubstreamState::Closing(substream);
|
||||||
|
} else {
|
||||||
*self = InboundSubstreamState::ResponsePendingSend {
|
*self = InboundSubstreamState::ResponsePendingSend {
|
||||||
substream: substream.send(error),
|
substream: substream.send(error),
|
||||||
closing: true,
|
closing: true,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
}
|
||||||
InboundSubstreamState::Closing(substream) => {
|
InboundSubstreamState::Closing(substream) => {
|
||||||
// let the stream close
|
// let the stream close
|
||||||
*self = InboundSubstreamState::Closing(substream);
|
*self = InboundSubstreamState::Closing(substream);
|
||||||
@ -314,7 +319,6 @@ where
|
|||||||
substream: out,
|
substream: out,
|
||||||
request,
|
request,
|
||||||
};
|
};
|
||||||
debug!(self.log, "Added outbound substream id"; "substream_id" => id);
|
|
||||||
self.outbound_substreams
|
self.outbound_substreams
|
||||||
.insert(id, (awaiting_stream, delay_key));
|
.insert(id, (awaiting_stream, delay_key));
|
||||||
}
|
}
|
||||||
|
@ -3,9 +3,11 @@ use eth2_libp2p::rpc::methods::*;
|
|||||||
use eth2_libp2p::rpc::RequestId;
|
use eth2_libp2p::rpc::RequestId;
|
||||||
use eth2_libp2p::PeerId;
|
use eth2_libp2p::PeerId;
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
|
use ssz::Encode;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
use std::ops::Sub;
|
use std::ops::Sub;
|
||||||
use types::{BeaconBlock, EthSpec, Hash256, Slot};
|
use types::{BeaconBlock, EthSpec, Hash256, Slot};
|
||||||
|
|
||||||
@ -42,11 +44,16 @@ pub struct Batch<T: EthSpec> {
|
|||||||
/// The hash of the chain root to requested from the peer.
|
/// The hash of the chain root to requested from the peer.
|
||||||
pub head_root: Hash256,
|
pub head_root: Hash256,
|
||||||
/// The peer that was originally assigned to the batch.
|
/// The peer that was originally assigned to the batch.
|
||||||
pub _original_peer: PeerId,
|
pub original_peer: PeerId,
|
||||||
/// The peer that is currently assigned to the batch.
|
/// The peer that is currently assigned to the batch.
|
||||||
pub current_peer: PeerId,
|
pub current_peer: PeerId,
|
||||||
/// The number of retries this batch has undergone.
|
/// The number of retries this batch has undergone due to a failed request.
|
||||||
pub retries: u8,
|
pub retries: u8,
|
||||||
|
/// The number of times this batch has attempted to be re-downloaded and re-processed. This
|
||||||
|
/// occurs when a batch has been received but cannot be processed.
|
||||||
|
pub reprocess_retries: u8,
|
||||||
|
/// Marks the batch as undergoing a re-process, with a hash of the original blocks it received.
|
||||||
|
pub original_hash: Option<u64>,
|
||||||
/// The blocks that have been downloaded.
|
/// The blocks that have been downloaded.
|
||||||
pub downloaded_blocks: Vec<BeaconBlock<T>>,
|
pub downloaded_blocks: Vec<BeaconBlock<T>>,
|
||||||
}
|
}
|
||||||
@ -66,9 +73,11 @@ impl<T: EthSpec> Batch<T> {
|
|||||||
start_slot,
|
start_slot,
|
||||||
end_slot,
|
end_slot,
|
||||||
head_root,
|
head_root,
|
||||||
_original_peer: peer_id.clone(),
|
original_peer: peer_id.clone(),
|
||||||
current_peer: peer_id,
|
current_peer: peer_id,
|
||||||
retries: 0,
|
retries: 0,
|
||||||
|
reprocess_retries: 0,
|
||||||
|
original_hash: None,
|
||||||
downloaded_blocks: Vec::new(),
|
downloaded_blocks: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -81,6 +90,15 @@ impl<T: EthSpec> Batch<T> {
|
|||||||
step: 1,
|
step: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This gets a hash that represents the blocks currently downloaded. This allows comparing a
|
||||||
|
/// previously downloaded batch of blocks with a new downloaded batch of blocks.
|
||||||
|
pub fn hash(&self) -> u64 {
|
||||||
|
// the hash used is the ssz-encoded list of blocks
|
||||||
|
let mut hasher = std::collections::hash_map::DefaultHasher::new();
|
||||||
|
self.downloaded_blocks.as_ssz_bytes().hash(&mut hasher);
|
||||||
|
hasher.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: EthSpec> Ord for Batch<T> {
|
impl<T: EthSpec> Ord for Batch<T> {
|
||||||
|
@ -28,7 +28,7 @@ const BATCH_BUFFER_SIZE: u8 = 5;
|
|||||||
/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed
|
/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed
|
||||||
/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will
|
/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will
|
||||||
/// be downvoted.
|
/// be downvoted.
|
||||||
const _INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3;
|
const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3;
|
||||||
|
|
||||||
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
|
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
|
||||||
/// has been completed and should be removed or to be kept if further processing is
|
/// has been completed and should be removed or to be kept if further processing is
|
||||||
@ -71,9 +71,6 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
|||||||
/// The next batch id that needs to be processed.
|
/// The next batch id that needs to be processed.
|
||||||
to_be_processed_id: BatchId,
|
to_be_processed_id: BatchId,
|
||||||
|
|
||||||
/// The last batch id that was processed.
|
|
||||||
last_processed_id: BatchId,
|
|
||||||
|
|
||||||
/// The current state of the chain.
|
/// The current state of the chain.
|
||||||
pub state: ChainSyncingState,
|
pub state: ChainSyncingState,
|
||||||
|
|
||||||
@ -122,7 +119,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
peer_pool,
|
peer_pool,
|
||||||
to_be_downloaded_id: BatchId(1),
|
to_be_downloaded_id: BatchId(1),
|
||||||
to_be_processed_id: BatchId(1),
|
to_be_processed_id: BatchId(1),
|
||||||
last_processed_id: BatchId(0),
|
|
||||||
state: ChainSyncingState::Stopped,
|
state: ChainSyncingState::Stopped,
|
||||||
current_processing_id: None,
|
current_processing_id: None,
|
||||||
sync_send,
|
sync_send,
|
||||||
@ -131,6 +127,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the latest slot number that has been processed.
|
||||||
|
fn current_processed_slot(&self) -> Slot {
|
||||||
|
self.start_slot
|
||||||
|
.saturating_add(self.to_be_processed_id.saturating_sub(1u64) * BLOCKS_PER_BATCH)
|
||||||
|
}
|
||||||
|
|
||||||
/// A batch of blocks has been received. This function gets run on all chains and should
|
/// A batch of blocks has been received. This function gets run on all chains and should
|
||||||
/// return Some if the request id matches a pending request on this chain, or None if it does
|
/// return Some if the request id matches a pending request on this chain, or None if it does
|
||||||
/// not.
|
/// not.
|
||||||
@ -221,16 +223,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if there is a batch ready to be processed
|
// Check if there is a batch ready to be processed
|
||||||
while !self.completed_batches.is_empty()
|
if !self.completed_batches.is_empty()
|
||||||
&& self.completed_batches[0].id == self.to_be_processed_id
|
&& self.completed_batches[0].id == self.to_be_processed_id
|
||||||
{
|
{
|
||||||
let batch = self.completed_batches.remove(0);
|
let batch = self.completed_batches.remove(0);
|
||||||
if batch.downloaded_blocks.is_empty() {
|
|
||||||
// The batch was empty, consider this processed and move to the next batch
|
// Note: We now send empty batches to the processor in order to trigger the block
|
||||||
self.processed_batches.push(batch);
|
// processor result callback. This is done, because an empty batch could end a chain
|
||||||
*self.to_be_processed_id += 1;
|
// and the logic for removing chains and checking completion is in the callback.
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// send the batch to the batch processor thread
|
// send the batch to the batch processor thread
|
||||||
return self.process_batch(batch);
|
return self.process_batch(batch);
|
||||||
@ -283,27 +283,56 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
let res = match result {
|
let res = match result {
|
||||||
BatchProcessResult::Success => {
|
BatchProcessResult::Success => {
|
||||||
*self.to_be_processed_id += 1;
|
*self.to_be_processed_id += 1;
|
||||||
// This variable accounts for skip slots and batches that were not actually
|
|
||||||
// processed due to having no blocks.
|
|
||||||
self.last_processed_id = batch.id;
|
|
||||||
|
|
||||||
// Remove any validate batches awaiting validation.
|
// If the processed batch was not empty, we can validate previous invalidated
|
||||||
// Only batches that have blocks are processed here, therefore all previous batches
|
// blocks
|
||||||
// have been correct.
|
if !batch.downloaded_blocks.is_empty() {
|
||||||
let last_processed_id = self.last_processed_id;
|
// Remove any batches awaiting validation.
|
||||||
self.processed_batches
|
//
|
||||||
.retain(|batch| batch.id.0 >= last_processed_id.0);
|
// All blocks in processed_batches should be prior batches. As the current
|
||||||
|
// batch has been processed with blocks in it, all previous batches are valid.
|
||||||
|
//
|
||||||
|
// If a previous batch has been validated and it had been re-processed, downvote
|
||||||
|
// the original peer.
|
||||||
|
while !self.processed_batches.is_empty() {
|
||||||
|
let processed_batch = self.processed_batches.remove(0);
|
||||||
|
if *processed_batch.id >= *batch.id {
|
||||||
|
crit!(self.log, "A processed batch had a greater id than the current process id";
|
||||||
|
"processed_id" => *processed_batch.id,
|
||||||
|
"current_id" => *batch.id);
|
||||||
|
}
|
||||||
|
|
||||||
// add the current batch to processed batches to be verified in the future. We are
|
if let Some(prev_hash) = processed_batch.original_hash {
|
||||||
|
// The validated batch has been re-processed
|
||||||
|
if prev_hash != processed_batch.hash() {
|
||||||
|
// The re-downloaded version was different
|
||||||
|
if processed_batch.current_peer != processed_batch.original_peer {
|
||||||
|
// A new peer sent the correct batch, the previous peer did not
|
||||||
|
// downvote the original peer
|
||||||
|
//
|
||||||
|
// If the same peer corrected it's mistake, we allow it.... for
|
||||||
|
// now.
|
||||||
|
debug!(self.log, "Re-processed batch validated. Downvoting original peer";
|
||||||
|
"batch_id" => *processed_batch.id,
|
||||||
|
"original_peer" => format!("{}",processed_batch.original_peer),
|
||||||
|
"new_peer" => format!("{}", processed_batch.current_peer));
|
||||||
|
network.downvote_peer(processed_batch.original_peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the current batch to processed batches to be verified in the future. We are
|
||||||
// only uncertain about this batch, if it has not returned all blocks.
|
// only uncertain about this batch, if it has not returned all blocks.
|
||||||
if batch.downloaded_blocks.len() < BLOCKS_PER_BATCH as usize {
|
if batch.downloaded_blocks.last().map(|block| block.slot)
|
||||||
|
!= Some(batch.end_slot.saturating_sub(1u64))
|
||||||
|
{
|
||||||
self.processed_batches.push(batch);
|
self.processed_batches.push(batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if the chain has completed syncing
|
// check if the chain has completed syncing
|
||||||
if self.start_slot + *self.last_processed_id * BLOCKS_PER_BATCH
|
if self.current_processed_slot() >= self.target_head_slot {
|
||||||
>= self.target_head_slot
|
|
||||||
{
|
|
||||||
// chain is completed
|
// chain is completed
|
||||||
ProcessingResult::RemoveChain
|
ProcessingResult::RemoveChain
|
||||||
} else {
|
} else {
|
||||||
@ -320,19 +349,104 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
BatchProcessResult::Failed => {
|
BatchProcessResult::Failed => {
|
||||||
// batch processing failed
|
warn!(self.log, "Batch processing failed"; "id" => *batch.id, "peer" => format!("{}", batch.current_peer));
|
||||||
// this could be because this batch is invalid, or a previous invalidated batch
|
// The batch processing failed
|
||||||
|
// This could be because this batch is invalid, or a previous invalidated batch
|
||||||
// is invalid. We need to find out which and downvote the peer that has sent us
|
// is invalid. We need to find out which and downvote the peer that has sent us
|
||||||
// an invalid batch.
|
// an invalid batch.
|
||||||
|
|
||||||
// firstly remove any validated batches
|
// check that we have no exceeded the re-process retry counter
|
||||||
self.handle_invalid_batch(network, batch)
|
if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS {
|
||||||
|
// if a batch has exceeded the invalid batch lookup attempts limit, it means
|
||||||
|
// that it is likely all peers in this chain are are sending invalid batches
|
||||||
|
// repeatedly and are either malicious or faulty. We drop the chain and
|
||||||
|
// downvote all peers.
|
||||||
|
warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; "id"=> *batch.id);
|
||||||
|
for peer_id in self.peer_pool.drain() {
|
||||||
|
network.downvote_peer(peer_id);
|
||||||
|
}
|
||||||
|
ProcessingResult::RemoveChain
|
||||||
|
} else {
|
||||||
|
// Handle this invalid batch, that is within the re-process retries limit.
|
||||||
|
self.handle_invalid_batch(network, batch);
|
||||||
|
ProcessingResult::KeepChain
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Some(res)
|
Some(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An invalid batch has been received that could not be processed.
|
||||||
|
///
|
||||||
|
/// These events occur when a peer as successfully responded with blocks, but the blocks we
|
||||||
|
/// have received are incorrect or invalid. This indicates the peer has not performed as
|
||||||
|
/// intended and can result in downvoting a peer.
|
||||||
|
// TODO: Batches could have been partially downloaded due to RPC size-limit restrictions. We
|
||||||
|
// need to add logic for partial batch downloads. Potentially, if another peer returns the same
|
||||||
|
// batch, we try a partial download.
|
||||||
|
fn handle_invalid_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch<T::EthSpec>) {
|
||||||
|
// The current batch could not be processed, indicating either the current or previous
|
||||||
|
// batches are invalid
|
||||||
|
|
||||||
|
// The previous batch could be incomplete due to the block sizes being too large to fit in
|
||||||
|
// a single RPC request or there could be consecutive empty batches which are not supposed
|
||||||
|
// to be there
|
||||||
|
|
||||||
|
// The current (sub-optimal) strategy is to simply re-request all batches that could
|
||||||
|
// potentially be faulty. If a batch returns a different result than the original and
|
||||||
|
// results in successful processing, we downvote the original peer that sent us the batch.
|
||||||
|
|
||||||
|
// If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS
|
||||||
|
// times before considering the entire chain invalid and downvoting all peers.
|
||||||
|
|
||||||
|
// Find any pre-processed batches awaiting validation
|
||||||
|
while !self.processed_batches.is_empty() {
|
||||||
|
let past_batch = self.processed_batches.remove(0);
|
||||||
|
*self.to_be_processed_id = std::cmp::min(*self.to_be_processed_id, *past_batch.id);
|
||||||
|
self.reprocess_batch(network, past_batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
// re-process the current batch
|
||||||
|
self.reprocess_batch(network, batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This re-downloads and marks the batch as being re-processed.
|
||||||
|
///
|
||||||
|
/// If the re-downloaded batch is different to the original and can be processed, the original
|
||||||
|
/// peer will be downvoted.
|
||||||
|
fn reprocess_batch(&mut self, network: &mut SyncNetworkContext, mut batch: Batch<T::EthSpec>) {
|
||||||
|
// marks the batch as attempting to be reprocessed by hashing the downloaded blocks
|
||||||
|
batch.original_hash = Some(batch.hash());
|
||||||
|
|
||||||
|
// remove previously downloaded blocks
|
||||||
|
batch.downloaded_blocks.clear();
|
||||||
|
|
||||||
|
// increment the re-process counter
|
||||||
|
batch.reprocess_retries += 1;
|
||||||
|
|
||||||
|
// attempt to find another peer to download the batch from (this potentially doubles up
|
||||||
|
// requests on a single peer)
|
||||||
|
let current_peer = &batch.current_peer;
|
||||||
|
let new_peer = self
|
||||||
|
.peer_pool
|
||||||
|
.iter()
|
||||||
|
.find(|peer| *peer != current_peer)
|
||||||
|
.unwrap_or_else(|| current_peer);
|
||||||
|
|
||||||
|
batch.current_peer = new_peer.clone();
|
||||||
|
|
||||||
|
debug!(self.log, "Re-requesting batch";
|
||||||
|
"start_slot" => batch.start_slot,
|
||||||
|
"end_slot" => batch.end_slot,
|
||||||
|
"id" => *batch.id,
|
||||||
|
"peer" => format!("{}", batch.current_peer),
|
||||||
|
"head_root"=> format!("{}", batch.head_root),
|
||||||
|
"retries" => batch.retries,
|
||||||
|
"re-processes" => batch.reprocess_retries);
|
||||||
|
self.send_batch(network, batch);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn stop_syncing(&mut self) {
|
pub fn stop_syncing(&mut self) {
|
||||||
self.state = ChainSyncingState::Stopped;
|
self.state = ChainSyncingState::Stopped;
|
||||||
}
|
}
|
||||||
@ -351,17 +465,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
// to start from this point and re-index all subsequent batches starting from one
|
// to start from this point and re-index all subsequent batches starting from one
|
||||||
// (effectively creating a new chain).
|
// (effectively creating a new chain).
|
||||||
|
|
||||||
if local_finalized_slot.as_u64()
|
if local_finalized_slot > self.current_processed_slot() {
|
||||||
> self
|
|
||||||
.start_slot
|
|
||||||
.as_u64()
|
|
||||||
.saturating_add(*self.last_processed_id * BLOCKS_PER_BATCH)
|
|
||||||
{
|
|
||||||
debug!(self.log, "Updating chain's progress";
|
debug!(self.log, "Updating chain's progress";
|
||||||
"prev_completed_slot" => self.start_slot + *self.last_processed_id*BLOCKS_PER_BATCH,
|
"prev_completed_slot" => self.current_processed_slot(),
|
||||||
"new_completed_slot" => local_finalized_slot.as_u64());
|
"new_completed_slot" => local_finalized_slot.as_u64());
|
||||||
// Re-index batches
|
// Re-index batches
|
||||||
*self.last_processed_id = 0;
|
|
||||||
*self.to_be_downloaded_id = 1;
|
*self.to_be_downloaded_id = 1;
|
||||||
*self.to_be_processed_id = 1;
|
*self.to_be_processed_id = 1;
|
||||||
|
|
||||||
@ -386,7 +494,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
self.peer_pool.insert(peer_id.clone());
|
self.peer_pool.insert(peer_id.clone());
|
||||||
// do not request blocks if the chain is not syncing
|
// do not request blocks if the chain is not syncing
|
||||||
if let ChainSyncingState::Stopped = self.state {
|
if let ChainSyncingState::Stopped = self.state {
|
||||||
debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id));
|
debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{}", peer_id));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -465,47 +573,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An invalid batch has been received that could not be processed.
|
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
|
||||||
///
|
/// pool and left over batches until the batch buffer is reached or all peers are exhausted.
|
||||||
/// These events occur when a peer as successfully responded with blocks, but the blocks we
|
|
||||||
/// have received are incorrect or invalid. This indicates the peer has not performed as
|
|
||||||
/// intended and can result in downvoting a peer.
|
|
||||||
fn handle_invalid_batch(
|
|
||||||
&mut self,
|
|
||||||
network: &mut SyncNetworkContext,
|
|
||||||
_batch: Batch<T::EthSpec>,
|
|
||||||
) -> ProcessingResult {
|
|
||||||
// The current batch could not be processed, indicating either the current or previous
|
|
||||||
// batches are invalid
|
|
||||||
|
|
||||||
// The previous batch could be incomplete due to the block sizes being too large to fit in
|
|
||||||
// a single RPC request or there could be consecutive empty batches which are not supposed
|
|
||||||
// to be there
|
|
||||||
|
|
||||||
// The current (sub-optimal) strategy is to simply re-request all batches that could
|
|
||||||
// potentially be faulty. If a batch returns a different result than the original and
|
|
||||||
// results in successful processing, we downvote the original peer that sent us the batch.
|
|
||||||
|
|
||||||
// If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS
|
|
||||||
// times before considering the entire chain invalid and downvoting all peers.
|
|
||||||
|
|
||||||
// Firstly, check if there are any past batches that could be invalid.
|
|
||||||
if !self.processed_batches.is_empty() {
|
|
||||||
// try and re-download this batch from other peers
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO: Implement this logic
|
|
||||||
// Currently just fail the chain, and drop all associated peers, removing them from the
|
|
||||||
// peer pool, to prevent re-status
|
|
||||||
for peer_id in self.peer_pool.drain() {
|
|
||||||
network.downvote_peer(peer_id);
|
|
||||||
}
|
|
||||||
ProcessingResult::RemoveChain
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to request the next required batches from the peer pool if the chain is syncing.
|
|
||||||
/// It will exhaust the peer pool and left over batches until the batch buffer is reached or
|
|
||||||
/// all peers are exhausted.
|
|
||||||
fn request_batches(&mut self, network: &mut SyncNetworkContext) {
|
fn request_batches(&mut self, network: &mut SyncNetworkContext) {
|
||||||
if let ChainSyncingState::Syncing = self.state {
|
if let ChainSyncingState::Syncing = self.state {
|
||||||
while self.send_range_request(network) {}
|
while self.send_range_request(network) {}
|
||||||
@ -518,7 +587,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
// find the next pending batch and request it from the peer
|
// find the next pending batch and request it from the peer
|
||||||
if let Some(peer_id) = self.get_next_peer() {
|
if let Some(peer_id) = self.get_next_peer() {
|
||||||
if let Some(batch) = self.get_next_batch(peer_id) {
|
if let Some(batch) = self.get_next_batch(peer_id) {
|
||||||
debug!(self.log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
|
debug!(self.log, "Requesting batch";
|
||||||
|
"start_slot" => batch.start_slot,
|
||||||
|
"end_slot" => batch.end_slot,
|
||||||
|
"id" => *batch.id,
|
||||||
|
"peer" => format!("{}", batch.current_peer),
|
||||||
|
"head_root"=> format!("{}", batch.head_root));
|
||||||
// send the batch
|
// send the batch
|
||||||
self.send_batch(network, batch);
|
self.send_batch(network, batch);
|
||||||
return true;
|
return true;
|
||||||
@ -531,6 +605,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
///
|
///
|
||||||
/// This is used to create the next request.
|
/// This is used to create the next request.
|
||||||
fn get_next_peer(&self) -> Option<PeerId> {
|
fn get_next_peer(&self) -> Option<PeerId> {
|
||||||
|
// TODO: Optimize this by combining with above two functions.
|
||||||
// randomize the peers for load balancing
|
// randomize the peers for load balancing
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let mut peers = self.peer_pool.iter().collect::<Vec<_>>();
|
let mut peers = self.peer_pool.iter().collect::<Vec<_>>();
|
||||||
|
@ -310,6 +310,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
|||||||
.block_root_tree
|
.block_root_tree
|
||||||
.is_known_block_root(&chain.target_head_root)
|
.is_known_block_root(&chain.target_head_root)
|
||||||
{
|
{
|
||||||
|
debug!(log, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot);
|
||||||
chain.status_peers(network);
|
chain.status_peers(network);
|
||||||
false
|
false
|
||||||
} else {
|
} else {
|
||||||
@ -322,6 +323,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
|||||||
.block_root_tree
|
.block_root_tree
|
||||||
.is_known_block_root(&chain.target_head_root)
|
.is_known_block_root(&chain.target_head_root)
|
||||||
{
|
{
|
||||||
|
debug!(log, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot);
|
||||||
chain.status_peers(network);
|
chain.status_peers(network);
|
||||||
false
|
false
|
||||||
} else {
|
} else {
|
||||||
|
@ -248,8 +248,9 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
|||||||
})
|
})
|
||||||
.is_none();
|
.is_none();
|
||||||
if id_not_found {
|
if id_not_found {
|
||||||
// The request didn't exist in any `SyncingChain`. Could have been an old request. Log
|
// The request didn't exist in any `SyncingChain`. Could have been an old request or
|
||||||
// and ignore
|
// the chain was purged due to being out of date whilst a request was pending. Log
|
||||||
|
// and ignore.
|
||||||
debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id);
|
debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -307,7 +308,9 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
|||||||
}
|
}
|
||||||
Some((_, ProcessingResult::KeepChain)) => {}
|
Some((_, ProcessingResult::KeepChain)) => {}
|
||||||
None => {
|
None => {
|
||||||
warn!(self.log, "No chains match the block processing id"; "id" => processing_id);
|
// This can happen if a chain gets purged due to being out of date whilst a
|
||||||
|
// batch process is in progress.
|
||||||
|
debug!(self.log, "No chains match the block processing id"; "id" => processing_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user