From 81b028b805e4d5b9329f2b28b3fa19ac1997f54b Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 23 Jan 2020 19:25:13 +1100 Subject: [PATCH] Advanced error handling for syncing (#819) * Initial block processing thread design * Correct compilation issues * Increase logging and request from all given peers * Patch peer request bug * Adds fork choice to block processing * Adds logging for bug isolation * Patch syncing for chains with skip-slots * Bump block processing error logs * Improve logging for attestation processing * Randomize peer selection during sync * Resuming chains restarts from local finalized slot * Downgrades Arc batches to Rc batches * Add clippy fixes * Add advanced error handling for invalid/malicious batches * Downgrade Rc to Option to pass processed batches to chains * Squash edge case rpc and syncing bugs * Process empty batches which could end chains * Removes last_processed_id concept to account for ending skip-slot batches * Add logging for chain purges * Adds retries to re-request batch logging * Remove bug finding log * Add reviewers suggestions * Revert to master modifications * Line wrapping * Revert to master --- beacon_node/eth2-libp2p/src/rpc/handler.rs | 16 +- .../network/src/sync/range_sync/batch.rs | 24 +- .../network/src/sync/range_sync/chain.rs | 237 ++++++++++++------ .../src/sync/range_sync/chain_collection.rs | 2 + .../network/src/sync/range_sync/range.rs | 9 +- 5 files changed, 195 insertions(+), 93 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 9d9020386..d6424f0af 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -163,11 +163,16 @@ where *self = InboundSubstreamState::ResponsePendingSend { substream, closing } } - InboundSubstreamState::ResponseIdle(substream) => { - *self = InboundSubstreamState::ResponsePendingSend { - substream: substream.send(error), - closing: true, - }; + InboundSubstreamState::ResponseIdle(mut substream) => { + // check if the stream is already closed + if let Ok(Async::Ready(None)) = substream.poll() { + *self = InboundSubstreamState::Closing(substream); + } else { + *self = InboundSubstreamState::ResponsePendingSend { + substream: substream.send(error), + closing: true, + }; + } } InboundSubstreamState::Closing(substream) => { // let the stream close @@ -314,7 +319,6 @@ where substream: out, request, }; - debug!(self.log, "Added outbound substream id"; "substream_id" => id); self.outbound_substreams .insert(id, (awaiting_stream, delay_key)); } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index e487e795b..a60614969 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -3,9 +3,11 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; +use ssz::Encode; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::ops::Sub; use types::{BeaconBlock, EthSpec, Hash256, Slot}; @@ -42,11 +44,16 @@ pub struct Batch { /// The hash of the chain root to requested from the peer. pub head_root: Hash256, /// The peer that was originally assigned to the batch. - pub _original_peer: PeerId, + pub original_peer: PeerId, /// The peer that is currently assigned to the batch. pub current_peer: PeerId, - /// The number of retries this batch has undergone. + /// The number of retries this batch has undergone due to a failed request. pub retries: u8, + /// The number of times this batch has attempted to be re-downloaded and re-processed. This + /// occurs when a batch has been received but cannot be processed. + pub reprocess_retries: u8, + /// Marks the batch as undergoing a re-process, with a hash of the original blocks it received. + pub original_hash: Option, /// The blocks that have been downloaded. pub downloaded_blocks: Vec>, } @@ -66,9 +73,11 @@ impl Batch { start_slot, end_slot, head_root, - _original_peer: peer_id.clone(), + original_peer: peer_id.clone(), current_peer: peer_id, retries: 0, + reprocess_retries: 0, + original_hash: None, downloaded_blocks: Vec::new(), } } @@ -81,6 +90,15 @@ impl Batch { step: 1, } } + + /// This gets a hash that represents the blocks currently downloaded. This allows comparing a + /// previously downloaded batch of blocks with a new downloaded batch of blocks. + pub fn hash(&self) -> u64 { + // the hash used is the ssz-encoded list of blocks + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + self.downloaded_blocks.as_ssz_bytes().hash(&mut hasher); + hasher.finish() + } } impl Ord for Batch { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 165d27ab2..d378184c9 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -28,7 +28,7 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed /// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will /// be downvoted. -const _INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; +const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is @@ -71,9 +71,6 @@ pub struct SyncingChain { /// The next batch id that needs to be processed. to_be_processed_id: BatchId, - /// The last batch id that was processed. - last_processed_id: BatchId, - /// The current state of the chain. pub state: ChainSyncingState, @@ -122,7 +119,6 @@ impl SyncingChain { peer_pool, to_be_downloaded_id: BatchId(1), to_be_processed_id: BatchId(1), - last_processed_id: BatchId(0), state: ChainSyncingState::Stopped, current_processing_id: None, sync_send, @@ -131,6 +127,12 @@ impl SyncingChain { } } + /// Returns the latest slot number that has been processed. + fn current_processed_slot(&self) -> Slot { + self.start_slot + .saturating_add(self.to_be_processed_id.saturating_sub(1u64) * BLOCKS_PER_BATCH) + } + /// A batch of blocks has been received. This function gets run on all chains and should /// return Some if the request id matches a pending request on this chain, or None if it does /// not. @@ -221,16 +223,14 @@ impl SyncingChain { } // Check if there is a batch ready to be processed - while !self.completed_batches.is_empty() + if !self.completed_batches.is_empty() && self.completed_batches[0].id == self.to_be_processed_id { let batch = self.completed_batches.remove(0); - if batch.downloaded_blocks.is_empty() { - // The batch was empty, consider this processed and move to the next batch - self.processed_batches.push(batch); - *self.to_be_processed_id += 1; - continue; - } + + // Note: We now send empty batches to the processor in order to trigger the block + // processor result callback. This is done, because an empty batch could end a chain + // and the logic for removing chains and checking completion is in the callback. // send the batch to the batch processor thread return self.process_batch(batch); @@ -283,27 +283,56 @@ impl SyncingChain { let res = match result { BatchProcessResult::Success => { *self.to_be_processed_id += 1; - // This variable accounts for skip slots and batches that were not actually - // processed due to having no blocks. - self.last_processed_id = batch.id; - // Remove any validate batches awaiting validation. - // Only batches that have blocks are processed here, therefore all previous batches - // have been correct. - let last_processed_id = self.last_processed_id; - self.processed_batches - .retain(|batch| batch.id.0 >= last_processed_id.0); + // If the processed batch was not empty, we can validate previous invalidated + // blocks + if !batch.downloaded_blocks.is_empty() { + // Remove any batches awaiting validation. + // + // All blocks in processed_batches should be prior batches. As the current + // batch has been processed with blocks in it, all previous batches are valid. + // + // If a previous batch has been validated and it had been re-processed, downvote + // the original peer. + while !self.processed_batches.is_empty() { + let processed_batch = self.processed_batches.remove(0); + if *processed_batch.id >= *batch.id { + crit!(self.log, "A processed batch had a greater id than the current process id"; + "processed_id" => *processed_batch.id, + "current_id" => *batch.id); + } - // add the current batch to processed batches to be verified in the future. We are + if let Some(prev_hash) = processed_batch.original_hash { + // The validated batch has been re-processed + if prev_hash != processed_batch.hash() { + // The re-downloaded version was different + if processed_batch.current_peer != processed_batch.original_peer { + // A new peer sent the correct batch, the previous peer did not + // downvote the original peer + // + // If the same peer corrected it's mistake, we allow it.... for + // now. + debug!(self.log, "Re-processed batch validated. Downvoting original peer"; + "batch_id" => *processed_batch.id, + "original_peer" => format!("{}",processed_batch.original_peer), + "new_peer" => format!("{}", processed_batch.current_peer)); + network.downvote_peer(processed_batch.original_peer); + } + } + } + } + } + + // Add the current batch to processed batches to be verified in the future. We are // only uncertain about this batch, if it has not returned all blocks. - if batch.downloaded_blocks.len() < BLOCKS_PER_BATCH as usize { + if batch.downloaded_blocks.last().map(|block| block.slot) + != Some(batch.end_slot.saturating_sub(1u64)) + { self.processed_batches.push(batch); } // check if the chain has completed syncing - if self.start_slot + *self.last_processed_id * BLOCKS_PER_BATCH - >= self.target_head_slot - { + if self.current_processed_slot() >= self.target_head_slot { // chain is completed ProcessingResult::RemoveChain } else { @@ -320,19 +349,104 @@ impl SyncingChain { } } BatchProcessResult::Failed => { - // batch processing failed - // this could be because this batch is invalid, or a previous invalidated batch + warn!(self.log, "Batch processing failed"; "id" => *batch.id, "peer" => format!("{}", batch.current_peer)); + // The batch processing failed + // This could be because this batch is invalid, or a previous invalidated batch // is invalid. We need to find out which and downvote the peer that has sent us // an invalid batch. - // firstly remove any validated batches - self.handle_invalid_batch(network, batch) + // check that we have no exceeded the re-process retry counter + if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { + // if a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers in this chain are are sending invalid batches + // repeatedly and are either malicious or faulty. We drop the chain and + // downvote all peers. + warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; "id"=> *batch.id); + for peer_id in self.peer_pool.drain() { + network.downvote_peer(peer_id); + } + ProcessingResult::RemoveChain + } else { + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch); + ProcessingResult::KeepChain + } } }; Some(res) } + /// An invalid batch has been received that could not be processed. + /// + /// These events occur when a peer as successfully responded with blocks, but the blocks we + /// have received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in downvoting a peer. + // TODO: Batches could have been partially downloaded due to RPC size-limit restrictions. We + // need to add logic for partial batch downloads. Potentially, if another peer returns the same + // batch, we try a partial download. + fn handle_invalid_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch) { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid + + // The previous batch could be incomplete due to the block sizes being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS + // times before considering the entire chain invalid and downvoting all peers. + + // Find any pre-processed batches awaiting validation + while !self.processed_batches.is_empty() { + let past_batch = self.processed_batches.remove(0); + *self.to_be_processed_id = std::cmp::min(*self.to_be_processed_id, *past_batch.id); + self.reprocess_batch(network, past_batch); + } + + // re-process the current batch + self.reprocess_batch(network, batch); + } + + /// This re-downloads and marks the batch as being re-processed. + /// + /// If the re-downloaded batch is different to the original and can be processed, the original + /// peer will be downvoted. + fn reprocess_batch(&mut self, network: &mut SyncNetworkContext, mut batch: Batch) { + // marks the batch as attempting to be reprocessed by hashing the downloaded blocks + batch.original_hash = Some(batch.hash()); + + // remove previously downloaded blocks + batch.downloaded_blocks.clear(); + + // increment the re-process counter + batch.reprocess_retries += 1; + + // attempt to find another peer to download the batch from (this potentially doubles up + // requests on a single peer) + let current_peer = &batch.current_peer; + let new_peer = self + .peer_pool + .iter() + .find(|peer| *peer != current_peer) + .unwrap_or_else(|| current_peer); + + batch.current_peer = new_peer.clone(); + + debug!(self.log, "Re-requesting batch"; + "start_slot" => batch.start_slot, + "end_slot" => batch.end_slot, + "id" => *batch.id, + "peer" => format!("{}", batch.current_peer), + "head_root"=> format!("{}", batch.head_root), + "retries" => batch.retries, + "re-processes" => batch.reprocess_retries); + self.send_batch(network, batch); + } + pub fn stop_syncing(&mut self) { self.state = ChainSyncingState::Stopped; } @@ -351,17 +465,11 @@ impl SyncingChain { // to start from this point and re-index all subsequent batches starting from one // (effectively creating a new chain). - if local_finalized_slot.as_u64() - > self - .start_slot - .as_u64() - .saturating_add(*self.last_processed_id * BLOCKS_PER_BATCH) - { + if local_finalized_slot > self.current_processed_slot() { debug!(self.log, "Updating chain's progress"; - "prev_completed_slot" => self.start_slot + *self.last_processed_id*BLOCKS_PER_BATCH, + "prev_completed_slot" => self.current_processed_slot(), "new_completed_slot" => local_finalized_slot.as_u64()); // Re-index batches - *self.last_processed_id = 0; *self.to_be_downloaded_id = 1; *self.to_be_processed_id = 1; @@ -386,7 +494,7 @@ impl SyncingChain { self.peer_pool.insert(peer_id.clone()); // do not request blocks if the chain is not syncing if let ChainSyncingState::Stopped = self.state { - debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id)); + debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{}", peer_id)); return; } @@ -465,47 +573,8 @@ impl SyncingChain { } } - /// An invalid batch has been received that could not be processed. - /// - /// These events occur when a peer as successfully responded with blocks, but the blocks we - /// have received are incorrect or invalid. This indicates the peer has not performed as - /// intended and can result in downvoting a peer. - fn handle_invalid_batch( - &mut self, - network: &mut SyncNetworkContext, - _batch: Batch, - ) -> ProcessingResult { - // The current batch could not be processed, indicating either the current or previous - // batches are invalid - - // The previous batch could be incomplete due to the block sizes being too large to fit in - // a single RPC request or there could be consecutive empty batches which are not supposed - // to be there - - // The current (sub-optimal) strategy is to simply re-request all batches that could - // potentially be faulty. If a batch returns a different result than the original and - // results in successful processing, we downvote the original peer that sent us the batch. - - // If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS - // times before considering the entire chain invalid and downvoting all peers. - - // Firstly, check if there are any past batches that could be invalid. - if !self.processed_batches.is_empty() { - // try and re-download this batch from other peers - } - - //TODO: Implement this logic - // Currently just fail the chain, and drop all associated peers, removing them from the - // peer pool, to prevent re-status - for peer_id in self.peer_pool.drain() { - network.downvote_peer(peer_id); - } - ProcessingResult::RemoveChain - } - - /// Attempts to request the next required batches from the peer pool if the chain is syncing. - /// It will exhaust the peer pool and left over batches until the batch buffer is reached or - /// all peers are exhausted. + /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer + /// pool and left over batches until the batch buffer is reached or all peers are exhausted. fn request_batches(&mut self, network: &mut SyncNetworkContext) { if let ChainSyncingState::Syncing = self.state { while self.send_range_request(network) {} @@ -518,7 +587,12 @@ impl SyncingChain { // find the next pending batch and request it from the peer if let Some(peer_id) = self.get_next_peer() { if let Some(batch) = self.get_next_batch(peer_id) { - debug!(self.log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); + debug!(self.log, "Requesting batch"; + "start_slot" => batch.start_slot, + "end_slot" => batch.end_slot, + "id" => *batch.id, + "peer" => format!("{}", batch.current_peer), + "head_root"=> format!("{}", batch.head_root)); // send the batch self.send_batch(network, batch); return true; @@ -531,6 +605,7 @@ impl SyncingChain { /// /// This is used to create the next request. fn get_next_peer(&self) -> Option { + // TODO: Optimize this by combining with above two functions. // randomize the peers for load balancing let mut rng = rand::thread_rng(); let mut peers = self.peer_pool.iter().collect::>(); diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index a42589e9d..2964ba0b6 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -310,6 +310,7 @@ impl ChainCollection { .block_root_tree .is_known_block_root(&chain.target_head_root) { + debug!(log, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); chain.status_peers(network); false } else { @@ -322,6 +323,7 @@ impl ChainCollection { .block_root_tree .is_known_block_root(&chain.target_head_root) { + debug!(log, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); chain.status_peers(network); false } else { diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 60b9ea18b..ee7fb8ae7 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -248,8 +248,9 @@ impl RangeSync { }) .is_none(); if id_not_found { - // The request didn't exist in any `SyncingChain`. Could have been an old request. Log - // and ignore + // The request didn't exist in any `SyncingChain`. Could have been an old request or + // the chain was purged due to being out of date whilst a request was pending. Log + // and ignore. debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id); } } @@ -307,7 +308,9 @@ impl RangeSync { } Some((_, ProcessingResult::KeepChain)) => {} None => { - warn!(self.log, "No chains match the block processing id"; "id" => processing_id); + // This can happen if a chain gets purged due to being out of date whilst a + // batch process is in progress. + debug!(self.log, "No chains match the block processing id"; "id" => processing_id); } } }