diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 90c354cfd..8cc3dd65d 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -1,7 +1,8 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::PeerId; -use slog::{debug, error}; +use slog::error; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tree_hash::TreeHash; @@ -22,7 +23,7 @@ use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot}; pub struct ImportQueue { pub chain: Arc>, /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. - pub partials: Vec, + partials: HashMap, /// Time before a queue entry is considered state. pub stale_time: Duration, /// Logging @@ -34,7 +35,7 @@ impl ImportQueue { pub fn new(chain: Arc>, stale_time: Duration, log: slog::Logger) -> Self { Self { chain, - partials: vec![], + partials: HashMap::new(), stale_time, log, } @@ -52,7 +53,7 @@ impl ImportQueue { let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self .partials .iter() - .filter_map(|partial| partial.clone().complete()) + .filter_map(|(_, partial)| partial.clone().complete()) .collect(); // Sort the completable partials to be in ascending slot order. @@ -61,14 +62,14 @@ impl ImportQueue { complete } + pub fn contains_block_root(&self, block_root: Hash256) -> bool { + self.partials.contains_key(&block_root) + } + /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial /// if it exists. pub fn remove(&mut self, block_root: Hash256) -> Option { - let position = self - .partials - .iter() - .position(|p| p.block_root == block_root)?; - Some(self.partials.remove(position)) + self.partials.remove(&block_root) } /// Flushes all stale entries from the queue. @@ -76,31 +77,10 @@ impl ImportQueue { /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the /// past. pub fn remove_stale(&mut self) { - let stale_indices: Vec = self - .partials - .iter() - .enumerate() - .filter_map(|(i, partial)| { - if partial.inserted + self.stale_time <= Instant::now() { - Some(i) - } else { - None - } - }) - .collect(); + let stale_time = self.stale_time; - if !stale_indices.is_empty() { - debug!( - self.log, - "ImportQueue removing stale entries"; - "stale_items" => stale_indices.len(), - "stale_time_seconds" => self.stale_time.as_secs() - ); - } - - stale_indices.iter().for_each(|&i| { - self.partials.remove(i); - }); + self.partials + .retain(|_, partial| partial.inserted + stale_time > Instant::now()) } /// Returns `true` if `self.chain` has not yet processed this block. @@ -122,27 +102,30 @@ impl ImportQueue { block_roots: &[BlockRootSlot], sender: PeerId, ) -> Vec { - let new_roots: Vec = block_roots + let new_block_root_slots: Vec = block_roots .iter() + // Ignore any roots already stored in the queue. + .filter(|brs| !self.contains_block_root(brs.block_root)) // Ignore any roots already processed by the chain. .filter(|brs| self.chain_has_not_seen_block(&brs.block_root)) - // Ignore any roots already stored in the queue. - .filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root)) .cloned() .collect(); - new_roots.iter().for_each(|brs| { - self.partials.push(PartialBeaconBlock { - slot: brs.slot, - block_root: brs.block_root, - sender: sender.clone(), - header: None, - body: None, - inserted: Instant::now(), - }) - }); + self.partials.extend( + new_block_root_slots + .iter() + .map(|brs| PartialBeaconBlock { + slot: brs.slot, + block_root: brs.block_root, + sender: sender.clone(), + header: None, + body: None, + inserted: Instant::now(), + }) + .map(|partial| (partial.block_root, partial)), + ); - new_roots + new_block_root_slots } /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for @@ -170,7 +153,7 @@ impl ImportQueue { if self.chain_has_not_seen_block(&block_root) { self.insert_header(block_root, header, sender.clone()); - required_bodies.push(block_root) + required_bodies.push(block_root); } } @@ -197,31 +180,20 @@ impl ImportQueue { /// If the header already exists, the `inserted` time is set to `now` and not other /// modifications are made. fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { - if let Some(i) = self - .partials - .iter() - .position(|p| p.block_root == block_root) - { - // Case 1: there already exists a partial with a matching block root. - // - // The `inserted` time is set to now and the header is replaced, regardless of whether - // it existed or not. - self.partials[i].header = Some(header); - self.partials[i].inserted = Instant::now(); - } else { - // Case 2: there was no partial with a matching block root. - // - // A new partial is added. This case permits adding a header without already known the - // root. - self.partials.push(PartialBeaconBlock { + self.partials + .entry(block_root) + .and_modify(|partial| { + partial.header = Some(header.clone()); + partial.inserted = Instant::now(); + }) + .or_insert_with(|| PartialBeaconBlock { slot: header.slot, block_root, header: Some(header), body: None, inserted: Instant::now(), sender, - }) - } + }); } /// Updates an existing partial with the `body`. @@ -232,7 +204,7 @@ impl ImportQueue { fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { let body_root = Hash256::from_slice(&body.tree_hash_root()[..]); - self.partials.iter_mut().for_each(|mut p| { + self.partials.iter_mut().for_each(|(_, mut p)| { if let Some(header) = &mut p.header { if body_root == header.block_body_root { p.inserted = Instant::now(); @@ -261,15 +233,10 @@ impl ImportQueue { sender, }; - if let Some(i) = self - .partials - .iter() - .position(|p| p.block_root == block_root) - { - self.partials[i] = partial; - } else { - self.partials.push(partial) - } + self.partials + .entry(block_root) + .and_modify(|existing_partial| *existing_partial = partial.clone()) + .or_insert(partial); } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 0a082afcf..2382e47a4 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -17,7 +17,7 @@ use types::{ const SLOT_IMPORT_TOLERANCE: u64 = 100; /// The amount of seconds a block (or partial block) may exist in the import queue. -const QUEUE_STALE_SECS: u64 = 600; +const QUEUE_STALE_SECS: u64 = 6; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. @@ -72,7 +72,6 @@ pub struct SimpleSync { import_queue: ImportQueue, /// The current state of the syncing protocol. state: SyncState, - /// Sync logger. log: slog::Logger, } @@ -160,96 +159,100 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { - let spec = &self.chain.spec; - let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); - // Disconnect nodes who are on a different network. + let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); + if local.network_id != remote.network_id { + // The node is on a different network, disconnect them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" ); + network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); - // Disconnect nodes if our finalized epoch is greater than thieirs, and their finalized - // epoch is not in our chain. Viz., they are on another chain. - // - // If the local or remote have a `latest_finalized_root == ZERO_HASH`, skips checks about - // the finalized_root. The logic is akward and I think we're better without it. - } else if (local.latest_finalized_epoch >= remote.latest_finalized_epoch) - && (!self - .chain - .rev_iter_block_roots(local.best_slot) - .any(|(root, _slot)| root == remote.latest_finalized_root)) - && (local.latest_finalized_root != spec.zero_hash) - && (remote.latest_finalized_root != spec.zero_hash) + } else if remote.latest_finalized_epoch <= local.latest_finalized_epoch + && remote.latest_finalized_root != self.chain.spec.zero_hash + && local.latest_finalized_root != self.chain.spec.zero_hash + && (self.root_at_slot(start_slot(remote.latest_finalized_epoch)) + != Some(remote.latest_finalized_root)) { + // The remotes finalized epoch is less than or greater than ours, but the block root is + // different to the one in our chain. + // + // Therefore, the node is on a different chain and we should not communicate with them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), - "reason" => "wrong_finalized_chain" + "reason" => "different finalized chain" ); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); - // Process handshakes from peers that seem to be on our chain. - } else { - info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); - self.known_peers.insert(peer_id.clone(), remote); - - // If we have equal or better finalized epochs and best slots, we require nothing else from - // this peer. + } else if remote.latest_finalized_epoch < local.latest_finalized_epoch { + // The node has a lower finalized epoch, their chain is not useful to us. There are two + // cases where a node can have a lower finalized epoch: // - // We make an exception when our best slot is 0. Best slot does not indicate wether or - // not there is a block at slot zero. - if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) - && (remote.best_slot <= local.best_slot) - && (local.best_slot > 0) - { - debug!(self.log, "Peer is naive"; "peer" => format!("{:?}", peer_id)); - return; - } + // ## The node is on the same chain + // + // If a node is on the same chain but has a lower finalized epoch, their head must be + // lower than ours. Therefore, we have nothing to request from them. + // + // ## The node is on a fork + // + // If a node is on a fork that has a lower finalized epoch, switching to that fork would + // cause us to revert a finalized block. This is not permitted, therefore we have no + // interest in their blocks. + debug!( + self.log, + "NaivePeer"; + "peer" => format!("{:?}", peer_id), + "reason" => "lower finalized epoch" + ); + } else if self + .chain + .store + .exists::(&remote.best_root) + .unwrap_or_else(|_| false) + { + // If the node's best-block is already known to us, we have nothing to request. + debug!( + self.log, + "NaivePeer"; + "peer" => format!("{:?}", peer_id), + "reason" => "best block is known" + ); + } else { + // The remote node has an equal or great finalized epoch and we don't know it's head. + // + // Therefore, there are some blocks between the local finalized epoch and the remote + // head that are worth downloading. + debug!(self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id)); - // If the remote has a higher finalized epoch, request all block roots from our finalized - // epoch through to its best slot. - if remote.latest_finalized_epoch > local.latest_finalized_epoch { - debug!(self.log, "Peer has high finalized epoch"; "peer" => format!("{:?}", peer_id)); - let start_slot = local - .latest_finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - let required_slots = remote.best_slot - start_slot; + let start_slot = local + .latest_finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let required_slots = remote.best_slot - start_slot; - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); - // If the remote has a greater best slot, request the roots between our best slot and their - // best slot. - } else if remote.best_slot > local.best_slot { - debug!(self.log, "Peer has higher best slot"; "peer" => format!("{:?}", peer_id)); - let start_slot = local - .latest_finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - let required_slots = remote.best_slot - start_slot; - - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); - } else { - debug!(self.log, "Nothing to request from peer"; "peer" => format!("{:?}", peer_id)); - } + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); } } + fn root_at_slot(&self, target_slot: Slot) -> Option { + self.chain + .rev_iter_block_roots(target_slot) + .take(1) + .find(|(_root, slot)| *slot == target_slot) + .map(|(root, _slot)| root) + } + /// Handle a `BeaconBlockRoots` request from the peer. pub fn on_beacon_block_roots_request( &mut self, @@ -275,11 +278,13 @@ impl SimpleSync { .collect(); if roots.len() as u64 != req.count { - debug!( + warn!( self.log, "BlockRootsRequest"; "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", + "start_slot" => req.start_slot, + "current_slot" => self.chain.current_state().slot, "requested" => req.count, "returned" => roots.len(), ); @@ -351,7 +356,7 @@ impl SimpleSync { BeaconBlockHeadersRequest { start_root: first.block_root, start_slot: first.slot, - max_headers: (last.slot - first.slot + 1).as_u64(), + max_headers: (last.slot - first.slot).as_u64(), skip_slots: 0, }, network, @@ -433,7 +438,9 @@ impl SimpleSync { .import_queue .enqueue_headers(res.headers, peer_id.clone()); - self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); + if !block_roots.is_empty() { + self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); + } } /// Handle a `BeaconBlockBodies` request from the peer. @@ -518,10 +525,28 @@ impl SimpleSync { { match outcome { BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, - BlockProcessingOutcome::ParentUnknown { .. } => { + BlockProcessingOutcome::ParentUnknown { parent } => { + // Clean the stale entries from the queue. + self.import_queue.remove_stale(); + + // Add this block to the queue self.import_queue .enqueue_full_blocks(vec![block], peer_id.clone()); + // Unless the parent is in the queue, request the parent block from the peer. + // + // It is likely that this is duplicate work, given we already send a hello + // request. However, I believe there are some edge-cases where the hello + // message doesn't suffice, so we perform this request as well. + if !self.import_queue.contains_block_root(parent) { + // Send a hello to learn of the clients best slot so we can then sync the required + // parent(s). + network.send_rpc_request( + peer_id.clone(), + RPCRequest::Hello(hello_message(&self.chain)), + ); + } + SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::FutureSlot { @@ -696,7 +721,7 @@ impl SimpleSync { if let Ok(outcome) = processing_result { match outcome { BlockProcessingOutcome::Processed { block_root } => { - info!( + debug!( self.log, "Imported block from network"; "source" => source, "slot" => block.slot, @@ -713,28 +738,19 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), ); - // Send a hello to learn of the clients best slot so we can then sync the require - // parent(s). - network.send_rpc_request( - peer_id.clone(), - RPCRequest::Hello(hello_message(&self.chain)), - ); - - // Explicitly request the parent block from the peer. + // Unless the parent is in the queue, request the parent block from the peer. // // It is likely that this is duplicate work, given we already send a hello // request. However, I believe there are some edge-cases where the hello // message doesn't suffice, so we perform this request as well. - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: parent, - start_slot: block.slot - 1, - max_headers: 1, - skip_slots: 0, - }, - network, - ) + if !self.import_queue.contains_block_root(parent) { + // Send a hello to learn of the clients best slot so we can then sync the require + // parent(s). + network.send_rpc_request( + peer_id.clone(), + RPCRequest::Hello(hello_message(&self.chain)), + ); + } } BlockProcessingOutcome::FutureSlot { present_slot, diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index cf50d671b..76807ce8f 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -139,8 +139,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { Err(BeaconStateError::SlotOutOfBounds) => { // Read a `BeaconState` from the store that has access to prior historical root. let beacon_state: BeaconState = { - // Load the earlier state from disk. Skip forward one slot, because a state - // doesn't return it's own state root. + // Load the earliest state from disk. let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; self.store.get(&new_state_root).ok()?