From 4b5b5851a64a4cd2920bb7f4ba0a10b6a5d1ac0b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 23 Mar 2019 13:23:44 +1100 Subject: [PATCH] Allow sync to to request block bodies. --- beacon_node/beacon_chain/src/beacon_chain.rs | 30 ++- beacon_node/eth2-libp2p/src/rpc/methods.rs | 2 +- beacon_node/network/Cargo.toml | 1 + beacon_node/network/src/beacon_chain.rs | 16 +- beacon_node/network/src/message_handler.rs | 17 +- beacon_node/network/src/sync/simple_sync.rs | 200 +++++++++++++++++-- beacon_node/network/tests/tests.rs | 72 ++++++- 7 files changed, 305 insertions(+), 33 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index dccd9842e..33198f0a3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -142,8 +142,14 @@ where let mut slot = start_slot + count - 1; loop { - // Return if the slot required is greater than the current state. - if slot >= state.slot { + // If the highest slot requested is that of the current state insert the root of the + // head block, unless the head block's slot is not matching. + if slot == state.slot && self.head().beacon_block.slot == slot { + roots.push(self.head().beacon_block_root); + + slot -= 1; + continue; + } else if slot >= state.slot { return Err(BeaconStateError::SlotOutOfBounds); } @@ -180,12 +186,25 @@ where } if (slot == start_slot) && (roots.len() == count.as_usize()) { - Ok(roots) + // Reverse the ordering of the roots. We extracted them in reverse order to make it + // simpler to lookup historic states. + // + // This is a potential optimisation target. + Ok(roots.iter().rev().cloned().collect()) } else { Err(BeaconStateError::SlotOutOfBounds) } } + /// Returns the block at the given root, if any. + /// + /// ## Errors + /// + /// May return a database error. + pub fn get_block(&self, block_root: &Hash256) -> Result, Error> { + Ok(self.block_store.get_deserialized(block_root)?) + } + /// Update the canonical head to some new values. pub fn update_canonical_head( &self, @@ -622,6 +641,11 @@ where } } + /// Returns `true` if the given block root has not been processed. + pub fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result { + Ok(!self.block_store.exists(beacon_block_root)?) + } + /// Accept some block and attempt to add it to block DAG. /// /// Will accept blocks from prior slots, however it will reject any block from a future slot. diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 381fc8b01..f6a5f2829 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -162,7 +162,7 @@ pub struct BeaconBlockHeadersResponse { #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesRequest { /// The list of beacon block bodies being requested. - pub block_roots: Hash256, + pub block_roots: Vec, } /// Response containing the list of requested beacon block bodies. diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index a53097159..c6411a020 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -14,6 +14,7 @@ eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } slog = "2.4.1" +ssz = { path = "../../eth2/utils/ssz" } futures = "0.1.25" error-chain = "0.12.0" crossbeam-channel = "0.3.8" diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index ba429e688..e2829cfa6 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -8,7 +8,9 @@ use beacon_chain::{ CheckPoint, }; use eth2_libp2p::HelloMessage; -use types::{BeaconStateError, Epoch, Hash256, Slot}; +use types::{BeaconBlock, BeaconStateError, Epoch, Hash256, Slot}; + +pub use beacon_chain::BeaconChainError; /// The network's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -20,6 +22,8 @@ pub trait BeaconChain: Send + Sync { fn head(&self) -> RwLockReadGuard; + fn get_block(&self, block_root: &Hash256) -> Result, BeaconChainError>; + fn best_slot(&self) -> Slot; fn best_block_root(&self) -> Hash256; @@ -35,6 +39,8 @@ pub trait BeaconChain: Send + Sync { start_slot: Slot, count: Slot, ) -> Result, BeaconStateError>; + + fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result; } impl BeaconChain for RawBeaconChain @@ -59,6 +65,10 @@ where self.head() } + fn get_block(&self, block_root: &Hash256) -> Result, BeaconChainError> { + self.get_block(block_root) + } + fn finalized_epoch(&self) -> Epoch { self.get_state().finalized_epoch } @@ -95,4 +105,8 @@ where ) -> Result, BeaconStateError> { self.get_block_roots(start_slot, count) } + + fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result { + self.is_new_block_root(beacon_block_root) + } } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 99a263ed8..1a790eee1 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -142,7 +142,7 @@ impl MessageHandler { RPCResponse::BeaconBlockRoots(response) => { debug!( self.log, - "BeaconBlockRoots response received from peer: {:?}", peer_id + "BeaconBlockRoots response received"; "peer" => format!("{:?}", peer_id) ); self.sync.on_beacon_block_roots_response( peer_id, @@ -150,6 +150,17 @@ impl MessageHandler { &mut self.network_context, ) } + RPCResponse::BeaconBlockHeaders(response) => { + debug!( + self.log, + "BeaconBlockHeaders response received"; "peer" => format!("{:?}", peer_id) + ); + self.sync.on_beacon_block_headers_response( + peer_id, + response, + &mut self.network_context, + ) + } // TODO: Handle all responses _ => panic!("Unknown response: {:?}", response), } @@ -233,10 +244,6 @@ impl NetworkContext { }; // register RPC request self.requests.insert((peer_id.clone(), id), Instant::now()); - debug!( - self.log, - "Hello request registered with peer: {:?}", peer_id - ); id } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index ab29d0db5..ee0646dbb 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -3,10 +3,12 @@ use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse}; use eth2_libp2p::PeerId; -use slog::{debug, o}; +use slog::{debug, error, o, warn}; +use ssz::TreeHash; use std::collections::HashMap; use std::sync::Arc; -use types::{Epoch, Hash256, Slot}; +use std::time::Instant; +use types::{BeaconBlockHeader, Epoch, Hash256, Slot}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -78,7 +80,7 @@ impl From<&Arc> for PeerSyncInfo { pub enum SyncState { Idle, Downloading, - Stopped, + _Stopped, } /// Simple Syncing protocol. @@ -89,6 +91,8 @@ pub struct SimpleSync { chain: Arc, /// A mapping of Peers to their respective PeerSyncInfo. known_peers: HashMap, + /// A queue to allow importing of blocks + import_queue: ImportQueue, /// The current state of the syncing protocol. state: SyncState, /// Sync logger. @@ -97,11 +101,12 @@ pub struct SimpleSync { impl SimpleSync { pub fn new(beacon_chain: Arc, log: &slog::Logger) -> Self { - let state = beacon_chain.get_state(); let sync_logger = log.new(o!("Service"=> "Sync")); + let import_queue = ImportQueue::new(beacon_chain.clone(), log.clone()); SimpleSync { chain: beacon_chain.clone(), known_peers: HashMap::new(), + import_queue, state: SyncState::Idle, log: sync_logger, } @@ -149,15 +154,24 @@ impl SimpleSync { .start_slot(spec.slots_per_epoch); let required_slots = start_slot - local.best_slot; - self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); } PeerStatus::HigherBestSlot => { let required_slots = remote.best_slot - local.best_slot; self.request_block_roots( peer_id, - local.best_slot, - required_slots.as_u64(), + BeaconBlockRootsRequest { + start_slot: local.best_slot + 1, + count: required_slots.into(), + }, network, ); } @@ -168,32 +182,109 @@ impl SimpleSync { pub fn on_beacon_block_roots_response( &mut self, peer_id: PeerId, - reponse: BeaconBlockRootsResponse, + response: BeaconBlockRootsResponse, network: &mut NetworkContext, ) { + if response.roots.is_empty() { + warn!( + self.log, + "Peer returned empty block roots response. PeerId: {:?}", peer_id + ); + return; + } + + let new_root_index = self.import_queue.first_new_root(&response.roots); + + // If a new block root is found, request it and all the headers following it. // + // We make an assumption here that if we don't know a block then we don't know of all + // it's parents. This might not be the case if syncing becomes more sophisticated. + if let Some(i) = new_root_index { + let new = &response.roots[i]; + + self.request_block_headers( + peer_id, + BeaconBlockHeadersRequest { + start_root: new.block_root, + start_slot: new.slot, + max_headers: (response.roots.len() - i) as u64, + skip_slots: 0, + }, + network, + ) + } + } + + pub fn on_beacon_block_headers_response( + &mut self, + peer_id: PeerId, + response: BeaconBlockHeadersResponse, + network: &mut NetworkContext, + ) { + if response.headers.is_empty() { + warn!( + self.log, + "Peer returned empty block headers response. PeerId: {:?}", peer_id + ); + return; + } + + let block_roots = self.import_queue.enqueue_headers(response.headers); + + if !block_roots.is_empty() { + self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); + } } fn request_block_roots( &mut self, peer_id: PeerId, - start_slot: Slot, - count: u64, + request: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { // Potentially set state to sync. - if self.state == SyncState::Idle && count > SLOT_IMPORT_TOLERANCE { + if self.state == SyncState::Idle && request.count > SLOT_IMPORT_TOLERANCE { debug!(self.log, "Entering downloading sync state."); self.state = SyncState::Downloading; } - debug!(self.log, "Requesting {} blocks from {:?}.", count, &peer_id); + debug!( + self.log, + "Requesting {} block roots from {:?}.", request.count, &peer_id + ); // TODO: handle count > max count. - network.send_rpc_request( - peer_id.clone(), - RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest { start_slot, count }), + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(request)); + } + + fn request_block_headers( + &mut self, + peer_id: PeerId, + request: BeaconBlockHeadersRequest, + network: &mut NetworkContext, + ) { + debug!( + self.log, + "Requesting {} headers from {:?}.", request.max_headers, &peer_id ); + + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(request)); + } + + fn request_block_bodies( + &mut self, + peer_id: PeerId, + request: BeaconBlockBodiesRequest, + network: &mut NetworkContext, + ) { + debug!( + self.log, + "Requesting {} bodies from {:?}.", + request.block_roots.len(), + &peer_id + ); + + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(request)); } /// Generates our current state in the form of a HELLO RPC message. @@ -201,3 +292,82 @@ impl SimpleSync { self.chain.hello_message() } } + +pub struct ImportQueue { + /// BeaconChain + pub chain: Arc, + /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. + pub partials: HashMap, + /// Logging + log: slog::Logger, +} + +impl ImportQueue { + pub fn new(chain: Arc, log: slog::Logger) -> Self { + Self { + chain, + partials: HashMap::new(), + log, + } + } + + fn is_new_block(&self, block_root: &Hash256) -> bool { + self.chain + .is_new_block_root(&block_root) + .unwrap_or_else(|_| { + error!(self.log, "Unable to determine if block is new."); + true + }) + } + + /// Returns the index of the first new root in the list of block roots. + pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option { + for root in roots { + println!("root {}", root.block_root); + } + roots + .iter() + .position(|brs| self.is_new_block(&brs.block_root)) + } + + /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for + /// which we should use to request `BeaconBlockBodies`. + /// + /// If a `header` is not in the queue and has not been processed by the chain it is added to + /// the queue and it's block root is included in the output. + /// + /// If a `header` is already in the queue, but not yet processed by the chain the block root is + /// included in the output and the `inserted` time for the partial record is set to + /// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale. + pub fn enqueue_headers(&mut self, headers: Vec) -> Vec { + let mut required_bodies: Vec = vec![]; + + for header in headers { + let block_root = Hash256::from_slice(&header.hash_tree_root()[..]); + + if self.is_new_block(&block_root) { + self.insert_partial(block_root, header); + required_bodies.push(block_root) + } + } + + required_bodies + } + + fn insert_partial(&mut self, block_root: Hash256, header: BeaconBlockHeader) { + self.partials.insert( + header.block_body_root, + PartialBeaconBlock { + block_root, + header, + inserted: Instant::now(), + }, + ); + } +} + +pub struct PartialBeaconBlock { + pub block_root: Hash256, + pub header: BeaconBlockHeader, + pub inserted: Instant, +} diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index fbfb827f2..076a3f529 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -67,7 +67,16 @@ impl SyncNode { let request = self.recv_rpc_request().expect("No block root request"); match request { - RPCRequest::BeaconBlockRoots(response) => response, + RPCRequest::BeaconBlockRoots(request) => request, + _ => panic!("Did not get block root request"), + } + } + + pub fn get_block_headers_request(&self) -> BeaconBlockHeadersRequest { + let request = self.recv_rpc_request().expect("No block headers request"); + + match request { + RPCRequest::BeaconBlockHeaders(request) => request, _ => panic!("Did not get block root request"), } } @@ -164,9 +173,7 @@ impl SyncMaster { .harness .beacon_chain .get_block_roots(request.start_slot, Slot::from(request.count)) - .expect("Beacon chain did not give blocks"); - - let roots = roots + .expect("Beacon chain did not give block roots") .iter() .enumerate() .map(|(i, root)| BlockRootSlot { @@ -179,6 +186,43 @@ impl SyncMaster { self.send_rpc_response(node, response) } + pub fn respond_to_block_headers_request( + &mut self, + node: &SyncNode, + request: BeaconBlockHeadersRequest, + ) { + let roots = self + .harness + .beacon_chain + .get_block_roots(request.start_slot, Slot::from(request.max_headers)) + .expect("Beacon chain did not give blocks"); + + if roots.is_empty() { + panic!("Roots was empty when trying to get headers.") + } + + assert_eq!( + roots[0], request.start_root, + "Got the wrong start root when getting headers" + ); + + let headers: Vec = roots + .iter() + .map(|root| { + let block = self + .harness + .beacon_chain + .get_block(root) + .expect("Failed to load block") + .expect("Block did not exist"); + block.block_header() + }) + .collect(); + + let response = RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }); + self.send_rpc_response(node, response) + } + fn send_rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) { node.send(self.rpc_response(node, rpc_response)); } @@ -228,6 +272,11 @@ pub fn build_blocks(blocks: usize, master: &mut SyncMaster, nodes: &mut Vec