diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 7a8efb254..827adeb3c 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -10,7 +10,7 @@ use beacon_chain::{ use eth2_libp2p::rpc::HelloMessage; use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; -pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; +pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome, InvalidBlock}; /// The network's API to the beacon chain. pub trait BeaconChain: Send + Sync { diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 0efa6b96f..098a5b4bf 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -208,8 +208,9 @@ impl MessageHandler { fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { PubsubMessage::Block(message) => { - self.sync - .on_block_gossip(peer_id, message, &mut self.network_context) + let _should_foward_on = + self.sync + .on_block_gossip(peer_id, message, &mut self.network_context); } PubsubMessage::Attestation(message) => { self.sync diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index b9280440b..0026347eb 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -104,7 +104,7 @@ impl ImportQueue { } /// Returns `true` if `self.chain` has not yet processed this block. - pub fn is_new_block(&self, block_root: &Hash256) -> bool { + pub fn chain_has_not_seen_block(&self, block_root: &Hash256) -> bool { self.chain .is_new_block_root(&block_root) .unwrap_or_else(|_| { @@ -125,7 +125,7 @@ impl ImportQueue { let new_roots: Vec = block_roots .iter() // Ignore any roots already processed by the chain. - .filter(|brs| self.is_new_block(&brs.block_root)) + .filter(|brs| self.chain_has_not_seen_block(&brs.block_root)) // Ignore any roots already stored in the queue. .filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root)) .cloned() @@ -168,7 +168,7 @@ impl ImportQueue { for header in headers { let block_root = Hash256::from_slice(&header.hash_tree_root()[..]); - if self.is_new_block(&block_root) { + if self.chain_has_not_seen_block(&block_root) { self.insert_header(block_root, header, sender.clone()); required_bodies.push(block_root) } @@ -186,6 +186,12 @@ impl ImportQueue { } } + pub fn enqueue_full_blocks(&mut self, blocks: Vec, sender: PeerId) { + for block in blocks { + self.insert_full_block(block, sender.clone()); + } + } + /// Inserts a header to the queue. /// /// If the header already exists, the `inserted` time is set to `now` and not other @@ -239,6 +245,32 @@ impl ImportQueue { } }); } + + /// Updates an existing `partial` with the completed block, or adds a new (complete) partial. + /// + /// If the partial already existed, the `inserted` time is set to `now`. + fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) { + let block_root = Hash256::from_slice(&block.hash_tree_root()[..]); + + let partial = PartialBeaconBlock { + slot: block.slot, + block_root, + header: Some(block.block_header()), + body: Some(block.body), + inserted: Instant::now(), + sender, + }; + + if let Some(i) = self + .partials + .iter() + .position(|p| p.block_root == block_root) + { + self.partials[i] = partial; + } else { + self.partials.push(partial) + } + } } /// Individual components of a `BeaconBlock`, potentially all that are required to form a full diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index e8a3da656..6a78dc57d 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,10 +1,11 @@ use super::import_queue::ImportQueue; -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock}; use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, warn}; +use ssz::TreeHash; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -16,6 +17,10 @@ const SLOT_IMPORT_TOLERANCE: u64 = 100; /// The amount of seconds a block (or partial block) may exist in the import queue. const QUEUE_STALE_SECS: u64 = 60; +/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. +/// Otherwise we queue it. +const FUTURE_SLOT_TOLERANCE: u64 = 1; + /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { @@ -536,65 +541,130 @@ impl SimpleSync { } /// Process a gossip message declaring a new block. + /// + /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip( &mut self, peer_id: PeerId, block: BeaconBlock, network: &mut NetworkContext, - ) { + ) -> bool { info!( self.log, "NewGossipBlock"; "peer" => format!("{:?}", peer_id), ); - /* // Ignore any block from a finalized slot. - if self.slot_is_finalized(msg.slot) { + if self.slot_is_finalized(block.slot) { warn!( self.log, "NewGossipBlock"; "msg" => "new block slot is finalized.", - "slot" => msg.slot, + "block_slot" => block.slot, ); - return; + return false; } + let block_root = Hash256::from_slice(&block.hash_tree_root()); + // Ignore any block that the chain already knows about. - if self.chain_has_seen_block(&msg.block_root) { - return; + if self.chain_has_seen_block(&block_root) { + println!("this happened"); + // TODO: Age confirm that we shouldn't forward a block if we already know of it. + return false; } - // k - if msg.slot == self.chain.hello_message().best_slot + 1 { - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: msg.block_root, - start_slot: msg.slot, - max_headers: 1, - skip_slots: 0, - }, - network, - ) + debug!( + self.log, + "NewGossipBlock"; + "peer" => format!("{:?}", peer_id), + "msg" => "processing block", + ); + match self.chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => { + // get the parent. + true + } + Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::FutureSlot { + present_slot, + block_slot, + })) => { + if block_slot - present_slot > FUTURE_SLOT_TOLERANCE { + // The block is too far in the future, drop it. + warn!( + self.log, "NewGossipBlock"; + "msg" => "future block rejected", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + "peer" => format!("{:?}", peer_id), + ); + // Do not forward the block around to peers. + false + } else { + // The block is in the future, but not too far. + warn!( + self.log, "NewGossipBlock"; + "msg" => "queuing future block", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + "peer" => format!("{:?}", peer_id), + ); + // Queue the block for later processing. + self.import_queue.enqueue_full_blocks(vec![block], peer_id); + // Forward the block around to peers. + true + } + } + Ok(outcome) => { + if outcome.is_invalid() { + // The peer has sent a block which is fundamentally invalid. + warn!( + self.log, "NewGossipBlock"; + "msg" => "invalid block from peer", + "outcome" => format!("{:?}", outcome), + "peer" => format!("{:?}", peer_id), + ); + // Disconnect the peer + network.disconnect(peer_id, GoodbyeReason::Fault); + // Do not forward the block to peers. + false + } else if outcome.sucessfully_processed() { + // The block was valid and we processed it successfully. + info!( + self.log, "NewGossipBlock"; + "msg" => "block import successful", + "peer" => format!("{:?}", peer_id), + ); + // Forward the block to peers + true + } else { + // The block wasn't necessarily invalid but we didn't process it successfully. + // This condition shouldn't be reached. + error!( + self.log, "NewGossipBlock"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", outcome), + ); + // Do not forward the block on. + false + } + } + Err(e) => { + // We encountered an error whilst processing the block. + // + // Blocks should not be able to trigger errors, instead they should be flagged as + // invalid. + error!( + self.log, "NewGossipBlock"; + "msg" => "internal error in processing block.", + "error" => format!("{:?}", e), + ); + // Do not forward the block to peers. + false + } } - - // TODO: if the block is a few more slots ahead, try to get all block roots from then until - // now. - // - // Note: only requests the new block -- will fail if we don't have its parents. - if !self.chain_has_seen_block(&msg.block_root) { - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: msg.block_root, - start_slot: msg.slot, - max_headers: 1, - skip_slots: 0, - }, - network, - ) - } - */ } /// Process a gossip message declaring a new attestation. @@ -724,6 +794,18 @@ impl SimpleSync { network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); } + /// Returns `true` if `self.chain` has not yet processed this block. + pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool { + !self + .chain + .is_new_block_root(&block_root) + .unwrap_or_else(|_| { + error!(self.log, "Unable to determine if block is new."); + false + }) + } + + /// Returns `true` if the given slot is finalized in our chain. fn slot_is_finalized(&self, slot: Slot) -> bool { slot <= self .chain diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index f437240dc..e25da37e7 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder { }) .collect(); - let genesis_time = 1553977336; // arbitrary + let genesis_time = 1554013000; // arbitrary let mut state = BeaconState::genesis( genesis_time,