From 96ba1c8f77064a24c3a8ab1b03954d80d333c21b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 22 Mar 2019 14:20:49 +1100 Subject: [PATCH] Implement `get_block_roots` for syncing --- beacon_node/beacon_chain/src/beacon_chain.rs | 64 ++++++++++++++++++++ beacon_node/network/src/beacon_chain.rs | 16 ++++- beacon_node/network/src/message_handler.rs | 13 +++- beacon_node/network/src/sync/simple_sync.rs | 12 ++++ beacon_node/network/tests/tests.rs | 61 +++++++++++++------ 5 files changed, 146 insertions(+), 20 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4b151d70b..dccd9842e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -122,6 +122,70 @@ where }) } + /// Returns beacon block roots for `count` slots, starting from `start_slot`. + /// + /// ## Errors: + /// + /// - `SlotOutOfBounds`: Unable to return the full specified range. + /// - `SlotOutOfBounds`: Unable to load a state from the DB. + /// - `SlotOutOfBounds`: Start slot is higher than the first slot. + /// - Other: BeaconState` is inconsistent. + pub fn get_block_roots( + &self, + start_slot: Slot, + count: Slot, + ) -> Result, BeaconStateError> { + let spec = &self.spec; + + let mut roots: Vec = vec![]; + let mut state = self.state.read().clone(); + let mut slot = start_slot + count - 1; + + loop { + // Return if the slot required is greater than the current state. + if slot >= state.slot { + return Err(BeaconStateError::SlotOutOfBounds); + } + + // If the slot is within the range of the current state's block roots, append the root + // to the output vec. + // + // If we get `SlotOutOfBounds` error, load the oldest known state to the present state + // from the DB. + match state.get_block_root(slot, spec) { + Ok(root) => { + roots.push(*root); + + if slot == start_slot { + break; + } else { + slot -= 1; + } + } + Err(BeaconStateError::SlotOutOfBounds) => { + // Read the earliest historic state in the current slot. + let earliest_historic_slot = + state.slot - Slot::from(spec.slots_per_historical_root); + // Load the earlier state from disk. + let new_state_root = state.get_state_root(earliest_historic_slot, spec)?; + + // Break if the DB is unable to load the state. + state = match self.state_store.get_deserialized(&new_state_root) { + Ok(Some(state)) => state, + _ => break, + } + } + Err(e) => return Err(e), + }; + } + + if (slot == start_slot) && (roots.len() == count.as_usize()) { + Ok(roots) + } else { + Err(BeaconStateError::SlotOutOfBounds) + } + } + /// Update the canonical head to some new values. pub fn update_canonical_head( &self, diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 5246c87c2..ba429e688 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -8,7 +8,7 @@ use beacon_chain::{ CheckPoint, }; use eth2_libp2p::HelloMessage; -use types::{Epoch, Hash256, Slot}; +use types::{BeaconStateError, Epoch, Hash256, Slot}; /// The network's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -29,6 +29,12 @@ pub trait BeaconChain: Send + Sync { fn finalized_epoch(&self) -> Epoch; fn hello_message(&self) -> HelloMessage; + + fn get_block_roots( + &self, + start_slot: Slot, + count: Slot, + ) -> Result, BeaconStateError>; } impl BeaconChain for RawBeaconChain @@ -81,4 +87,12 @@ where best_slot: self.best_slot(), } } + + fn get_block_roots( + &self, + start_slot: Slot, + count: Slot, + ) -> Result, BeaconStateError> { + self.get_block_roots(start_slot, count) + } } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 5b39de997..99a263ed8 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -139,8 +139,19 @@ impl MessageHandler { self.sync .on_hello(peer_id, hello_message, &mut self.network_context); } + RPCResponse::BeaconBlockRoots(response) => { + debug!( + self.log, + "BeaconBlockRoots response received from peer: {:?}", peer_id + ); + self.sync.on_beacon_block_roots_response( + peer_id, + response, + &mut self.network_context, + ) + } // TODO: Handle all responses - _ => {} + _ => panic!("Unknown response: {:?}", response), } } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 0b2f736c1..ab29d0db5 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -165,6 +165,15 @@ impl SimpleSync { } } + pub fn on_beacon_block_roots_response( + &mut self, + peer_id: PeerId, + reponse: BeaconBlockRootsResponse, + network: &mut NetworkContext, + ) { + // + } + fn request_block_roots( &mut self, peer_id: PeerId, @@ -174,9 +183,12 @@ impl SimpleSync { ) { // Potentially set state to sync. if self.state == SyncState::Idle && count > SLOT_IMPORT_TOLERANCE { + debug!(self.log, "Entering downloading sync state."); self.state = SyncState::Downloading; } + debug!(self.log, "Requesting {} blocks from {:?}.", count, &peer_id); + // TODO: handle count > max count. network.send_rpc_request( peer_id.clone(), diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index dea57982e..fbfb827f2 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -63,6 +63,15 @@ impl SyncNode { self.harness.beacon_chain.hello_message() } + pub fn get_block_root_request(&self) -> BeaconBlockRootsRequest { + let request = self.recv_rpc_request().expect("No block root request"); + + match request { + RPCRequest::BeaconBlockRoots(response) => response, + _ => panic!("Did not get block root request"), + } + } + fn _recv_rpc_response(&self) -> Result { let network_message = self.recv()?; Ok(match network_message { @@ -146,6 +155,34 @@ impl SyncMaster { } } + pub fn respond_to_block_roots_request( + &mut self, + node: &SyncNode, + request: BeaconBlockRootsRequest, + ) { + let roots = self + .harness + .beacon_chain + .get_block_roots(request.start_slot, Slot::from(request.count)) + .expect("Beacon chain did not give blocks"); + + let roots = roots + .iter() + .enumerate() + .map(|(i, root)| BlockRootSlot { + block_root: *root, + slot: Slot::from(i) + request.start_slot, + }) + .collect(); + + let response = RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }); + self.send_rpc_response(node, response) + } + + fn send_rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) { + node.send(self.rpc_response(node, rpc_response)); + } + fn rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) -> HandlerMessage { HandlerMessage::RPC( self.peer_id.clone(), @@ -158,17 +195,6 @@ impl SyncMaster { } } -fn assert_sent_block_root_request(node: &SyncNode, expected: BeaconBlockRootsRequest) { - let request = node.recv_rpc_request().expect("No block root request"); - - match request { - RPCRequest::BeaconBlockRoots(response) => { - assert_eq!(expected, response, "Bad block roots response"); - } - _ => assert!(false, "Did not get block root request"), - } -} - fn test_setup( state_builder: TestingBeaconStateBuilder, node_count: usize, @@ -223,13 +249,12 @@ fn first_test() { master.do_hello_with(&nodes[0]); - assert_sent_block_root_request( - &nodes[0], - BeaconBlockRootsRequest { - start_slot: original_node_slot, - count: 2, - }, - ); + let request = nodes[0].get_block_root_request(); + assert_eq!(request.start_slot, original_node_slot); + assert_eq!(request.count, 2); + master.respond_to_block_roots_request(&nodes[0], request); + + std::thread::sleep(Duration::from_millis(500)); runtime.shutdown_now(); }