From 719dd72de65d2f4f8f9af16279206a7921c190cb Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 7 Jun 2019 02:55:16 -0400 Subject: [PATCH] Fix recently introduced sync bugs --- beacon_node/network/Cargo.toml | 1 + beacon_node/network/src/sync/import_queue.rs | 4 +- beacon_node/network/src/sync/simple_sync.rs | 299 ++++++++++-------- .../src/per_slot_processing.rs | 3 +- eth2/types/src/beacon_state.rs | 2 +- 5 files changed, 179 insertions(+), 130 deletions(-) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 9cac12659..ebf71aa4e 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -9,6 +9,7 @@ sloggers = "0.3.2" [dependencies] beacon_chain = { path = "../beacon_chain" } +store = { path = "../store" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 5b03f58df..16a277f0b 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -166,7 +166,7 @@ impl ImportQueue { let mut required_bodies: Vec = vec![]; for header in headers { - let block_root = Hash256::from_slice(&header.tree_hash_root()[..]); + let block_root = Hash256::from_slice(&header.canonical_root()[..]); if self.chain_has_not_seen_block(&block_root) { self.insert_header(block_root, header, sender.clone()); @@ -250,7 +250,7 @@ impl ImportQueue { /// /// If the partial already existed, the `inserted` time is set to `now`. fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) { - let block_root = Hash256::from_slice(&block.tree_hash_root()[..]); + let block_root = Hash256::from_slice(&block.canonical_root()[..]); let partial = PartialBeaconBlock { slot: block.slot, diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 043f0beda..c3fb03ca1 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -8,8 +8,11 @@ use slog::{debug, error, info, o, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use store::Store; use tree_hash::TreeHash; -use types::{Attestation, BeaconBlock, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot}; +use types::{ + Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, +}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -160,67 +163,88 @@ impl SimpleSync { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); - let network_id_mismatch = local.network_id != remote.network_id; - let on_different_finalized_chain = (local.latest_finalized_epoch - >= remote.latest_finalized_epoch) - && (!self - .chain - .rev_iter_block_roots(local.best_slot) - .any(|root| root == remote.latest_finalized_root)); - - if network_id_mismatch || on_different_finalized_chain { + // Disconnect nodes who are on a different network. + if local.network_id != remote.network_id { info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" ); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); - return; + // Disconnect nodes if our finalized epoch is greater than thieirs, and their finalized + // epoch is not in our chain. Viz., they are on another chain. + // + // If the local or remote have a `latest_finalized_root == ZERO_HASH`, skips checks about + // the finalized_root. The logic is akward and I think we're better without it. + } else if (local.latest_finalized_epoch >= remote.latest_finalized_epoch) + && (!self + .chain + .rev_iter_block_roots(local.best_slot) + .any(|root| root == remote.latest_finalized_root)) + && (local.latest_finalized_root != spec.zero_hash) + && (remote.latest_finalized_root != spec.zero_hash) + { + info!( + self.log, "HandshakeFailure"; + "peer" => format!("{:?}", peer_id), + "reason" => "wrong_finalized_chain" + ); + network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); + // Process handshakes from peers that seem to be on our chain. } else { info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); self.known_peers.insert(peer_id.clone(), remote); - } - // If we have equal or better finalized epochs and best slots, we require nothing else from - // this peer. - if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) - && (remote.best_slot <= local.best_slot) - { - return; - } + // If we have equal or better finalized epochs and best slots, we require nothing else from + // this peer. + // + // We make an exception when our best slot is 0. Best slot does not indicate wether or + // not there is a block at slot zero. + if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) + && (remote.best_slot <= local.best_slot) + && (local.best_slot > 0) + { + debug!(self.log, "Peer is naive"; "peer" => format!("{:?}", peer_id)); + return; + } - // If the remote has a higher finalized epoch, request all block roots from our finalized - // epoch through to its best slot. - if remote.latest_finalized_epoch > local.latest_finalized_epoch { - let start_slot = local - .latest_finalized_epoch - .start_slot(spec.slots_per_epoch); - let required_slots = start_slot - remote.best_slot; + // If the remote has a higher finalized epoch, request all block roots from our finalized + // epoch through to its best slot. + if remote.latest_finalized_epoch > local.latest_finalized_epoch { + debug!(self.log, "Peer has high finalized epoch"; "peer" => format!("{:?}", peer_id)); + let start_slot = local + .latest_finalized_epoch + .start_slot(spec.slots_per_epoch); + let required_slots = remote.best_slot - start_slot; - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); - // If the remote has a greater best slot, request the roots between our best slot and their - // best slot. - } else if remote.best_root > local.best_root { - let start_slot = local - .latest_finalized_epoch - .start_slot(spec.slots_per_epoch); - let required_slots = start_slot - remote.best_slot; + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); + // If the remote has a greater best slot, request the roots between our best slot and their + // best slot. + } else if remote.best_slot > local.best_slot { + debug!(self.log, "Peer has higher best slot"; "peer" => format!("{:?}", peer_id)); + let start_slot = local + .latest_finalized_epoch + .start_slot(spec.slots_per_epoch); + let required_slots = remote.best_slot - start_slot; - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); + } else { + debug!(self.log, "Nothing to request from peer"; "peer" => format!("{:?}", peer_id)); + } } } @@ -237,19 +261,40 @@ impl SimpleSync { "BlockRootsRequest"; "peer" => format!("{:?}", peer_id), "count" => req.count, + "start_slot" => req.start_slot, ); - let roots = self + let mut roots: Vec = self .chain - .rev_iter_block_roots(req.start_slot) + .rev_iter_block_roots(req.start_slot + req.count) + .skip(1) .take(req.count as usize) + .collect(); + + if roots.len() as u64 != req.count { + debug!( + self.log, + "BlockRootsRequest"; + "peer" => format!("{:?}", peer_id), + "msg" => "Failed to return all requested hashes", + "requested" => req.count, + "returned" => roots.len(), + ); + } + + roots.reverse(); + + let mut roots: Vec = roots + .iter() .enumerate() .map(|(i, block_root)| BlockRootSlot { slot: req.start_slot + Slot::from(i), - block_root, + block_root: *block_root, }) .collect(); + roots.dedup_by_key(|brs| brs.block_root); + network.send_rpc_response( peer_id, request_id, @@ -335,12 +380,28 @@ impl SimpleSync { "count" => req.max_headers, ); - let headers = get_block_headers( - &self.chain, - req.start_slot, - req.max_headers as usize, - req.skip_slots as usize, - ); + let count = req.max_headers; + + // Collect the block roots. + // + // Instead of using `chain.rev_iter_blocks` we collect the roots first. This avoids + // unnecessary block deserialization when `req.skip_slots > 0`. + let mut roots: Vec = self + .chain + .rev_iter_block_roots(req.start_slot + (count - 1)) + .take(count as usize) + .collect(); + + roots.reverse(); + + let headers: Vec = roots + .into_iter() + .step_by(req.skip_slots as usize + 1) + .filter_map(|root| { + let block = self.chain.store.get::(&root).ok()?; + Some(block?.block_header()) + }) + .collect(); network.send_rpc_response( peer_id, @@ -388,27 +449,33 @@ impl SimpleSync { req: BeaconBlockBodiesRequest, network: &mut NetworkContext, ) { + let block_bodies: Vec = req + .block_roots + .iter() + .filter_map(|root| { + if let Ok(Some(block)) = self.chain.store.get::(root) { + Some(block.body) + } else { + debug!( + self.log, + "Peer requested unknown block"; + "peer" => format!("{:?}", peer_id), + "request_root" => format!("{:}", root), + ); + + None + } + }) + .collect(); + debug!( self.log, "BlockBodiesRequest"; "peer" => format!("{:?}", peer_id), - "count" => req.block_roots.len(), + "requested" => req.block_roots.len(), + "returned" => block_bodies.len(), ); - let block_bodies = match self.chain.get_block_bodies(&req.block_roots) { - Ok(bodies) => bodies, - Err(e) => { - // TODO: return RPC error. - warn!( - self.log, - "RPCRequest"; "peer" => format!("{:?}", peer_id), - "req" => "BeaconBlockBodies", - "error" => format!("{:?}", e) - ); - return; - } - }; - network.send_rpc_response( peer_id, request_id, @@ -449,18 +516,12 @@ impl SimpleSync { block: BeaconBlock, network: &mut NetworkContext, ) -> bool { - info!( - self.log, - "GossipBlockReceived"; - "peer" => format!("{:?}", peer_id), - "block_slot" => format!("{:?}", block.slot), - ); - // Ignore any block from a finalized slot. if self.slot_is_finalized(block.slot) { - warn!( - self.log, "IgnoredGossipBlock"; - "msg" => "new block slot is finalized.", + debug!( + self.log, "IgnoredFinalizedBlock"; + "source" => "gossip", + "msg" => "chain is finalized at block slot", "block_slot" => block.slot, ); return false; @@ -475,11 +536,11 @@ impl SimpleSync { } match self.chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => { + Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown { .. })) => { // The block was valid and we processed it successfully. debug!( - self.log, "InvalidGossipBlock"; - "msg" => "parent block unknown", + self.log, "ParentBlockUnknown"; + "source" => "gossip", "parent_root" => format!("{}", block.previous_block_root), "peer" => format!("{:?}", peer_id), ); @@ -505,8 +566,9 @@ impl SimpleSync { if block_slot - present_slot > FUTURE_SLOT_TOLERANCE { // The block is too far in the future, drop it. warn!( - self.log, "InvalidGossipBlock"; - "msg" => "future block rejected", + self.log, "FutureBlock"; + "source" => "gossip", + "msg" => "block for future slot rejected, check your time", "present_slot" => present_slot, "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, @@ -517,8 +579,9 @@ impl SimpleSync { } else { // The block is in the future, but not too far. warn!( - self.log, "FutureGossipBlock"; - "msg" => "queuing future block", + self.log, "QueuedFutureBlock"; + "source" => "gossip", + "msg" => "queuing future block, check your time", "present_slot" => present_slot, "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, @@ -534,7 +597,8 @@ impl SimpleSync { if outcome.is_invalid() { // The peer has sent a block which is fundamentally invalid. warn!( - self.log, "InvalidGossipBlock"; + self.log, "InvalidBlock"; + "source" => "gossip", "msg" => "peer sent objectively invalid block", "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}", peer_id), @@ -546,7 +610,8 @@ impl SimpleSync { } else if outcome.sucessfully_processed() { // The block was valid and we processed it successfully. info!( - self.log, "ValidGossipBlock"; + self.log, "ImportedBlock"; + "source" => "gossip", "peer" => format!("{:?}", peer_id), ); // Forward the block to peers @@ -555,7 +620,8 @@ impl SimpleSync { // The block wasn't necessarily invalid but we didn't process it successfully. // This condition shouldn't be reached. error!( - self.log, "InvalidGossipBlock"; + self.log, "BlockProcessingFailure"; + "source" => "gossip", "msg" => "unexpected condition in processing block.", "outcome" => format!("{:?}", outcome), ); @@ -569,8 +635,9 @@ impl SimpleSync { // Blocks should not be able to trigger errors, instead they should be flagged as // invalid. error!( - self.log, "InvalidGossipBlock"; + self.log, "BlockProcessingError"; "msg" => "internal error in processing block.", + "source" => "gossip", "error" => format!("{:?}", e), ); // Do not forward the block to peers. @@ -584,19 +651,15 @@ impl SimpleSync { /// Not currently implemented. pub fn on_attestation_gossip( &mut self, - peer_id: PeerId, + _peer_id: PeerId, msg: Attestation, _network: &mut NetworkContext, ) { - info!( - self.log, - "NewAttestationGossip"; - "peer" => format!("{:?}", peer_id), - ); - match self.chain.process_attestation(msg) { - Ok(()) => info!(self.log, "ImportedAttestation"), - Err(e) => warn!(self.log, "InvalidAttestation"; "error" => format!("{:?}", e)), + Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"), + Err(e) => { + warn!(self.log, "InvalidAttestation"; "source" => "gossip", "error" => format!("{:?}", e)) + } } } @@ -611,6 +674,9 @@ impl SimpleSync { // Loop through all of the complete blocks in the queue. for (block_root, block, sender) in self.import_queue.complete_blocks() { + let slot = block.slot; + let parent_root = block.previous_block_root; + match self.chain.process_block(block) { Ok(outcome) => { if outcome.is_invalid() { @@ -618,15 +684,12 @@ impl SimpleSync { warn!( self.log, "InvalidBlock"; - "sender_peer_id" => format!("{:?}", sender), + "sender_peer_id" => format!("{:?}", sender.clone()), + "block_root" => format!("{}", block_root), "reason" => format!("{:?}", outcome), ); network.disconnect(sender, GoodbyeReason::Fault); - break; - } - - // If this results to true, the item will be removed from the queue. - if outcome.sucessfully_processed() { + } else if outcome.sucessfully_processed() { successful += 1; self.import_queue.remove(block_root); } else { @@ -635,6 +698,8 @@ impl SimpleSync { "ProcessImportQueue"; "msg" => "Block not imported", "outcome" => format!("{:?}", outcome), + "block_slot" => format!("{:?}", slot), + "parent_root" => format!("{}", parent_root), "peer" => format!("{:?}", sender), ); } @@ -752,19 +817,3 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes best_slot: state.slot, } } - -/// Return a list of `BeaconBlockHeader` from the given `BeaconChain`, starting at `start_slot` and -/// returning `count` headers with a gap of `skip` slots between each. -fn get_block_headers( - beacon_chain: &BeaconChain, - start_slot: Slot, - count: usize, - skip: usize, -) -> Vec { - beacon_chain - .rev_iter_blocks(start_slot) - .step_by(skip + 1) - .take(count) - .map(|block| block.block_header()) - .collect() -} diff --git a/eth2/state_processing/src/per_slot_processing.rs b/eth2/state_processing/src/per_slot_processing.rs index 97645ab8a..0fc074262 100644 --- a/eth2/state_processing/src/per_slot_processing.rs +++ b/eth2/state_processing/src/per_slot_processing.rs @@ -1,5 +1,4 @@ use crate::*; -use tree_hash::SignedRoot; use types::*; #[derive(Debug, PartialEq)] @@ -44,7 +43,7 @@ fn cache_state(state: &mut BeaconState, spec: &ChainSpec) -> Resu // Store the previous slot's post state transition root. state.set_state_root(previous_slot, previous_slot_state_root)?; - let latest_block_root = Hash256::from_slice(&state.latest_block_header.signed_root()[..]); + let latest_block_root = state.latest_block_header.canonical_root(); state.set_block_root(previous_slot, latest_block_root)?; // Set the state slot back to what it should be. diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index 471b82b20..b93738328 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -111,7 +111,7 @@ where pub current_crosslinks: FixedLenVec, pub previous_crosslinks: FixedLenVec, pub latest_block_roots: FixedLenVec, - latest_state_roots: FixedLenVec, + pub latest_state_roots: FixedLenVec, latest_active_index_roots: FixedLenVec, latest_slashed_balances: FixedLenVec, pub latest_block_header: BeaconBlockHeader,