Improve genesis service (#1103)
* Update for latest master * Shift delay inside loop * Clean up genesis service * Tidy * Tidy logs * Address Michael's comments * Add pre-genesis logging * Remove est time till genesis * Fix time formatting * Tidy
This commit is contained in:
parent
e889c2eb22
commit
3c52b5c58d
120
Cargo.lock
generated
120
Cargo.lock
generated
@ -232,6 +232,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base-x"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b20b618342cf9891c292c4f5ac2cde7287cc5c87e87e9c769d617793607dec1"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.9.3"
|
||||
@ -560,7 +566,7 @@ checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2"
|
||||
dependencies = [
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -628,6 +634,7 @@ dependencies = [
|
||||
"sloggers",
|
||||
"slot_clock",
|
||||
"store",
|
||||
"time 0.2.16",
|
||||
"timer",
|
||||
"tokio 0.2.20",
|
||||
"toml",
|
||||
@ -998,6 +1005,12 @@ dependencies = [
|
||||
"winapi 0.3.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "discard"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
|
||||
|
||||
[[package]]
|
||||
name = "discv5"
|
||||
version = "0.1.0-alpha.2"
|
||||
@ -1915,7 +1928,7 @@ dependencies = [
|
||||
"log 0.3.9",
|
||||
"mime 0.2.6",
|
||||
"num_cpus",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
"traitobject",
|
||||
"typeable",
|
||||
"unicase 1.4.2",
|
||||
@ -1940,7 +1953,7 @@ dependencies = [
|
||||
"log 0.4.8",
|
||||
"net2",
|
||||
"rustc_version",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
"tokio 0.1.22",
|
||||
"tokio-buf",
|
||||
"tokio-executor",
|
||||
@ -1970,7 +1983,7 @@ dependencies = [
|
||||
"log 0.4.8",
|
||||
"net2",
|
||||
"pin-project",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
"tokio 0.2.20",
|
||||
"tower-service",
|
||||
"want 0.3.0",
|
||||
@ -3681,7 +3694,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
"tokio 0.2.20",
|
||||
"tokio-tls 0.3.1",
|
||||
"url 2.1.1",
|
||||
@ -3788,7 +3801,7 @@ dependencies = [
|
||||
"libsqlite3-sys",
|
||||
"lru-cache",
|
||||
"memchr",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3813,7 +3826,7 @@ dependencies = [
|
||||
"libc",
|
||||
"rand 0.3.23",
|
||||
"rustc-serialize",
|
||||
"time",
|
||||
"time 0.1.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -4371,6 +4384,12 @@ version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "standback"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47e4b8c631c998468961a9ea159f064c5c8499b95b5e4a34b77849d45949d540"
|
||||
|
||||
[[package]]
|
||||
name = "state_processing"
|
||||
version = "0.2.0"
|
||||
@ -4415,6 +4434,55 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||
|
||||
[[package]]
|
||||
name = "stdweb"
|
||||
version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5"
|
||||
dependencies = [
|
||||
"discard",
|
||||
"rustc_version",
|
||||
"stdweb-derive",
|
||||
"stdweb-internal-macros",
|
||||
"stdweb-internal-runtime",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stdweb-derive"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stdweb-internal-macros"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11"
|
||||
dependencies = [
|
||||
"base-x",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stdweb-internal-runtime"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
|
||||
|
||||
[[package]]
|
||||
name = "store"
|
||||
version = "0.1.2"
|
||||
@ -4621,6 +4689,44 @@ dependencies = [
|
||||
"winapi 0.3.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a51cadc5b1eec673a685ff7c33192ff7b7603d0b75446fb354939ee615acb15"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"standback",
|
||||
"stdweb",
|
||||
"time-macros",
|
||||
"version_check 0.9.1",
|
||||
"winapi 0.3.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time-macros"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ae9b6e9f095bc105e183e3cd493d72579be3181ad4004fceb01adbe9eecab2d"
|
||||
dependencies = [
|
||||
"proc-macro-hack",
|
||||
"time-macros-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time-macros-impl"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5c3be1edfad6027c69f5491cf4cb310d1a71ecd6af742788c6ff8bced86b8fa"
|
||||
dependencies = [
|
||||
"proc-macro-hack",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"standback",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "timer"
|
||||
version = "0.1.2"
|
||||
|
@ -39,3 +39,4 @@ environment = { path = "../../lighthouse/environment" }
|
||||
eth2_ssz = "0.1.2"
|
||||
lazy_static = "1.4.0"
|
||||
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
|
||||
time = "0.2.16"
|
||||
|
@ -7,18 +7,16 @@ use slog::{debug, error, info, warn};
|
||||
use slot_clock::SlotClock;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use time;
|
||||
use tokio::time::delay_for;
|
||||
use types::{EthSpec, Slot};
|
||||
|
||||
/// Create a warning log whenever the peer count is at or below this value.
|
||||
pub const WARN_PEER_COUNT: usize = 1;
|
||||
|
||||
const SECS_PER_MINUTE: f64 = 60.0;
|
||||
const SECS_PER_HOUR: f64 = 3600.0;
|
||||
const SECS_PER_DAY: f64 = 86400.0; // non-leap
|
||||
const SECS_PER_WEEK: f64 = 604_800.0; // non-leap
|
||||
const DAYS_PER_WEEK: f64 = 7.0;
|
||||
const HOURS_PER_DAY: f64 = 24.0;
|
||||
const MINUTES_PER_HOUR: f64 = 60.0;
|
||||
const DAYS_PER_WEEK: i64 = 7;
|
||||
const HOURS_PER_DAY: i64 = 24;
|
||||
const MINUTES_PER_HOUR: i64 = 60;
|
||||
|
||||
/// The number of historical observations that should be used to determine the average sync time.
|
||||
const SPEEDO_OBSERVATIONS: usize = 4;
|
||||
@ -46,6 +44,25 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
||||
let mut interval = tokio::time::interval_at(start_instant, interval_duration);
|
||||
|
||||
let interval_future = async move {
|
||||
// Perform pre-genesis logging.
|
||||
loop {
|
||||
match beacon_chain.slot_clock.duration_to_next_slot() {
|
||||
// If the duration to the next slot is greater than the slot duration, then we are
|
||||
// waiting for genesis.
|
||||
Some(next_slot) if next_slot > slot_duration => {
|
||||
info!(
|
||||
log,
|
||||
"Waiting for genesis";
|
||||
"peers" => peer_count_pretty(network.connected_peers()),
|
||||
"wait_time" => estimated_time_pretty(Some(next_slot.as_secs() as f64)),
|
||||
);
|
||||
delay_for(slot_duration).await;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Perform post-genesis logging.
|
||||
while let Some(_) = interval.next().await {
|
||||
let connected_peer_count = network.connected_peers();
|
||||
let sync_state = network.sync_state();
|
||||
@ -200,31 +217,21 @@ fn seconds_pretty(secs: f64) -> String {
|
||||
return "--".into();
|
||||
}
|
||||
|
||||
let weeks = secs / SECS_PER_WEEK;
|
||||
let days = secs / SECS_PER_DAY;
|
||||
let hours = secs / SECS_PER_HOUR;
|
||||
let minutes = secs / SECS_PER_MINUTE;
|
||||
let d = time::Duration::seconds_f64(secs);
|
||||
|
||||
if weeks.floor() > 0.0 {
|
||||
format!(
|
||||
"{:.0} weeks {:.0} days",
|
||||
weeks,
|
||||
(days % DAYS_PER_WEEK).round()
|
||||
)
|
||||
} else if days.floor() > 0.0 {
|
||||
format!(
|
||||
"{:.0} days {:.0} hrs",
|
||||
days,
|
||||
(hours % HOURS_PER_DAY).round()
|
||||
)
|
||||
} else if hours.floor() > 0.0 {
|
||||
format!(
|
||||
"{:.0} hrs {:.0} mins",
|
||||
hours,
|
||||
(minutes % MINUTES_PER_HOUR).round()
|
||||
)
|
||||
let weeks = d.whole_weeks();
|
||||
let days = d.whole_days();
|
||||
let hours = d.whole_hours();
|
||||
let minutes = d.whole_minutes();
|
||||
|
||||
if weeks > 0 {
|
||||
format!("{:.0} weeks {:.0} days", weeks, days % DAYS_PER_WEEK)
|
||||
} else if days > 0 {
|
||||
format!("{:.0} days {:.0} hrs", days, hours % HOURS_PER_DAY)
|
||||
} else if hours > 0 {
|
||||
format!("{:.0} hrs {:.0} mins", hours, minutes % MINUTES_PER_HOUR)
|
||||
} else {
|
||||
format!("{:.0} mins", minutes.round())
|
||||
format!("{:.0} mins", minutes)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -167,6 +167,13 @@ impl Service {
|
||||
&self.inner.deposit_cache
|
||||
}
|
||||
|
||||
/// Removes all blocks from the cache, except for the latest block.
|
||||
///
|
||||
/// We don't remove the latest blocks so we don't lose track of the latest block.
|
||||
pub fn clear_block_cache(&self) {
|
||||
self.inner.block_cache.write().truncate(1)
|
||||
}
|
||||
|
||||
/// Drop the block cache, replacing it with an empty one.
|
||||
pub fn drop_block_cache(&self) {
|
||||
*(self.inner.block_cache.write()) = BlockCache::default();
|
||||
|
@ -1,33 +1,42 @@
|
||||
pub use crate::{common::genesis_deposits, interop::interop_genesis_state};
|
||||
pub use eth1::Config as Eth1Config;
|
||||
|
||||
use eth1::{DepositLog, Eth1Block, Service};
|
||||
use parking_lot::Mutex;
|
||||
use eth1::{DepositLog, Eth1Block, Service as Eth1Service};
|
||||
use slog::{debug, error, info, trace, Logger};
|
||||
use state_processing::{
|
||||
eth2_genesis_time, initialize_beacon_state_from_eth1, is_valid_genesis_state,
|
||||
per_block_processing::process_deposit, process_activations,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use tokio::time::delay_for;
|
||||
use types::{BeaconState, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256};
|
||||
|
||||
/// Provides a service that connects to some Eth1 HTTP JSON-RPC endpoint and maintains a cache of eth1
|
||||
/// blocks and deposits, listening for the eth1 block that triggers eth2 genesis and returning the
|
||||
/// genesis `BeaconState`.
|
||||
/// The number of blocks that are pulled per request whilst waiting for genesis.
|
||||
const BLOCKS_PER_GENESIS_POLL: usize = 99;
|
||||
|
||||
/// Stats about the eth1 genesis process.
|
||||
pub struct Statistics {
|
||||
highest_processed_block: AtomicU64,
|
||||
active_validator_count: AtomicUsize,
|
||||
total_deposit_count: AtomicUsize,
|
||||
latest_timestamp: AtomicU64,
|
||||
}
|
||||
|
||||
/// Provides a service that connects to some Eth1 HTTP JSON-RPC endpoint and maintains a cache of
|
||||
/// eth1 blocks and deposits, listening for the eth1 block that triggers eth2 genesis and returning
|
||||
/// the genesis `BeaconState`.
|
||||
///
|
||||
/// Is a wrapper around the `Service` struct of the `eth1` crate.
|
||||
#[derive(Clone)]
|
||||
pub struct Eth1GenesisService {
|
||||
/// The underlying service. Access to this object is only required for testing and diagnosis.
|
||||
pub core: Service,
|
||||
/// The highest block number we've processed and determined it does not trigger genesis.
|
||||
highest_processed_block: Arc<Mutex<Option<u64>>>,
|
||||
/// Enabled when the genesis service should start downloading blocks.
|
||||
///
|
||||
/// It is disabled until there are enough deposit logs to start syncing.
|
||||
sync_blocks: Arc<Mutex<bool>>,
|
||||
pub eth1_service: Eth1Service,
|
||||
/// Statistics about genesis progress.
|
||||
stats: Arc<Statistics>,
|
||||
}
|
||||
|
||||
impl Eth1GenesisService {
|
||||
@ -51,22 +60,28 @@ impl Eth1GenesisService {
|
||||
// For small testnets, this is much faster as they do not have
|
||||
// a `MIN_GENESIS_SECONDS`, so after `MIN_GENESIS_VALIDATOR_COUNT`
|
||||
// has been reached only a single block needs to be read.
|
||||
max_blocks_per_update: Some(5),
|
||||
max_blocks_per_update: Some(BLOCKS_PER_GENESIS_POLL),
|
||||
..config
|
||||
};
|
||||
|
||||
Self {
|
||||
core: Service::new(config, log),
|
||||
highest_processed_block: Arc::new(Mutex::new(None)),
|
||||
sync_blocks: Arc::new(Mutex::new(false)),
|
||||
eth1_service: Eth1Service::new(config, log),
|
||||
stats: Arc::new(Statistics {
|
||||
highest_processed_block: AtomicU64::new(0),
|
||||
active_validator_count: AtomicUsize::new(0),
|
||||
total_deposit_count: AtomicUsize::new(0),
|
||||
latest_timestamp: AtomicU64::new(0),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn first_viable_eth1_block(&self, min_genesis_active_validator_count: usize) -> Option<u64> {
|
||||
if self.core.deposit_cache_len() < min_genesis_active_validator_count {
|
||||
/// Returns the first eth1 block that has enough deposits that it's a (potentially invalid)
|
||||
/// candidate for genesis.
|
||||
fn first_candidate_eth1_block(&self, min_genesis_active_validator_count: usize) -> Option<u64> {
|
||||
if self.eth1_service.deposit_cache_len() < min_genesis_active_validator_count {
|
||||
None
|
||||
} else {
|
||||
self.core
|
||||
self.eth1_service
|
||||
.deposits()
|
||||
.read()
|
||||
.cache
|
||||
@ -75,8 +90,7 @@ impl Eth1GenesisService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a future that will keep updating the cache and resolve once it has discovered the
|
||||
/// first Eth1 block that triggers an Eth2 genesis.
|
||||
/// Scans the Eth1 chain, returning a genesis state once it has been discovered.
|
||||
///
|
||||
/// ## Returns
|
||||
///
|
||||
@ -87,14 +101,19 @@ impl Eth1GenesisService {
|
||||
update_interval: Duration,
|
||||
spec: ChainSpec,
|
||||
) -> Result<BeaconState<E>, String> {
|
||||
let service = self.clone();
|
||||
let log = service.core.log.clone();
|
||||
let min_genesis_active_validator_count = spec.min_genesis_active_validator_count;
|
||||
let min_genesis_time = spec.min_genesis_time;
|
||||
let eth1_service = &self.eth1_service;
|
||||
let log = ð1_service.log;
|
||||
|
||||
let mut sync_blocks = false;
|
||||
let mut highest_processed_block = None;
|
||||
|
||||
info!(
|
||||
log,
|
||||
"Importing eth1 deposit logs";
|
||||
);
|
||||
|
||||
loop {
|
||||
// **WARNING** `delay_for` panics on error
|
||||
delay_for(update_interval).await;
|
||||
let update_result = Service::update_deposit_cache(self.core.clone())
|
||||
let update_result = Eth1Service::update_deposit_cache(eth1_service.clone())
|
||||
.await
|
||||
.map_err(|e| format!("{:?}", e));
|
||||
|
||||
@ -106,67 +125,114 @@ impl Eth1GenesisService {
|
||||
)
|
||||
}
|
||||
|
||||
// Do not exit the loop if there is an error whilst updating.
|
||||
// Only enable the `sync_blocks` flag if there are enough deposits to feasibly
|
||||
// trigger genesis.
|
||||
//
|
||||
// Note: genesis is triggered by the _active_ validator count, not just the
|
||||
// deposit count, so it's possible that block downloads are started too early.
|
||||
// This is just wasteful, not erroneous.
|
||||
let mut sync_blocks = self.sync_blocks.lock();
|
||||
self.stats
|
||||
.total_deposit_count
|
||||
.store(eth1_service.deposit_cache_len(), Ordering::Relaxed);
|
||||
|
||||
if !(*sync_blocks) {
|
||||
if let Some(viable_eth1_block) =
|
||||
self.first_viable_eth1_block(min_genesis_active_validator_count as usize)
|
||||
if !sync_blocks {
|
||||
if let Some(viable_eth1_block) = self
|
||||
.first_candidate_eth1_block(spec.min_genesis_active_validator_count as usize)
|
||||
{
|
||||
info!(
|
||||
log,
|
||||
"Minimum genesis deposit count met";
|
||||
"deposit_count" => min_genesis_active_validator_count,
|
||||
"block_number" => viable_eth1_block,
|
||||
"Importing eth1 blocks";
|
||||
);
|
||||
self.core.set_lowest_cached_block(viable_eth1_block);
|
||||
*sync_blocks = true
|
||||
self.eth1_service.set_lowest_cached_block(viable_eth1_block);
|
||||
sync_blocks = true
|
||||
} else {
|
||||
info!(
|
||||
log,
|
||||
"Waiting for more deposits";
|
||||
"min_genesis_active_validators" => spec.min_genesis_active_validator_count,
|
||||
"total_deposits" => eth1_service.deposit_cache_len(),
|
||||
);
|
||||
|
||||
delay_for(update_interval).await;
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let should_update_block_cache = *sync_blocks;
|
||||
if should_update_block_cache {
|
||||
let update_result = Service::update_block_cache(self.core.clone()).await;
|
||||
if let Err(e) = update_result {
|
||||
// Download new eth1 blocks into the cache.
|
||||
let blocks_imported = match Eth1Service::update_block_cache(eth1_service.clone()).await
|
||||
{
|
||||
Ok(outcome) => {
|
||||
debug!(
|
||||
log,
|
||||
"Imported eth1 blocks";
|
||||
"latest_block_timestamp" => eth1_service.latest_block_timestamp(),
|
||||
"cache_head" => self.highest_safe_block(),
|
||||
"count" => outcome.blocks_imported,
|
||||
);
|
||||
outcome.blocks_imported
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
log,
|
||||
"Failed to update eth1 block cache";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
0
|
||||
}
|
||||
};
|
||||
if let Some(genesis_state) = self
|
||||
.scan_new_blocks::<E>(&spec)
|
||||
.map_err(|e| format!("Failed to scan for new blocks: {}", e))?
|
||||
|
||||
// Scan the new eth1 blocks, searching for genesis.
|
||||
if let Some(genesis_state) =
|
||||
self.scan_new_blocks::<E>(&mut highest_processed_block, &spec)?
|
||||
{
|
||||
break Ok(genesis_state);
|
||||
} else {
|
||||
debug!(
|
||||
info!(
|
||||
log,
|
||||
"No eth1 genesis block found";
|
||||
"latest_block_timestamp" => self.core.latest_block_timestamp(),
|
||||
"min_genesis_time" => min_genesis_time,
|
||||
"min_validator_count" => min_genesis_active_validator_count,
|
||||
"cached_blocks" => self.core.block_cache_len(),
|
||||
"cached_deposits" => self.core.deposit_cache_len(),
|
||||
"cache_head" => self.highest_known_block(),
|
||||
"Genesis ceremony complete";
|
||||
"genesis_validators" => genesis_state.get_active_validator_indices(E::genesis_epoch()).len(),
|
||||
"genesis_time" => genesis_state.genesis_time,
|
||||
);
|
||||
break Ok(genesis_state);
|
||||
}
|
||||
|
||||
// Drop all the scanned blocks as they are no longer required.
|
||||
eth1_service.clear_block_cache();
|
||||
|
||||
// Load some statistics from the atomics.
|
||||
let active_validator_count = self.stats.active_validator_count.load(Ordering::Relaxed);
|
||||
let total_deposit_count = self.stats.total_deposit_count.load(Ordering::Relaxed);
|
||||
let latest_timestamp = self.stats.latest_timestamp.load(Ordering::Relaxed);
|
||||
|
||||
// Perform some logging.
|
||||
if timestamp_can_trigger_genesis(latest_timestamp, &spec)? {
|
||||
// Indicate that we are awaiting adequate active validators.
|
||||
if (active_validator_count as u64) < spec.min_genesis_active_validator_count {
|
||||
info!(
|
||||
log,
|
||||
"Waiting for more validators";
|
||||
"min_genesis_active_validators" => spec.min_genesis_active_validator_count,
|
||||
"total_deposits" => total_deposit_count,
|
||||
"active_validators" => active_validator_count,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
log,
|
||||
"Waiting for adequate eth1 timestamp";
|
||||
"ming_genesis_delay" => spec.min_genesis_delay,
|
||||
"genesis_time" => spec.min_genesis_time,
|
||||
"latest_eth1_timestamp" => latest_timestamp,
|
||||
);
|
||||
}
|
||||
|
||||
// If we imported the full number of blocks, poll again in a short amount of time.
|
||||
//
|
||||
// We assume that if we imported a large chunk of blocks then we're some distance from
|
||||
// the head and we should sync faster.
|
||||
if blocks_imported >= BLOCKS_PER_GENESIS_POLL {
|
||||
delay_for(Duration::from_millis(50)).await;
|
||||
} else {
|
||||
delay_for(update_interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes any new blocks that have appeared since this function was last run.
|
||||
///
|
||||
/// A `highest_processed_block` value is stored in `self`. This function will find any blocks
|
||||
/// in it's caches that have a higher block number than `highest_processed_block` and check to
|
||||
/// see if they would trigger an Eth2 genesis.
|
||||
///
|
||||
/// Blocks are always tested in increasing order, starting with the lowest unknown block
|
||||
/// number in the cache.
|
||||
///
|
||||
@ -177,98 +243,89 @@ impl Eth1GenesisService {
|
||||
/// - `Err(_)` if there was some internal error.
|
||||
fn scan_new_blocks<E: EthSpec>(
|
||||
&self,
|
||||
highest_processed_block: &mut Option<u64>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<BeaconState<E>>, String> {
|
||||
let genesis_trigger_eth1_block = self
|
||||
.core
|
||||
.blocks()
|
||||
.read()
|
||||
.iter()
|
||||
// Filter out any blocks that would result in a genesis time that is earlier than
|
||||
// `MIN_GENESIS_TIME`.
|
||||
let eth1_service = &self.eth1_service;
|
||||
let log = ð1_service.log;
|
||||
|
||||
for block in eth1_service.blocks().read().iter() {
|
||||
// It's possible that the block and deposit caches aren't synced. Ignore any blocks
|
||||
// which are not safe for both caches.
|
||||
//
|
||||
// Note: any `SafeArith` errors are suppressed here; we simply skip blocks that cause
|
||||
// overflow/div-by-zero.
|
||||
.filter(|block| {
|
||||
eth2_genesis_time(block.timestamp, spec)
|
||||
.map_or(false, |t| t >= spec.min_genesis_time)
|
||||
})
|
||||
// The block cache might be more recently updated than deposit cache. Restrict any
|
||||
// block numbers that are not known by all caches.
|
||||
.filter(|block| {
|
||||
self.highest_known_block()
|
||||
.map(|n| block.number <= n)
|
||||
.unwrap_or_else(|| true)
|
||||
})
|
||||
.find(|block| {
|
||||
let mut highest_processed_block = self.highest_processed_block.lock();
|
||||
let block_number = block.number;
|
||||
// Don't update the highest processed block since we want to come back and process this
|
||||
// again later.
|
||||
if self.highest_safe_block().map_or(true, |n| block.number > n) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let next_new_block_number =
|
||||
highest_processed_block.map(|n| n + 1).unwrap_or_else(|| 0);
|
||||
// Ignore any block that has already been processed or update the highest processed
|
||||
// block.
|
||||
if highest_processed_block.map_or(false, |highest| highest >= block.number) {
|
||||
continue;
|
||||
} else {
|
||||
self.stats
|
||||
.highest_processed_block
|
||||
.store(block.number, Ordering::Relaxed);
|
||||
self.stats
|
||||
.latest_timestamp
|
||||
.store(block.timestamp, Ordering::Relaxed);
|
||||
|
||||
if block_number < next_new_block_number {
|
||||
return false;
|
||||
}
|
||||
*highest_processed_block = Some(block.number)
|
||||
}
|
||||
|
||||
self.is_valid_genesis_eth1_block::<E>(block, &spec, &self.core.log)
|
||||
.and_then(|val| {
|
||||
*highest_processed_block = Some(block.number);
|
||||
Ok(val)
|
||||
})
|
||||
.map(|is_valid| {
|
||||
if !is_valid {
|
||||
info!(
|
||||
self.core.log,
|
||||
"Inspected new eth1 block";
|
||||
"msg" => "did not trigger genesis",
|
||||
"block_number" => block_number
|
||||
);
|
||||
};
|
||||
is_valid
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
error!(
|
||||
self.core.log,
|
||||
"Failed to detect if eth1 block triggers genesis";
|
||||
"eth1_block_number" => block.number,
|
||||
"eth1_block_hash" => format!("{}", block.hash),
|
||||
);
|
||||
false
|
||||
})
|
||||
})
|
||||
.cloned();
|
||||
// Ignore any block with an insufficient timestamp.
|
||||
if !timestamp_can_trigger_genesis(block.timestamp, spec)? {
|
||||
trace!(
|
||||
log,
|
||||
"Insufficient block timestamp";
|
||||
"min_genesis_delay" => spec.min_genesis_delay,
|
||||
"min_genesis_time" => spec.min_genesis_time,
|
||||
"eth1_block_timestamp" => block.timestamp,
|
||||
"eth1_block_number" => block.number,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(eth1_block) = genesis_trigger_eth1_block {
|
||||
debug!(
|
||||
self.core.log,
|
||||
"All genesis conditions met";
|
||||
"eth1_block_height" => eth1_block.number,
|
||||
);
|
||||
// Generate a potential beacon state for this eth1 block.
|
||||
//
|
||||
// Note: this state is fully valid, some fields have been bypassed to make verification
|
||||
// faster.
|
||||
let state = self.cheap_state_at_eth1_block::<E>(block, &spec)?;
|
||||
let active_validator_count =
|
||||
state.get_active_validator_indices(E::genesis_epoch()).len();
|
||||
|
||||
let genesis_state = self
|
||||
.genesis_from_eth1_block(eth1_block.clone(), &spec)
|
||||
.map_err(|e| format!("Failed to generate valid genesis state : {}", e))?;
|
||||
self.stats
|
||||
.active_validator_count
|
||||
.store(active_validator_count, Ordering::Relaxed);
|
||||
|
||||
info!(
|
||||
self.core.log,
|
||||
"Deposit contract genesis complete";
|
||||
"eth1_block_height" => eth1_block.number,
|
||||
"validator_count" => genesis_state.validators.len(),
|
||||
);
|
||||
if is_valid_genesis_state(&state, spec) {
|
||||
let genesis_state = self
|
||||
.genesis_from_eth1_block(block.clone(), &spec)
|
||||
.map_err(|e| format!("Failed to generate valid genesis state : {}", e))?;
|
||||
|
||||
Ok(Some(genesis_state))
|
||||
} else {
|
||||
Ok(None)
|
||||
return Ok(Some(genesis_state));
|
||||
} else {
|
||||
trace!(
|
||||
log,
|
||||
"Insufficient active validators";
|
||||
"min_genesis_active_validator_count" => format!("{}", spec.min_genesis_active_validator_count),
|
||||
"active_validators" => active_validator_count,
|
||||
"eth1_block_number" => block.number,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Produces an eth2 genesis `BeaconState` from the given `eth1_block`.
|
||||
/// Produces an eth2 genesis `BeaconState` from the given `eth1_block`. The caller should have
|
||||
/// verified that `eth1_block` produces a valid genesis state.
|
||||
///
|
||||
/// ## Returns
|
||||
///
|
||||
/// - Ok(genesis_state) if all went well.
|
||||
/// - Err(e) if the given `eth1_block` was not a viable block to trigger genesis or there was
|
||||
/// - `Ok(genesis_state)`: if all went well.
|
||||
/// - `Err(e)`: if the given `eth1_block` was not a viable block to trigger genesis or there was
|
||||
/// an internal error.
|
||||
fn genesis_from_eth1_block<E: EthSpec>(
|
||||
&self,
|
||||
@ -276,7 +333,7 @@ impl Eth1GenesisService {
|
||||
spec: &ChainSpec,
|
||||
) -> Result<BeaconState<E>, String> {
|
||||
let deposit_logs = self
|
||||
.core
|
||||
.eth1_service
|
||||
.deposits()
|
||||
.read()
|
||||
.cache
|
||||
@ -300,84 +357,76 @@ impl Eth1GenesisService {
|
||||
}
|
||||
}
|
||||
|
||||
/// A cheap (compared to using `initialize_beacon_state_from_eth1) method for determining if some
|
||||
/// `target_block` will trigger genesis.
|
||||
fn is_valid_genesis_eth1_block<E: EthSpec>(
|
||||
/// Generates an incomplete `BeaconState` for some `eth1_block` that can be used for checking
|
||||
/// to see if that `eth1_block` triggers eth2 genesis.
|
||||
///
|
||||
/// ## Notes
|
||||
///
|
||||
/// The returned `BeaconState` should **not** be used as the genesis state, it is
|
||||
/// incomplete.
|
||||
fn cheap_state_at_eth1_block<E: EthSpec>(
|
||||
&self,
|
||||
target_block: &Eth1Block,
|
||||
eth1_block: &Eth1Block,
|
||||
spec: &ChainSpec,
|
||||
log: &Logger,
|
||||
) -> Result<bool, String> {
|
||||
if target_block.timestamp < spec.min_genesis_time {
|
||||
Ok(false)
|
||||
} else {
|
||||
let mut local_state: BeaconState<E> = BeaconState::new(
|
||||
0,
|
||||
Eth1Data {
|
||||
block_hash: Hash256::zero(),
|
||||
deposit_root: Hash256::zero(),
|
||||
deposit_count: 0,
|
||||
},
|
||||
&spec,
|
||||
);
|
||||
) -> Result<BeaconState<E>, String> {
|
||||
let genesis_time = eth2_genesis_time(eth1_block.timestamp, spec)
|
||||
.map_err(|e| format!("Unable to set genesis time: {:?}", e))?;
|
||||
|
||||
local_state.genesis_time = target_block.timestamp;
|
||||
let mut state: BeaconState<E> = BeaconState::new(
|
||||
genesis_time,
|
||||
Eth1Data {
|
||||
block_hash: Hash256::zero(),
|
||||
deposit_root: Hash256::zero(),
|
||||
deposit_count: 0,
|
||||
},
|
||||
&spec,
|
||||
);
|
||||
|
||||
self.deposit_logs_at_block(target_block.number)
|
||||
.iter()
|
||||
// TODO: add the signature field back.
|
||||
//.filter(|deposit_log| deposit_log.signature_is_valid)
|
||||
.map(|deposit_log| Deposit {
|
||||
proof: vec![Hash256::zero(); spec.deposit_contract_tree_depth as usize].into(),
|
||||
data: deposit_log.deposit_data.clone(),
|
||||
})
|
||||
.try_for_each(|deposit| {
|
||||
// No need to verify proofs in order to test if some block will trigger genesis.
|
||||
const PROOF_VERIFICATION: bool = false;
|
||||
self.deposit_logs_at_block(eth1_block.number)
|
||||
.iter()
|
||||
.map(|deposit_log| Deposit {
|
||||
// Generate a bogus proof.
|
||||
//
|
||||
// The deposits are coming directly from our own deposit tree to there's no need to
|
||||
// make proofs about their inclusion in it.
|
||||
proof: vec![Hash256::zero(); spec.deposit_contract_tree_depth as usize].into(),
|
||||
data: deposit_log.deposit_data.clone(),
|
||||
})
|
||||
.try_for_each(|deposit| {
|
||||
// Skip proof verification (see comment about bogus proof generation).
|
||||
const PROOF_VERIFICATION: bool = false;
|
||||
|
||||
// Note: presently all the signatures are verified each time this function is
|
||||
// run.
|
||||
//
|
||||
// It would be more efficient to pre-verify signatures, filter out the invalid
|
||||
// ones and disable verification for `process_deposit`.
|
||||
//
|
||||
// This is only more efficient in scenarios where `min_genesis_time` occurs
|
||||
// _before_ `min_validator_count` is met. We're unlikely to see this scenario
|
||||
// in testnets (`min_genesis_time` is usually `0`) and I'm not certain it will
|
||||
// happen for the real, production deposit contract.
|
||||
// Note: presently all the signatures are verified each time this function is
|
||||
// run.
|
||||
//
|
||||
// It would be more efficient to pre-verify signatures, filter out the invalid
|
||||
// ones and disable verification for `process_deposit`.
|
||||
//
|
||||
// Such an optimization would only be useful in a scenario where `MIN_GENESIS_TIME`
|
||||
// is reached _prior_ to `MIN_ACTIVE_VALIDATOR_COUNT`. I suspect this won't be the
|
||||
// case for mainnet, so we defer this optimization.
|
||||
|
||||
process_deposit(&mut local_state, &deposit, spec, PROOF_VERIFICATION)
|
||||
.map_err(|e| format!("Error whilst processing deposit: {:?}", e))
|
||||
})?;
|
||||
process_deposit(&mut state, &deposit, spec, PROOF_VERIFICATION)
|
||||
.map_err(|e| format!("Error whilst processing deposit: {:?}", e))
|
||||
})?;
|
||||
|
||||
process_activations(&mut local_state, spec)
|
||||
.map_err(|e| format!("Error whilst processing activations: {:?}", e))?;
|
||||
let is_valid = is_valid_genesis_state(&local_state, spec);
|
||||
process_activations(&mut state, spec)
|
||||
.map_err(|e| format!("Error whilst processing activations: {:?}", e))?;
|
||||
|
||||
trace!(
|
||||
log,
|
||||
"Eth1 block inspected for genesis";
|
||||
"active_validators" => local_state.get_active_validator_indices(local_state.current_epoch()).len(),
|
||||
"validators" => local_state.validators.len()
|
||||
);
|
||||
|
||||
Ok(is_valid)
|
||||
}
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Returns the `block_number` of the highest (by block number) block in the cache.
|
||||
///
|
||||
/// Takes the lower block number of the deposit and block caches to ensure this number is safe.
|
||||
fn highest_known_block(&self) -> Option<u64> {
|
||||
let block_cache = self.core.blocks().read().highest_block_number()?;
|
||||
let deposit_cache = self.core.deposits().read().last_processed_block?;
|
||||
/// Returns the highest block that is present in both the deposit and block caches.
|
||||
fn highest_safe_block(&self) -> Option<u64> {
|
||||
let block_cache = self.eth1_service.blocks().read().highest_block_number()?;
|
||||
let deposit_cache = self.eth1_service.deposits().read().last_processed_block?;
|
||||
|
||||
Some(std::cmp::min(block_cache, deposit_cache))
|
||||
}
|
||||
|
||||
/// Returns all deposit logs included in `block_number` and all prior blocks.
|
||||
fn deposit_logs_at_block(&self, block_number: u64) -> Vec<DepositLog> {
|
||||
self.core
|
||||
self.eth1_service
|
||||
.deposits()
|
||||
.read()
|
||||
.cache
|
||||
@ -387,8 +436,21 @@ impl Eth1GenesisService {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns statistics about eth1 genesis.
|
||||
pub fn statistics(&self) -> &Statistics {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
/// Returns the `Service` contained in `self`.
|
||||
pub fn into_core_service(self) -> Service {
|
||||
self.core
|
||||
pub fn into_core_service(self) -> Eth1Service {
|
||||
self.eth1_service
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `false` for a timestamp that would result in a genesis time that is earlier than
|
||||
/// `MIN_GENESIS_TIME`.
|
||||
fn timestamp_can_trigger_genesis(timestamp: u64, spec: &ChainSpec) -> Result<bool, String> {
|
||||
eth2_genesis_time(timestamp, spec)
|
||||
.map(|t| t >= spec.min_genesis_time)
|
||||
.map_err(|e| format!("Arith error when during genesis calculation: {:?}", e))
|
||||
}
|
||||
|
@ -3,6 +3,6 @@ mod eth1_genesis_service;
|
||||
mod interop;
|
||||
|
||||
pub use eth1::Config as Eth1Config;
|
||||
pub use eth1_genesis_service::Eth1GenesisService;
|
||||
pub use eth1_genesis_service::{Eth1GenesisService, Statistics};
|
||||
pub use interop::interop_genesis_state;
|
||||
pub use types::test_utils::generate_deterministic_keypairs;
|
||||
|
Loading…
Reference in New Issue
Block a user