From 5d8a0858804684714f043de55cbc5c91c1b38d12 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 13 Mar 2020 15:51:03 +1100 Subject: [PATCH] Upgrade the parent lookup logic (#895) * Upgrade the parent lookup logic --- beacon_node/network/src/sync/manager.rs | 282 ++++++++++++++++-------- 1 file changed, 188 insertions(+), 94 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 032490554..ae542b324 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -43,6 +43,7 @@ use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use futures::prelude::*; +use rand::seq::SliceRandom; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use std::boxed::Box; @@ -160,8 +161,11 @@ pub struct SyncManager { /// A collection of parent block lookups. parent_queue: SmallVec<[ParentRequests; 3]>, - /// A collection of block hashes being searched for - single_block_lookups: FnvHashMap, + /// A collection of block hashes being searched for and a flag indicating if a result has been + /// received or not. + /// + /// The flag allows us to determine if the peer returned data or sent us nothing. + single_block_lookups: FnvHashMap, /// The collection of known, connected, fully-sync'd peers. full_peers: HashSet, @@ -292,44 +296,74 @@ impl SyncManager { request_id: RequestId, block: Option>, ) { - // check if this is a single block lookup - i.e we were searching for a specific hash - if block.is_some() { - if let Some(block_hash) = self.single_block_lookups.remove(&request_id) { - self.single_block_lookup_response( - peer_id, - block.expect("block exists"), - block_hash, - ); - return; - } - } - - // this should be a response to a parent request search - // find the request - let mut parent_request = match self - .parent_queue - .iter() - .position(|request| request.pending == Some(request_id)) - { - Some(pos) => self.parent_queue.remove(pos), - None => { - if block.is_some() { - // No pending request, invalid request_id or coding error - warn!(self.log, "BlocksByRoot response unknown"; "request_id" => request_id); - } - // it could be a stream termination None, in which case we just ignore it - return; - } - }; - match block { Some(block) => { + // data was returned, not just a stream termination + + // check if this is a single block lookup - i.e we were searching for a specific hash + let mut single_block_hash = None; + if let Some((block_hash, data_received)) = + self.single_block_lookups.get_mut(&request_id) + { + // update the state of the lookup indicating a block was received from the peer + *data_received = true; + single_block_hash = Some(block_hash.clone()); + } + if let Some(block_hash) = single_block_hash { + self.single_block_lookup_response(peer_id, block, block_hash); + return; + } + + // This wasn't a single block lookup request, it must be a response to a parent request search + // find the request + let mut parent_request = match self + .parent_queue + .iter() + .position(|request| request.pending == Some(request_id)) + { + // we remove from the queue and process it. It will get re-added if required + Some(pos) => self.parent_queue.remove(pos), + None => { + // No pending request, invalid request_id or coding error + warn!(self.log, "BlocksByRoot response unknown"; "request_id" => request_id); + return; + } + }; // add the block to response parent_request.downloaded_blocks.push(block); // queue for processing self.process_parent_request(parent_request); } None => { + // this is a stream termination + + // stream termination for a single block lookup, remove the key + if let Some((block_hash, data_received)) = + self.single_block_lookups.remove(&request_id) + { + // the peer didn't respond with a block that it referenced + if !data_received { + warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", block_hash), "peer_id" => format!("{}", peer_id)); + self.network.downvote_peer(peer_id); + } + return; + } + + // This wasn't a single block lookup request, it must be a response to a parent request search + // find the request and remove it + let mut parent_request = match self + .parent_queue + .iter() + .position(|request| request.pending == Some(request_id)) + { + Some(pos) => self.parent_queue.remove(pos), + None => { + // No pending request, the parent request has been processed and this is + // the resulting stream termination. + return; + } + }; + // An empty response has been returned to a parent request // if an empty response is given, the peer didn't have the requested block, try again parent_request.failed_attempts += 1; parent_request.last_submitted_peer = peer_id; @@ -350,6 +384,7 @@ impl SyncManager { // verify the hash is correct and try and process the block if expected_block_hash != block.canonical_root() { // the peer that sent this, sent us the wrong block + warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => format!("{}", peer_id)); self.network.downvote_peer(peer_id); return; } @@ -405,7 +440,7 @@ impl SyncManager { } // Make sure this block is not already being searched for - // TODO: Potentially store a hashset of blocks for O(1) lookups + // NOTE: Potentially store a hashset of blocks for O(1) lookups for parent_req in self.parent_queue.iter() { if parent_req .downloaded_blocks @@ -440,18 +475,18 @@ impl SyncManager { }; if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { - self.single_block_lookups.insert(request_id, block_hash); + self.single_block_lookups + .insert(request_id, (block_hash, false)); } } fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) { trace!(self.log, "Sync manager received a failed RPC"); // remove any single block lookups - self.single_block_lookups.remove(&request_id); - - // notify the range sync - self.range_sync - .inject_error(&mut self.network, peer_id.clone(), request_id); + if self.single_block_lookups.remove(&request_id).is_some() { + // this was a single block request lookup, look no further + return; + } // increment the failure of a parent lookup if the request matches a parent search if let Some(pos) = self @@ -463,7 +498,12 @@ impl SyncManager { parent_request.failed_attempts += 1; parent_request.last_submitted_peer = peer_id; self.request_parent(parent_request); + return; } + + // otherwise, this is a range sync issue, notify the range sync + self.range_sync + .inject_error(&mut self.network, peer_id.clone(), request_id); } fn peer_disconnect(&mut self, peer_id: &PeerId) { @@ -504,6 +544,7 @@ impl SyncManager { } } + /// A new block has been received for a parent lookup query, process it. fn process_parent_request(&mut self, mut parent_request: ParentRequests) { // verify the last added block is the parent of the last requested block @@ -512,7 +553,7 @@ impl SyncManager { self.log, "There must be at least two blocks in a parent request lookup at all times" ); - panic!("There must be at least two blocks in parent request lookup at all time"); + panic!("There must be at least two blocks in parent request lookup at all times"); // fail loudly } let previous_index = parent_request.downloaded_blocks.len() - 2; @@ -530,87 +571,135 @@ impl SyncManager { let _ = parent_request.downloaded_blocks.pop(); let peer = parent_request.last_submitted_peer.clone(); - debug!(self.log, "Peer sent invalid parent."; - "peer_id" => format!("{:?}",peer), - "received_block" => format!("{}", block_hash), - "expected_parent" => format!("{}", expected_hash), + warn!(self.log, "Peer sent invalid parent."; + "peer_id" => format!("{:?}",peer), + "received_block" => format!("{}", block_hash), + "expected_parent" => format!("{}", expected_hash), ); self.request_parent(parent_request); self.network.downvote_peer(peer); } else { - let mut successes = 0; + // The last block in the queue is the only one that has not attempted to be processed yet. + // + // The logic here attempts to process the last block. If it can be processed, the rest + // of the blocks must have known parents. If any of them cannot be processed, we + // consider the entire chain corrupt and drop it, notifying the user. + // + // If the last block in the queue cannot be processed, we also drop the entire queue. + // If the last block in the queue has an unknown parent, we continue the parent + // lookup-search. - // try and process the list of blocks up to the requested block + let total_blocks_to_process = parent_request.downloaded_blocks.len(); + + if let Some(chain) = self.chain.upgrade() { + let newest_block = parent_request + .downloaded_blocks + .pop() + .expect("There is always at least one block in the queue"); + match chain.process_block(newest_block.clone()) { + Ok(BlockProcessingOutcome::ParentUnknown { .. }) => { + // need to keep looking for parents + // add the block back to the queue and continue the search + parent_request.downloaded_blocks.push(newest_block); + self.request_parent(parent_request); + return; + } + Ok(BlockProcessingOutcome::Processed { .. }) + | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} + Ok(outcome) => { + // all else we consider the chain a failure and downvote the peer that sent + // us the last block + warn!( + self.log, "Invalid parent chain. Downvoting peer"; + "outcome" => format!("{:?}", outcome), + "last_peer" => format!("{:?}", parent_request.last_submitted_peer), + ); + self.network + .downvote_peer(parent_request.last_submitted_peer.clone()); + return; + } + Err(e) => { + warn!( + self.log, "Parent chain processing error. Downvoting peer"; + "error" => format!("{:?}", e), + "last_peer" => format!("{:?}", parent_request.last_submitted_peer), + ); + self.network + .downvote_peer(parent_request.last_submitted_peer.clone()); + return; + } + } + } else { + // chain doesn't exist, drop the parent queue and return + return; + } + + //TODO: Shift this to a block processing thread + + // the last received block has been successfully processed, process all other blocks in the + // chain while let Some(block) = parent_request.downloaded_blocks.pop() { // check if the chain exists if let Some(chain) = self.chain.upgrade() { - match chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::ParentUnknown { .. }) => { - // need to keep looking for parents - parent_request.downloaded_blocks.push(block); - self.request_parent(parent_request); - break; - } - Ok(BlockProcessingOutcome::Processed { .. }) => successes += 1, - Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} + match chain.process_block(block) { + Ok(BlockProcessingOutcome::Processed { .. }) + | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} // continue to the next block + + // all else is considered a failure Ok(outcome) => { - // it's a future slot or an invalid block, remove it and try again - parent_request.failed_attempts += 1; + // the previous blocks have failed, notify the user the chain lookup has + // failed and drop the parent queue debug!( - self.log, "Invalid parent block"; + self.log, "Invalid parent chain. Past blocks failure"; "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}", parent_request.last_submitted_peer), ); self.network .downvote_peer(parent_request.last_submitted_peer.clone()); - self.request_parent(parent_request); break; } Err(e) => { - parent_request.failed_attempts += 1; warn!( - self.log, "Parent processing error"; + self.log, "Parent chain processing error."; "error" => format!("{:?}", e) ); self.network .downvote_peer(parent_request.last_submitted_peer.clone()); - self.request_parent(parent_request); break; } } } else { + // chain doesn't exist, end the processing break; } } - if successes > 0 { - if let Some(chain) = self.chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "block_imports" => successes, - "location" => "parent request" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "parent request" - ), - }; - } + // at least one block has been processed, run fork-choice + if let Some(chain) = self.chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "block_imports" => total_blocks_to_process - parent_request.downloaded_blocks.len(), + "location" => "parent request" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "parent request" + ), + }; } } } + /// Progresses a parent request query. + /// + /// This checks to ensure there a peers to progress the query, checks for failures and + /// initiates requests. fn request_parent(&mut self, mut parent_request: ParentRequests) { - // check to make sure there are peers to search for the parent from - if self.full_peers.is_empty() { - return; - } - // check to make sure this request hasn't failed if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE || parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE @@ -622,21 +711,26 @@ impl SyncManager { return; // drop the request } - let parent_hash = parent_request - .downloaded_blocks - .last() - .expect("The parent queue should never be empty") - .parent_root(); + let parent_hash = if let Some(block) = parent_request.downloaded_blocks.last() { + block.parent_root() + } else { + crit!(self.log, "Parent queue is empty. This should never happen"); + return; + }; + let request = BlocksByRootRequest { block_roots: vec![parent_hash], }; // select a random fully synced peer to attempt to download the parent block - let peer_id = self.full_peers.iter().next().expect("List is not empty"); + let available_peers = self.full_peers.iter().collect::>(); + let peer_id = if let Some(peer_id) = available_peers.choose(&mut rand::thread_rng()) { + (**peer_id).clone() + } else { + // there were no peers to choose from. We drop the lookup request + return; + }; - if let Ok(request_id) = self - .network - .blocks_by_root_request(peer_id.clone(), request) - { + if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { // if the request was successful add the queue back into self parent_request.pending = Some(request_id); self.parent_queue.push(parent_request);