Testnet stability (#451)

* Change reduced tree for adding weightless node

* Add more comments for reduced tree fork choice

* Small refactor on reduced tree for readability

* Move test_harness forking logic into itself

* Add new `AncestorIter` trait to store

* Add unfinished tests to fork choice

* Make `beacon_state.genesis_block_root` public

* Add failing lmd_ghost fork choice tests

* Extend fork_choice tests, create failing test

* Implement Debug for generic ReducedTree

* Add lazy_static to fork choice tests

* Add verify_integrity fn to reduced tree

* Fix bugs in reduced tree

* Ensure all reduced tree tests verify integrity

* Slightly alter reduce tree test params

* Add (failing) reduced tree test

* Fix bug in fork choice

Iter ancestors was not working well with skip slots

* Put maximum depth for common ancestor search

Ensures that we don't search back past the finalized root.

* Add basic finalization tests for reduced tree

* Change fork choice to use beacon_block_root

Previously it was using target_root, which was wrong

* Change reduced tree for adding weightless node

* Add more comments for reduced tree fork choice

* Small refactor on reduced tree for readability

* Move test_harness forking logic into itself

* Add new `AncestorIter` trait to store

* Add unfinished tests to fork choice

* Make `beacon_state.genesis_block_root` public

* Add failing lmd_ghost fork choice tests

* Extend fork_choice tests, create failing test

* Implement Debug for generic ReducedTree

* Add lazy_static to fork choice tests

* Add verify_integrity fn to reduced tree

* Fix bugs in reduced tree

* Ensure all reduced tree tests verify integrity

* Slightly alter reduce tree test params

* Add (failing) reduced tree test

* Fix bug in fork choice

Iter ancestors was not working well with skip slots

* Put maximum depth for common ancestor search

Ensures that we don't search back past the finalized root.

* Add basic finalization tests for reduced tree

* Add network dir CLI flag

* Simplify "NewSlot" log message

* Rename network-dir CLI flag

* Change fork choice to use beacon_block_root

Previously it was using target_root, which was wrong

* Update db dir size for metrics

* Change slog to use `FullFormat` logging

* Update some comments and log formatting

* Add prom gauge for best block root

* Only add known target blocks to fork choice

* Add finalized and justified root prom metrics

* Add CLI flag for setting log level

* Add logger to beacon chain

* Add debug-level CLI flag to validator

* Allow block processing if fork choice fails

* Create warn log when there's low libp2p peer count

* Minor change to logging

* Make ancestor iter return option

* Disable fork choice test when !debug_assertions

* Fix type, removed code fragment

* Tidy some borrow-checker evading

* Lower reduced tree random test iterations
This commit is contained in:
Paul Hauner 2019-07-29 13:45:45 +10:00 committed by GitHub
parent 1b26a36ebc
commit 177df12149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 231 additions and 45 deletions

View File

@ -12,6 +12,8 @@ log = "0.4"
operation_pool = { path = "../../eth2/operation_pool" }
serde = "1.0"
serde_derive = "1.0"
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
sloggers = { version = "^0.3" }
slot_clock = { path = "../../eth2/utils/slot_clock" }
eth2_ssz = { path = "../../eth2/utils/ssz" }
eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" }

View File

@ -8,6 +8,7 @@ use log::trace;
use operation_pool::DepositInsertStatus;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::{RwLock, RwLockReadGuard};
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
@ -83,6 +84,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub fork_choice: ForkChoice<T>,
/// Stores metrics about this `BeaconChain`.
pub metrics: Metrics,
/// Logging to CLI, etc.
log: Logger,
}
impl<T: BeaconChainTypes> BeaconChain<T> {
@ -93,6 +96,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
mut genesis_state: BeaconState<T::EthSpec>,
genesis_block: BeaconBlock,
spec: ChainSpec,
log: Logger,
) -> Result<Self, Error> {
genesis_state.build_all_caches(&spec)?;
@ -123,6 +127,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root),
metrics: Metrics::new()?,
store,
log,
})
}
@ -130,6 +135,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn from_store(
store: Arc<T::Store>,
spec: ChainSpec,
log: Logger,
) -> Result<Option<BeaconChain<T>>, Error> {
let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes());
let p: PersistedBeaconChain<T> = match store.get(&key) {
@ -159,6 +165,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
genesis_block_root: p.genesis_block_root,
metrics: Metrics::new()?,
store,
log,
}))
}
@ -646,13 +653,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.put(&state_root, &state)?;
// Register the new block with the fork choice service.
self.fork_choice.process_block(&state, &block, block_root)?;
if let Err(e) = self.fork_choice.process_block(&state, &block, block_root) {
error!(
self.log,
"fork choice failed to process_block";
"error" => format!("{:?}", e),
"block_root" => format!("{}", block_root),
"block_slot" => format!("{}", block.slot)
)
}
// Execute the fork choice algorithm, enthroning a new head if discovered.
//
// Note: in the future we may choose to run fork-choice less often, potentially based upon
// some heuristic around number of attestations seen for the block.
self.fork_choice()?;
if let Err(e) = self.fork_choice() {
error!(
self.log,
"fork choice failed to find head";
"error" => format!("{:?}", e)
)
};
self.metrics.block_processing_successes.inc();
self.metrics
@ -780,9 +801,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get(&beacon_state_root)?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
let previous_slot = self.head().beacon_block.slot;
let new_slot = beacon_block.slot;
// If we switched to a new chain (instead of building atop the present chain).
if self.head().beacon_block_root != beacon_block.previous_block_root {
self.metrics.fork_choice_reorg_count.inc();
warn!(
self.log,
"Beacon chain re-org";
"previous_slot" => previous_slot,
"new_slot" => new_slot
);
} else {
info!(
self.log,
"new head block";
"justified_root" => format!("{}", beacon_state.current_justified_root),
"finalized_root" => format!("{}", beacon_state.finalized_root),
"root" => format!("{}", beacon_block_root),
"slot" => new_slot,
);
};
let old_finalized_epoch = self.head().beacon_state.finalized_epoch;

View File

@ -18,6 +18,7 @@ pub enum Error {
pub struct ForkChoice<T: BeaconChainTypes> {
backend: T::LmdGhost,
store: Arc<T::Store>,
/// Used for resolving the `0x00..00` alias back to genesis.
///
/// Does not necessarily need to be the _actual_ genesis, it suffices to be the finalized root
@ -36,6 +37,7 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
genesis_block_root: Hash256,
) -> Self {
Self {
store: store.clone(),
backend: T::LmdGhost::new(store, genesis_block, genesis_block_root),
genesis_block_root,
}
@ -125,13 +127,6 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
state: &BeaconState<T::EthSpec>,
attestation: &Attestation,
) -> Result<()> {
let validator_indices = get_attesting_indices_unsorted(
state,
&attestation.data,
&attestation.aggregation_bitfield,
)?;
let block_slot = state.get_attestation_slot(&attestation.data)?;
let block_hash = attestation.data.beacon_block_root;
// Ignore any attestations to the zero hash.
@ -147,7 +142,22 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
// (1) becomes weird once we hit finality and fork choice drops the genesis block. (2) is
// fine because votes to the genesis block are not useful; all validators implicitly attest
// to genesis just by being present in the chain.
if block_hash != Hash256::zero() {
//
// Additionally, don't add any block hash to fork choice unless we have imported the block.
if block_hash != Hash256::zero()
&& self
.store
.exists::<BeaconBlock>(&block_hash)
.unwrap_or(false)
{
let validator_indices = get_attesting_indices_unsorted(
state,
&attestation.data,
&attestation.aggregation_bitfield,
)?;
let block_slot = state.get_attestation_slot(&attestation.data)?;
for validator_index in validator_indices {
self.backend
.process_attestation(validator_index, block_hash, block_slot)?;

View File

@ -1,5 +1,6 @@
use crate::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use lmd_ghost::LmdGhost;
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::SlotClock;
use slot_clock::TestingSlotClock;
use state_processing::per_slot_processing;
@ -94,6 +95,9 @@ where
let mut genesis_block = BeaconBlock::empty(&spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root());
let builder = NullLoggerBuilder;
let log = builder.build().expect("logger should build");
// Slot clock
let slot_clock = TestingSlotClock::new(
spec.genesis_slot,
@ -107,6 +111,7 @@ where
genesis_state,
genesis_block,
spec.clone(),
log,
)
.expect("Terminate if beacon chain generation fails");

View File

@ -49,7 +49,9 @@ where
T: BeaconChainTypes<Store = U, EthSpec = V>,
T::LmdGhost: LmdGhost<U, V>,
{
if let Ok(Some(beacon_chain)) = BeaconChain::from_store(store.clone(), spec.clone()) {
if let Ok(Some(beacon_chain)) =
BeaconChain::from_store(store.clone(), spec.clone(), log.clone())
{
info!(
log,
"Loaded BeaconChain from store";
@ -78,7 +80,7 @@ where
// Genesis chain
//TODO: Handle error correctly
BeaconChain::from_genesis(store, slot_clock, genesis_state, genesis_block, spec)
BeaconChain::from_genesis(store, slot_clock, genesis_state, genesis_block, spec, log)
.expect("Terminate if beacon chain generation fails")
}
}

View File

@ -190,29 +190,38 @@ impl<T: BeaconChainTypes> Drop for Client<T> {
}
fn do_state_catchup<T: BeaconChainTypes>(chain: &Arc<BeaconChain<T>>, log: &slog::Logger) {
if let Some(genesis_height) = chain.slots_since_genesis() {
let result = chain.catchup_state();
// Only attempt to `catchup_state` if we can read the slot clock.
if let Some(current_slot) = chain.read_slot_clock() {
let state_catchup_result = chain.catchup_state();
let best_slot = chain.head().beacon_block.slot;
let latest_block_root = chain.head().beacon_block_root;
let common = o!(
"best_slot" => chain.head().beacon_block.slot,
"latest_block_root" => format!("{}", chain.head().beacon_block_root),
"wall_clock_slot" => chain.read_slot_clock().unwrap(),
"state_slot" => chain.head().beacon_state.slot,
"slots_since_genesis" => genesis_height,
"skip_slots" => current_slot.saturating_sub(best_slot),
"best_block_root" => format!("{}", latest_block_root),
"best_block_slot" => best_slot,
"slot" => current_slot,
);
match result {
Ok(_) => info!(
if let Err(e) = state_catchup_result {
error!(
log,
"NewSlot";
common
),
Err(e) => error!(
log,
"StateCatchupFailed";
"State catchup failed";
"error" => format!("{:?}", e),
common
),
};
)
} else {
info!(
log,
"Slot start";
common
)
}
} else {
error!(
log,
"Beacon chain running whilst slot clock is unavailable."
);
};
}

View File

@ -2,7 +2,7 @@ use crate::Client;
use beacon_chain::BeaconChainTypes;
use exit_future::Exit;
use futures::{Future, Stream};
use slog::{debug, o};
use slog::{debug, o, warn};
use std::time::{Duration, Instant};
use tokio::runtime::TaskExecutor;
use tokio::timer::Interval;
@ -10,6 +10,9 @@ use tokio::timer::Interval;
/// The interval between heartbeat events.
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15;
/// Create a warning log whenever the peer count is at or below this value.
pub const WARN_PEER_COUNT: usize = 1;
/// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS`
/// durations.
///
@ -30,9 +33,16 @@ pub fn run<T: BeaconChainTypes + Send + Sync + 'static>(
let libp2p = client.network.libp2p_service();
let heartbeat = move |_| {
// Notify the number of connected nodes
// Panic if libp2p is poisoned
debug!(log, ""; "Connected Peers" => libp2p.lock().swarm.connected_peers());
// Number of libp2p (not discv5) peers connected.
//
// Panics if libp2p is poisoned.
let connected_peer_count = libp2p.lock().swarm.connected_peers();
debug!(log, "libp2p"; "peer_count" => connected_peer_count);
if connected_peer_count <= WARN_PEER_COUNT {
warn!(log, "Low libp2p peer count"; "peer_count" => connected_peer_count);
}
Ok(())
};

View File

@ -79,10 +79,16 @@ impl Config {
}
pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), String> {
// If a `datadir` has been specified, set the network dir to be inside it.
if let Some(dir) = args.value_of("datadir") {
self.network_dir = PathBuf::from(dir).join("network");
};
// If a network dir has been specified, override the `datadir` definition.
if let Some(dir) = args.value_of("network-dir") {
self.network_dir = PathBuf::from(dir);
};
if let Some(listen_address_str) = args.value_of("listen-address") {
let listen_address = listen_address_str
.parse()

View File

@ -1,7 +1,7 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use prometheus::{IntGauge, Opts, Registry};
use slot_clock::SlotClock;
use std::fs::File;
use std::fs;
use std::path::PathBuf;
use types::{EthSpec, Slot};
@ -13,6 +13,9 @@ pub struct LocalMetrics {
present_slot: IntGauge,
present_epoch: IntGauge,
best_slot: IntGauge,
best_beacon_block_root: IntGauge,
justified_beacon_block_root: IntGauge,
finalized_beacon_block_root: IntGauge,
validator_count: IntGauge,
justified_epoch: IntGauge,
finalized_epoch: IntGauge,
@ -36,6 +39,24 @@ impl LocalMetrics {
let opts = Opts::new("best_slot", "slot_of_block_at_chain_head");
IntGauge::with_opts(opts)?
},
best_beacon_block_root: {
let opts = Opts::new("best_beacon_block_root", "root_of_block_at_chain_head");
IntGauge::with_opts(opts)?
},
justified_beacon_block_root: {
let opts = Opts::new(
"justified_beacon_block_root",
"root_of_block_at_justified_head",
);
IntGauge::with_opts(opts)?
},
finalized_beacon_block_root: {
let opts = Opts::new(
"finalized_beacon_block_root",
"root_of_block_at_finalized_head",
);
IntGauge::with_opts(opts)?
},
validator_count: {
let opts = Opts::new("validator_count", "number_of_validators");
IntGauge::with_opts(opts)?
@ -64,6 +85,9 @@ impl LocalMetrics {
registry.register(Box::new(self.present_slot.clone()))?;
registry.register(Box::new(self.present_epoch.clone()))?;
registry.register(Box::new(self.best_slot.clone()))?;
registry.register(Box::new(self.best_beacon_block_root.clone()))?;
registry.register(Box::new(self.justified_beacon_block_root.clone()))?;
registry.register(Box::new(self.finalized_beacon_block_root.clone()))?;
registry.register(Box::new(self.validator_count.clone()))?;
registry.register(Box::new(self.finalized_epoch.clone()))?;
registry.register(Box::new(self.justified_epoch.clone()))?;
@ -87,6 +111,22 @@ impl LocalMetrics {
.set(present_slot.epoch(T::EthSpec::slots_per_epoch()).as_u64() as i64);
self.best_slot.set(state.slot.as_u64() as i64);
self.best_beacon_block_root
.set(beacon_chain.head().beacon_block_root.to_low_u64_le() as i64);
self.justified_beacon_block_root.set(
beacon_chain
.head()
.beacon_state
.current_justified_root
.to_low_u64_le() as i64,
);
self.finalized_beacon_block_root.set(
beacon_chain
.head()
.beacon_state
.finalized_root
.to_low_u64_le() as i64,
);
self.validator_count
.set(state.validator_registry.len() as i64);
self.justified_epoch
@ -97,10 +137,17 @@ impl LocalMetrics {
self.validator_balances_sum
.set(state.balances.iter().sum::<u64>() as i64);
}
let db_size = File::open(db_path)
.and_then(|f| f.metadata())
.and_then(|m| Ok(m.len()))
.unwrap_or(0);
let db_size = if let Ok(iter) = fs::read_dir(db_path) {
iter.filter_map(Result::ok)
.map(size_of_dir_entry)
.fold(0_u64, |sum, val| sum + val)
} else {
0
};
self.database_size.set(db_size as i64);
}
}
fn size_of_dir_entry(dir: fs::DirEntry) -> u64 {
dir.metadata().map(|m| m.len()).unwrap_or(0)
}

View File

@ -296,7 +296,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.collect();
if roots.len() as u64 != req.count {
warn!(
debug!(
self.log,
"BlockRootsRequest";
"peer" => format!("{:?}", peer_id),

View File

@ -36,6 +36,13 @@ fn main() {
.help("File path where output will be written.")
.takes_value(true),
)
.arg(
Arg::with_name("network-dir")
.long("network-dir")
.value_name("NETWORK-DIR")
.help("Data directory for network keys.")
.takes_value(true)
)
// network related arguments
.arg(
Arg::with_name("listen-address")
@ -145,6 +152,16 @@ fn main() {
.short("r")
.help("When present, genesis will be within 30 minutes prior. Only for testing"),
)
.arg(
Arg::with_name("debug-level")
.long("debug-level")
.value_name("LEVEL")
.short("s")
.help("The title of the spec constants for chain config.")
.takes_value(true)
.possible_values(&["info", "debug", "trace", "warn", "error", "crit"])
.default_value("info"),
)
.arg(
Arg::with_name("verbosity")
.short("v")
@ -156,9 +173,19 @@ fn main() {
// build the initial logger
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build();
let drain = match matches.value_of("debug-level") {
Some("info") => drain.filter_level(Level::Info),
Some("debug") => drain.filter_level(Level::Debug),
Some("trace") => drain.filter_level(Level::Trace),
Some("warn") => drain.filter_level(Level::Warning),
Some("error") => drain.filter_level(Level::Error),
Some("crit") => drain.filter_level(Level::Critical),
_ => unreachable!("guarded by clap"),
};
let drain = match matches.occurrences_of("verbosity") {
0 => drain.filter_level(Level::Info),
1 => drain.filter_level(Level::Debug),
@ -263,6 +290,7 @@ fn main() {
}
};
// Start the node using a `tokio` executor.
match run::run_beacon_node(client_config, eth2_config, &log) {
Ok(_) => {}
Err(e) => crit!(log, "Beacon node failed to start"; "reason" => format!("{:}", e)),

View File

@ -15,6 +15,12 @@ use tokio::runtime::TaskExecutor;
use tokio_timer::clock::Clock;
use types::{MainnetEthSpec, MinimalEthSpec};
/// Reads the configuration and initializes a `BeaconChain` with the required types and parameters.
///
/// Spawns an executor which performs syncing, networking, block production, etc.
///
/// Blocks the current thread, returning after the `BeaconChain` has exited or a `Ctrl+C`
/// signal.
pub fn run_beacon_node(
client_config: ClientConfig,
eth2_config: Eth2Config,
@ -38,7 +44,7 @@ pub fn run_beacon_node(
warn!(
log,
"This software is EXPERIMENTAL and provides no guarantees or warranties."
"Ethereum 2.0 is pre-release. This software is experimental."
);
info!(
@ -46,6 +52,7 @@ pub fn run_beacon_node(
"Starting beacon node";
"p2p_listen_address" => format!("{:?}", &other_client_config.network.listen_address),
"data_dir" => format!("{:?}", other_client_config.data_dir()),
"network_dir" => format!("{:?}", other_client_config.network.network_dir),
"spec_constants" => &spec_constants,
"db_type" => &other_client_config.db_type,
);
@ -92,7 +99,8 @@ pub fn run_beacon_node(
result
}
pub fn run<T>(
/// Performs the type-generic parts of launching a `BeaconChain`.
fn run<T>(
db_path: &Path,
client_config: ClientConfig,
eth2_config: Eth2Config,

View File

@ -11,7 +11,7 @@ use crate::service::Service as ValidatorService;
use clap::{App, Arg};
use eth2_config::{read_from_file, write_to_file, Eth2Config};
use protos::services_grpc::ValidatorServiceClient;
use slog::{crit, error, info, o, Drain};
use slog::{crit, error, info, o, Drain, Level};
use std::fs;
use std::path::PathBuf;
use types::{Keypair, MainnetEthSpec, MinimalEthSpec};
@ -26,7 +26,6 @@ fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let mut log = slog::Logger::root(drain, o!());
// CLI
let matches = App::new("Lighthouse Validator Client")
@ -73,8 +72,29 @@ fn main() {
.possible_values(&["mainnet", "minimal"])
.default_value("minimal"),
)
.arg(
Arg::with_name("debug-level")
.long("debug-level")
.value_name("LEVEL")
.short("s")
.help("The title of the spec constants for chain config.")
.takes_value(true)
.possible_values(&["info", "debug", "trace", "warn", "error", "crit"])
.default_value("info"),
)
.get_matches();
let drain = match matches.value_of("debug-level") {
Some("info") => drain.filter_level(Level::Info),
Some("debug") => drain.filter_level(Level::Debug),
Some("trace") => drain.filter_level(Level::Trace),
Some("warn") => drain.filter_level(Level::Warning),
Some("error") => drain.filter_level(Level::Error),
Some("crit") => drain.filter_level(Level::Critical),
_ => unreachable!("guarded by clap"),
};
let mut log = slog::Logger::root(drain.fuse(), o!());
let data_dir = match matches
.value_of("datadir")
.and_then(|v| Some(PathBuf::from(v)))