From 051355925299862ea2298c6f5d24fdf1dfc6fbd0 Mon Sep 17 00:00:00 2001 From: Kirk Baird Date: Tue, 16 Jul 2019 17:28:15 +1000 Subject: [PATCH] =?UTF-8?q?Fix=20syncing=20bugs=20by=20recursively=20attem?= =?UTF-8?q?pting=20to=20process=20parents=20in=20the=20=E2=80=A6=20(#429)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix syncing bugs by recursively attempting to process parents in the import queue, change BlockRootsIterator * Swap from crossbeam channel to tokio mpsc * Recursion fix * Remove exess block processing * Fix network lag, correct attestation topic * Correct network poll logic * Overhaul of SimpleSync and modify BlockRootsIterator to return start_slot * Fix bug in tests relating to StateRootsIterator * Remove old, commented-out heartbeat code. * Tidy docs on import queue enum * Change source logging msg in simple sync * Rename function parameter in simple sync * Use `BestBlockRootsIterator` in `reduced_tree` * Update comments for `BestBlockRootsIterator` * Fix duplicate dep in cargo.toml --- beacon_node/Cargo.toml | 2 +- beacon_node/beacon_chain/src/beacon_chain.rs | 15 +- beacon_node/beacon_chain/src/test_utils.rs | 2 +- beacon_node/client/Cargo.toml | 2 +- beacon_node/client/src/notifier.rs | 33 ++- beacon_node/eth2-libp2p/Cargo.toml | 2 +- beacon_node/eth2-libp2p/src/service.rs | 2 - beacon_node/http_server/Cargo.toml | 3 +- beacon_node/http_server/src/lib.rs | 3 +- beacon_node/network/Cargo.toml | 3 +- beacon_node/network/src/message_handler.rs | 33 +-- beacon_node/network/src/service.rs | 105 ++++---- beacon_node/network/src/sync/import_queue.rs | 127 ++++++---- beacon_node/network/src/sync/simple_sync.rs | 237 ++++++++++++------- beacon_node/rpc/Cargo.toml | 3 +- beacon_node/rpc/src/attestation.rs | 9 +- beacon_node/rpc/src/beacon_block.rs | 6 +- beacon_node/rpc/src/lib.rs | 3 +- beacon_node/src/main.rs | 2 +- beacon_node/store/src/iter.rs | 167 ++++++++++++- eth2/lmd_ghost/src/reduced_tree.rs | 8 +- 21 files changed, 515 insertions(+), 252 deletions(-) diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 9e96f8484..5c8786c70 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -13,7 +13,7 @@ client = { path = "client" } version = { path = "version" } clap = "2.32.0" serde = "1.0" -slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } +slog = { version = "^2.2.3" , features = ["max_level_trace"] } slog-term = "^2.4.0" slog-async = "^2.3.0" ctrlc = { version = "3.1.1", features = ["termination"] } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2d8282270..96ebe4b41 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -18,7 +18,7 @@ use state_processing::{ per_slot_processing, BlockProcessingError, }; use std::sync::Arc; -use store::iter::{BlockIterator, BlockRootsIterator, StateRootsIterator}; +use store::iter::{BestBlockRootsIterator, BlockIterator, BlockRootsIterator, StateRootsIterator}; use store::{Error as DBError, Store}; use tree_hash::TreeHash; use types::*; @@ -226,6 +226,19 @@ impl BeaconChain { BlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) } + /// Iterates in reverse (highest to lowest slot) through all block roots from largest + /// `slot <= beacon_state.slot` through to genesis. + /// + /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. + /// + /// Contains duplicate roots when skip slots are encountered. + pub fn rev_iter_best_block_roots( + &self, + slot: Slot, + ) -> BestBlockRootsIterator { + BestBlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) + } + /// Iterates in reverse (highest to lowest slot) through all state roots from `slot` through to /// genesis. /// diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9b3f7c1cb..991d29418 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -191,7 +191,7 @@ where fn get_state_at_slot(&self, state_slot: Slot) -> BeaconState { let state_root = self .chain - .rev_iter_state_roots(self.chain.current_state().slot) + .rev_iter_state_roots(self.chain.current_state().slot - 1) .find(|(_hash, slot)| *slot == state_slot) .map(|(hash, _slot)| hash) .expect("could not find state root"); diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index d3b1e6294..7afaf92ac 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -20,7 +20,7 @@ serde = "1.0.93" serde_derive = "1.0" error-chain = "0.12.0" eth2_ssz = { path = "../../eth2/utils/ssz" } -slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } +slog = { version = "^2.2.3" , features = ["max_level_trace"] } slog-async = "^2.3.0" slog-json = "^2.3" slog-term = "^2.4.0" diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 977342b1a..987f064a4 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -3,39 +3,34 @@ use beacon_chain::BeaconChainTypes; use exit_future::Exit; use futures::{Future, Stream}; use slog::{debug, o}; -use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; -/// Thread that monitors the client and reports useful statistics to the user. +/// The interval between heartbeat events. +pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 5; +/// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS` +/// durations. +/// +/// Presently unused, but remains for future use. pub fn run( client: &Client, executor: TaskExecutor, exit: Exit, ) { // notification heartbeat - let interval = Interval::new(Instant::now(), Duration::from_secs(5)); + let interval = Interval::new( + Instant::now(), + Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS), + ); let _log = client.log.new(o!("Service" => "Notifier")); - // TODO: Debugging only - let counter = Arc::new(Mutex::new(0)); - let network = client.network.clone(); - - // build heartbeat logic here - let heartbeat = move |_| { - //debug!(log, "Temp heartbeat output"); - //TODO: Remove this logic. Testing only - let mut count = counter.lock().unwrap(); - *count += 1; - - if *count % 5 == 0 { - // debug!(log, "Sending Message"); - network.send_message(); - } - + let heartbeat = |_| { + // There is not presently any heartbeat logic. + // + // We leave this function empty for future use. Ok(()) }; diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 1fbd30872..2fbedf780 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -15,7 +15,7 @@ serde = "1.0" serde_derive = "1.0" eth2_ssz = { path = "../../eth2/utils/ssz" } eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" } -slog = { version = "^2.4.1" , features = ["max_level_trace", "release_max_level_trace"] } +slog = { version = "^2.4.1" , features = ["max_level_trace"] } version = { path = "../version" } tokio = "0.1.16" futures = "0.1.25" diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 69f8a1ca5..2eecfac97 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -109,8 +109,6 @@ impl Stream for Service { fn poll(&mut self) -> Poll, Self::Error> { loop { - // TODO: Currently only gossipsub events passed here. - // Build a type for more generic events match self.swarm.poll() { //Behaviour events Ok(Async::Ready(Some(event))) => match event { diff --git a/beacon_node/http_server/Cargo.toml b/beacon_node/http_server/Cargo.toml index 9a7a4b0a5..3e428357d 100644 --- a/beacon_node/http_server/Cargo.toml +++ b/beacon_node/http_server/Cargo.toml @@ -27,9 +27,8 @@ futures = "0.1.23" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -slog = "^2.2.3" +slog = { version = "^2.2.3" , features = ["max_level_trace"] } slog-term = "^2.4.0" slog-async = "^2.3.0" tokio = "0.1.17" exit-future = "0.1.4" -crossbeam-channel = "0.3.8" diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index ab1176d61..f1d006a5b 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -14,6 +14,7 @@ use slog::{info, o, warn}; use std::path::PathBuf; use std::sync::Arc; use tokio::runtime::TaskExecutor; +use tokio::sync::mpsc; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct HttpServerConfig { @@ -75,7 +76,7 @@ pub fn create_iron_http_server( pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, - _network_chan: crossbeam_channel::Sender, + _network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, db_path: PathBuf, metrics_registry: Registry, diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 23fbdd7d9..1499ac580 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,10 +13,9 @@ store = { path = "../store" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -slog = { version = "^2.2.3" } +slog = { version = "^2.2.3" , features = ["max_level_trace"] } eth2_ssz = { path = "../../eth2/utils/ssz" } tree_hash = { path = "../../eth2/utils/tree_hash" } futures = "0.1.25" error-chain = "0.12.0" -crossbeam-channel = "0.3.8" tokio = "0.1.16" diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 40a396c3b..40538798a 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -2,17 +2,18 @@ use crate::error; use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use crossbeam_channel::{unbounded as channel, Sender}; use eth2_libp2p::{ behaviour::PubsubMessage, rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId}, PeerId, RPCEvent, }; -use futures::future; +use futures::future::Future; +use futures::stream::Stream; use slog::{debug, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +use tokio::sync::mpsc; /// Timeout for RPC requests. // const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); @@ -48,13 +49,13 @@ impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( beacon_chain: Arc>, - network_send: crossbeam_channel::Sender, + network_send: mpsc::UnboundedSender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, - ) -> error::Result> { + ) -> error::Result> { debug!(log, "Service starting"); - let (handler_send, handler_recv) = channel(); + let (handler_send, handler_recv) = mpsc::unbounded_channel(); // Initialise sync and begin processing in thread // generate the Message handler @@ -69,13 +70,13 @@ impl MessageHandler { // spawn handler task // TODO: Handle manual termination of thread - executor.spawn(future::poll_fn(move || -> Result<_, _> { - loop { - handler.handle_message(handler_recv.recv().map_err(|_| { + executor.spawn( + handler_recv + .for_each(move |msg| Ok(handler.handle_message(msg))) + .map_err(move |_| { debug!(log, "Network message handler terminated."); - })?); - } - })); + }), + ); Ok(handler_send) } @@ -222,7 +223,7 @@ impl MessageHandler { pub struct NetworkContext { /// The network channel to relay messages to the Network service. - network_send: crossbeam_channel::Sender, + network_send: mpsc::UnboundedSender, /// A mapping of peers and the RPC id we have sent an RPC request to. outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>, /// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`. @@ -232,7 +233,7 @@ pub struct NetworkContext { } impl NetworkContext { - pub fn new(network_send: crossbeam_channel::Sender, log: slog::Logger) -> Self { + pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { Self { network_send, outstanding_outgoing_request_ids: HashMap::new(), @@ -278,13 +279,13 @@ impl NetworkContext { ); } - fn send_rpc_event(&self, peer_id: PeerId, rpc_event: RPCEvent) { + fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.send(peer_id, OutgoingMessage::RPC(rpc_event)) } - fn send(&self, peer_id: PeerId, outgoing_message: OutgoingMessage) { + fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) { self.network_send - .send(NetworkMessage::Send(peer_id, outgoing_message)) + .try_send(NetworkMessage::Send(peer_id, outgoing_message)) .unwrap_or_else(|_| { warn!( self.log, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index b2ecc1a0b..a2265bb8e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -2,24 +2,23 @@ use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::Topic; use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; -use futures::sync::oneshot; use futures::Stream; use slog::{debug, info, o, trace}; use std::marker::PhantomData; use std::sync::Arc; use tokio::runtime::TaskExecutor; +use tokio::sync::{mpsc, oneshot}; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { //libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, - network_send: crossbeam_channel::Sender, + network_send: mpsc::UnboundedSender, _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } @@ -30,9 +29,9 @@ impl Service { config: &NetworkConfig, executor: &TaskExecutor, log: slog::Logger, - ) -> error::Result<(Arc, Sender)> { + ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel - let (network_send, network_recv) = channel::(); + let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_send = MessageHandler::spawn( @@ -64,9 +63,9 @@ impl Service { } // TODO: Testing only - pub fn send_message(&self) { + pub fn send_message(&mut self) { self.network_send - .send(NetworkMessage::Send( + .try_send(NetworkMessage::Send( PeerId::random(), OutgoingMessage::NotifierTest, )) @@ -76,12 +75,12 @@ impl Service { fn spawn_service( libp2p_service: LibP2PService, - network_recv: crossbeam_channel::Receiver, - message_handler_send: crossbeam_channel::Sender, + network_recv: mpsc::UnboundedReceiver, + message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, log: slog::Logger, -) -> error::Result> { - let (network_exit, exit_rx) = oneshot::channel(); +) -> error::Result> { + let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); // spawn on the current executor executor.spawn( @@ -105,25 +104,61 @@ fn spawn_service( //TODO: Potentially handle channel errors fn network_service( mut libp2p_service: LibP2PService, - network_recv: crossbeam_channel::Receiver, - message_handler_send: crossbeam_channel::Sender, + mut network_recv: mpsc::UnboundedReceiver, + mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { - // poll the swarm - loop { + // only end the loop once both major polls are not ready. + let mut not_ready_count = 0; + while not_ready_count < 2 { + not_ready_count = 0; + // poll the network channel + match network_recv.poll() { + Ok(Async::Ready(Some(message))) => { + match message { + // TODO: Testing message - remove + NetworkMessage::Send(peer_id, outgoing_message) => { + match outgoing_message { + OutgoingMessage::RPC(rpc_event) => { + trace!(log, "Sending RPC Event: {:?}", rpc_event); + //TODO: Make swarm private + //TODO: Implement correct peer id topic message handling + libp2p_service.swarm.send_rpc(peer_id, rpc_event); + } + OutgoingMessage::NotifierTest => { + // debug!(log, "Received message from notifier"); + } + }; + } + NetworkMessage::Publish { topics, message } => { + debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); + libp2p_service.swarm.publish(topics, *message); + } + } + } + Ok(Async::NotReady) => not_ready_count += 1, + Ok(Async::Ready(None)) => { + return Err(eth2_libp2p::error::Error::from("Network channel closed")); + } + Err(_) => { + return Err(eth2_libp2p::error::Error::from("Network channel error")); + } + } + + // poll the swarm match libp2p_service.poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); message_handler_send - .send(HandlerMessage::RPC(peer_id, rpc_event)) + .try_send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "failed to send rpc to handler")?; } Libp2pEvent::PeerDialed(peer_id) => { debug!(log, "Peer Dialed: {:?}", peer_id); message_handler_send - .send(HandlerMessage::PeerDialed(peer_id)) + .try_send(HandlerMessage::PeerDialed(peer_id)) .map_err(|_| "failed to send rpc to handler")?; } Libp2pEvent::PubsubMessage { @@ -132,43 +167,13 @@ fn network_service( //TODO: Decide if we need to propagate the topic upwards. (Potentially for //attestations) message_handler_send - .send(HandlerMessage::PubsubMessage(source, message)) + .try_send(HandlerMessage::PubsubMessage(source, message)) .map_err(|_| " failed to send pubsub message to handler")?; } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), - Ok(Async::NotReady) => break, - Err(_) => break, - } - } - // poll the network channel - // TODO: refactor - combine poll_fn's? - loop { - match network_recv.try_recv() { - // TODO: Testing message - remove - Ok(NetworkMessage::Send(peer_id, outgoing_message)) => { - match outgoing_message { - OutgoingMessage::RPC(rpc_event) => { - trace!(log, "Sending RPC Event: {:?}", rpc_event); - //TODO: Make swarm private - //TODO: Implement correct peer id topic message handling - libp2p_service.swarm.send_rpc(peer_id, rpc_event); - } - OutgoingMessage::NotifierTest => { - // debug!(log, "Received message from notifier"); - } - }; - } - Ok(NetworkMessage::Publish { topics, message }) => { - debug!(log, "Sending pubsub message on topics {:?}", topics); - libp2p_service.swarm.publish(topics, *message); - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - return Err(eth2_libp2p::error::Error::from( - "Network channel disconnected", - )); - } + Ok(Async::NotReady) => not_ready_count += 1, + Err(_) => not_ready_count += 1, } } Ok(Async::NotReady) diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 8cc3dd65d..fe640aaa0 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -41,31 +41,23 @@ impl ImportQueue { } } - /// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing - /// slot number. Does not delete the partials from the queue, this must be done manually. - /// - /// Returns `(queue_index, block, sender)`: - /// - /// - `block_root`: may be used to remove the entry if it is successfully processed. - /// - `block`: the completed block. - /// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial. - pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> { - let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self - .partials - .iter() - .filter_map(|(_, partial)| partial.clone().complete()) - .collect(); - - // Sort the completable partials to be in ascending slot order. - complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap()); - - complete - } - + /// Returns true of the if the `BlockRoot` is found in the `import_queue`. pub fn contains_block_root(&self, block_root: Hash256) -> bool { self.partials.contains_key(&block_root) } + /// Attempts to complete the `BlockRoot` if it is found in the `import_queue`. + /// + /// Returns an Enum with a `PartialBeaconBlockCompletion`. + /// Does not remove the `block_root` from the `import_queue`. + pub fn attempt_complete_block(&self, block_root: Hash256) -> PartialBeaconBlockCompletion { + if let Some(partial) = self.partials.get(&block_root) { + partial.attempt_complete() + } else { + PartialBeaconBlockCompletion::MissingRoot + } + } + /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial /// if it exists. pub fn remove(&mut self, block_root: Hash256) -> Option { @@ -102,6 +94,8 @@ impl ImportQueue { block_roots: &[BlockRootSlot], sender: PeerId, ) -> Vec { + // TODO: This will currently not return a `BlockRootSlot` if this root exists but there is no header. + // It would be more robust if it did. let new_block_root_slots: Vec = block_roots .iter() // Ignore any roots already stored in the queue. @@ -135,12 +129,8 @@ impl ImportQueue { /// the queue and it's block root is included in the output. /// /// If a `header` is already in the queue, but not yet processed by the chain the block root is - /// included in the output and the `inserted` time for the partial record is set to + /// not included in the output and the `inserted` time for the partial record is set to /// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale. - /// - /// Presently the queue enforces that a `BeaconBlockHeader` _must_ be received before its - /// `BeaconBlockBody`. This is not a natural requirement and we could enhance the queue to lift - /// this restraint. pub fn enqueue_headers( &mut self, headers: Vec, @@ -152,8 +142,10 @@ impl ImportQueue { let block_root = Hash256::from_slice(&header.canonical_root()[..]); if self.chain_has_not_seen_block(&block_root) { - self.insert_header(block_root, header, sender.clone()); - required_bodies.push(block_root); + if !self.insert_header(block_root, header, sender.clone()) { + // If a body is empty + required_bodies.push(block_root); + } } } @@ -163,10 +155,17 @@ impl ImportQueue { /// If there is a matching `header` for this `body`, adds it to the queue. /// /// If there is no `header` for the `body`, the body is simply discarded. - pub fn enqueue_bodies(&mut self, bodies: Vec, sender: PeerId) { + pub fn enqueue_bodies( + &mut self, + bodies: Vec, + sender: PeerId, + ) -> Option { + let mut last_block_hash = None; for body in bodies { - self.insert_body(body, sender.clone()); + last_block_hash = self.insert_body(body, sender.clone()); } + + last_block_hash } pub fn enqueue_full_blocks(&mut self, blocks: Vec, sender: PeerId) { @@ -179,12 +178,22 @@ impl ImportQueue { /// /// If the header already exists, the `inserted` time is set to `now` and not other /// modifications are made. - fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { + /// Returns true is `body` exists. + fn insert_header( + &mut self, + block_root: Hash256, + header: BeaconBlockHeader, + sender: PeerId, + ) -> bool { + let mut exists = false; self.partials .entry(block_root) .and_modify(|partial| { partial.header = Some(header.clone()); partial.inserted = Instant::now(); + if partial.body.is_some() { + exists = true; + } }) .or_insert_with(|| PartialBeaconBlock { slot: header.slot, @@ -194,28 +203,30 @@ impl ImportQueue { inserted: Instant::now(), sender, }); + exists } /// Updates an existing partial with the `body`. /// - /// If there is no header for the `body`, the body is simply discarded. - /// /// If the body already existed, the `inserted` time is set to `now`. - fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { + /// + /// Returns the block hash of the inserted body + fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) -> Option { let body_root = Hash256::from_slice(&body.tree_hash_root()[..]); + let mut last_root = None; - self.partials.iter_mut().for_each(|(_, mut p)| { + self.partials.iter_mut().for_each(|(root, mut p)| { if let Some(header) = &mut p.header { if body_root == header.block_body_root { p.inserted = Instant::now(); - - if p.body.is_none() { - p.body = Some(body.clone()); - p.sender = sender.clone(); - } + p.body = Some(body.clone()); + p.sender = sender.clone(); + last_root = Some(*root); } } }); + + last_root } /// Updates an existing `partial` with the completed block, or adds a new (complete) partial. @@ -257,13 +268,33 @@ pub struct PartialBeaconBlock { } impl PartialBeaconBlock { - /// Consumes `self` and returns a full built `BeaconBlock`, it's root and the `sender` - /// `PeerId`, if enough information exists to complete the block. Otherwise, returns `None`. - pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { - Some(( - self.block_root, - self.header?.into_block(self.body?), - self.sender, - )) + /// Attempts to build a block. + /// + /// Does not comsume the `PartialBeaconBlock`. + pub fn attempt_complete(&self) -> PartialBeaconBlockCompletion { + if self.header.is_none() { + PartialBeaconBlockCompletion::MissingHeader(self.slot) + } else if self.body.is_none() { + PartialBeaconBlockCompletion::MissingBody + } else { + PartialBeaconBlockCompletion::Complete( + self.header + .clone() + .unwrap() + .into_block(self.body.clone().unwrap()), + ) + } } } + +/// The result of trying to convert a `BeaconBlock` into a `PartialBeaconBlock`. +pub enum PartialBeaconBlockCompletion { + /// The partial contains a valid BeaconBlock. + Complete(BeaconBlock), + /// The partial does not exist. + MissingRoot, + /// The partial contains a `BeaconBlockRoot` but no `BeaconBlockHeader`. + MissingHeader(Slot), + /// The partial contains a `BeaconBlockRoot` and `BeaconBlockHeader` but no `BeaconBlockBody`. + MissingBody, +} diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 5899e5aea..5ce921057 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,4 +1,4 @@ -use super::import_queue::ImportQueue; +use super::import_queue::{ImportQueue, PartialBeaconBlockCompletion}; use crate::message_handler::NetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; @@ -17,7 +17,7 @@ use types::{ const SLOT_IMPORT_TOLERANCE: u64 = 100; /// The amount of seconds a block (or partial block) may exist in the import queue. -const QUEUE_STALE_SECS: u64 = 6; +const QUEUE_STALE_SECS: u64 = 100; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. @@ -227,7 +227,12 @@ impl SimpleSync { // // Therefore, there are some blocks between the local finalized epoch and the remote // head that are worth downloading. - debug!(self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id)); + debug!( + self.log, "UsefulPeer"; + "peer" => format!("{:?}", peer_id), + "local_finalized_epoch" => local.latest_finalized_epoch, + "remote_latest_finalized_epoch" => remote.latest_finalized_epoch, + ); let start_slot = local .latest_finalized_epoch @@ -238,7 +243,7 @@ impl SimpleSync { peer_id, BeaconBlockRootsRequest { start_slot, - count: required_slots.into(), + count: required_slots.as_u64(), }, network, ); @@ -247,7 +252,7 @@ impl SimpleSync { fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain - .rev_iter_block_roots(target_slot) + .rev_iter_best_block_roots(target_slot) .take(1) .find(|(_root, slot)| *slot == target_slot) .map(|(root, _slot)| root) @@ -271,8 +276,7 @@ impl SimpleSync { let mut roots: Vec = self .chain - .rev_iter_block_roots(req.start_slot + req.count) - .skip(1) + .rev_iter_best_block_roots(req.start_slot + req.count) .take(req.count as usize) .map(|(block_root, slot)| BlockRootSlot { slot, block_root }) .collect(); @@ -356,7 +360,7 @@ impl SimpleSync { BeaconBlockHeadersRequest { start_root: first.block_root, start_slot: first.slot, - max_headers: (last.slot - first.slot).as_u64(), + max_headers: (last.slot - first.slot + 1).as_u64(), skip_slots: 0, }, network, @@ -386,7 +390,7 @@ impl SimpleSync { // unnecessary block deserialization when `req.skip_slots > 0`. let mut roots: Vec = self .chain - .rev_iter_block_roots(req.start_slot + (count - 1)) + .rev_iter_best_block_roots(req.start_slot + count) .take(count as usize) .map(|(root, _slot)| root) .collect(); @@ -499,14 +503,26 @@ impl SimpleSync { "count" => res.block_bodies.len(), ); - self.import_queue - .enqueue_bodies(res.block_bodies, peer_id.clone()); + if !res.block_bodies.is_empty() { + // Import all blocks to queue + let last_root = self + .import_queue + .enqueue_bodies(res.block_bodies, peer_id.clone()); + + // Attempt to process all recieved bodies by recursively processing the latest block + if let Some(root) = last_root { + match self.attempt_process_partial_block(peer_id, root, network, &"rpc") { + Some(BlockProcessingOutcome::Processed { block_root: _ }) => { + // If processing is successful remove from `import_queue` + self.import_queue.remove(root); + } + _ => {} + } + } + } // Clear out old entries self.import_queue.remove_stale(); - - // Import blocks, if possible. - self.process_import_queue(network); } /// Process a gossip message declaring a new block. @@ -526,31 +542,35 @@ impl SimpleSync { match outcome { BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, BlockProcessingOutcome::ParentUnknown { parent } => { - // Clean the stale entries from the queue. - self.import_queue.remove_stale(); - // Add this block to the queue self.import_queue - .enqueue_full_blocks(vec![block], peer_id.clone()); - trace!( - self.log, - "NewGossipBlock"; + .enqueue_full_blocks(vec![block.clone()], peer_id.clone()); + debug!( + self.log, "RequestParentBlock"; + "parent_root" => format!("{}", parent), + "parent_slot" => block.slot - 1, "peer" => format!("{:?}", peer_id), ); - // Unless the parent is in the queue, request the parent block from the peer. - // - // It is likely that this is duplicate work, given we already send a hello - // request. However, I believe there are some edge-cases where the hello - // message doesn't suffice, so we perform this request as well. - if !self.import_queue.contains_block_root(parent) { - // Send a hello to learn of the clients best slot so we can then sync the required - // parent(s). - network.send_rpc_request( - peer_id.clone(), - RPCRequest::Hello(hello_message(&self.chain)), - ); - } + // Request roots between parent and start of finality from peer. + let start_slot = self + .chain + .head() + .beacon_state + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + // Request blocks between `latest_finalized_slot` and the `block` + start_slot, + count: block.slot.as_u64() - start_slot.as_u64(), + }, + network, + ); + + // Clean the stale entries from the queue. + self.import_queue.remove_stale(); SHOULD_FORWARD_GOSSIP_BLOCK } @@ -592,40 +612,6 @@ impl SimpleSync { } } - /// Iterate through the `import_queue` and process any complete blocks. - /// - /// If a block is successfully processed it is removed from the queue, otherwise it remains in - /// the queue. - pub fn process_import_queue(&mut self, network: &mut NetworkContext) { - let mut successful = 0; - - // Loop through all of the complete blocks in the queue. - for (block_root, block, sender) in self.import_queue.complete_blocks() { - let processing_result = self.process_block(sender, block.clone(), network, &"gossip"); - - let should_dequeue = match processing_result { - Some(BlockProcessingOutcome::ParentUnknown { .. }) => false, - Some(BlockProcessingOutcome::FutureSlot { - present_slot, - block_slot, - }) if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => false, - _ => true, - }; - - if processing_result == Some(BlockProcessingOutcome::Processed { block_root }) { - successful += 1; - } - - if should_dequeue { - self.import_queue.remove(block_root); - } - } - - if successful > 0 { - info!(self.log, "Imported {} blocks", successful) - } - } - /// Request some `BeaconBlockRoots` from the remote peer. fn request_block_roots( &mut self, @@ -700,6 +686,89 @@ impl SimpleSync { hello_message(&self.chain) } + /// Helper function to attempt to process a partial block. + /// + /// If the block can be completed recursively call `process_block` + /// else request missing parts. + fn attempt_process_partial_block( + &mut self, + peer_id: PeerId, + block_root: Hash256, + network: &mut NetworkContext, + source: &str, + ) -> Option { + match self.import_queue.attempt_complete_block(block_root) { + PartialBeaconBlockCompletion::MissingBody => { + // Unable to complete the block because the block body is missing. + debug!( + self.log, "RequestParentBody"; + "source" => source, + "block_root" => format!("{}", block_root), + "peer" => format!("{:?}", peer_id), + ); + + // Request the block body from the peer. + self.request_block_bodies( + peer_id, + BeaconBlockBodiesRequest { + block_roots: vec![block_root], + }, + network, + ); + + None + } + PartialBeaconBlockCompletion::MissingHeader(slot) => { + // Unable to complete the block because the block header is missing. + debug!( + self.log, "RequestParentHeader"; + "source" => source, + "block_root" => format!("{}", block_root), + "peer" => format!("{:?}", peer_id), + ); + + // Request the block header from the peer. + self.request_block_headers( + peer_id, + BeaconBlockHeadersRequest { + start_root: block_root, + start_slot: slot, + max_headers: 1, + skip_slots: 0, + }, + network, + ); + + None + } + PartialBeaconBlockCompletion::MissingRoot => { + // The `block_root` is not known to the queue. + debug!( + self.log, "MissingParentRoot"; + "source" => source, + "block_root" => format!("{}", block_root), + "peer" => format!("{:?}", peer_id), + ); + + // Do nothing. + + None + } + PartialBeaconBlockCompletion::Complete(block) => { + // The block exists in the queue, attempt to process it + trace!( + self.log, "AttemptProcessParent"; + "source" => source, + "block_root" => format!("{}", block_root), + "parent_slot" => block.slot, + "peer" => format!("{:?}", peer_id), + ); + + self.process_block(peer_id.clone(), block, network, source) + } + } + } + /// Processes the `block` that was received from `peer_id`. /// /// If the block was submitted to the beacon chain without internal error, `Some(outcome)` is @@ -726,6 +795,7 @@ impl SimpleSync { if let Ok(outcome) = processing_result { match outcome { BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. debug!( self.log, "Imported block from network"; "source" => source, @@ -735,26 +805,29 @@ impl SimpleSync { ); } BlockProcessingOutcome::ParentUnknown { parent } => { - // The block was valid and we processed it successfully. - debug!( + // The parent has not been processed + trace!( self.log, "ParentBlockUnknown"; "source" => source, "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, "peer" => format!("{:?}", peer_id), ); - // Unless the parent is in the queue, request the parent block from the peer. - // - // It is likely that this is duplicate work, given we already send a hello - // request. However, I believe there are some edge-cases where the hello - // message doesn't suffice, so we perform this request as well. - if !self.import_queue.contains_block_root(parent) { - // Send a hello to learn of the clients best slot so we can then sync the require - // parent(s). - network.send_rpc_request( - peer_id.clone(), - RPCRequest::Hello(hello_message(&self.chain)), - ); + // If the parent is in the `import_queue` attempt to complete it then process it. + match self.attempt_process_partial_block(peer_id, parent, network, source) { + // If processing parent is sucessful, re-process block and remove parent from queue + Some(BlockProcessingOutcome::Processed { block_root: _ }) => { + self.import_queue.remove(parent); + + // Attempt to process `block` again + match self.chain.process_block(block) { + Ok(outcome) => return Some(outcome), + Err(_) => return None, + } + } + // All other cases leave `parent` in `import_queue` and return original outcome. + _ => {} } } BlockProcessingOutcome::FutureSlot { diff --git a/beacon_node/rpc/Cargo.toml b/beacon_node/rpc/Cargo.toml index a37492796..99cd78e6a 100644 --- a/beacon_node/rpc/Cargo.toml +++ b/beacon_node/rpc/Cargo.toml @@ -22,9 +22,8 @@ dirs = "1.0.3" futures = "0.1.23" serde = "1.0" serde_derive = "1.0" -slog = "^2.2.3" +slog = { version = "^2.2.3" , features = ["max_level_trace"] } slog-term = "^2.4.0" slog-async = "^2.3.0" tokio = "0.1.17" exit-future = "0.1.4" -crossbeam-channel = "0.3.8" diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 86f4331f1..b85d4e947 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,7 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; use eth2_libp2p::TopicBuilder; -use eth2_libp2p::SHARD_TOPIC_PREFIX; +use eth2_libp2p::BEACON_ATTESTATION_TOPIC; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -13,12 +13,13 @@ use protos::services_grpc::AttestationService; use slog::{error, info, trace, warn}; use ssz::{ssz_encode, Decode}; use std::sync::Arc; +use tokio::sync::mpsc; use types::Attestation; #[derive(Clone)] pub struct AttestationServiceInstance { pub chain: Arc>, - pub network_chan: crossbeam_channel::Sender, + pub network_chan: mpsc::UnboundedSender, pub log: slog::Logger, } @@ -139,11 +140,11 @@ impl AttestationService for AttestationServiceInstance { ); // valid attestation, propagate to the network - let topic = TopicBuilder::new(SHARD_TOPIC_PREFIX).build(); + let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build(); let message = PubsubMessage::Attestation(attestation); self.network_chan - .send(NetworkMessage::Publish { + .try_send(NetworkMessage::Publish { topics: vec![topic], message: Box::new(message), }) diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index cdf46a1ab..faaf2232a 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,5 +1,4 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use crossbeam_channel; use eth2_libp2p::BEACON_PUBSUB_TOPIC; use eth2_libp2p::{PubsubMessage, TopicBuilder}; use futures::Future; @@ -14,12 +13,13 @@ use slog::Logger; use slog::{error, info, trace, warn}; use ssz::{ssz_encode, Decode}; use std::sync::Arc; +use tokio::sync::mpsc; use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { pub chain: Arc>, - pub network_chan: crossbeam_channel::Sender, + pub network_chan: mpsc::UnboundedSender, pub log: Logger, } @@ -111,7 +111,7 @@ impl BeaconBlockService for BeaconBlockServiceInstance { // Publish the block to the p2p network via gossipsub. self.network_chan - .send(NetworkMessage::Publish { + .try_send(NetworkMessage::Publish { topics: vec![topic], message: Box::new(message), }) diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 3e6fd3e73..eef009292 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -20,11 +20,12 @@ use protos::services_grpc::{ use slog::{info, o, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; +use tokio::sync::mpsc; pub fn start_server( config: &RPCConfig, executor: &TaskExecutor, - network_chan: crossbeam_channel::Sender, + network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal { diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index 6beb0fd64..55c86672a 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -163,7 +163,7 @@ fn main() { 0 => drain.filter_level(Level::Info), 1 => drain.filter_level(Level::Debug), 2 => drain.filter_level(Level::Trace), - _ => drain.filter_level(Level::Info), + _ => drain.filter_level(Level::Trace), }; let mut log = slog::Logger::root(drain.fuse(), o!()); diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 76807ce8f..d545cf2fe 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -15,15 +15,15 @@ impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { Self { store, beacon_state: Cow::Borrowed(beacon_state), - slot: start_slot, + slot: start_slot + 1, } } pub fn owned(store: Arc, beacon_state: BeaconState, start_slot: Slot) -> Self { Self { - slot: start_slot, - beacon_state: Cow::Owned(beacon_state), store, + beacon_state: Cow::Owned(beacon_state), + slot: start_slot + 1, } } } @@ -90,13 +90,19 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { } } -/// Iterates backwards through block roots. +/// Iterates backwards through block roots. If any specified slot is unable to be retrieved, the +/// iterator returns `None` indefinitely. /// /// Uses the `latest_block_roots` field of `BeaconState` to as the source of block roots and will /// perform a lookup on the `Store` for a prior `BeaconState` if `latest_block_roots` has been /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. +/// +/// ## Notes +/// +/// See [`BestBlockRootsIterator`](struct.BestBlockRootsIterator.html), which has different +/// `start_slot` logic. #[derive(Clone)] pub struct BlockRootsIterator<'a, T: EthSpec, U> { store: Arc, @@ -108,18 +114,18 @@ impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. pub fn new(store: Arc, beacon_state: &'a BeaconState, start_slot: Slot) -> Self { Self { - slot: start_slot, - beacon_state: Cow::Borrowed(beacon_state), store, + beacon_state: Cow::Borrowed(beacon_state), + slot: start_slot + 1, } } /// Create a new iterator over all block roots in the given `beacon_state` and prior states. pub fn owned(store: Arc, beacon_state: BeaconState, start_slot: Slot) -> Self { Self { - slot: start_slot, - beacon_state: Cow::Owned(beacon_state), store, + beacon_state: Cow::Owned(beacon_state), + slot: start_slot + 1, } } } @@ -156,6 +162,104 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { } } +/// Iterates backwards through block roots with `start_slot` highest possible value +/// `<= beacon_state.slot`. +/// +/// The distinction between `BestBlockRootsIterator` and `BlockRootsIterator` is: +/// +/// - `BestBlockRootsIterator` uses best-effort slot. When `start_slot` is greater than the latest available block root +/// on `beacon_state`, returns `Some(root, slot)` where `slot` is the latest available block +/// root. +/// - `BlockRootsIterator` is strict about `start_slot`. When `start_slot` is greater than the latest available block root +/// on `beacon_state`, returns `None`. +/// +/// This is distinct from `BestBlockRootsIterator`. +/// +/// Uses the `latest_block_roots` field of `BeaconState` to as the source of block roots and will +/// perform a lookup on the `Store` for a prior `BeaconState` if `latest_block_roots` has been +/// exhausted. +/// +/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. +#[derive(Clone)] +pub struct BestBlockRootsIterator<'a, T: EthSpec, U> { + store: Arc, + beacon_state: Cow<'a, BeaconState>, + slot: Slot, +} + +impl<'a, T: EthSpec, U: Store> BestBlockRootsIterator<'a, T, U> { + /// Create a new iterator over all block roots in the given `beacon_state` and prior states. + pub fn new(store: Arc, beacon_state: &'a BeaconState, start_slot: Slot) -> Self { + let mut slot = start_slot; + if slot >= beacon_state.slot { + // Slot may be too high. + slot = beacon_state.slot; + if beacon_state.get_block_root(slot).is_err() { + slot -= 1; + } + } + + Self { + store, + beacon_state: Cow::Borrowed(beacon_state), + slot: slot + 1, + } + } + + /// Create a new iterator over all block roots in the given `beacon_state` and prior states. + pub fn owned(store: Arc, beacon_state: BeaconState, start_slot: Slot) -> Self { + let mut slot = start_slot; + if slot >= beacon_state.slot { + // Slot may be too high. + slot = beacon_state.slot; + // TODO: Use a function other than `get_block_root` as this will always return `Err()` + // for slot = state.slot. + if beacon_state.get_block_root(slot).is_err() { + slot -= 1; + } + } + + Self { + store, + beacon_state: Cow::Owned(beacon_state), + slot: slot + 1, + } + } +} + +impl<'a, T: EthSpec, U: Store> Iterator for BestBlockRootsIterator<'a, T, U> { + type Item = (Hash256, Slot); + + fn next(&mut self) -> Option { + if self.slot == 0 { + // End of Iterator + return None; + } + + self.slot -= 1; + + match self.beacon_state.get_block_root(self.slot) { + Ok(root) => Some((*root, self.slot)), + Err(BeaconStateError::SlotOutOfBounds) => { + // Read a `BeaconState` from the store that has access to prior historical root. + let beacon_state: BeaconState = { + // Load the earliest state from disk. + let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; + + self.store.get(&new_state_root).ok()? + }?; + + self.beacon_state = Cow::Owned(beacon_state); + + let root = self.beacon_state.get_block_root(self.slot).ok()?; + + Some((*root, self.slot)) + } + _ => None, + } + } +} + #[cfg(test)] mod test { use super::*; @@ -206,7 +310,50 @@ mod test { let mut collected: Vec<(Hash256, Slot)> = iter.collect(); collected.reverse(); - let expected_len = 2 * MainnetEthSpec::slots_per_historical_root() - 1; + let expected_len = 2 * MainnetEthSpec::slots_per_historical_root(); + + assert_eq!(collected.len(), expected_len); + + for i in 0..expected_len { + assert_eq!(collected[i].0, Hash256::from(i as u64)); + } + } + + #[test] + fn best_block_root_iter() { + let store = Arc::new(MemoryStore::open()); + let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root(); + + let mut state_a: BeaconState = get_state(); + let mut state_b: BeaconState = get_state(); + + state_a.slot = Slot::from(slots_per_historical_root); + state_b.slot = Slot::from(slots_per_historical_root * 2); + + let mut hashes = (0..).into_iter().map(|i| Hash256::from(i)); + + for root in &mut state_a.latest_block_roots[..] { + *root = hashes.next().unwrap() + } + for root in &mut state_b.latest_block_roots[..] { + *root = hashes.next().unwrap() + } + + let state_a_root = hashes.next().unwrap(); + state_b.latest_state_roots[0] = state_a_root; + store.put(&state_a_root, &state_a).unwrap(); + + let iter = BestBlockRootsIterator::new(store.clone(), &state_b, state_b.slot); + + assert!( + iter.clone().find(|(_root, slot)| *slot == 0).is_some(), + "iter should contain zero slot" + ); + + let mut collected: Vec<(Hash256, Slot)> = iter.collect(); + collected.reverse(); + + let expected_len = 2 * MainnetEthSpec::slots_per_historical_root(); assert_eq!(collected.len(), expected_len); @@ -255,7 +402,7 @@ mod test { let mut collected: Vec<(Hash256, Slot)> = iter.collect(); collected.reverse(); - let expected_len = MainnetEthSpec::slots_per_historical_root() * 2 - 1; + let expected_len = MainnetEthSpec::slots_per_historical_root() * 2; assert_eq!(collected.len(), expected_len, "collection length incorrect"); diff --git a/eth2/lmd_ghost/src/reduced_tree.rs b/eth2/lmd_ghost/src/reduced_tree.rs index 49e900076..dace2bda6 100644 --- a/eth2/lmd_ghost/src/reduced_tree.rs +++ b/eth2/lmd_ghost/src/reduced_tree.rs @@ -8,7 +8,7 @@ use parking_lot::RwLock; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; -use store::{iter::BlockRootsIterator, Error as StoreError, Store}; +use store::{iter::BestBlockRootsIterator, Error as StoreError, Store}; use types::{BeaconBlock, BeaconState, EthSpec, Hash256, Slot}; type Result = std::result::Result; @@ -530,14 +530,14 @@ where Ok(a_root) } - fn iter_ancestors(&self, child: Hash256) -> Result> { + fn iter_ancestors(&self, child: Hash256) -> Result> { let block = self.get_block(child)?; let state = self.get_state(block.state_root)?; - Ok(BlockRootsIterator::owned( + Ok(BestBlockRootsIterator::owned( self.store.clone(), state, - block.slot, + block.slot - 1, )) }