diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index ad3233be7..f9adb93c1 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -179,6 +179,19 @@ pub struct BeaconBlockRootsResponse { pub roots: Vec, } +impl BeaconBlockRootsResponse { + /// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`. + pub fn slots_are_ascending(&self) -> bool { + for i in 1..self.roots.len() { + if self.roots[i - 1].slot >= self.roots[i].slot { + return false; + } + } + + true + } +} + /// Contains a block root and associated slot. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlockRootSlot { diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 17cbd2f12..8680993aa 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -5,7 +5,7 @@ use slog::{debug, error}; use ssz::TreeHash; use std::sync::Arc; use std::time::{Duration, Instant}; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256}; +use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot}; /// Provides a queue for fully and partially built `BeaconBlock`s. /// @@ -120,6 +120,38 @@ impl ImportQueue { .position(|brs| self.is_new_block(&brs.block_root)) } + /// Adds the `block_roots` to the partials queue. + /// + /// If a `block_root` is not in the queue and has not been processed by the chain it is added + /// to the queue and it's block root is included in the output. + pub fn enqueue_block_roots( + &mut self, + block_roots: &[BlockRootSlot], + sender: PeerId, + ) -> Vec { + let new_roots: Vec = block_roots + .iter() + // Ignore any roots already processed by the chain. + .filter(|brs| self.is_new_block(&brs.block_root)) + // Ignore any roots already stored in the queue. + .filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root)) + .cloned() + .collect(); + + new_roots.iter().for_each(|brs| { + self.partials.push(PartialBeaconBlock { + slot: brs.slot, + block_root: brs.block_root, + sender: sender.clone(), + header: None, + body: None, + inserted: Instant::now(), + }) + }); + + new_roots + } + /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for /// which we should use to request `BeaconBlockBodies`. /// @@ -174,8 +206,9 @@ impl ImportQueue { self.partials[i].inserted = Instant::now(); } else { self.partials.push(PartialBeaconBlock { + slot: header.slot, block_root, - header, + header: Some(header), body: None, inserted: Instant::now(), sender, @@ -192,12 +225,14 @@ impl ImportQueue { let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); self.partials.iter_mut().for_each(|mut p| { - if body_root == p.header.block_body_root { - p.inserted = Instant::now(); + if let Some(header) = &mut p.header { + if body_root == header.block_body_root { + p.inserted = Instant::now(); - if p.body.is_none() { - p.body = Some(body.clone()); - p.sender = sender.clone(); + if p.body.is_none() { + p.body = Some(body.clone()); + p.sender = sender.clone(); + } } } }); @@ -208,9 +243,10 @@ impl ImportQueue { /// `BeaconBlock`. #[derive(Clone, Debug)] pub struct PartialBeaconBlock { + pub slot: Slot, /// `BeaconBlock` root. pub block_root: Hash256, - pub header: BeaconBlockHeader, + pub header: Option, pub body: Option, /// The instant at which this record was created or last meaningfully modified. Used to /// determine if an entry is stale and should be removed. @@ -225,7 +261,7 @@ impl PartialBeaconBlock { pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { Some(( self.block_root, - self.header.into_block(self.body?), + self.header?.into_block(self.body?), self.sender, )) } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 1afba018d..21b261268 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -358,31 +358,48 @@ impl SimpleSync { if res.roots.is_empty() { warn!( self.log, - "Peer returned empty block roots response. PeerId: {:?}", peer_id + "Peer returned empty block roots response"; + "peer_id" => format!("{:?}", peer_id) ); return; } - let new_root_index = self.import_queue.first_new_root(&res.roots); - - // If a new block root is found, request it and all the headers following it. - // - // We make an assumption here that if we don't know a block then we don't know of all - // it's parents. This might not be the case if syncing becomes more sophisticated. - if let Some(i) = new_root_index { - let new = &res.roots[i]; - - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: new.block_root, - start_slot: new.slot, - max_headers: (res.roots.len() - i) as u64, - skip_slots: 0, - }, - network, - ) + // The wire protocol specifies that slots must be in ascending order. + if !res.slots_are_ascending() { + warn!( + self.log, + "Peer returned block roots response with bad slot ordering"; + "peer_id" => format!("{:?}", peer_id) + ); + return; } + + let new_roots = self.import_queue.enqueue_block_roots(&res.roots, peer_id.clone()); + + // No new roots means nothing to do. + // + // This check protects against future panics. + if new_roots.is_empty() { + return; + } + + // Determine the first (earliest) and last (latest) `BlockRootSlot` items. + // + // This logic relies upon slots to be in ascending order, which is enforced earlier. + let first = new_roots.first().expect("Non-empty list must have first"); + let last = new_roots.last().expect("Non-empty list must have last"); + + // Request all headers between the earliest and latest new `BlockRootSlot` items. + self.request_block_headers( + peer_id, + BeaconBlockHeadersRequest { + start_root: first.block_root, + start_slot: first.slot, + max_headers: (last.slot - first.slot + 1).as_u64(), + skip_slots: 0, + }, + network, + ) } /// Handle a `BeaconBlockHeaders` request from the peer. @@ -528,8 +545,17 @@ impl SimpleSync { "NewGossipBlock"; "peer" => format!("{:?}", peer_id), ); - // TODO: filter out messages that a prior to the finalized slot. - // + + // Ignore any block from a finalized slot. + if self.slot_is_finalized(msg.slot) { + warn!( + self.log, "NewGossipBlock"; + "msg" => "new block slot is finalized.", + "slot" => msg.slot, + ); + return; + } + // TODO: if the block is a few more slots ahead, try to get all block roots from then until // now. // @@ -675,6 +701,14 @@ impl SimpleSync { network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); } + fn slot_is_finalized(&self, slot: Slot) -> bool { + slot <= self + .chain + .hello_message() + .latest_finalized_epoch + .start_slot(self.chain.get_spec().slots_per_epoch) + } + /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { self.chain.hello_message() diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index b38e8b527..f437240dc 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder { }) .collect(); - let genesis_time = 1553753928; // arbitrary + let genesis_time = 1553977336; // arbitrary let mut state = BeaconState::genesis( genesis_time,