From c259d6c00637e6372cc75afd1c6cd2debe009424 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sun, 18 Aug 2019 03:36:13 +1000 Subject: [PATCH] First draft sync re-write. WIP --- beacon_node/network/src/message_handler.rs | 10 +- beacon_node/network/src/sync/import_queue.rs | 307 ------- beacon_node/network/src/sync/manager.rs | 810 +++++++++++++------ beacon_node/network/src/sync/simple_sync.rs | 409 ++-------- 4 files changed, 661 insertions(+), 875 deletions(-) delete mode 100644 beacon_node/network/src/sync/import_queue.rs diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 6a9a40369..fd10c5aea 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -118,7 +118,14 @@ impl MessageHandler { hello_message, &mut self.network_context, ), - RPCRequest::Goodbye(goodbye_reason) => self.sync.on_goodbye(peer_id, goodbye_reason), + RPCRequest::Goodbye(goodbye_reason) => { + debug!( + self.log, "PeerGoodbye"; + "peer" => format!("{:?}", peer_id), + "reason" => format!("{:?}", reason), + ); + self.sync.on_disconnect(peer_id), + }, RPCRequest::BeaconBlocks(request) => self.sync.on_beacon_blocks_request( peer_id, request_id, @@ -167,6 +174,7 @@ impl MessageHandler { Ok(beacon_blocks) => { self.sync.on_beacon_blocks_response( peer_id, + request_id, beacon_blocks, &mut self.network_context, ); diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs deleted file mode 100644 index 5503ed64f..000000000 --- a/beacon_node/network/src/sync/import_queue.rs +++ /dev/null @@ -1,307 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::PeerId; -use slog::error; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tree_hash::TreeHash; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, EthSpec, Hash256, Slot}; - -/// Provides a queue for fully and partially built `BeaconBlock`s. -/// -/// The queue is fundamentally a `Vec` where no two items have the same -/// `item.block_root`. This struct it backed by a `Vec` not a `HashMap` for the following two -/// reasons: -/// -/// - When we receive a `BeaconBlockBody`, the only way we can find it's matching -/// `BeaconBlockHeader` is to find a header such that `header.beacon_block_body == -/// tree_hash_root(body)`. Therefore, if we used a `HashMap` we would need to use the root of -/// `BeaconBlockBody` as the key. -/// - It is possible for multiple distinct blocks to have identical `BeaconBlockBodies`. Therefore -/// we cannot use a `HashMap` keyed by the root of `BeaconBlockBody`. -pub struct ImportQueue { - pub chain: Arc>, - /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. - partials: HashMap>, - /// Time before a queue entry is considered state. - pub stale_time: Duration, - /// Logging - log: slog::Logger, -} - -impl ImportQueue { - /// Return a new, empty queue. - pub fn new(chain: Arc>, stale_time: Duration, log: slog::Logger) -> Self { - Self { - chain, - partials: HashMap::new(), - stale_time, - log, - } - } - - /// Returns true of the if the `BlockRoot` is found in the `import_queue`. - pub fn contains_block_root(&self, block_root: Hash256) -> bool { - self.partials.contains_key(&block_root) - } - - /// Attempts to complete the `BlockRoot` if it is found in the `import_queue`. - /// - /// Returns an Enum with a `PartialBeaconBlockCompletion`. - /// Does not remove the `block_root` from the `import_queue`. - pub fn attempt_complete_block( - &self, - block_root: Hash256, - ) -> PartialBeaconBlockCompletion { - if let Some(partial) = self.partials.get(&block_root) { - partial.attempt_complete() - } else { - PartialBeaconBlockCompletion::MissingRoot - } - } - - /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial - /// if it exists. - pub fn remove(&mut self, block_root: Hash256) -> Option> { - self.partials.remove(&block_root) - } - - /// Flushes all stale entries from the queue. - /// - /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the - /// past. - pub fn remove_stale(&mut self) { - let stale_time = self.stale_time; - - self.partials - .retain(|_, partial| partial.inserted + stale_time > Instant::now()) - } - - /// Returns `true` if `self.chain` has not yet processed this block. - pub fn chain_has_not_seen_block(&self, block_root: &Hash256) -> bool { - self.chain - .is_new_block_root(&block_root) - .unwrap_or_else(|_| { - error!(self.log, "Unable to determine if block is new."); - true - }) - } - - /// Adds the `block_roots` to the partials queue. - /// - /// If a `block_root` is not in the queue and has not been processed by the chain it is added - /// to the queue and it's block root is included in the output. - pub fn enqueue_block_roots( - &mut self, - block_roots: &[BlockRootSlot], - sender: PeerId, - ) -> Vec { - // TODO: This will currently not return a `BlockRootSlot` if this root exists but there is no header. - // It would be more robust if it did. - let new_block_root_slots: Vec = block_roots - .iter() - // Ignore any roots already stored in the queue. - .filter(|brs| !self.contains_block_root(brs.block_root)) - // Ignore any roots already processed by the chain. - .filter(|brs| self.chain_has_not_seen_block(&brs.block_root)) - .cloned() - .collect(); - - self.partials.extend( - new_block_root_slots - .iter() - .map(|brs| PartialBeaconBlock { - slot: brs.slot, - block_root: brs.block_root, - sender: sender.clone(), - header: None, - body: None, - inserted: Instant::now(), - }) - .map(|partial| (partial.block_root, partial)), - ); - - new_block_root_slots - } - - /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for - /// which we should use to request `BeaconBlockBodies`. - /// - /// If a `header` is not in the queue and has not been processed by the chain it is added to - /// the queue and it's block root is included in the output. - /// - /// If a `header` is already in the queue, but not yet processed by the chain the block root is - /// not included in the output and the `inserted` time for the partial record is set to - /// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale. - pub fn enqueue_headers( - &mut self, - headers: Vec, - sender: PeerId, - ) -> Vec { - let mut required_bodies: Vec = vec![]; - - for header in headers { - let block_root = Hash256::from_slice(&header.canonical_root()[..]); - - if self.chain_has_not_seen_block(&block_root) - && !self.insert_header(block_root, header, sender.clone()) - { - // If a body is empty - required_bodies.push(block_root); - } - } - - required_bodies - } - - /// If there is a matching `header` for this `body`, adds it to the queue. - /// - /// If there is no `header` for the `body`, the body is simply discarded. - pub fn enqueue_bodies( - &mut self, - bodies: Vec>, - sender: PeerId, - ) -> Option { - let mut last_block_hash = None; - for body in bodies { - last_block_hash = self.insert_body(body, sender.clone()); - } - - last_block_hash - } - - pub fn enqueue_full_blocks(&mut self, blocks: Vec>, sender: PeerId) { - for block in blocks { - self.insert_full_block(block, sender.clone()); - } - } - - /// Inserts a header to the queue. - /// - /// If the header already exists, the `inserted` time is set to `now` and not other - /// modifications are made. - /// Returns true is `body` exists. - fn insert_header( - &mut self, - block_root: Hash256, - header: BeaconBlockHeader, - sender: PeerId, - ) -> bool { - let mut exists = false; - self.partials - .entry(block_root) - .and_modify(|partial| { - partial.header = Some(header.clone()); - partial.inserted = Instant::now(); - if partial.body.is_some() { - exists = true; - } - }) - .or_insert_with(|| PartialBeaconBlock { - slot: header.slot, - block_root, - header: Some(header), - body: None, - inserted: Instant::now(), - sender, - }); - exists - } - - /// Updates an existing partial with the `body`. - /// - /// If the body already existed, the `inserted` time is set to `now`. - /// - /// Returns the block hash of the inserted body - fn insert_body( - &mut self, - body: BeaconBlockBody, - sender: PeerId, - ) -> Option { - let body_root = Hash256::from_slice(&body.tree_hash_root()[..]); - let mut last_root = None; - - self.partials.iter_mut().for_each(|(root, mut p)| { - if let Some(header) = &mut p.header { - if body_root == header.body_root { - p.inserted = Instant::now(); - p.body = Some(body.clone()); - p.sender = sender.clone(); - last_root = Some(*root); - } - } - }); - - last_root - } - - /// Updates an existing `partial` with the completed block, or adds a new (complete) partial. - /// - /// If the partial already existed, the `inserted` time is set to `now`. - fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) { - let block_root = Hash256::from_slice(&block.canonical_root()[..]); - - let partial = PartialBeaconBlock { - slot: block.slot, - block_root, - header: Some(block.block_header()), - body: Some(block.body), - inserted: Instant::now(), - sender, - }; - - self.partials - .entry(block_root) - .and_modify(|existing_partial| *existing_partial = partial.clone()) - .or_insert(partial); - } -} - -/// Individual components of a `BeaconBlock`, potentially all that are required to form a full -/// `BeaconBlock`. -#[derive(Clone, Debug)] -pub struct PartialBeaconBlock { - pub slot: Slot, - /// `BeaconBlock` root. - pub block_root: Hash256, - pub header: Option, - pub body: Option>, - /// The instant at which this record was created or last meaningfully modified. Used to - /// determine if an entry is stale and should be removed. - pub inserted: Instant, - /// The `PeerId` that last meaningfully contributed to this item. - pub sender: PeerId, -} - -impl PartialBeaconBlock { - /// Attempts to build a block. - /// - /// Does not comsume the `PartialBeaconBlock`. - pub fn attempt_complete(&self) -> PartialBeaconBlockCompletion { - if self.header.is_none() { - PartialBeaconBlockCompletion::MissingHeader(self.slot) - } else if self.body.is_none() { - PartialBeaconBlockCompletion::MissingBody - } else { - PartialBeaconBlockCompletion::Complete( - self.header - .clone() - .unwrap() - .into_block(self.body.clone().unwrap()), - ) - } - } -} - -/// The result of trying to convert a `BeaconBlock` into a `PartialBeaconBlock`. -pub enum PartialBeaconBlockCompletion { - /// The partial contains a valid BeaconBlock. - Complete(BeaconBlock), - /// The partial does not exist. - MissingRoot, - /// The partial contains a `BeaconBlockRoot` but no `BeaconBlockHeader`. - MissingHeader(Slot), - /// The partial contains a `BeaconBlockRoot` and `BeaconBlockHeader` but no `BeaconBlockBody`. - MissingBody, -} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 52c1a72c6..a4ce544ec 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1,283 +1,639 @@ +const MAX_BLOCKS_PER_REQUEST: usize = 10; -const MAXIMUM_BLOCKS_PER_REQUEST: usize = 10; -const SIMULTANEOUS_REQUESTS: usize = 10; -use super::simple_sync::FUTURE_SLOT_TOLERANCE; +/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. +const SLOT_IMPORT_TOLERANCE: u64 = 10; -struct Chunk { - id: usize, - start_slot: Slot, - end_slot: Slot, - } +const PARENT_FAIL_TOLERANCE: usize = 3; +const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE*2; - -struct CompletedChunk { - peer_id: PeerId, - chunk: Chunk, - blocks: Vec, +enum BlockRequestsState { + QueuedForward, + QueuedBackward, + Pending(RequestId), + Complete, } -struct ProcessedChunk { - peer_id: PeerId, - chunk: Chunk, +struct BlockRequests { + target_head_slot: Slot + target_head_root: Hash256, + downloaded_blocks: Vec, + state: State, } -#[derive(PartialEq)] -pub enum SyncState { - Idle, - Downloading, - ColdSync { - max_wanted_slot: Slot, - max_wanted_hash: Hash256, +struct ParentRequests { + downloaded_blocks: Vec, + attempts: usize, + last_submitted_peer: PeerId, // to downvote the submitting peer. + state: BlockRequestsState, +} + +impl BlockRequests { + + // gets the start slot for next batch + // last block slot downloaded plus 1 + fn next_start_slot(&self) -> Option { + if !self.downloaded_blocks.is_empty() { + match self.state { + BlockRequestsState::QueuedForward => { + let last_element_index = self.downloaded_blocks.len() -1; + Some(downloaded_blocks[last_element_index].slot.add(1)) + } + BlockRequestsState::QueuedBackward => { + let earliest_known_slot = self.downloaded_blocks[0].slot; + Some(earliest_known_slot.add(1).sub(MAX_BLOCKS_PER_REQUEST)) + } + } + } + else { + None + } } } -pub enum SyncManagerState { - RequestBlocks(peer_id, BeaconBlockRequest), +enum ManagerState { + Syncing, + Regular, Stalled, - Idle, } -pub struct PeerSyncInfo { - peer_id: PeerId, - fork_version: [u8,4], - finalized_root: Hash256, - finalized_epoch: Epoch, - head_root: Hash256, - head_slot: Slot, - requested_slot_skip: Option<(Slot, usize)>, +enum ImportManagerOutcome { + Idle, + RequestBlocks{ + peer_id: PeerId, + request_id: RequestId, + request: BeaconBlocksRequest, + }, + RecentRequest(PeerId, RecentBeaconBlocksRequest), + DownvotePeer(PeerId), } -pub(crate) struct SyncManager { + +pub struct ImportManager { /// A reference to the underlying beacon chain. chain: Arc>, - /// A mapping of Peers to their respective PeerSyncInfo. - available_peers: HashMap, - wanted_chunks: Vec, - pending_chunks: HashMap, - completed_chunks: Vec, - processed_chunks: Vec, // ordered - multi_peer_sections: HashMap - - current_requests: usize, - latest_wanted_slot: Option, - sync_status: SyncStatus, - to_process_chunk_id: usize, + state: MangerState, + import_queue: HashMap, + parent_queue: Vec, + full_peers: Hashset, + current_req_id: usize, log: Logger, - } -impl SyncManager { - /// Adds a sync-able peer and determines which blocks to download given the current state of - /// the chain, known peers and currently requested blocks. - fn add_sync_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo, network &mut NetworkContext) { +impl ImportManager { + pub fn add_peer(&mut self, peer_id, remote: PeerSyncInfo) { + // TODO: Improve comments. + // initially try to download blocks from our current head + // then backwards search all the way back to our finalized epoch until we match on a chain + // has to be done sequentially to find next slot to start the batch from + let local = PeerSyncInfo::from(&self.chain); - let remote_finalized_slot = remote.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let local_finalized_slot = local.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); - // cold sync - if remote_finalized_slot > local.head_slot { - if let SyncState::Idle || SyncState::Downloading = self.sync_state { - info!(self.log, "Cold Sync Started", "start_slot" => local.head_slot, "latest_known_finalized" => remote_finalized_slot); - self.sync_state = SyncState::ColdSync{Slot::from(0), remote.finalized_hash} - } - - if let SyncState::ColdSync{max_wanted_slot, max_wanted_hjash } = self.sync_state { - - // We don't assume that our current head is the canonical chain. So we request blocks from - // our last finalized slot to ensure we are on the finalized chain. - if max_wanted_slot < remote_finalized_slot { - let remaining_blocks = remote_finalized_slot - max_wanted_slot; - for chunk in (0..remaining_blocks/MAXIMUM_BLOCKS_PER_REQUEST) { - self.wanted_chunks.push( - Chunk { - id: self.current_chunk_id, - previous_chunk: self.curent_chunk_id.saturating_sub(1), - start_slot: chunk*MAXIMUM_BLOCKS_PER_REQUEST + self.last_wanted_slot, - end_slot: (section+1)*MAXIMUM_BLOCKS_PER_REQUEST +self.last_wanted_slot, - }) - self.current_chunk_id +=1; - } - - // add any extra partial chunks - self.pending_section.push( Section { - start_slot: (remaining_blocks/MAXIMUM_BLOCKS_PER_REQUEST) + 1, - end_slot: remote_finalized_slot, - }) - self.current_chunk_id +=1; - - info!(self.log, "Cold Sync Updated", "start_slot" => local.head_slot, "latest_known_finalized" => remote_finalized_slot); - - self.sync_state = SyncState::ColdSync{remote_finalized_slot, remote.finalized_hash} - } + // If a peer is within SLOT_IMPORT_TOLERANCE from out head slot, ignore a batch sync + if remote.head_slot.sub(local.head_slot) < SLOT_IMPORT_TOLERANCE { + trace!(self.log, "Ignoring full sync with peer"; + "peer" => peer_id, + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local.head_slot, + ); + // remove the peer from the queue if it exists + self.import_queue.remove(&peer_id); + return; } - else { // hot sync - if remote_head_slot > self.chain.head().beacon_state.slot { - if let SyncState::Idle = self.sync_state { - self.sync_state = SyncState::Downloading - info!(self.log, "Sync Started", "start_slot" => local.head_slot, "latest_known_head" => remote.head_slot.as_u64()); + if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { + // update the target head slot + if remote.head_slot > requested_block.target_head_slot { + block_requests.target_head_slot = remote.head_slot; } - self.latest_known_slot = remote_head_slot; - //TODO Build requests. + } else { + let block_requests = BlockRequests { + target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called + target_head_root: remote.head_root, + downloaded_blocks: Vec::new(), + state: RequestedBlockState::Queued } + self.import_queue.insert(peer_id, block_requests); } - available_peers.push(remote); - } - pub fn add_blocks(&mut self, chunk_id: RequestId, peer_id: PeerId, blocks: Vec) { - - if SyncState::ColdSync{max_wanted_slot, max_wanted_hash} = self.sync_state { - - let chunk = match self.pending_chunks.remove(&peer_id) { - Some(chunks) => { - match chunks.find(|chunk| chunk.id == chunk_id) { - Some(chunk) => chunk, - None => { - warn!(self.log, "Received blocks for an unknown chunk"; - "peer"=> peer_id); - return; - } - } - }, - None => { - warn!(self.log, "Received blocks without a request"; - "peer"=> peer_id); + pub fn beacon_blocks_response(peer_id: PeerId, request_id: RequestId, blocks: Vec) { + + // find the request + let block_requests = match self.import_queue.get_mut(&peer_id) { + Some(req) if req.state = RequestedBlockState::Pending(request_id) => req, + None => { + // No pending request, invalid request_id or coding error + warn!(self.log, "BeaconBlocks response unknown"; "request_id" => request_id); return; - } - }; + } + }; - // add to completed - self.current_requests -= 1; - self.completed_chunks.push(CompletedChunk(peer_id, Chunk)); + // The response should contain at least one block. + // + // If we are syncing up to a target head block, at least the target head block should be + // returned. If we are syncing back to our last finalized block the request should return + // at least the last block we received (last known block). In diagram form: + // + // unknown blocks requested blocks downloaded blocks + // |-------------------|------------------------|------------------------| + // ^finalized slot ^ requested start slot ^ last known block ^ remote head + + if blocks.is_empty() { + warn!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); + block_requests.state = RequestedBlockState::Failed; + return; + } + + // Add the newly downloaded blocks to the current list of downloaded blocks. This also + // determines if we are syncing forward or backward. + let syncing_forwards = { + if block_requests.blocks.is_empty() { + block_requests.blocks.push(blocks); + true + } + else if block_requests.blocks[0].slot < blocks[0].slot { // syncing forwards + // verify the peer hasn't sent overlapping blocks - ensuring the strictly + // increasing blocks in a batch will be verified during the processing + if block_requests.next_slot() > blocks[0].slot { + warn!(self.log, "BeaconBlocks response returned duplicate blocks", "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.next_slot()); + block_requests.state = RequestedBlockState::Failed; + return; + } + + block_requests.blocks.push(blocks); + true + } + else { false } + }; + + + // Determine if more blocks need to be downloaded. There are a few cases: + // - We have downloaded a batch from our head_slot, which has not reached the remotes head + // (target head). Therefore we need to download another sequential batch. + // - The latest batch includes blocks that greater than or equal to the target_head slot, + // which means we have caught up to their head. We then check to see if the first + // block downloaded matches our head. If so, we are on the same chain and can process + // the blocks. If not we need to sync back further until we are on the same chain. So + // request more blocks. + // - We are syncing backwards (from our head slot) and need to check if we are on the same + // chain. If so, process the blocks, if not, request more blocks all the way up to + // our last finalized slot. + + if syncing_forwards { + // does the batch contain the target_head_slot + let last_element_index = block_requests.blocks.len()-1; + if block_requests[last_element_index].slot >= block_requests.target_slot { + // if the batch is on our chain, this is complete and we can then process. + // Otherwise start backwards syncing until we reach a common chain. + let earliest_slot = block_requests_blocks[0].slot + if block_requests.blocks[0] == self.chain.get_block_by_slot(earliest_slot) { + block_requests.state = RequestedBlockState::Complete; + return; + } + + // not on the same chain, request blocks backwards + // binary search, request half the distance between the earliest block and our + // finalized slot + let state = &beacon_chain.head().beacon_state; + let local_finalized_slot = state.finalized_checkpoint.epoch; //TODO: Convert to slot + // check that the request hasn't failed by having no common chain + if local_finalized_slot >= block_requests.blocks[0] { + warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); + block_requests.state = RequestedBlockState::Failed; + return; + } + + // Start a backwards sync by requesting earlier blocks + // There can be duplication in downloaded blocks here if there are a large number + // of skip slots. In all cases we at least re-download the earliest known block. + // It is unlikely that a backwards sync in required, so we accept this duplication + // for now. + block_requests.state = RequestedBlockState::QueuedBackward; + } + else { + // batch doesn't contain the head slot, request the next batch + block_requests.state = RequestedBlockState::QueuedForward; + } + } + else { + // syncing backwards + // if the batch is on our chain, this is complete and we can then process. + // Otherwise continue backwards + let earliest_slot = block_requests_blocks[0].slot + if block_requests.blocks[0] == self.chain.get_block_by_slot(earliest_slot) { + block_requests.state = RequestedBlockState::Complete; + return; + } + block_requests.state = RequestedBlockState::QueuedBackward; + } } - pub fn inject_error(id: RequestId, peer_id) { - if let SyncState::ColdSync{ _max_wanted_slot, _max_wanted_hash } { - match self.pending_chunks.get(&peer_id) { - Some(chunks) => { - if let Some(pos) = chunks.iter().position(|c| c.id == id) { - chunks.remove(pos); - } - }, - None => { - debug!(self.log, - "Received an error for an unknown request"; - "request_id" => id, - "peer" => peer_id - ); + pub fn recent_blocks_response(peer_id: PeerId, request_id: RequestId, blocks: Vec) { + + // find the request + let parent_request = match self.parent_queue.get_mut(&peer_id) { + Some(req) if req.state = RequestedBlockState::Pending(request_id) => req, + None => { + // No pending request, invalid request_id or coding error + warn!(self.log, "RecentBeaconBlocks response unknown"; "request_id" => request_id); + return; + } + }; + + // if an empty response is given, the peer didn't have the requested block, try again + if blocks.is_empty() { + parent_request.attempts += 1; + parent_request.state = RequestedBlockState::QueuedForward; + parent_request.last_submitted_peer = peer_id; + return; + } + + // currently only support a single block lookup. Reject any response that has more than 1 + // block + if blocks.len() != 1 { + //TODO: Potentially downvote the peer + debug!(self.log, "Peer sent more than 1 parent. Ignoring"; + "peer_id" => peer_id, + "no_parents" => blocks.len() + ); + return; + } + + + // queue for processing + parent_request.state = RequestedBlockState::Complete; + } + + + pub fn inject_error(peer_id: PeerId, id: RequestId) { + //TODO: Remove block state from pending + } + + pub fn peer_disconnect(peer_id: PeerId) { + self.import_queue.remove(&peer_id); + self.full_peers.remove(&peer_id); + self.update_state(); + } + + pub fn add_full_peer(peer_id: PeerId) { + debug!( + self.log, "Fully synced peer added"; + "peer" => format!("{:?}", peer_id), + ); + self.full_peers.insert(peer_id); + self.update_state(); + } + + pub fn add_unknown_block(&mut self,block: BeaconBlock) { + // if we are not in regular sync mode, ignore this block + if self.state == ManagerState::Regular { + return; + } + + // make sure this block is not already being searched for + // TODO: Potentially store a hashset of blocks for O(1) lookups + for parent_req in self.parent_queue.iter() { + if let Some(_) = parent_req.downloaded_blocks.iter().find(|d_block| d_block == block) { + // we are already searching for this block, ignore it + return; + } + } + + let req = ParentRequests { + downloaded_blocks: vec![block], + failed_attempts: 0, + state: RequestedBlockState::QueuedBackward + } + + self.parent_queue.push(req); + } + + pub fn poll() -> ImportManagerOutcome { + + loop { + // update the state of the manager + self.update_state(); + + // process potential block requests + if let Some(outcome) = self.process_potential_block_requests() { + return outcome; + } + + // process any complete long-range batches + if let Some(outcome) = self.process_complete_batches() { + return outcome; + } + + // process any parent block lookup-requests + if let Some(outcome) = self.process_parent_requests() { + return outcome; + } + + // process any complete parent lookups + if let (re_run, outcome) = self.process_complete_parent_requests() { + if let Some(outcome) = outcome { + return outcome; + } + else if !re_run { + break; } } } + + return ImportManagerOutcome::Idle; + } - pub fn poll(&mut self) -> SyncManagerState { - // if cold sync - if let SyncState::ColdSync(waiting_slot, max_wanted_slot, max_wanted_hash) = self.sync_state { + fn update_state(&mut self) { + let previous_state = self.state; + self.state = { + if !self.import_queue.is_empty() { + ManagerState::Syncing + } + else if !self.full_peers.is_empty() { + ManagerState::Regualar + } + else { + ManagerState::Stalled } + }; + if self.state != previous_state { + info!(self.log, "Syncing state updated", + "old_state" => format!("{:?}", previous_state) + "new_state" => format!("{:?}", self.state) + ); + } + } - // Try to process completed chunks - for completed_chunk in self.completed_chunks { - let chunk = completed_chunk.1; - let last_chunk_id = { - let no_processed_chunks = self.processed_chunks.len(); - if elements == 0 { 0 } else { self.processed_chunks[no_processed_chunks].id } - }; - if chunk.id == last_chunk_id + 1 { - // try and process the chunk - for block in chunk.blocks { - let processing_result = self.chain.process_block(block.clone()); - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutCome::Processed { block_root} => { - // block successfully processed - }, - BlockProcessingOutcome::BlockIsAlreadyKnown => { - warn!( - self.log, "Block Already Known"; - "source" => source, - "sync" => "Cold Sync", - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - "peer" => format!("{:?}", chunk.0), - ); - }, - _ => { - // An error has occurred - // This could be due to the previous chunk or the current chunk. - // Re-issue both. - warn!( - self.log, "Faulty Chunk"; - "source" => source, - "sync" => "Cold Sync", - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - "peer" => format!("{:?}", chunk.0), - "outcome" => format!("{:?}", outcome), - ); - // re-issue both chunks - // if both are the same peer. Downgrade the peer. - let past_chunk = self.processed_chunks.pop() - self.wanted_chunks.insert(0, chunk.clone()); - self.wanted_chunks.insert(0, past_chunk.clone()); - if chunk.0 == past_chunk.peer_id { - // downgrade peer - return SyncManagerState::DowngradePeer(chunk.0); - } - break; - } - } - } - } - // chunk successfully processed - debug!(self.log, - "Chunk Processed"; - "id" => chunk.id - "start_slot" => chunk.start_slot, - "end_slot" => chunk.end_slot, + fn process_potential_block_requests(&mut self) -> Option { + // check if an outbound request is required + // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p + // layer and not needed here. + // If any in queued state we submit a request. + + + // remove any failed batches + self.import_queue.retain(|peer_id, block_request| { + if block_request.state == RequestedBlockState::Failed { + debug!(self.log, "Block import from peer failed", + "peer_id" => peer_id, + "downloaded_blocks" => block_request.downloaded.blocks.len() ); - self.processed_chunks.push(chunk); - } + false } + else { true } + }); - // chunks completed, update the state - self.sync_state = SyncState::ColdSync{waiting_slot, max_wanted_slot, max_wanted_hash}; - // Remove stales + for (peer_id, block_requests) in self.import_queue.iter_mut() { + if let Some(request) = requests.iter().find(|req| req.state == RequestedBlockState::QueuedForward || req.state == RequestedBlockState::QueuedBackward) { - // Spawn requests - if self.current_requests <= SIMULTANEOUS_REQUESTS { - if !self.wanted_chunks.is_empty() { - let chunk = self.wanted_chunks.remove(0); - for n in (0..self.peers.len()).rev() { - let peer = self.peers.swap_remove(n); - let peer_finalized_slot = peer.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); - if peer_finalized_slot >= chunk.end_slot { - *self.pending.chunks.entry(&peer_id).or_insert_with(|| Vec::new).push(chunk); - self.active_peers.push(peer); - self.current_requests +=1; - let block_request = BeaconBlockRequest { - head_block_root, - start_slot: chunk.start_slot, - count: chunk.end_slot - chunk.start_slot - step: 1 - } - return SyncManagerState::BlockRequest(peer, block_request); - } - } - // no peers for this chunk - self.wanted_chunks.push(chunk); - return SyncManagerState::Stalled + let request.state = RequestedBlockState::Pending(self.current_req_id); + self.current_req_id +=1; + + let req = BeaconBlocksRequest { + head_block_root: request.target_root, + start_slot: request.next_start_slot().unwrap_or_else(|| self.chain.head().slot), + count: MAX_BLOCKS_PER_REQUEST, + step: 0 } + return Some(ImportManagerOutCome::RequestBlocks{ peer_id, req }); } } - // if hot sync - return SyncManagerState::Idle + None + } + + fn process_complete_batches(&mut self) -> Option { + + let completed_batches = self.import_queue.iter().filter(|_peer, block_requests| block_requests.state == RequestedState::Complete).map(|peer, _| peer).collect::>(); + for peer_id in completed_batches { + let block_requests = self.import_queue.remove(&peer_id).unwrap("key exists"); + match self.process_blocks(block_requests.downloaded_blocks) { + Ok(()) => { + //TODO: Verify it's impossible to have empty downloaded_blocks + last_element = block_requests.downloaded_blocks.len() -1 + debug!(self.log, "Blocks processed successfully"; + "peer" => peer_id, + "start_slot" => block_requests.downloaded_blocks[0].slot, + "end_slot" => block_requests.downloaded_blocks[last_element].slot, + "no_blocks" => last_element + 1, + ); + // Re-HELLO to ensure we are up to the latest head + return Some(ImportManagerOutcome::Hello(peer_id)); + } + Err(e) => { + last_element = block_requests.downloaded_blocks.len() -1 + warn!(self.log, "Block processing failed"; + "peer" => peer_id, + "start_slot" => block_requests.downloaded_blocks[0].slot, + "end_slot" => block_requests.downloaded_blocks[last_element].slot, + "no_blocks" => last_element + 1, + "error" => format!("{:?}", e), + ); + return Some(ImportManagerOutcome::DownvotePeer(peer_id)); + } + } + } + None + } + + + fn process_parent_requests(&mut self) -> Option { + + // remove any failed requests + self.parent_queue.retain(|parent_request| { + if parent_request.state == RequestedBlockState::Failed { + debug!(self.log, "Parent import failed", + "block" => parent_request.downloaded_blocks[0].hash, + "siblings found" => parent_request.len() + ); + false + } + else { true } + }); + + // check to make sure there are peers to search for the parent from + if self.full_peers.is_empty() { + return; + } + + // check if parents need to be searched for + for parent_request in self.parent_queue.iter_mut() { + if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { + parent_request.state == BlockRequestsState::Failed + continue; + } + else if parent_request.state == BlockRequestsState::QueuedForward { + parent_request.state = BlockRequestsState::Pending(self.current_req_id); + self.current_req_id +=1; + let parent_hash = + let req = RecentBeaconBlocksRequest { + block_roots: vec![parent_hash], + }; + + // select a random fully synced peer to attempt to download the parent block + let peer_id = self.full_peers.iter().next().expect("List is not empty"); + + return Some(ImportManagerOutcome::RecentRequest(peer_id, req); + } + } + + None + } + + + fn process_complete_parent_requests(&mut self) => (bool, Option) { + + // flag to determine if there is more process to drive or if the manager can be switched to + // an idle state + let mut re_run = false; + + // verify the last added block is the parent of the last requested block + let last_index = parent_requests.downloaded_blocks.len() -1; + let expected_hash = parent_requests.downloaded_blocks[last_index].parent ; + let block_hash = parent_requests.downloaded_blocks[0].tree_hash_root(); + if block_hash != expected_hash { + //TODO: Potentially downvote the peer + debug!(self.log, "Peer sent invalid parent. Ignoring"; + "peer_id" => peer_id, + "received_block" => block_hash, + "expected_parent" => expected_hash, + ); + return; + } + + // Find any parent_requests ready to be processed + for completed_request in self.parent_queue.iter_mut().filter(|req| req.state == BlockRequestsState::Complete) { + // try and process the list of blocks up to the requested block + while !completed_request.downloaded_blocks.is_empty() { + let block = completed_request.downloaded_blocks.pop(); + match self.chain_process_block(block.clone()) { + Ok(BlockProcessingOutcome::ParentUnknown { parent } => { + // need to keep looking for parents + completed_request.downloaded_blocks.push(block); + completed_request.state == BlockRequestsState::QueuedForward; + re_run = true; + break; + } + Ok(BlockProcessingOutcome::Processed { _ } => { } + Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again + completed_request.failed_attempts +=1; + trace!( + self.log, "Invalid parent block"; + "outcome" => format!("{:?}", outcome); + "peer" => format!("{:?}", completed_request.last_submitted_peer), + ); + completed_request.state == BlockRequestsState::QueuedForward; + re_run = true; + return (re_run, Some(ImportManagerOutcome::DownvotePeer(completed_request.last_submitted_peer))); + } + Err(e) => { + completed_request.failed_attempts +=1; + warn!( + self.log, "Parent processing error"; + "error" => format!("{:?}", e); + ); + completed_request.state == BlockRequestsState::QueuedForward; + re_run = true; + return (re_run, Some(ImportManagerOutcome::DownvotePeer(completed_request.last_submitted_peer))); + } + } + } + } + + // remove any full completed and processed parent chains + self.parent_queue.retain(|req| if req.state == BlockRequestsState::Complete { false } else { true }); + (re_run, None) } + + + fn process_blocks( + &mut self, + blocks: Vec>, + ) -> Result<(), String> { + + for block in blocks { + let processing_result = self.chain.process_block(block.clone()); + + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. + trace!( + self.log, "Imported block from network"; + "source" => source, + "slot" => block.slot, + "block_root" => format!("{}", block_root), + "peer" => format!("{:?}", peer_id), + ); + } + BlockProcessingOutcome::ParentUnknown { parent } => { + // blocks should be sequential and all parents should exist + trace!( + self.log, "ParentBlockUnknown"; + "source" => source, + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + ); + return Err(format!("Block at slot {} has an unknown parent.", block.slot)); + } + BlockProcessingOutcome::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + trace!( + self.log, "FutureBlock"; + "source" => source, + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + "peer" => format!("{:?}", peer_id), + ); + return Err(format!("Block at slot {} is too far in the future", block.slot)); + } else { + // The block is in the future, but not too far. + trace!( + self.log, "QueuedFutureBlock"; + "source" => source, + "msg" => "queuing future block, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + "peer" => format!("{:?}", peer_id), + ); + } + } + _ => { + trace!( + self.log, "InvalidBlock"; + "source" => source, + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", outcome), + "peer" => format!("{:?}", peer_id), + ); + return Err(format!("Invalid block at slot {}", block.slot)); + } + } + Ok(()) + } else { + trace!( + self.log, "BlockProcessingFailure"; + "source" => source, + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", processing_result) + ); + return Err(format!("Unexpected block processing error: {:?}", processing_result)); + } + } + } +} diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 6e5cada23..a7f5ced40 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -14,11 +14,6 @@ use types::{ Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, }; -/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. -const SLOT_IMPORT_TOLERANCE: u64 = 100; - -/// The amount of seconds a block may exist in the import queue. -const QUEUE_STALE_SECS: u64 = 100; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. @@ -35,9 +30,11 @@ pub struct PeerSyncInfo { finalized_epoch: Epoch, head_root: Hash256, head_slot: Slot, - requested_slot_skip: Option<(Slot, usize)>, } + + + impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { @@ -69,10 +66,7 @@ pub enum SyncState { pub struct SimpleSync { /// A reference to the underlying beacon chain. chain: Arc>, - /// A mapping of Peers to their respective PeerSyncInfo. - known_peers: HashMap, - /// The current state of the syncing protocol. - state: SyncState, + manager: ImportManager, log: slog::Logger, } @@ -81,49 +75,24 @@ impl SimpleSync { pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); - let queue_item_stale_time = Duration::from_secs(QUEUE_STALE_SECS); - - let import_queue = - ImportQueue::new(beacon_chain.clone(), queue_item_stale_time, log.clone()); SimpleSync { chain: beacon_chain.clone(), - known_peers: HashMap::new(), - import_queue, - state: SyncState::Idle, + manager: ImportManager::new(), log: sync_logger, } } - /// Handle a `Goodbye` message from a peer. - /// - /// Removes the peer from `known_peers`. - pub fn on_goodbye(&mut self, peer_id: PeerId, reason: GoodbyeReason) { - info!( - self.log, "PeerGoodbye"; - "peer" => format!("{:?}", peer_id), - "reason" => format!("{:?}", reason), - ); - - self.known_peers.remove(&peer_id); - } - /// Handle a peer disconnect. /// - /// Removes the peer from `known_peers`. + /// Removes the peer from the manager. pub fn on_disconnect(&mut self, peer_id: PeerId) { - info!( - self.log, "Peer Disconnected"; - "peer" => format!("{:?}", peer_id), - ); - self.known_peers.remove(&peer_id); + self.manager.peer_disconnect(&peer_id); } /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { - info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id)); - network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); } @@ -137,7 +106,7 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { - debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); + trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); // Say hello back. network.send_rpc_response( @@ -156,7 +125,7 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { - debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); + trace!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); // Process the hello message, without sending back another hello. self.process_hello(peer_id, hello, network); @@ -178,7 +147,7 @@ impl SimpleSync { if local.fork_version != remote.fork_version { // The node is on a different network/fork, disconnect them. - info!( + debug!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" @@ -195,7 +164,7 @@ impl SimpleSync { // different to the one in our chain. // // Therefore, the node is on a different chain and we should not communicate with them. - info!( + debug!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "different finalized chain" @@ -227,13 +196,10 @@ impl SimpleSync { .exists::>(&remote.best_root) .unwrap_or_else(|_| false) { - // If the node's best-block is already known to us, we have nothing to request. - debug!( - self.log, - "NaivePeer"; - "peer" => format!("{:?}", peer_id), - "reason" => "best block is known" - ); + // If the node's best-block is already known to us and they are close to our current + // head, treat them as a fully sync'd peer. + self.import_manager.add_full_peer(peer_id); + self.process_sync(); } else { // The remote node has an equal or great finalized epoch and we don't know it's head. // @@ -246,43 +212,60 @@ impl SimpleSync { "remote_latest_finalized_epoch" => remote.latest_finalized_epoch, ); - + self.import_manager.add_peer(peer_id, remote); self.process_sync(); } } self.proess_sync(&mut self) { loop { - match self.sync_manager.poll() { - SyncManagerState::RequestBlocks(peer_id, req) { - debug!( + match self.import_manager.poll() { + ImportManagerOutcome::RequestBlocks(peer_id, req) { + trace!( self.log, - "RPCRequest(BeaconBlockBodies)"; - "count" => req.block_roots.len(), + "RPC Request"; + "method" => "BeaconBlocks", + "count" => req.count, "peer" => format!("{:?}", peer_id) ); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(req)); }, - SyncManagerState::Stalled { - // need more peers to continue sync - warn!(self.log, "No useable peers for sync"); - break; + ImportManagerOutcome::RecentRequest(peer_id, req) { + trace!( + self.log, + "RPC Request"; + "method" => "RecentBeaconBlocks", + "count" => req.block_roots.len(), + "peer" => format!("{:?}", peer_id) + ); + network.send_rpc_request(peer_id.clone(), RPCRequest::RecentBeaconBlocks(req)); + }, + ImportManagerOutcome::DownvotePeer(peer_id) { + trace!( + self.log, + "Peer downvoted"; + "peer" => format!("{:?}", peer_id) + ); + // TODO: Implement reputation + network.disconnect(peer_id.clone(), GoodbyeReason::Fault); }, SyncManagerState::Idle { // nothing to do - break; + return; } } } } + /* fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain .rev_iter_block_roots() .find(|(_root, slot)| *slot == target_slot) .map(|(root, _slot)| root) } + */ /// Handle a `BeaconBlocks` request from the peer. pub fn on_beacon_blocks_request( @@ -346,8 +329,8 @@ impl SimpleSync { pub fn on_beacon_blocks_response( &mut self, peer_id: PeerId, + request_id: RequestId, res: Vec>, - network: &mut NetworkContext, ) { debug!( self.log, @@ -356,9 +339,26 @@ impl SimpleSync { "count" => res.block_bodies.len(), ); - if !res.is_empty() { - self.sync_manager.add_blocks(peer_id, blocks); - } + self.import_manager.beacon_blocks_response(peer_id, request_id, blocks); + + self.process_sync(); + } + + /// Handle a `RecentBeaconBlocks` response from the peer. + pub fn on_recent_beacon_blocks_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + res: Vec>, + ) { + debug!( + self.log, + "BeaconBlocksResponse"; + "peer" => format!("{:?}", peer_id), + "count" => res.block_bodies.len(), + ); + + self.import_manager.recent_blocks_response(peer_id, request_id, blocks); self.process_sync(); } @@ -372,7 +372,6 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, ) -> bool { if let Some(outcome) = self.process_block(peer_id.clone(), block.clone(), network, &"gossip") @@ -380,53 +379,17 @@ impl SimpleSync { match outcome { BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, BlockProcessingOutcome::ParentUnknown { parent } => { - // Add this block to the queue - self.import_queue - .enqueue_full_blocks(vec![block.clone()], peer_id.clone()); - debug!( - self.log, "RequestParentBlock"; - "parent_root" => format!("{}", parent), - "parent_slot" => block.slot - 1, - "peer" => format!("{:?}", peer_id), - ); - - // Request roots between parent and start of finality from peer. - let start_slot = self - .chain - .head() - .beacon_state - .finalized_checkpoint - .epoch - .start_slot(T::EthSpec::slots_per_epoch()); - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - // Request blocks between `latest_finalized_slot` and the `block` - start_slot, - count: block.slot.as_u64() - start_slot.as_u64(), - }, - network, - ); - - // Clean the stale entries from the queue. - self.import_queue.remove_stale(); - + // Inform the sync manager to find parents for this block + self.import_manager.add_unknown_block(block.clone()); SHOULD_FORWARD_GOSSIP_BLOCK } - BlockProcessingOutcome::FutureSlot { present_slot, block_slot, } if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => { - self.import_queue - .enqueue_full_blocks(vec![block], peer_id.clone()); - + //TODO: Decide the logic here SHOULD_FORWARD_GOSSIP_BLOCK } - // Note: known blocks are forwarded on the gossip network. - // - // We rely upon the lower layers (libp2p) to stop loops occurring from re-gossiped - // blocks. BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK, _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, } @@ -457,48 +420,8 @@ impl SimpleSync { } } - /// Request some `BeaconBlockRoots` from the remote peer. - fn request_block_roots( - &mut self, - peer_id: PeerId, - req: BeaconBlockRootsRequest, - network: &mut NetworkContext, - ) { - // Potentially set state to sync. - if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE { - debug!(self.log, "Entering downloading sync state."); - self.state = SyncState::Downloading; - } - - debug!( - self.log, - "RPCRequest(BeaconBlockRoots)"; - "count" => req.count, - "peer" => format!("{:?}", peer_id) - ); - - // TODO: handle count > max count. - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(req)); - } - - /// Request some `BeaconBlockHeaders` from the remote peer. - fn request_block_headers( - &mut self, - peer_id: PeerId, - req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, - ) { - debug!( - self.log, - "RPCRequest(BeaconBlockHeaders)"; - "max_headers" => req.max_headers, - "peer" => format!("{:?}", peer_id) - ); - - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req)); - } - +/* /// Returns `true` if `self.chain` has not yet processed this block. pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool { !self @@ -509,207 +432,13 @@ impl SimpleSync { false }) } + */ /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { hello_message(&self.chain) } - /// Helper function to attempt to process a partial block. - /// - /// If the block can be completed recursively call `process_block` - /// else request missing parts. - fn attempt_process_partial_block( - &mut self, - peer_id: PeerId, - block_root: Hash256, - network: &mut NetworkContext, - source: &str, - ) -> Option { - match self.import_queue.attempt_complete_block(block_root) { - PartialBeaconBlockCompletion::MissingBody => { - // Unable to complete the block because the block body is missing. - debug!( - self.log, "RequestParentBody"; - "source" => source, - "block_root" => format!("{}", block_root), - "peer" => format!("{:?}", peer_id), - ); - - // Request the block body from the peer. - self.request_block_bodies( - peer_id, - BeaconBlockBodiesRequest { - block_roots: vec![block_root], - }, - network, - ); - - None - } - PartialBeaconBlockCompletion::MissingHeader(slot) => { - // Unable to complete the block because the block header is missing. - debug!( - self.log, "RequestParentHeader"; - "source" => source, - "block_root" => format!("{}", block_root), - "peer" => format!("{:?}", peer_id), - ); - - // Request the block header from the peer. - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: block_root, - start_slot: slot, - max_headers: 1, - skip_slots: 0, - }, - network, - ); - - None - } - PartialBeaconBlockCompletion::MissingRoot => { - // The `block_root` is not known to the queue. - debug!( - self.log, "MissingParentRoot"; - "source" => source, - "block_root" => format!("{}", block_root), - "peer" => format!("{:?}", peer_id), - ); - - // Do nothing. - - None - } - PartialBeaconBlockCompletion::Complete(block) => { - // The block exists in the queue, attempt to process it - trace!( - self.log, "AttemptProcessParent"; - "source" => source, - "block_root" => format!("{}", block_root), - "parent_slot" => block.slot, - "peer" => format!("{:?}", peer_id), - ); - - self.process_block(peer_id.clone(), block, network, source) - } - } - } - - /// Processes the `block` that was received from `peer_id`. - /// - /// If the block was submitted to the beacon chain without internal error, `Some(outcome)` is - /// returned, otherwise `None` is returned. Note: `Some(_)` does not necessarily indicate that - /// the block was successfully processed or valid. - /// - /// This function performs the following duties: - /// - /// - Attempting to import the block into the beacon chain. - /// - Logging - /// - Requesting unavailable blocks (e.g., if parent is unknown). - /// - Disconnecting faulty nodes. - /// - /// This function does not remove processed blocks from the import queue. - fn process_block( - &mut self, - peer_id: PeerId, - block: BeaconBlock, - network: &mut NetworkContext, - source: &str, - ) -> Option { - let processing_result = self.chain.process_block(block.clone()); - - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - // The block was valid and we processed it successfully. - debug!( - self.log, "Imported block from network"; - "source" => source, - "slot" => block.slot, - "block_root" => format!("{}", block_root), - "peer" => format!("{:?}", peer_id), - ); - } - BlockProcessingOutcome::ParentUnknown { parent } => { - // The parent has not been processed - trace!( - self.log, "ParentBlockUnknown"; - "source" => source, - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - "peer" => format!("{:?}", peer_id), - ); - - // If the parent is in the `import_queue` attempt to complete it then process it. - // All other cases leave `parent` in `import_queue` and return original outcome. - if let Some(BlockProcessingOutcome::Processed { .. }) = - self.attempt_process_partial_block(peer_id, parent, network, source) - { - // If processing parent is successful, re-process block and remove parent from queue - self.import_queue.remove(parent); - - // Attempt to process `block` again - match self.chain.process_block(block) { - Ok(outcome) => return Some(outcome), - Err(_) => return None, - } - } - } - BlockProcessingOutcome::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - warn!( - self.log, "FutureBlock"; - "source" => source, - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - "peer" => format!("{:?}", peer_id), - ); - network.disconnect(peer_id, GoodbyeReason::Fault); - } else { - // The block is in the future, but not too far. - debug!( - self.log, "QueuedFutureBlock"; - "source" => source, - "msg" => "queuing future block, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - "peer" => format!("{:?}", peer_id), - ); - } - } - _ => { - debug!( - self.log, "InvalidBlock"; - "source" => source, - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", outcome), - "peer" => format!("{:?}", peer_id), - ); - } - } - - Some(outcome) - } else { - error!( - self.log, "BlockProcessingFailure"; - "source" => source, - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", processing_result) - ); - - None - } - } } /// Build a `HelloMessage` representing the state of the given `beacon_chain`.