diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e32770c59..49e1eb290 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -69,6 +69,8 @@ impl BlockLookups { /* Lookup requests */ + /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is + /// constructed. pub fn search_block( &mut self, hash: Hash256, @@ -105,6 +107,8 @@ impl BlockLookups { } } + /// If a block is attempted to be processed but we do not know its parent, this function is + /// called in order to find the block's parent. pub fn search_parent( &mut self, block: Arc>, @@ -201,6 +205,7 @@ impl BlockLookups { ); } + /// Process a response received from a parent lookup request. pub fn parent_lookup_response( &mut self, id: Id, @@ -258,7 +263,6 @@ impl BlockLookups { self.request_parent(parent_lookup, cx); } VerifyError::PreviousFailure { parent_root } => { - self.failed_chains.insert(parent_lookup.chain_hash()); debug!( self.log, "Parent chain ignored due to past failure"; @@ -336,6 +340,7 @@ impl BlockLookups { } } + /// An RPC error has occurred during a parent lookup. This function handles this case. pub fn parent_lookup_failed( &mut self, id: Id, @@ -362,7 +367,7 @@ impl BlockLookups { pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext) { if let Some(mut request) = self.single_block_lookups.remove(&id) { - request.register_failure(); + request.register_failure_downloading(); trace!(self.log, "Single block lookup failed"; "block" => %request.hash); if let Ok((peer_id, block_request)) = request.request_block() { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { @@ -453,7 +458,7 @@ impl BlockLookups { "single_block_failure", ); // Try it again if possible. - req.register_failure(); + req.register_failure_processing(); if let Ok((peer_id, request)) = req.request_block() { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { @@ -569,12 +574,13 @@ impl BlockLookups { "last_peer" => %peer_id, ); - // Add this chain to cache of failed chains - self.failed_chains.insert(chain_hash); - // This currently can be a host of errors. We permit this due to the partial // ambiguity. cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); + + // Try again if possible + parent_lookup.processing_failed(); + self.request_parent(parent_lookup, cx); } BlockProcessResult::Ignored => { // Beacon processor signalled to ignore the block processing result. @@ -683,14 +689,26 @@ impl BlockLookups { parent_lookup::RequestError::SendFailed(_) => { // Probably shutting down, nothing to do here. Drop the request } - parent_lookup::RequestError::ChainTooLong - | parent_lookup::RequestError::TooManyAttempts => { + parent_lookup::RequestError::ChainTooLong => { self.failed_chains.insert(parent_lookup.chain_hash()); // This indicates faulty peers. for &peer_id in parent_lookup.used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } + parent_lookup::RequestError::TooManyAttempts { cannot_process } => { + // We only consider the chain failed if we were unable to process it. + // We could have failed because one peer continually failed to send us + // bad blocks. We still allow other peers to send us this chain. Note + // that peers that do this, still get penalised. + if cannot_process { + self.failed_chains.insert(parent_lookup.chain_hash()); + } + // This indicates faulty peers. + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) + } + } parent_lookup::RequestError::NoPeers => { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 62503353a..bf5a1b259 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -10,7 +10,7 @@ use crate::sync::{ use super::single_block_lookup::{self, SingleBlockRequest}; -/// How many attempts we try to find a parent of a block before we give up trying . +/// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; /// The maximum depth we will search for a parent block. In principle we should have sync'd any /// canonical chain to its head once the peer connects. A chain should not appear where it's depth @@ -41,7 +41,12 @@ pub enum VerifyError { pub enum RequestError { SendFailed(&'static str), ChainTooLong, - TooManyAttempts, + /// We witnessed too many failures trying to complete this parent lookup. + TooManyAttempts { + /// We received more failures trying to process the blocks than downloading them + /// from peers. + cannot_process: bool, + }, NoPeers, } @@ -105,7 +110,12 @@ impl ParentLookup { } pub fn download_failed(&mut self) { - self.current_parent_request.register_failure(); + self.current_parent_request.register_failure_downloading(); + self.current_parent_request_id = None; + } + + pub fn processing_failed(&mut self) { + self.current_parent_request.register_failure_processing(); self.current_parent_request_id = None; } @@ -126,7 +136,7 @@ impl ParentLookup { // be dropped and the peer downscored. if let Some(parent_root) = block.as_ref().map(|block| block.parent_root()) { if failed_chains.contains(&parent_root) { - self.current_parent_request.register_failure(); + self.current_parent_request.register_failure_downloading(); self.current_parent_request_id = None; return Err(VerifyError::PreviousFailure { parent_root }); } @@ -144,7 +154,7 @@ impl ParentLookup { #[cfg(test)] pub fn failed_attempts(&self) -> u8 { - self.current_parent_request.failed_attempts + self.current_parent_request.failed_attempts() } pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { @@ -171,7 +181,9 @@ impl From for RequestError { fn from(e: super::single_block_lookup::LookupRequestError) -> Self { use super::single_block_lookup::LookupRequestError as E; match e { - E::TooManyAttempts => RequestError::TooManyAttempts, + E::TooManyAttempts { cannot_process } => { + RequestError::TooManyAttempts { cannot_process } + } E::NoPeers => RequestError::NoPeers, } } @@ -195,7 +207,10 @@ impl RequestError { match self { RequestError::SendFailed(e) => e, RequestError::ChainTooLong => "chain_too_long", - RequestError::TooManyAttempts => "too_many_attempts", + RequestError::TooManyAttempts { cannot_process } if *cannot_process => { + "too_many_processing_attempts" + } + RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index debf3de8d..8ba5b17bf 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -18,8 +18,10 @@ pub struct SingleBlockRequest { pub available_peers: HashSet, /// Peers from which we have requested this block. pub used_peers: HashSet, - /// How many times have we attempted this block. - pub failed_attempts: u8, + /// How many times have we attempted to process this block. + failed_processing: u8, + /// How many times have we attempted to download this block. + failed_downloading: u8, } #[derive(Debug, PartialEq, Eq)] @@ -38,7 +40,11 @@ pub enum VerifyError { #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { - TooManyAttempts, + /// Too many failed attempts + TooManyAttempts { + /// The failed attempts were primarily due to processing failures. + cannot_process: bool, + }, NoPeers, } @@ -49,15 +55,29 @@ impl SingleBlockRequest { state: State::AwaitingDownload, available_peers: HashSet::from([peer_id]), used_peers: HashSet::default(), - failed_attempts: 0, + failed_processing: 0, + failed_downloading: 0, } } - pub fn register_failure(&mut self) { - self.failed_attempts += 1; + /// Registers a failure in processing a block. + pub fn register_failure_processing(&mut self) { + self.failed_processing = self.failed_processing.saturating_add(1); self.state = State::AwaitingDownload; } + /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong + /// block. + pub fn register_failure_downloading(&mut self) { + self.failed_downloading = self.failed_downloading.saturating_add(1); + self.state = State::AwaitingDownload; + } + + /// The total number of failures, whether it be processing or downloading. + pub fn failed_attempts(&self) -> u8 { + self.failed_processing + self.failed_downloading + } + pub fn add_peer(&mut self, hash: &Hash256, peer_id: &PeerId) -> bool { let is_useful = &self.hash == hash; if is_useful { @@ -72,7 +92,7 @@ impl SingleBlockRequest { if let State::Downloading { peer_id } = &self.state { if peer_id == dc_peer_id { // Peer disconnected before providing a block - self.register_failure(); + self.register_failure_downloading(); return Err(()); } } @@ -87,14 +107,16 @@ impl SingleBlockRequest { ) -> Result>>, VerifyError> { match self.state { State::AwaitingDownload => { - self.register_failure(); + self.register_failure_downloading(); Err(VerifyError::ExtraBlocksReturned) } State::Downloading { peer_id } => match block { Some(block) => { if block.canonical_root() != self.hash { // return an error and drop the block - self.register_failure(); + // NOTE: we take this is as a download failure to prevent counting the + // attempt as a chain failure, but simply a peer failure. + self.register_failure_downloading(); Err(VerifyError::RootMismatch) } else { // Return the block for processing. @@ -103,14 +125,14 @@ impl SingleBlockRequest { } } None => { - self.register_failure(); + self.register_failure_downloading(); Err(VerifyError::NoBlockReturned) } }, State::Processing { peer_id: _ } => match block { Some(_) => { // We sent the block for processing and received an extra block. - self.register_failure(); + self.register_failure_downloading(); Err(VerifyError::ExtraBlocksReturned) } None => { @@ -124,19 +146,19 @@ impl SingleBlockRequest { pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> { debug_assert!(matches!(self.state, State::AwaitingDownload)); - if self.failed_attempts <= MAX_ATTEMPTS { - if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) { - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![self.hash]), - }; - self.state = State::Downloading { peer_id }; - self.used_peers.insert(peer_id); - Ok((peer_id, request)) - } else { - Err(LookupRequestError::NoPeers) - } + if self.failed_attempts() >= MAX_ATTEMPTS { + Err(LookupRequestError::TooManyAttempts { + cannot_process: self.failed_processing >= self.failed_downloading, + }) + } else if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) { + let request = BlocksByRootRequest { + block_roots: VariableList::from(vec![self.hash]), + }; + self.state = State::Downloading { peer_id }; + self.used_peers.insert(peer_id); + Ok((peer_id, request)) } else { - Err(LookupRequestError::TooManyAttempts) + Err(LookupRequestError::NoPeers) } } @@ -169,6 +191,8 @@ impl slog::Value for SingleBlockRequest { serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))? } } + serializer.emit_u8("failed_downloads", self.failed_downloading)?; + serializer.emit_u8("failed_processing", self.failed_processing)?; slog::Result::Ok(()) } } @@ -200,11 +224,28 @@ mod tests { } #[test] - fn test_max_attempts() { + fn test_block_lookup_failures() { + const FAILURES: u8 = 3; let peer_id = PeerId::random(); let block = rand_block(); - let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id); - sl.register_failure(); + let mut sl = SingleBlockRequest::::new(block.canonical_root(), peer_id); + for _ in 1..FAILURES { + sl.request_block().unwrap(); + sl.register_failure_downloading(); + } + + // Now we receive the block and send it for processing + sl.request_block().unwrap(); + sl.verify_block(Some(Arc::new(block))).unwrap().unwrap(); + + // One processing failure maxes the available attempts + sl.register_failure_processing(); + assert_eq!( + sl.request_block(), + Err(LookupRequestError::TooManyAttempts { + cannot_process: false + }) + ) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 352de4e09..b3afadda2 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -385,12 +385,11 @@ fn test_parent_lookup_too_many_attempts() { let parent = rig.rand_block(); let block = rig.block_with_parent(parent.canonical_root()); - let chain_hash = block.canonical_root(); let peer_id = PeerId::random(); // Trigger the request bl.search_parent(Arc::new(block), peer_id, &mut cx); - for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE + 1 { + for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { let id = rig.expect_parent_request(); match i % 2 { // make sure every error is accounted for @@ -402,6 +401,8 @@ fn test_parent_lookup_too_many_attempts() { // Send a bad block this time. It should be tried again. let bad_block = rig.rand_block(); bl.parent_lookup_response(id, peer_id, Some(Arc::new(bad_block)), D, &mut cx); + // Send the stream termination + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); rig.expect_penalty(); } } @@ -411,7 +412,74 @@ fn test_parent_lookup_too_many_attempts() { } assert_eq!(bl.parent_queue.len(), 0); - assert!(bl.failed_chains.contains(&chain_hash)); +} + +#[test] +fn test_parent_lookup_too_many_download_attempts_no_blacklist() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let block_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Arc::new(block), peer_id, &mut cx); + for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { + assert!(!bl.failed_chains.contains(&block_hash)); + let id = rig.expect_parent_request(); + if i % 2 != 0 { + // The request fails. It should be tried again. + bl.parent_lookup_failed(id, peer_id, &mut cx); + } else { + // Send a bad block this time. It should be tried again. + let bad_block = rig.rand_block(); + bl.parent_lookup_response(id, peer_id, Some(Arc::new(bad_block)), D, &mut cx); + rig.expect_penalty(); + } + if i < parent_lookup::PARENT_FAIL_TOLERANCE { + assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i)); + } + } + + assert_eq!(bl.parent_queue.len(), 0); + assert!(!bl.failed_chains.contains(&block_hash)); + assert!(!bl.failed_chains.contains(&parent.canonical_root())); +} + +#[test] +fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { + const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = Arc::new(rig.rand_block()); + let block = rig.block_with_parent(parent.canonical_root()); + let block_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Arc::new(block), peer_id, &mut cx); + + // Fail downloading the block + for _ in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { + let id = rig.expect_parent_request(); + // The request fails. It should be tried again. + bl.parent_lookup_failed(id, peer_id, &mut cx); + } + + // Now fail processing a block in the parent request + for _ in 0..PROCESSING_FAILURES { + let id = dbg!(rig.expect_parent_request()); + assert!(!bl.failed_chains.contains(&block_hash)); + // send the right parent but fail processing + bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx); + bl.parent_block_processed(block_hash, BlockError::InvalidSignature.into(), &mut cx); + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_penalty(); + } + + assert!(bl.failed_chains.contains(&block_hash)); + assert_eq!(bl.parent_queue.len(), 0); } #[test]