Fix syncing bugs by recursively attempting to process parents in the … (#429)
* Fix syncing bugs by recursively attempting to process parents in the import queue, change BlockRootsIterator * Swap from crossbeam channel to tokio mpsc * Recursion fix * Remove exess block processing * Fix network lag, correct attestation topic * Correct network poll logic * Overhaul of SimpleSync and modify BlockRootsIterator to return start_slot * Fix bug in tests relating to StateRootsIterator * Remove old, commented-out heartbeat code. * Tidy docs on import queue enum * Change source logging msg in simple sync * Rename function parameter in simple sync * Use `BestBlockRootsIterator` in `reduced_tree` * Update comments for `BestBlockRootsIterator` * Fix duplicate dep in cargo.toml
This commit is contained in:
parent
88c6d15c32
commit
0513559252
@ -13,7 +13,7 @@ client = { path = "client" }
|
||||
version = { path = "version" }
|
||||
clap = "2.32.0"
|
||||
serde = "1.0"
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] }
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
|
||||
slog-term = "^2.4.0"
|
||||
slog-async = "^2.3.0"
|
||||
ctrlc = { version = "3.1.1", features = ["termination"] }
|
||||
|
@ -18,7 +18,7 @@ use state_processing::{
|
||||
per_slot_processing, BlockProcessingError,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use store::iter::{BlockIterator, BlockRootsIterator, StateRootsIterator};
|
||||
use store::iter::{BestBlockRootsIterator, BlockIterator, BlockRootsIterator, StateRootsIterator};
|
||||
use store::{Error as DBError, Store};
|
||||
use tree_hash::TreeHash;
|
||||
use types::*;
|
||||
@ -226,6 +226,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
BlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot)
|
||||
}
|
||||
|
||||
/// Iterates in reverse (highest to lowest slot) through all block roots from largest
|
||||
/// `slot <= beacon_state.slot` through to genesis.
|
||||
///
|
||||
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
|
||||
///
|
||||
/// Contains duplicate roots when skip slots are encountered.
|
||||
pub fn rev_iter_best_block_roots(
|
||||
&self,
|
||||
slot: Slot,
|
||||
) -> BestBlockRootsIterator<T::EthSpec, T::Store> {
|
||||
BestBlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot)
|
||||
}
|
||||
|
||||
/// Iterates in reverse (highest to lowest slot) through all state roots from `slot` through to
|
||||
/// genesis.
|
||||
///
|
||||
|
@ -191,7 +191,7 @@ where
|
||||
fn get_state_at_slot(&self, state_slot: Slot) -> BeaconState<E> {
|
||||
let state_root = self
|
||||
.chain
|
||||
.rev_iter_state_roots(self.chain.current_state().slot)
|
||||
.rev_iter_state_roots(self.chain.current_state().slot - 1)
|
||||
.find(|(_hash, slot)| *slot == state_slot)
|
||||
.map(|(hash, _slot)| hash)
|
||||
.expect("could not find state root");
|
||||
|
@ -20,7 +20,7 @@ serde = "1.0.93"
|
||||
serde_derive = "1.0"
|
||||
error-chain = "0.12.0"
|
||||
eth2_ssz = { path = "../../eth2/utils/ssz" }
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] }
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
|
||||
slog-async = "^2.3.0"
|
||||
slog-json = "^2.3"
|
||||
slog-term = "^2.4.0"
|
||||
|
@ -3,39 +3,34 @@ use beacon_chain::BeaconChainTypes;
|
||||
use exit_future::Exit;
|
||||
use futures::{Future, Stream};
|
||||
use slog::{debug, o};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use tokio::timer::Interval;
|
||||
|
||||
/// Thread that monitors the client and reports useful statistics to the user.
|
||||
/// The interval between heartbeat events.
|
||||
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 5;
|
||||
|
||||
/// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS`
|
||||
/// durations.
|
||||
///
|
||||
/// Presently unused, but remains for future use.
|
||||
pub fn run<T: BeaconChainTypes + Send + Sync + 'static>(
|
||||
client: &Client<T>,
|
||||
executor: TaskExecutor,
|
||||
exit: Exit,
|
||||
) {
|
||||
// notification heartbeat
|
||||
let interval = Interval::new(Instant::now(), Duration::from_secs(5));
|
||||
let interval = Interval::new(
|
||||
Instant::now(),
|
||||
Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS),
|
||||
);
|
||||
|
||||
let _log = client.log.new(o!("Service" => "Notifier"));
|
||||
|
||||
// TODO: Debugging only
|
||||
let counter = Arc::new(Mutex::new(0));
|
||||
let network = client.network.clone();
|
||||
|
||||
// build heartbeat logic here
|
||||
let heartbeat = move |_| {
|
||||
//debug!(log, "Temp heartbeat output");
|
||||
//TODO: Remove this logic. Testing only
|
||||
let mut count = counter.lock().unwrap();
|
||||
*count += 1;
|
||||
|
||||
if *count % 5 == 0 {
|
||||
// debug!(log, "Sending Message");
|
||||
network.send_message();
|
||||
}
|
||||
|
||||
let heartbeat = |_| {
|
||||
// There is not presently any heartbeat logic.
|
||||
//
|
||||
// We leave this function empty for future use.
|
||||
Ok(())
|
||||
};
|
||||
|
||||
|
@ -15,7 +15,7 @@ serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
eth2_ssz = { path = "../../eth2/utils/ssz" }
|
||||
eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" }
|
||||
slog = { version = "^2.4.1" , features = ["max_level_trace", "release_max_level_trace"] }
|
||||
slog = { version = "^2.4.1" , features = ["max_level_trace"] }
|
||||
version = { path = "../version" }
|
||||
tokio = "0.1.16"
|
||||
futures = "0.1.25"
|
||||
|
@ -109,8 +109,6 @@ impl Stream for Service {
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
// TODO: Currently only gossipsub events passed here.
|
||||
// Build a type for more generic events
|
||||
match self.swarm.poll() {
|
||||
//Behaviour events
|
||||
Ok(Async::Ready(Some(event))) => match event {
|
||||
|
@ -27,9 +27,8 @@ futures = "0.1.23"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
slog = "^2.2.3"
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
|
||||
slog-term = "^2.4.0"
|
||||
slog-async = "^2.3.0"
|
||||
tokio = "0.1.17"
|
||||
exit-future = "0.1.4"
|
||||
crossbeam-channel = "0.3.8"
|
||||
|
@ -14,6 +14,7 @@ use slog::{info, o, warn};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct HttpServerConfig {
|
||||
@ -75,7 +76,7 @@ pub fn create_iron_http_server<T: BeaconChainTypes + 'static>(
|
||||
pub fn start_service<T: BeaconChainTypes + 'static>(
|
||||
config: &HttpServerConfig,
|
||||
executor: &TaskExecutor,
|
||||
_network_chan: crossbeam_channel::Sender<NetworkMessage>,
|
||||
_network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
db_path: PathBuf,
|
||||
metrics_registry: Registry,
|
||||
|
@ -13,10 +13,9 @@ store = { path = "../store" }
|
||||
eth2-libp2p = { path = "../eth2-libp2p" }
|
||||
version = { path = "../version" }
|
||||
types = { path = "../../eth2/types" }
|
||||
slog = { version = "^2.2.3" }
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
|
||||
eth2_ssz = { path = "../../eth2/utils/ssz" }
|
||||
tree_hash = { path = "../../eth2/utils/tree_hash" }
|
||||
futures = "0.1.25"
|
||||
error-chain = "0.12.0"
|
||||
crossbeam-channel = "0.3.8"
|
||||
tokio = "0.1.16"
|
||||
|
@ -2,17 +2,18 @@ use crate::error;
|
||||
use crate::service::{NetworkMessage, OutgoingMessage};
|
||||
use crate::sync::SimpleSync;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use crossbeam_channel::{unbounded as channel, Sender};
|
||||
use eth2_libp2p::{
|
||||
behaviour::PubsubMessage,
|
||||
rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId},
|
||||
PeerId, RPCEvent,
|
||||
};
|
||||
use futures::future;
|
||||
use futures::future::Future;
|
||||
use futures::stream::Stream;
|
||||
use slog::{debug, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Timeout for RPC requests.
|
||||
// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
@ -48,13 +49,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
/// Initializes and runs the MessageHandler.
|
||||
pub fn spawn(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
network_send: crossbeam_channel::Sender<NetworkMessage>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
executor: &tokio::runtime::TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<Sender<HandlerMessage>> {
|
||||
) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> {
|
||||
debug!(log, "Service starting");
|
||||
|
||||
let (handler_send, handler_recv) = channel();
|
||||
let (handler_send, handler_recv) = mpsc::unbounded_channel();
|
||||
|
||||
// Initialise sync and begin processing in thread
|
||||
// generate the Message handler
|
||||
@ -69,13 +70,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
|
||||
// spawn handler task
|
||||
// TODO: Handle manual termination of thread
|
||||
executor.spawn(future::poll_fn(move || -> Result<_, _> {
|
||||
loop {
|
||||
handler.handle_message(handler_recv.recv().map_err(|_| {
|
||||
executor.spawn(
|
||||
handler_recv
|
||||
.for_each(move |msg| Ok(handler.handle_message(msg)))
|
||||
.map_err(move |_| {
|
||||
debug!(log, "Network message handler terminated.");
|
||||
})?);
|
||||
}
|
||||
}));
|
||||
}),
|
||||
);
|
||||
|
||||
Ok(handler_send)
|
||||
}
|
||||
@ -222,7 +223,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
|
||||
pub struct NetworkContext {
|
||||
/// The network channel to relay messages to the Network service.
|
||||
network_send: crossbeam_channel::Sender<NetworkMessage>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
/// A mapping of peers and the RPC id we have sent an RPC request to.
|
||||
outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>,
|
||||
/// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`.
|
||||
@ -232,7 +233,7 @@ pub struct NetworkContext {
|
||||
}
|
||||
|
||||
impl NetworkContext {
|
||||
pub fn new(network_send: crossbeam_channel::Sender<NetworkMessage>, log: slog::Logger) -> Self {
|
||||
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
|
||||
Self {
|
||||
network_send,
|
||||
outstanding_outgoing_request_ids: HashMap::new(),
|
||||
@ -278,13 +279,13 @@ impl NetworkContext {
|
||||
);
|
||||
}
|
||||
|
||||
fn send_rpc_event(&self, peer_id: PeerId, rpc_event: RPCEvent) {
|
||||
fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
|
||||
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
|
||||
}
|
||||
|
||||
fn send(&self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
|
||||
fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
|
||||
self.network_send
|
||||
.send(NetworkMessage::Send(peer_id, outgoing_message))
|
||||
.try_send(NetworkMessage::Send(peer_id, outgoing_message))
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
|
@ -2,24 +2,23 @@ use crate::error;
|
||||
use crate::message_handler::{HandlerMessage, MessageHandler};
|
||||
use crate::NetworkConfig;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
|
||||
use eth2_libp2p::Service as LibP2PService;
|
||||
use eth2_libp2p::Topic;
|
||||
use eth2_libp2p::{Libp2pEvent, PeerId};
|
||||
use eth2_libp2p::{PubsubMessage, RPCEvent};
|
||||
use futures::prelude::*;
|
||||
use futures::sync::oneshot;
|
||||
use futures::Stream;
|
||||
use slog::{debug, info, o, trace};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
/// Service that handles communication between internal services and the eth2_libp2p network service.
|
||||
pub struct Service<T: BeaconChainTypes> {
|
||||
//libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||
_libp2p_exit: oneshot::Sender<()>,
|
||||
network_send: crossbeam_channel::Sender<NetworkMessage>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
_phantom: PhantomData<T>, //message_handler: MessageHandler,
|
||||
//message_handler_send: Sender<HandlerMessage>
|
||||
}
|
||||
@ -30,9 +29,9 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
|
||||
config: &NetworkConfig,
|
||||
executor: &TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> {
|
||||
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage>)> {
|
||||
// build the network channel
|
||||
let (network_send, network_recv) = channel::<NetworkMessage>();
|
||||
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
// launch message handler thread
|
||||
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
|
||||
let message_handler_send = MessageHandler::spawn(
|
||||
@ -64,9 +63,9 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
|
||||
}
|
||||
|
||||
// TODO: Testing only
|
||||
pub fn send_message(&self) {
|
||||
pub fn send_message(&mut self) {
|
||||
self.network_send
|
||||
.send(NetworkMessage::Send(
|
||||
.try_send(NetworkMessage::Send(
|
||||
PeerId::random(),
|
||||
OutgoingMessage::NotifierTest,
|
||||
))
|
||||
@ -76,12 +75,12 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
|
||||
|
||||
fn spawn_service(
|
||||
libp2p_service: LibP2PService,
|
||||
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
|
||||
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
|
||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||
message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
|
||||
executor: &TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<oneshot::Sender<()>> {
|
||||
let (network_exit, exit_rx) = oneshot::channel();
|
||||
) -> error::Result<tokio::sync::oneshot::Sender<()>> {
|
||||
let (network_exit, exit_rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
// spawn on the current executor
|
||||
executor.spawn(
|
||||
@ -105,25 +104,61 @@ fn spawn_service(
|
||||
//TODO: Potentially handle channel errors
|
||||
fn network_service(
|
||||
mut libp2p_service: LibP2PService,
|
||||
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
|
||||
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
|
||||
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
|
||||
log: slog::Logger,
|
||||
) -> impl futures::Future<Item = (), Error = eth2_libp2p::error::Error> {
|
||||
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
|
||||
// poll the swarm
|
||||
loop {
|
||||
// only end the loop once both major polls are not ready.
|
||||
let mut not_ready_count = 0;
|
||||
while not_ready_count < 2 {
|
||||
not_ready_count = 0;
|
||||
// poll the network channel
|
||||
match network_recv.poll() {
|
||||
Ok(Async::Ready(Some(message))) => {
|
||||
match message {
|
||||
// TODO: Testing message - remove
|
||||
NetworkMessage::Send(peer_id, outgoing_message) => {
|
||||
match outgoing_message {
|
||||
OutgoingMessage::RPC(rpc_event) => {
|
||||
trace!(log, "Sending RPC Event: {:?}", rpc_event);
|
||||
//TODO: Make swarm private
|
||||
//TODO: Implement correct peer id topic message handling
|
||||
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
|
||||
}
|
||||
OutgoingMessage::NotifierTest => {
|
||||
// debug!(log, "Received message from notifier");
|
||||
}
|
||||
};
|
||||
}
|
||||
NetworkMessage::Publish { topics, message } => {
|
||||
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
|
||||
libp2p_service.swarm.publish(topics, *message);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => not_ready_count += 1,
|
||||
Ok(Async::Ready(None)) => {
|
||||
return Err(eth2_libp2p::error::Error::from("Network channel closed"));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(eth2_libp2p::error::Error::from("Network channel error"));
|
||||
}
|
||||
}
|
||||
|
||||
// poll the swarm
|
||||
match libp2p_service.poll() {
|
||||
Ok(Async::Ready(Some(event))) => match event {
|
||||
Libp2pEvent::RPC(peer_id, rpc_event) => {
|
||||
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::RPC(peer_id, rpc_event))
|
||||
.try_send(HandlerMessage::RPC(peer_id, rpc_event))
|
||||
.map_err(|_| "failed to send rpc to handler")?;
|
||||
}
|
||||
Libp2pEvent::PeerDialed(peer_id) => {
|
||||
debug!(log, "Peer Dialed: {:?}", peer_id);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::PeerDialed(peer_id))
|
||||
.try_send(HandlerMessage::PeerDialed(peer_id))
|
||||
.map_err(|_| "failed to send rpc to handler")?;
|
||||
}
|
||||
Libp2pEvent::PubsubMessage {
|
||||
@ -132,43 +167,13 @@ fn network_service(
|
||||
//TODO: Decide if we need to propagate the topic upwards. (Potentially for
|
||||
//attestations)
|
||||
message_handler_send
|
||||
.send(HandlerMessage::PubsubMessage(source, message))
|
||||
.try_send(HandlerMessage::PubsubMessage(source, message))
|
||||
.map_err(|_| " failed to send pubsub message to handler")?;
|
||||
}
|
||||
},
|
||||
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
|
||||
Ok(Async::NotReady) => break,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// poll the network channel
|
||||
// TODO: refactor - combine poll_fn's?
|
||||
loop {
|
||||
match network_recv.try_recv() {
|
||||
// TODO: Testing message - remove
|
||||
Ok(NetworkMessage::Send(peer_id, outgoing_message)) => {
|
||||
match outgoing_message {
|
||||
OutgoingMessage::RPC(rpc_event) => {
|
||||
trace!(log, "Sending RPC Event: {:?}", rpc_event);
|
||||
//TODO: Make swarm private
|
||||
//TODO: Implement correct peer id topic message handling
|
||||
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
|
||||
}
|
||||
OutgoingMessage::NotifierTest => {
|
||||
// debug!(log, "Received message from notifier");
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(NetworkMessage::Publish { topics, message }) => {
|
||||
debug!(log, "Sending pubsub message on topics {:?}", topics);
|
||||
libp2p_service.swarm.publish(topics, *message);
|
||||
}
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
return Err(eth2_libp2p::error::Error::from(
|
||||
"Network channel disconnected",
|
||||
));
|
||||
}
|
||||
Ok(Async::NotReady) => not_ready_count += 1,
|
||||
Err(_) => not_ready_count += 1,
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady)
|
||||
|
@ -41,31 +41,23 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing
|
||||
/// slot number. Does not delete the partials from the queue, this must be done manually.
|
||||
///
|
||||
/// Returns `(queue_index, block, sender)`:
|
||||
///
|
||||
/// - `block_root`: may be used to remove the entry if it is successfully processed.
|
||||
/// - `block`: the completed block.
|
||||
/// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial.
|
||||
pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> {
|
||||
let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self
|
||||
.partials
|
||||
.iter()
|
||||
.filter_map(|(_, partial)| partial.clone().complete())
|
||||
.collect();
|
||||
|
||||
// Sort the completable partials to be in ascending slot order.
|
||||
complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap());
|
||||
|
||||
complete
|
||||
}
|
||||
|
||||
/// Returns true of the if the `BlockRoot` is found in the `import_queue`.
|
||||
pub fn contains_block_root(&self, block_root: Hash256) -> bool {
|
||||
self.partials.contains_key(&block_root)
|
||||
}
|
||||
|
||||
/// Attempts to complete the `BlockRoot` if it is found in the `import_queue`.
|
||||
///
|
||||
/// Returns an Enum with a `PartialBeaconBlockCompletion`.
|
||||
/// Does not remove the `block_root` from the `import_queue`.
|
||||
pub fn attempt_complete_block(&self, block_root: Hash256) -> PartialBeaconBlockCompletion {
|
||||
if let Some(partial) = self.partials.get(&block_root) {
|
||||
partial.attempt_complete()
|
||||
} else {
|
||||
PartialBeaconBlockCompletion::MissingRoot
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial
|
||||
/// if it exists.
|
||||
pub fn remove(&mut self, block_root: Hash256) -> Option<PartialBeaconBlock> {
|
||||
@ -102,6 +94,8 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
block_roots: &[BlockRootSlot],
|
||||
sender: PeerId,
|
||||
) -> Vec<BlockRootSlot> {
|
||||
// TODO: This will currently not return a `BlockRootSlot` if this root exists but there is no header.
|
||||
// It would be more robust if it did.
|
||||
let new_block_root_slots: Vec<BlockRootSlot> = block_roots
|
||||
.iter()
|
||||
// Ignore any roots already stored in the queue.
|
||||
@ -135,12 +129,8 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
/// the queue and it's block root is included in the output.
|
||||
///
|
||||
/// If a `header` is already in the queue, but not yet processed by the chain the block root is
|
||||
/// included in the output and the `inserted` time for the partial record is set to
|
||||
/// not included in the output and the `inserted` time for the partial record is set to
|
||||
/// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale.
|
||||
///
|
||||
/// Presently the queue enforces that a `BeaconBlockHeader` _must_ be received before its
|
||||
/// `BeaconBlockBody`. This is not a natural requirement and we could enhance the queue to lift
|
||||
/// this restraint.
|
||||
pub fn enqueue_headers(
|
||||
&mut self,
|
||||
headers: Vec<BeaconBlockHeader>,
|
||||
@ -152,8 +142,10 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
let block_root = Hash256::from_slice(&header.canonical_root()[..]);
|
||||
|
||||
if self.chain_has_not_seen_block(&block_root) {
|
||||
self.insert_header(block_root, header, sender.clone());
|
||||
required_bodies.push(block_root);
|
||||
if !self.insert_header(block_root, header, sender.clone()) {
|
||||
// If a body is empty
|
||||
required_bodies.push(block_root);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -163,10 +155,17 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
/// If there is a matching `header` for this `body`, adds it to the queue.
|
||||
///
|
||||
/// If there is no `header` for the `body`, the body is simply discarded.
|
||||
pub fn enqueue_bodies(&mut self, bodies: Vec<BeaconBlockBody>, sender: PeerId) {
|
||||
pub fn enqueue_bodies(
|
||||
&mut self,
|
||||
bodies: Vec<BeaconBlockBody>,
|
||||
sender: PeerId,
|
||||
) -> Option<Hash256> {
|
||||
let mut last_block_hash = None;
|
||||
for body in bodies {
|
||||
self.insert_body(body, sender.clone());
|
||||
last_block_hash = self.insert_body(body, sender.clone());
|
||||
}
|
||||
|
||||
last_block_hash
|
||||
}
|
||||
|
||||
pub fn enqueue_full_blocks(&mut self, blocks: Vec<BeaconBlock>, sender: PeerId) {
|
||||
@ -179,12 +178,22 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
///
|
||||
/// If the header already exists, the `inserted` time is set to `now` and not other
|
||||
/// modifications are made.
|
||||
fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) {
|
||||
/// Returns true is `body` exists.
|
||||
fn insert_header(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
header: BeaconBlockHeader,
|
||||
sender: PeerId,
|
||||
) -> bool {
|
||||
let mut exists = false;
|
||||
self.partials
|
||||
.entry(block_root)
|
||||
.and_modify(|partial| {
|
||||
partial.header = Some(header.clone());
|
||||
partial.inserted = Instant::now();
|
||||
if partial.body.is_some() {
|
||||
exists = true;
|
||||
}
|
||||
})
|
||||
.or_insert_with(|| PartialBeaconBlock {
|
||||
slot: header.slot,
|
||||
@ -194,28 +203,30 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
|
||||
inserted: Instant::now(),
|
||||
sender,
|
||||
});
|
||||
exists
|
||||
}
|
||||
|
||||
/// Updates an existing partial with the `body`.
|
||||
///
|
||||
/// If there is no header for the `body`, the body is simply discarded.
|
||||
///
|
||||
/// If the body already existed, the `inserted` time is set to `now`.
|
||||
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) {
|
||||
///
|
||||
/// Returns the block hash of the inserted body
|
||||
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) -> Option<Hash256> {
|
||||
let body_root = Hash256::from_slice(&body.tree_hash_root()[..]);
|
||||
let mut last_root = None;
|
||||
|
||||
self.partials.iter_mut().for_each(|(_, mut p)| {
|
||||
self.partials.iter_mut().for_each(|(root, mut p)| {
|
||||
if let Some(header) = &mut p.header {
|
||||
if body_root == header.block_body_root {
|
||||
p.inserted = Instant::now();
|
||||
|
||||
if p.body.is_none() {
|
||||
p.body = Some(body.clone());
|
||||
p.sender = sender.clone();
|
||||
}
|
||||
p.body = Some(body.clone());
|
||||
p.sender = sender.clone();
|
||||
last_root = Some(*root);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
last_root
|
||||
}
|
||||
|
||||
/// Updates an existing `partial` with the completed block, or adds a new (complete) partial.
|
||||
@ -257,13 +268,33 @@ pub struct PartialBeaconBlock {
|
||||
}
|
||||
|
||||
impl PartialBeaconBlock {
|
||||
/// Consumes `self` and returns a full built `BeaconBlock`, it's root and the `sender`
|
||||
/// `PeerId`, if enough information exists to complete the block. Otherwise, returns `None`.
|
||||
pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> {
|
||||
Some((
|
||||
self.block_root,
|
||||
self.header?.into_block(self.body?),
|
||||
self.sender,
|
||||
))
|
||||
/// Attempts to build a block.
|
||||
///
|
||||
/// Does not comsume the `PartialBeaconBlock`.
|
||||
pub fn attempt_complete(&self) -> PartialBeaconBlockCompletion {
|
||||
if self.header.is_none() {
|
||||
PartialBeaconBlockCompletion::MissingHeader(self.slot)
|
||||
} else if self.body.is_none() {
|
||||
PartialBeaconBlockCompletion::MissingBody
|
||||
} else {
|
||||
PartialBeaconBlockCompletion::Complete(
|
||||
self.header
|
||||
.clone()
|
||||
.unwrap()
|
||||
.into_block(self.body.clone().unwrap()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of trying to convert a `BeaconBlock` into a `PartialBeaconBlock`.
|
||||
pub enum PartialBeaconBlockCompletion {
|
||||
/// The partial contains a valid BeaconBlock.
|
||||
Complete(BeaconBlock),
|
||||
/// The partial does not exist.
|
||||
MissingRoot,
|
||||
/// The partial contains a `BeaconBlockRoot` but no `BeaconBlockHeader`.
|
||||
MissingHeader(Slot),
|
||||
/// The partial contains a `BeaconBlockRoot` and `BeaconBlockHeader` but no `BeaconBlockBody`.
|
||||
MissingBody,
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use super::import_queue::ImportQueue;
|
||||
use super::import_queue::{ImportQueue, PartialBeaconBlockCompletion};
|
||||
use crate::message_handler::NetworkContext;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
|
||||
use eth2_libp2p::rpc::methods::*;
|
||||
@ -17,7 +17,7 @@ use types::{
|
||||
const SLOT_IMPORT_TOLERANCE: u64 = 100;
|
||||
|
||||
/// The amount of seconds a block (or partial block) may exist in the import queue.
|
||||
const QUEUE_STALE_SECS: u64 = 6;
|
||||
const QUEUE_STALE_SECS: u64 = 100;
|
||||
|
||||
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
|
||||
/// Otherwise we queue it.
|
||||
@ -227,7 +227,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
//
|
||||
// Therefore, there are some blocks between the local finalized epoch and the remote
|
||||
// head that are worth downloading.
|
||||
debug!(self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id));
|
||||
debug!(
|
||||
self.log, "UsefulPeer";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"local_finalized_epoch" => local.latest_finalized_epoch,
|
||||
"remote_latest_finalized_epoch" => remote.latest_finalized_epoch,
|
||||
);
|
||||
|
||||
let start_slot = local
|
||||
.latest_finalized_epoch
|
||||
@ -238,7 +243,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
peer_id,
|
||||
BeaconBlockRootsRequest {
|
||||
start_slot,
|
||||
count: required_slots.into(),
|
||||
count: required_slots.as_u64(),
|
||||
},
|
||||
network,
|
||||
);
|
||||
@ -247,7 +252,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
|
||||
fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> {
|
||||
self.chain
|
||||
.rev_iter_block_roots(target_slot)
|
||||
.rev_iter_best_block_roots(target_slot)
|
||||
.take(1)
|
||||
.find(|(_root, slot)| *slot == target_slot)
|
||||
.map(|(root, _slot)| root)
|
||||
@ -271,8 +276,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
|
||||
let mut roots: Vec<BlockRootSlot> = self
|
||||
.chain
|
||||
.rev_iter_block_roots(req.start_slot + req.count)
|
||||
.skip(1)
|
||||
.rev_iter_best_block_roots(req.start_slot + req.count)
|
||||
.take(req.count as usize)
|
||||
.map(|(block_root, slot)| BlockRootSlot { slot, block_root })
|
||||
.collect();
|
||||
@ -356,7 +360,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
BeaconBlockHeadersRequest {
|
||||
start_root: first.block_root,
|
||||
start_slot: first.slot,
|
||||
max_headers: (last.slot - first.slot).as_u64(),
|
||||
max_headers: (last.slot - first.slot + 1).as_u64(),
|
||||
skip_slots: 0,
|
||||
},
|
||||
network,
|
||||
@ -386,7 +390,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
// unnecessary block deserialization when `req.skip_slots > 0`.
|
||||
let mut roots: Vec<Hash256> = self
|
||||
.chain
|
||||
.rev_iter_block_roots(req.start_slot + (count - 1))
|
||||
.rev_iter_best_block_roots(req.start_slot + count)
|
||||
.take(count as usize)
|
||||
.map(|(root, _slot)| root)
|
||||
.collect();
|
||||
@ -499,14 +503,26 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
"count" => res.block_bodies.len(),
|
||||
);
|
||||
|
||||
self.import_queue
|
||||
.enqueue_bodies(res.block_bodies, peer_id.clone());
|
||||
if !res.block_bodies.is_empty() {
|
||||
// Import all blocks to queue
|
||||
let last_root = self
|
||||
.import_queue
|
||||
.enqueue_bodies(res.block_bodies, peer_id.clone());
|
||||
|
||||
// Attempt to process all recieved bodies by recursively processing the latest block
|
||||
if let Some(root) = last_root {
|
||||
match self.attempt_process_partial_block(peer_id, root, network, &"rpc") {
|
||||
Some(BlockProcessingOutcome::Processed { block_root: _ }) => {
|
||||
// If processing is successful remove from `import_queue`
|
||||
self.import_queue.remove(root);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear out old entries
|
||||
self.import_queue.remove_stale();
|
||||
|
||||
// Import blocks, if possible.
|
||||
self.process_import_queue(network);
|
||||
}
|
||||
|
||||
/// Process a gossip message declaring a new block.
|
||||
@ -526,31 +542,35 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
match outcome {
|
||||
BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK,
|
||||
BlockProcessingOutcome::ParentUnknown { parent } => {
|
||||
// Clean the stale entries from the queue.
|
||||
self.import_queue.remove_stale();
|
||||
|
||||
// Add this block to the queue
|
||||
self.import_queue
|
||||
.enqueue_full_blocks(vec![block], peer_id.clone());
|
||||
trace!(
|
||||
self.log,
|
||||
"NewGossipBlock";
|
||||
.enqueue_full_blocks(vec![block.clone()], peer_id.clone());
|
||||
debug!(
|
||||
self.log, "RequestParentBlock";
|
||||
"parent_root" => format!("{}", parent),
|
||||
"parent_slot" => block.slot - 1,
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
);
|
||||
|
||||
// Unless the parent is in the queue, request the parent block from the peer.
|
||||
//
|
||||
// It is likely that this is duplicate work, given we already send a hello
|
||||
// request. However, I believe there are some edge-cases where the hello
|
||||
// message doesn't suffice, so we perform this request as well.
|
||||
if !self.import_queue.contains_block_root(parent) {
|
||||
// Send a hello to learn of the clients best slot so we can then sync the required
|
||||
// parent(s).
|
||||
network.send_rpc_request(
|
||||
peer_id.clone(),
|
||||
RPCRequest::Hello(hello_message(&self.chain)),
|
||||
);
|
||||
}
|
||||
// Request roots between parent and start of finality from peer.
|
||||
let start_slot = self
|
||||
.chain
|
||||
.head()
|
||||
.beacon_state
|
||||
.finalized_epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch());
|
||||
self.request_block_roots(
|
||||
peer_id,
|
||||
BeaconBlockRootsRequest {
|
||||
// Request blocks between `latest_finalized_slot` and the `block`
|
||||
start_slot,
|
||||
count: block.slot.as_u64() - start_slot.as_u64(),
|
||||
},
|
||||
network,
|
||||
);
|
||||
|
||||
// Clean the stale entries from the queue.
|
||||
self.import_queue.remove_stale();
|
||||
|
||||
SHOULD_FORWARD_GOSSIP_BLOCK
|
||||
}
|
||||
@ -592,40 +612,6 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate through the `import_queue` and process any complete blocks.
|
||||
///
|
||||
/// If a block is successfully processed it is removed from the queue, otherwise it remains in
|
||||
/// the queue.
|
||||
pub fn process_import_queue(&mut self, network: &mut NetworkContext) {
|
||||
let mut successful = 0;
|
||||
|
||||
// Loop through all of the complete blocks in the queue.
|
||||
for (block_root, block, sender) in self.import_queue.complete_blocks() {
|
||||
let processing_result = self.process_block(sender, block.clone(), network, &"gossip");
|
||||
|
||||
let should_dequeue = match processing_result {
|
||||
Some(BlockProcessingOutcome::ParentUnknown { .. }) => false,
|
||||
Some(BlockProcessingOutcome::FutureSlot {
|
||||
present_slot,
|
||||
block_slot,
|
||||
}) if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => false,
|
||||
_ => true,
|
||||
};
|
||||
|
||||
if processing_result == Some(BlockProcessingOutcome::Processed { block_root }) {
|
||||
successful += 1;
|
||||
}
|
||||
|
||||
if should_dequeue {
|
||||
self.import_queue.remove(block_root);
|
||||
}
|
||||
}
|
||||
|
||||
if successful > 0 {
|
||||
info!(self.log, "Imported {} blocks", successful)
|
||||
}
|
||||
}
|
||||
|
||||
/// Request some `BeaconBlockRoots` from the remote peer.
|
||||
fn request_block_roots(
|
||||
&mut self,
|
||||
@ -700,6 +686,89 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
hello_message(&self.chain)
|
||||
}
|
||||
|
||||
/// Helper function to attempt to process a partial block.
|
||||
///
|
||||
/// If the block can be completed recursively call `process_block`
|
||||
/// else request missing parts.
|
||||
fn attempt_process_partial_block(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
block_root: Hash256,
|
||||
network: &mut NetworkContext,
|
||||
source: &str,
|
||||
) -> Option<BlockProcessingOutcome> {
|
||||
match self.import_queue.attempt_complete_block(block_root) {
|
||||
PartialBeaconBlockCompletion::MissingBody => {
|
||||
// Unable to complete the block because the block body is missing.
|
||||
debug!(
|
||||
self.log, "RequestParentBody";
|
||||
"source" => source,
|
||||
"block_root" => format!("{}", block_root),
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
);
|
||||
|
||||
// Request the block body from the peer.
|
||||
self.request_block_bodies(
|
||||
peer_id,
|
||||
BeaconBlockBodiesRequest {
|
||||
block_roots: vec![block_root],
|
||||
},
|
||||
network,
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
PartialBeaconBlockCompletion::MissingHeader(slot) => {
|
||||
// Unable to complete the block because the block header is missing.
|
||||
debug!(
|
||||
self.log, "RequestParentHeader";
|
||||
"source" => source,
|
||||
"block_root" => format!("{}", block_root),
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
);
|
||||
|
||||
// Request the block header from the peer.
|
||||
self.request_block_headers(
|
||||
peer_id,
|
||||
BeaconBlockHeadersRequest {
|
||||
start_root: block_root,
|
||||
start_slot: slot,
|
||||
max_headers: 1,
|
||||
skip_slots: 0,
|
||||
},
|
||||
network,
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
PartialBeaconBlockCompletion::MissingRoot => {
|
||||
// The `block_root` is not known to the queue.
|
||||
debug!(
|
||||
self.log, "MissingParentRoot";
|
||||
"source" => source,
|
||||
"block_root" => format!("{}", block_root),
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
);
|
||||
|
||||
// Do nothing.
|
||||
|
||||
None
|
||||
}
|
||||
PartialBeaconBlockCompletion::Complete(block) => {
|
||||
// The block exists in the queue, attempt to process it
|
||||
trace!(
|
||||
self.log, "AttemptProcessParent";
|
||||
"source" => source,
|
||||
"block_root" => format!("{}", block_root),
|
||||
"parent_slot" => block.slot,
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
);
|
||||
|
||||
self.process_block(peer_id.clone(), block, network, source)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes the `block` that was received from `peer_id`.
|
||||
///
|
||||
/// If the block was submitted to the beacon chain without internal error, `Some(outcome)` is
|
||||
@ -726,6 +795,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
if let Ok(outcome) = processing_result {
|
||||
match outcome {
|
||||
BlockProcessingOutcome::Processed { block_root } => {
|
||||
// The block was valid and we processed it successfully.
|
||||
debug!(
|
||||
self.log, "Imported block from network";
|
||||
"source" => source,
|
||||
@ -735,26 +805,29 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
||||
);
|
||||
}
|
||||
BlockProcessingOutcome::ParentUnknown { parent } => {
|
||||
// The block was valid and we processed it successfully.
|
||||
debug!(
|
||||
// The parent has not been processed
|
||||
trace!(
|
||||
self.log, "ParentBlockUnknown";
|
||||
"source" => source,
|
||||
"parent_root" => format!("{}", parent),
|
||||
"baby_block_slot" => block.slot,
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
);
|
||||
|
||||
// Unless the parent is in the queue, request the parent block from the peer.
|
||||
//
|
||||
// It is likely that this is duplicate work, given we already send a hello
|
||||
// request. However, I believe there are some edge-cases where the hello
|
||||
// message doesn't suffice, so we perform this request as well.
|
||||
if !self.import_queue.contains_block_root(parent) {
|
||||
// Send a hello to learn of the clients best slot so we can then sync the require
|
||||
// parent(s).
|
||||
network.send_rpc_request(
|
||||
peer_id.clone(),
|
||||
RPCRequest::Hello(hello_message(&self.chain)),
|
||||
);
|
||||
// If the parent is in the `import_queue` attempt to complete it then process it.
|
||||
match self.attempt_process_partial_block(peer_id, parent, network, source) {
|
||||
// If processing parent is sucessful, re-process block and remove parent from queue
|
||||
Some(BlockProcessingOutcome::Processed { block_root: _ }) => {
|
||||
self.import_queue.remove(parent);
|
||||
|
||||
// Attempt to process `block` again
|
||||
match self.chain.process_block(block) {
|
||||
Ok(outcome) => return Some(outcome),
|
||||
Err(_) => return None,
|
||||
}
|
||||
}
|
||||
// All other cases leave `parent` in `import_queue` and return original outcome.
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
BlockProcessingOutcome::FutureSlot {
|
||||
|
@ -22,9 +22,8 @@ dirs = "1.0.3"
|
||||
futures = "0.1.23"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
slog = "^2.2.3"
|
||||
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
|
||||
slog-term = "^2.4.0"
|
||||
slog-async = "^2.3.0"
|
||||
tokio = "0.1.17"
|
||||
exit-future = "0.1.4"
|
||||
crossbeam-channel = "0.3.8"
|
||||
|
@ -1,7 +1,7 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::PubsubMessage;
|
||||
use eth2_libp2p::TopicBuilder;
|
||||
use eth2_libp2p::SHARD_TOPIC_PREFIX;
|
||||
use eth2_libp2p::BEACON_ATTESTATION_TOPIC;
|
||||
use futures::Future;
|
||||
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
|
||||
use network::NetworkMessage;
|
||||
@ -13,12 +13,13 @@ use protos::services_grpc::AttestationService;
|
||||
use slog::{error, info, trace, warn};
|
||||
use ssz::{ssz_encode, Decode};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::Attestation;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AttestationServiceInstance<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub network_chan: crossbeam_channel::Sender<NetworkMessage>,
|
||||
pub network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
pub log: slog::Logger,
|
||||
}
|
||||
|
||||
@ -139,11 +140,11 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
|
||||
);
|
||||
|
||||
// valid attestation, propagate to the network
|
||||
let topic = TopicBuilder::new(SHARD_TOPIC_PREFIX).build();
|
||||
let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build();
|
||||
let message = PubsubMessage::Attestation(attestation);
|
||||
|
||||
self.network_chan
|
||||
.send(NetworkMessage::Publish {
|
||||
.try_send(NetworkMessage::Publish {
|
||||
topics: vec![topic],
|
||||
message: Box::new(message),
|
||||
})
|
||||
|
@ -1,5 +1,4 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
|
||||
use crossbeam_channel;
|
||||
use eth2_libp2p::BEACON_PUBSUB_TOPIC;
|
||||
use eth2_libp2p::{PubsubMessage, TopicBuilder};
|
||||
use futures::Future;
|
||||
@ -14,12 +13,13 @@ use slog::Logger;
|
||||
use slog::{error, info, trace, warn};
|
||||
use ssz::{ssz_encode, Decode};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{BeaconBlock, Signature, Slot};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BeaconBlockServiceInstance<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub network_chan: crossbeam_channel::Sender<NetworkMessage>,
|
||||
pub network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
@ -111,7 +111,7 @@ impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
|
||||
|
||||
// Publish the block to the p2p network via gossipsub.
|
||||
self.network_chan
|
||||
.send(NetworkMessage::Publish {
|
||||
.try_send(NetworkMessage::Publish {
|
||||
topics: vec![topic],
|
||||
message: Box::new(message),
|
||||
})
|
||||
|
@ -20,11 +20,12 @@ use protos::services_grpc::{
|
||||
use slog::{info, o, warn};
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
|
||||
config: &RPCConfig,
|
||||
executor: &TaskExecutor,
|
||||
network_chan: crossbeam_channel::Sender<NetworkMessage>,
|
||||
network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
log: &slog::Logger,
|
||||
) -> exit_future::Signal {
|
||||
|
@ -163,7 +163,7 @@ fn main() {
|
||||
0 => drain.filter_level(Level::Info),
|
||||
1 => drain.filter_level(Level::Debug),
|
||||
2 => drain.filter_level(Level::Trace),
|
||||
_ => drain.filter_level(Level::Info),
|
||||
_ => drain.filter_level(Level::Trace),
|
||||
};
|
||||
|
||||
let mut log = slog::Logger::root(drain.fuse(), o!());
|
||||
|
@ -15,15 +15,15 @@ impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> {
|
||||
Self {
|
||||
store,
|
||||
beacon_state: Cow::Borrowed(beacon_state),
|
||||
slot: start_slot,
|
||||
slot: start_slot + 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>, start_slot: Slot) -> Self {
|
||||
Self {
|
||||
slot: start_slot,
|
||||
beacon_state: Cow::Owned(beacon_state),
|
||||
store,
|
||||
beacon_state: Cow::Owned(beacon_state),
|
||||
slot: start_slot + 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -90,13 +90,19 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates backwards through block roots.
|
||||
/// Iterates backwards through block roots. If any specified slot is unable to be retrieved, the
|
||||
/// iterator returns `None` indefinitely.
|
||||
///
|
||||
/// Uses the `latest_block_roots` field of `BeaconState` to as the source of block roots and will
|
||||
/// perform a lookup on the `Store` for a prior `BeaconState` if `latest_block_roots` has been
|
||||
/// exhausted.
|
||||
///
|
||||
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
|
||||
///
|
||||
/// ## Notes
|
||||
///
|
||||
/// See [`BestBlockRootsIterator`](struct.BestBlockRootsIterator.html), which has different
|
||||
/// `start_slot` logic.
|
||||
#[derive(Clone)]
|
||||
pub struct BlockRootsIterator<'a, T: EthSpec, U> {
|
||||
store: Arc<U>,
|
||||
@ -108,18 +114,18 @@ impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> {
|
||||
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
|
||||
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>, start_slot: Slot) -> Self {
|
||||
Self {
|
||||
slot: start_slot,
|
||||
beacon_state: Cow::Borrowed(beacon_state),
|
||||
store,
|
||||
beacon_state: Cow::Borrowed(beacon_state),
|
||||
slot: start_slot + 1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>, start_slot: Slot) -> Self {
|
||||
Self {
|
||||
slot: start_slot,
|
||||
beacon_state: Cow::Owned(beacon_state),
|
||||
store,
|
||||
beacon_state: Cow::Owned(beacon_state),
|
||||
slot: start_slot + 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -156,6 +162,104 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates backwards through block roots with `start_slot` highest possible value
|
||||
/// `<= beacon_state.slot`.
|
||||
///
|
||||
/// The distinction between `BestBlockRootsIterator` and `BlockRootsIterator` is:
|
||||
///
|
||||
/// - `BestBlockRootsIterator` uses best-effort slot. When `start_slot` is greater than the latest available block root
|
||||
/// on `beacon_state`, returns `Some(root, slot)` where `slot` is the latest available block
|
||||
/// root.
|
||||
/// - `BlockRootsIterator` is strict about `start_slot`. When `start_slot` is greater than the latest available block root
|
||||
/// on `beacon_state`, returns `None`.
|
||||
///
|
||||
/// This is distinct from `BestBlockRootsIterator`.
|
||||
///
|
||||
/// Uses the `latest_block_roots` field of `BeaconState` to as the source of block roots and will
|
||||
/// perform a lookup on the `Store` for a prior `BeaconState` if `latest_block_roots` has been
|
||||
/// exhausted.
|
||||
///
|
||||
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
|
||||
#[derive(Clone)]
|
||||
pub struct BestBlockRootsIterator<'a, T: EthSpec, U> {
|
||||
store: Arc<U>,
|
||||
beacon_state: Cow<'a, BeaconState<T>>,
|
||||
slot: Slot,
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store> BestBlockRootsIterator<'a, T, U> {
|
||||
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
|
||||
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>, start_slot: Slot) -> Self {
|
||||
let mut slot = start_slot;
|
||||
if slot >= beacon_state.slot {
|
||||
// Slot may be too high.
|
||||
slot = beacon_state.slot;
|
||||
if beacon_state.get_block_root(slot).is_err() {
|
||||
slot -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
store,
|
||||
beacon_state: Cow::Borrowed(beacon_state),
|
||||
slot: slot + 1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>, start_slot: Slot) -> Self {
|
||||
let mut slot = start_slot;
|
||||
if slot >= beacon_state.slot {
|
||||
// Slot may be too high.
|
||||
slot = beacon_state.slot;
|
||||
// TODO: Use a function other than `get_block_root` as this will always return `Err()`
|
||||
// for slot = state.slot.
|
||||
if beacon_state.get_block_root(slot).is_err() {
|
||||
slot -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
store,
|
||||
beacon_state: Cow::Owned(beacon_state),
|
||||
slot: slot + 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store> Iterator for BestBlockRootsIterator<'a, T, U> {
|
||||
type Item = (Hash256, Slot);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.slot == 0 {
|
||||
// End of Iterator
|
||||
return None;
|
||||
}
|
||||
|
||||
self.slot -= 1;
|
||||
|
||||
match self.beacon_state.get_block_root(self.slot) {
|
||||
Ok(root) => Some((*root, self.slot)),
|
||||
Err(BeaconStateError::SlotOutOfBounds) => {
|
||||
// Read a `BeaconState` from the store that has access to prior historical root.
|
||||
let beacon_state: BeaconState<T> = {
|
||||
// Load the earliest state from disk.
|
||||
let new_state_root = self.beacon_state.get_oldest_state_root().ok()?;
|
||||
|
||||
self.store.get(&new_state_root).ok()?
|
||||
}?;
|
||||
|
||||
self.beacon_state = Cow::Owned(beacon_state);
|
||||
|
||||
let root = self.beacon_state.get_block_root(self.slot).ok()?;
|
||||
|
||||
Some((*root, self.slot))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
@ -206,7 +310,50 @@ mod test {
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect();
|
||||
collected.reverse();
|
||||
|
||||
let expected_len = 2 * MainnetEthSpec::slots_per_historical_root() - 1;
|
||||
let expected_len = 2 * MainnetEthSpec::slots_per_historical_root();
|
||||
|
||||
assert_eq!(collected.len(), expected_len);
|
||||
|
||||
for i in 0..expected_len {
|
||||
assert_eq!(collected[i].0, Hash256::from(i as u64));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn best_block_root_iter() {
|
||||
let store = Arc::new(MemoryStore::open());
|
||||
let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root();
|
||||
|
||||
let mut state_a: BeaconState<MainnetEthSpec> = get_state();
|
||||
let mut state_b: BeaconState<MainnetEthSpec> = get_state();
|
||||
|
||||
state_a.slot = Slot::from(slots_per_historical_root);
|
||||
state_b.slot = Slot::from(slots_per_historical_root * 2);
|
||||
|
||||
let mut hashes = (0..).into_iter().map(|i| Hash256::from(i));
|
||||
|
||||
for root in &mut state_a.latest_block_roots[..] {
|
||||
*root = hashes.next().unwrap()
|
||||
}
|
||||
for root in &mut state_b.latest_block_roots[..] {
|
||||
*root = hashes.next().unwrap()
|
||||
}
|
||||
|
||||
let state_a_root = hashes.next().unwrap();
|
||||
state_b.latest_state_roots[0] = state_a_root;
|
||||
store.put(&state_a_root, &state_a).unwrap();
|
||||
|
||||
let iter = BestBlockRootsIterator::new(store.clone(), &state_b, state_b.slot);
|
||||
|
||||
assert!(
|
||||
iter.clone().find(|(_root, slot)| *slot == 0).is_some(),
|
||||
"iter should contain zero slot"
|
||||
);
|
||||
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect();
|
||||
collected.reverse();
|
||||
|
||||
let expected_len = 2 * MainnetEthSpec::slots_per_historical_root();
|
||||
|
||||
assert_eq!(collected.len(), expected_len);
|
||||
|
||||
@ -255,7 +402,7 @@ mod test {
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect();
|
||||
collected.reverse();
|
||||
|
||||
let expected_len = MainnetEthSpec::slots_per_historical_root() * 2 - 1;
|
||||
let expected_len = MainnetEthSpec::slots_per_historical_root() * 2;
|
||||
|
||||
assert_eq!(collected.len(), expected_len, "collection length incorrect");
|
||||
|
||||
|
@ -8,7 +8,7 @@ use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use store::{iter::BlockRootsIterator, Error as StoreError, Store};
|
||||
use store::{iter::BestBlockRootsIterator, Error as StoreError, Store};
|
||||
use types::{BeaconBlock, BeaconState, EthSpec, Hash256, Slot};
|
||||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
@ -530,14 +530,14 @@ where
|
||||
Ok(a_root)
|
||||
}
|
||||
|
||||
fn iter_ancestors(&self, child: Hash256) -> Result<BlockRootsIterator<E, T>> {
|
||||
fn iter_ancestors(&self, child: Hash256) -> Result<BestBlockRootsIterator<E, T>> {
|
||||
let block = self.get_block(child)?;
|
||||
let state = self.get_state(block.state_root)?;
|
||||
|
||||
Ok(BlockRootsIterator::owned(
|
||||
Ok(BestBlockRootsIterator::owned(
|
||||
self.store.clone(),
|
||||
state,
|
||||
block.slot,
|
||||
block.slot - 1,
|
||||
))
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user