Add slot timer to beacon node

This commit is contained in:
Paul Hauner 2019-03-27 10:36:20 +11:00
parent b887509607
commit b006586d19
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
9 changed files with 132 additions and 3 deletions

View File

@ -14,6 +14,7 @@ slog-term = "^2.4.0"
slog-async = "^2.3.0"
ctrlc = { version = "3.1.1", features = ["termination"] }
tokio = "0.1.15"
tokio-timer = "0.2.10"
futures = "0.1.25"
exit-future = "0.1.3"
state_processing = { path = "../eth2/state_processing" }

View File

@ -409,6 +409,20 @@ where
}
}
/// Reads the slot clock (see `self.read_slot_clock()` and returns the number of slots since
/// genesis.
pub fn slots_since_genesis(&self) -> Option<SlotHeight> {
let now = self.read_slot_clock()?;
if now < self.spec.genesis_slot {
None
} else {
Some(SlotHeight::from(
now.as_u64() - self.spec.genesis_slot.as_u64(),
))
}
}
/// Returns slot of the present state.
///
/// This is distinct to `read_slot_clock`, which reads from the actual system clock. If

View File

@ -14,6 +14,7 @@ types = { path = "../../eth2/types" }
slot_clock = { path = "../../eth2/utils/slot_clock" }
error-chain = "0.12.0"
slog = "^2.2.3"
ssz = { path = "../../eth2/utils/ssz" }
tokio = "0.1.15"
clap = "2.32.0"
dirs = "1.0.3"

View File

@ -9,11 +9,17 @@ use beacon_chain::BeaconChain;
pub use client_config::ClientConfig;
pub use client_types::ClientTypes;
use exit_future::Signal;
use futures::{future::Future, Stream};
use network::Service as NetworkService;
use slog::o;
use slog::{error, info, o};
use slot_clock::SlotClock;
use ssz::TreeHash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::runtime::TaskExecutor;
use tokio::timer::Interval;
use types::Hash256;
/// Main beacon node client service. This provides the connection and initialisation of the clients
/// sub-services in multiple threads.
@ -26,6 +32,8 @@ pub struct Client<T: ClientTypes> {
pub network: Arc<NetworkService>,
/// Signal to terminate the RPC server.
pub rpc_exit_signal: Option<Signal>,
/// Signal to terminate the slot timer.
pub slot_timer_exit_signal: Option<Signal>,
/// The clients logger.
log: slog::Logger,
/// Marker to pin the beacon chain generics.
@ -42,6 +50,17 @@ impl<TClientType: ClientTypes> Client<TClientType> {
// generate a beacon chain
let beacon_chain = TClientType::initialise_beacon_chain(&config);
{
let state = beacon_chain.state.read();
let state_root = Hash256::from_slice(&state.hash_tree_root());
info!(
log,
"ChainInitialized";
"state_root" => format!("{}", state_root),
"genesis_time" => format!("{}", state.genesis_time),
);
}
// Start the network service, libp2p and syncing threads
// TODO: Add beacon_chain reference to network parameters
let network_config = &config.net_conf;
@ -65,10 +84,65 @@ impl<TClientType: ClientTypes> Client<TClientType> {
));
}
let (slot_timer_exit_signal, exit) = exit_future::signal();
if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() {
// set up the validator work interval - start at next slot and proceed every slot
let interval = {
// Set the interval to start at the next slot, and every slot after
let slot_duration = Duration::from_secs(config.spec.seconds_per_slot);
//TODO: Handle checked add correctly
Interval::new(Instant::now() + duration_to_next_slot, slot_duration)
};
let chain = beacon_chain.clone();
let log = log.new(o!("Service" => "SlotTimer"));
let state_slot = chain.state.read().slot;
let wall_clock_slot = chain.read_slot_clock().unwrap();
let slots_since_genesis = chain.slots_since_genesis().unwrap();
info!(
log,
"Starting SlotTimer";
"state_slot" => state_slot,
"wall_clock_slot" => wall_clock_slot,
"slots_since_genesis" => slots_since_genesis,
"catchup_distance" => wall_clock_slot - state_slot,
);
executor.spawn(
exit.until(
interval
.for_each(move |_| {
if let Some(genesis_height) = chain.slots_since_genesis() {
match chain.catchup_state() {
Ok(_) => info!(
log,
"NewSlot";
"slot" => chain.state.read().slot,
"slots_since_genesis" => genesis_height,
),
Err(e) => error!(
log,
"StateCatchupFailed";
"state_slot" => chain.state.read().slot,
"slots_since_genesis" => genesis_height,
"error" => format!("{:?}", e),
),
};
}
Ok(())
})
.map_err(|_| ()),
)
.map(|_| ()),
);
}
Ok(Client {
config,
beacon_chain,
rpc_exit_signal,
slot_timer_exit_signal: Some(slot_timer_exit_signal),
log,
network,
phantom: PhantomData,

View File

@ -2,7 +2,7 @@ use crate::Client;
use crate::ClientTypes;
use exit_future::Exit;
use futures::{Future, Stream};
use slog::{debug, info, o};
use slog::{debug, o};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::runtime::TaskExecutor;
@ -22,7 +22,7 @@ pub fn run<T: ClientTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exi
// build heartbeat logic here
let heartbeat = move |_| {
info!(log, "Temp heartbeat output");
debug!(log, "Temp heartbeat output");
//TODO: Remove this logic. Testing only
let mut count = counter.lock().unwrap();
*count += 1;

View File

@ -6,10 +6,12 @@ use futures::Future;
use slog::info;
use std::cell::RefCell;
use tokio::runtime::Builder;
use tokio_timer::clock::Clock;
pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> {
let mut runtime = Builder::new()
.name_prefix("main-")
.clock(Clock::system())
.build()
.map_err(|e| format!("{:?}", e))?;

View File

@ -3,10 +3,13 @@ mod testing_slot_clock;
pub use crate::system_time_slot_clock::{Error as SystemTimeSlotClockError, SystemTimeSlotClock};
pub use crate::testing_slot_clock::{Error as TestingSlotClockError, TestingSlotClock};
use std::time::Duration;
pub use types::Slot;
pub trait SlotClock: Send + Sync {
type Error;
fn present_slot(&self) -> Result<Option<Slot>, Self::Error>;
fn duration_to_next_slot(&self) -> Result<Option<Duration>, Self::Error>;
}

View File

@ -54,6 +54,10 @@ impl SlotClock for SystemTimeSlotClock {
.and_then(|s| Some(s + self.genesis_slot))),
}
}
fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> {
duration_to_next_slot(self.genesis_seconds, self.slot_duration_seconds)
}
}
impl From<SystemTimeError> for Error {
@ -67,6 +71,30 @@ fn slot_from_duration(slot_duration_seconds: u64, duration: Duration) -> Option<
duration.as_secs().checked_div(slot_duration_seconds)?,
))
}
// calculate the duration to the next slot
fn duration_to_next_slot(
genesis_time: u64,
seconds_per_slot: u64,
) -> Result<Option<Duration>, Error> {
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
let genesis_time = Duration::from_secs(genesis_time);
if now < genesis_time {
return Ok(None);
}
let since_genesis = now - genesis_time;
let elapsed_slots = since_genesis.as_secs() / seconds_per_slot;
let next_slot_start_seconds = (elapsed_slots + 1)
.checked_mul(seconds_per_slot)
.expect("Next slot time should not overflow u64");
let time_to_next_slot = Duration::from_secs(next_slot_start_seconds) - since_genesis;
Ok(Some(time_to_next_slot))
}
#[cfg(test)]
mod tests {

View File

@ -1,5 +1,6 @@
use super::SlotClock;
use std::sync::RwLock;
use std::time::Duration;
use types::Slot;
#[derive(Debug, PartialEq)]
@ -32,6 +33,11 @@ impl SlotClock for TestingSlotClock {
let slot = *self.slot.read().expect("TestingSlotClock poisoned.");
Ok(Some(Slot::new(slot)))
}
/// Always returns a duration of 1 second.
fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> {
Ok(Some(Duration::from_secs(1)))
}
}
#[cfg(test)]