lighthouse/beacon_node/beacon_chain/src/builder.rs
Paul Hauner 8609cced0e Reset payload statuses when resuming fork choice (#3498)
## Issue Addressed

NA

## Proposed Changes

This PR is motivated by a recent consensus failure in Geth where it returned `INVALID` for an `VALID` block. Without this PR, the only way to recover is by re-syncing Lighthouse. Whilst ELs "shouldn't have consensus failures", in reality it's something that we can expect from time to time due to the complex nature of Ethereum. Being able to recover easily will help the network recover and EL devs to troubleshoot.

The risk introduced with this PR is that genuinely INVALID payloads get a "second chance" at being imported. I believe the DoS risk here is negligible since LH needs to be restarted in order to re-process the payload. Furthermore, there's no reason to think that a well-performing EL will accept a truly invalid payload the second-time-around.

## Additional Info

This implementation has the following intricacies:

1. Instead of just resetting *invalid* payloads to optimistic, we'll also reset *valid* payloads. This is an artifact of our existing implementation.
1. We will only reset payload statuses when we detect an invalid payload present in `proto_array`
    - This helps save us from forgetting that all our blocks are valid in the "best case scenario" where there are no invalid blocks.
1. If we fail to revert the payload statuses we'll log a `CRIT` and just continue with a `proto_array` that *does not* have reverted payload statuses.
    - The code to revert statuses needs to deal with balances and proposer-boost, so it's a failure point. This is a defensive measure to avoid introducing new show-stopping bugs to LH.
2022-08-29 14:34:41 +00:00

1111 lines
42 KiB
Rust

use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::head_tracker::HeadTracker;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::ValidatorMonitor;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::ChainConfig;
use crate::{
BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain,
Eth1ChainBackend, ServerSentEventHandler,
};
use eth1::Config as Eth1Config;
use execution_layer::ExecutionLayer;
use fork_choice::{ForkChoice, ResetPayloadStatuses};
use futures::channel::mpsc::Sender;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::RwLock;
use slasher::Slasher;
use slog::{crit, error, info, Logger};
use slot_clock::{SlotClock, TestingSlotClock};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
};
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
/// functionality and only exists to satisfy the type system.
pub struct Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>(
PhantomData<(TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore)>,
);
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore> BeaconChainTypes
for Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
where
THotStore: ItemStore<TEthSpec> + 'static,
TColdStore: ItemStore<TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec> + 'static,
TEthSpec: EthSpec + 'static,
{
type HotStore = THotStore;
type ColdStore = TColdStore;
type SlotClock = TSlotClock;
type Eth1Chain = TEth1Backend;
type EthSpec = TEthSpec;
}
/// Builds a `BeaconChain` by either creating anew from genesis, or, resuming from an existing chain
/// persisted to `store`.
///
/// Types may be elided and the compiler will infer them if all necessary builder methods have been
/// called. If type inference errors are being raised, it is likely that not all required methods
/// have been called.
///
/// See the tests for an example of a complete working example.
pub struct BeaconChainBuilder<T: BeaconChainTypes> {
#[allow(clippy::type_complexity)]
store: Option<Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>>,
store_migrator_config: Option<MigratorConfig>,
pub genesis_time: Option<u64>,
genesis_block_root: Option<Hash256>,
genesis_state_root: Option<Hash256>,
#[allow(clippy::type_complexity)]
fork_choice: Option<
ForkChoice<BeaconForkChoiceStore<T::EthSpec, T::HotStore, T::ColdStore>, T::EthSpec>,
>,
op_pool: Option<OperationPool<T::EthSpec>>,
eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>,
execution_layer: Option<ExecutionLayer<T::EthSpec>>,
event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
slot_clock: Option<T::SlotClock>,
shutdown_sender: Option<Sender<ShutdownReason>>,
head_tracker: Option<HeadTracker>,
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
spec: ChainSpec,
chain_config: ChainConfig,
log: Option<Logger>,
graffiti: Graffiti,
slasher: Option<Arc<Slasher<T::EthSpec>>>,
validator_monitor: Option<ValidatorMonitor<T::EthSpec>>,
// Pending I/O batch that is constructed during building and should be executed atomically
// alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called.
pending_io_batch: Vec<KeyValueStoreOp>,
task_executor: Option<TaskExecutor>,
}
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
BeaconChainBuilder<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>
where
THotStore: ItemStore<TEthSpec> + 'static,
TColdStore: ItemStore<TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec> + 'static,
TEthSpec: EthSpec + 'static,
{
/// Returns a new builder.
///
/// The `_eth_spec_instance` parameter is only supplied to make concrete the `TEthSpec` trait.
/// This should generally be either the `MinimalEthSpec` or `MainnetEthSpec` types.
pub fn new(_eth_spec_instance: TEthSpec) -> Self {
Self {
store: None,
store_migrator_config: None,
genesis_time: None,
genesis_block_root: None,
genesis_state_root: None,
fork_choice: None,
op_pool: None,
eth1_chain: None,
execution_layer: None,
event_handler: None,
slot_clock: None,
shutdown_sender: None,
head_tracker: None,
validator_pubkey_cache: None,
spec: TEthSpec::default_spec(),
chain_config: ChainConfig::default(),
log: None,
graffiti: Graffiti::default(),
slasher: None,
validator_monitor: None,
pending_io_batch: vec![],
task_executor: None,
}
}
/// Override the default spec (as defined by `TEthSpec`).
///
/// This method should generally be called immediately after `Self::new` to ensure components
/// are started with a consistent spec.
pub fn custom_spec(mut self, spec: ChainSpec) -> Self {
self.spec = spec;
self
}
/// Get a reference to the builder's spec.
pub fn get_spec(&self) -> &ChainSpec {
&self.spec
}
/// Sets the maximum number of blocks that will be skipped when processing
/// some consensus messages.
///
/// Set to `None` for no limit.
pub fn import_max_skip_slots(mut self, n: Option<u64>) -> Self {
self.chain_config.import_max_skip_slots = n;
self
}
/// Sets the store (database).
///
/// Should generally be called early in the build chain.
pub fn store(mut self, store: Arc<HotColdDB<TEthSpec, THotStore, TColdStore>>) -> Self {
self.store = Some(store);
self
}
/// Sets the store migrator config (optional).
pub fn store_migrator_config(mut self, config: MigratorConfig) -> Self {
self.store_migrator_config = Some(config);
self
}
/// Sets the slasher.
pub fn slasher(mut self, slasher: Arc<Slasher<TEthSpec>>) -> Self {
self.slasher = Some(slasher);
self
}
/// Sets the logger.
///
/// Should generally be called early in the build chain.
pub fn logger(mut self, log: Logger) -> Self {
self.log = Some(log);
self
}
/// Sets the task executor.
pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self {
self.task_executor = Some(task_executor);
self
}
/// Attempt to load an existing eth1 cache from the builder's `Store`.
pub fn get_persisted_eth1_backend(&self) -> Result<Option<SszEth1>, String> {
let store = self
.store
.clone()
.ok_or("get_persisted_eth1_backend requires a store.")?;
store
.get_item::<SszEth1>(&ETH1_CACHE_DB_KEY)
.map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e))
}
/// Returns true if `self.store` contains a persisted beacon chain.
pub fn store_contains_beacon_chain(&self) -> Result<bool, String> {
let store = self
.store
.clone()
.ok_or("store_contains_beacon_chain requires a store.")?;
Ok(store
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.is_some())
}
/// Attempt to load an existing chain from the builder's `Store`.
///
/// May initialize several components; including the op_pool and finalized checkpoints.
pub fn resume_from_db(mut self) -> Result<Self, String> {
let log = self.log.as_ref().ok_or("resume_from_db requires a log")?;
info!(
log,
"Starting beacon chain";
"method" => "resume"
);
let store = self
.store
.clone()
.ok_or("resume_from_db requires a store.")?;
let chain = store
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.ok_or_else(|| {
"No persisted beacon chain found in store. Try purging the beacon chain database."
.to_string()
})?;
let fork_choice =
BeaconChain::<Witness<TSlotClock, TEth1Backend, _, _, _>>::load_fork_choice(
store.clone(),
ResetPayloadStatuses::always_reset_conditionally(
self.chain_config.always_reset_payload_statuses,
),
&self.spec,
log,
)
.map_err(|e| format!("Unable to load fork choice from disk: {:?}", e))?
.ok_or("Fork choice not found in store")?;
let genesis_block = store
.get_blinded_block(&chain.genesis_block_root)
.map_err(|e| descriptive_db_error("genesis block", &e))?
.ok_or("Genesis block not found in store")?;
let genesis_state = store
.get_state(&genesis_block.state_root(), Some(genesis_block.slot()))
.map_err(|e| descriptive_db_error("genesis state", &e))?
.ok_or("Genesis block not found in store")?;
self.genesis_time = Some(genesis_state.genesis_time());
self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
.map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))?
.map(PersistedOperationPool::into_operation_pool)
.transpose()
.map_err(|e| {
format!(
"Error while creating the op pool from the persisted op pool: {:?}",
e
)
})?
.unwrap_or_else(OperationPool::new),
);
let pubkey_cache = ValidatorPubkeyCache::load_from_store(store)
.map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?;
self.genesis_block_root = Some(chain.genesis_block_root);
self.genesis_state_root = Some(genesis_block.state_root());
self.head_tracker = Some(
HeadTracker::from_ssz_container(&chain.ssz_head_tracker)
.map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?,
);
self.validator_pubkey_cache = Some(pubkey_cache);
self.fork_choice = Some(fork_choice);
Ok(self)
}
/// Store the genesis state & block in the DB.
///
/// Do *not* initialize fork choice, or do anything that assumes starting from genesis.
///
/// Return the `BeaconSnapshot` representing genesis as well as the mutated builder.
fn set_genesis_state(
mut self,
mut beacon_state: BeaconState<TEthSpec>,
) -> Result<(BeaconSnapshot<TEthSpec>, Self), String> {
let store = self
.store
.clone()
.ok_or("set_genesis_state requires a store")?;
let beacon_block = genesis_block(&mut beacon_state, &self.spec)?;
beacon_state
.build_all_caches(&self.spec)
.map_err(|e| format!("Failed to build genesis state caches: {:?}", e))?;
let beacon_state_root = beacon_block.message().state_root();
let beacon_block_root = beacon_block.canonical_root();
store
.put_state(&beacon_state_root, &beacon_state)
.map_err(|e| format!("Failed to store genesis state: {:?}", e))?;
store
.put_block(&beacon_block_root, beacon_block.clone())
.map_err(|e| format!("Failed to store genesis block: {:?}", e))?;
// Store the genesis block under the `ZERO_HASH` key.
store
.put_block(&Hash256::zero(), beacon_block.clone())
.map_err(|e| {
format!(
"Failed to store genesis block under 0x00..00 alias: {:?}",
e
)
})?;
self.genesis_state_root = Some(beacon_state_root);
self.genesis_block_root = Some(beacon_block_root);
self.genesis_time = Some(beacon_state.genesis_time());
Ok((
BeaconSnapshot {
beacon_block_root,
beacon_block: Arc::new(beacon_block),
beacon_state,
},
self,
))
}
/// Starts a new chain from a genesis state.
pub fn genesis_state(mut self, beacon_state: BeaconState<TEthSpec>) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;
let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
self = updated_builder;
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis);
let current_slot = None;
let fork_choice = ForkChoice::from_anchor(
fc_store,
genesis.beacon_block_root,
&genesis.beacon_block,
&genesis.beacon_state,
current_slot,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
self.fork_choice = Some(fork_choice);
Ok(self.empty_op_pool())
}
/// Start the chain from a weak subjectivity state.
pub fn weak_subjectivity_state(
mut self,
mut weak_subj_state: BeaconState<TEthSpec>,
weak_subj_block: SignedBeaconBlock<TEthSpec>,
genesis_state: BeaconState<TEthSpec>,
) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;
let weak_subj_slot = weak_subj_state.slot();
let weak_subj_block_root = weak_subj_block.canonical_root();
let weak_subj_state_root = weak_subj_block.state_root();
// Check that the given block lies on an epoch boundary. Due to the database only storing
// full states on epoch boundaries and at restore points it would be difficult to support
// starting from a mid-epoch state.
if weak_subj_slot % TEthSpec::slots_per_epoch() != 0 {
return Err(format!(
"Checkpoint block at slot {} is not aligned to epoch start. \
Please supply an aligned checkpoint with block.slot % 32 == 0",
weak_subj_block.slot(),
));
}
// Check that the block and state have consistent slots and state roots.
if weak_subj_state.slot() != weak_subj_block.slot() {
return Err(format!(
"Slot of snapshot block ({}) does not match snapshot state ({})",
weak_subj_block.slot(),
weak_subj_state.slot(),
));
}
// Prime all caches before storing the state in the database and computing the tree hash
// root.
weak_subj_state
.build_all_caches(&self.spec)
.map_err(|e| format!("Error building caches on checkpoint state: {e:?}"))?;
let computed_state_root = weak_subj_state
.update_tree_hash_cache()
.map_err(|e| format!("Error computing checkpoint state root: {:?}", e))?;
if weak_subj_state_root != computed_state_root {
return Err(format!(
"Snapshot state root does not match block, expected: {:?}, got: {:?}",
weak_subj_state_root, computed_state_root
));
}
// Check that the checkpoint state is for the same network as the genesis state.
// This check doesn't do much for security but should prevent mistakes.
if weak_subj_state.genesis_validators_root() != genesis_state.genesis_validators_root() {
return Err(format!(
"Snapshot state appears to be from the wrong network. Genesis validators root \
is {:?} but should be {:?}",
weak_subj_state.genesis_validators_root(),
genesis_state.genesis_validators_root()
));
}
// Set the store's split point *before* storing genesis so that genesis is stored
// immediately in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root);
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder;
// Write the state and block non-atomically, it doesn't matter if they're forgotten
// about on a crash restart.
store
.put_state(&weak_subj_state_root, &weak_subj_state)
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
store
.put_block(&weak_subj_block_root, weak_subj_block.clone())
.map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?;
// Stage the database's metadata fields for atomic storage when `build` is called.
// This prevents the database from restarting in an inconsistent state if the anchor
// info or split point is written before the `PersistedBeaconChain`.
self.pending_io_batch.push(store.store_split_in_batch());
self.pending_io_batch.push(
store
.init_anchor_info(weak_subj_block.message())
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
self.pending_io_batch
.push(store.pruning_checkpoint_store_op(Checkpoint {
root: weak_subj_block_root,
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
}));
let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
beacon_block: Arc::new(weak_subj_block),
beacon_state: weak_subj_state,
};
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot);
let current_slot = Some(snapshot.beacon_block.slot());
let fork_choice = ForkChoice::from_anchor(
fc_store,
snapshot.beacon_block_root,
&snapshot.beacon_block,
&snapshot.beacon_state,
current_slot,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
self.fork_choice = Some(fork_choice);
Ok(self.empty_op_pool())
}
/// Sets the `BeaconChain` eth1 backend.
pub fn eth1_backend(mut self, backend: Option<TEth1Backend>) -> Self {
self.eth1_chain = backend.map(Eth1Chain::new);
self
}
/// Sets the `BeaconChain` execution layer.
pub fn execution_layer(mut self, execution_layer: Option<ExecutionLayer<TEthSpec>>) -> Self {
self.execution_layer = execution_layer;
self
}
/// Sets the `BeaconChain` event handler backend.
///
/// For example, provide `ServerSentEventHandler` as a `handler`.
pub fn event_handler(mut self, handler: Option<ServerSentEventHandler<TEthSpec>>) -> Self {
self.event_handler = handler;
self
}
/// Sets the `BeaconChain` slot clock.
///
/// For example, provide `SystemTimeSlotClock` as a `clock`.
pub fn slot_clock(mut self, clock: TSlotClock) -> Self {
self.slot_clock = Some(clock);
self
}
/// Fetch a reference to the slot clock.
///
/// Can be used for mutation during testing due to `SlotClock`'s internal mutability.
pub fn get_slot_clock(&self) -> Option<&TSlotClock> {
self.slot_clock.as_ref()
}
/// Sets a `Sender` to allow the beacon chain to send shutdown signals.
pub fn shutdown_sender(mut self, sender: Sender<ShutdownReason>) -> Self {
self.shutdown_sender = Some(sender);
self
}
/// Creates a new, empty operation pool.
fn empty_op_pool(mut self) -> Self {
self.op_pool = Some(OperationPool::new());
self
}
/// Sets the `graffiti` field.
pub fn graffiti(mut self, graffiti: Graffiti) -> Self {
self.graffiti = graffiti;
self
}
/// Sets the `ChainConfig` that determines `BeaconChain` runtime behaviour.
pub fn chain_config(mut self, config: ChainConfig) -> Self {
self.chain_config = config;
self
}
/// Register some validators for additional monitoring.
///
/// `validators` is a comma-separated string of 0x-formatted BLS pubkeys.
pub fn monitor_validators(
mut self,
auto_register: bool,
validators: Vec<PublicKeyBytes>,
log: Logger,
) -> Self {
self.validator_monitor = Some(ValidatorMonitor::new(
validators,
auto_register,
log.clone(),
));
self
}
/// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied.
///
/// An error will be returned at runtime if all required parameters have not been configured.
///
/// Will also raise ambiguous type errors at compile time if some parameters have not been
/// configured.
#[allow(clippy::type_complexity)] // I think there's nothing to be gained here from a type alias.
pub fn build(
mut self,
) -> Result<
BeaconChain<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>,
String,
> {
let log = self.log.ok_or("Cannot build without a logger")?;
let slot_clock = self
.slot_clock
.ok_or("Cannot build without a slot_clock.")?;
let store = self.store.clone().ok_or("Cannot build without a store.")?;
let mut fork_choice = self
.fork_choice
.ok_or("Cannot build without fork choice.")?;
let genesis_block_root = self
.genesis_block_root
.ok_or("Cannot build without a genesis block root")?;
let genesis_state_root = self
.genesis_state_root
.ok_or("Cannot build without a genesis state root")?;
let mut validator_monitor = self
.validator_monitor
.ok_or("Cannot build without a validator monitor")?;
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());
let current_slot = if slot_clock
.is_prior_to_genesis()
.ok_or("Unable to read slot clock")?
{
self.spec.genesis_slot
} else {
slot_clock.now().ok_or("Unable to read slot")?
};
let initial_head_block_root = fork_choice
.get_head(current_slot, &self.spec)
.map_err(|e| format!("Unable to get fork choice head: {:?}", e))?;
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_full_block(&initial_head_block_root) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {
error!(
log,
"Error decoding head block";
"message" => "This node has likely missed a hard fork. \
It will try to revert the invalid blocks and keep running, \
but any stray blocks and states will not be deleted. \
Long-term you should consider re-syncing this node."
);
let (block_root, block) = revert_to_fork_boundary(
current_slot,
initial_head_block_root,
store.clone(),
&self.spec,
&log,
)?;
// Update head tracker.
head_tracker.register_block(block_root, block.parent_root(), block.slot());
(block_root, block, true)
}
Err(e) => return Err(descriptive_db_error("head block", &e)),
};
let head_state_root = head_block.state_root();
let head_state = store
.get_state(&head_state_root, Some(head_block.slot()))
.map_err(|e| descriptive_db_error("head state", &e))?
.ok_or("Head state not found in store")?;
// If the head reverted then we need to reset fork choice using the new head's finalized
// checkpoint.
if head_reverted {
fork_choice = reset_fork_choice_to_finalization(
head_block_root,
&head_state,
store.clone(),
Some(current_slot),
&self.spec,
self.chain_config.count_unrealized.into(),
)?;
}
let mut head_snapshot = BeaconSnapshot {
beacon_block_root: head_block_root,
beacon_block: Arc::new(head_block),
beacon_state: head_state,
};
head_snapshot
.beacon_state
.build_all_caches(&self.spec)
.map_err(|e| format!("Failed to build state caches: {:?}", e))?;
// Perform a check to ensure that the finalization points of the head and fork choice are
// consistent.
//
// This is a sanity check to detect database corruption.
let fc_finalized = fork_choice.finalized_checkpoint();
let head_finalized = head_snapshot.beacon_state.finalized_checkpoint();
if fc_finalized.epoch < head_finalized.epoch {
return Err(format!(
"Database corrupt: fork choice is finalized at {:?} whilst head is finalized at \
{:?}",
fc_finalized, head_finalized
));
}
let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| {
ValidatorPubkeyCache::new(&head_snapshot.beacon_state, store.clone())
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
})?;
let migrator_config = self.store_migrator_config.unwrap_or_default();
let store_migrator = BackgroundMigrator::new(
store.clone(),
migrator_config,
genesis_block_root,
log.clone(),
);
if let Some(slot) = slot_clock.now() {
validator_monitor.process_valid_state(
slot.epoch(TEthSpec::slots_per_epoch()),
&head_snapshot.beacon_state,
);
}
// If enabled, set up the fork choice signaller.
let (fork_choice_signal_tx, fork_choice_signal_rx) =
if self.chain_config.fork_choice_before_proposal_timeout_ms != 0 {
let tx = ForkChoiceSignalTx::new();
let rx = tx.get_receiver();
(Some(tx), Some(rx))
} else {
(None, None)
};
// Store the `PersistedBeaconChain` in the database atomically with the metadata so that on
// restart we can correctly detect the presence of an initialized database.
//
// This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance
// doesn't write a `PersistedBeaconChain` without the rest of the batch.
self.pending_io_batch.push(BeaconChain::<
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
>::persist_head_in_batch_standalone(
genesis_block_root, &head_tracker
));
self.pending_io_batch.push(BeaconChain::<
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
>::persist_fork_choice_in_batch_standalone(
&fork_choice
));
store
.hot_db
.do_atomically(self.pending_io_batch)
.map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?;
let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root();
let genesis_time = head_snapshot.beacon_state.genesis_time();
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let beacon_chain = BeaconChain {
spec: self.spec,
config: self.chain_config,
store,
task_executor: self
.task_executor
.ok_or("Cannot build without task executor")?,
store_migrator,
slot_clock,
op_pool: self.op_pool.ok_or("Cannot build without op pool")?,
// TODO: allow for persisting and loading the pool from disk.
naive_aggregation_pool: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
naive_sync_aggregation_pool: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_attestations: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_sync_contributions: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_gossip_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_sync_contributors: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_aggregators: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_sync_aggregators: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_producers: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_voluntary_exits: <_>::default(),
observed_proposer_slashings: <_>::default(),
observed_attester_slashings: <_>::default(),
eth1_chain: self.eth1_chain,
execution_layer: self.execution_layer,
genesis_validators_root,
genesis_time,
canonical_head,
genesis_block_root,
genesis_state_root,
fork_choice_signal_tx,
fork_choice_signal_rx,
event_handler: self.event_handler,
head_tracker,
snapshot_cache: TimeoutRwLock::new(SnapshotCache::new(
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
shutdown_sender: self
.shutdown_sender
.ok_or("Cannot build without a shutdown sender.")?,
log: log.clone(),
graffiti: self.graffiti,
slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor),
};
let head = beacon_chain.head_snapshot();
// Prime the attester cache with the head state.
beacon_chain
.attester_cache
.maybe_cache_state(
&head.beacon_state,
head.beacon_block_root,
&beacon_chain.spec,
)
.map_err(|e| format!("Failed to prime attester cache: {:?}", e))?;
// Only perform the check if it was configured.
if let Some(wss_checkpoint) = beacon_chain.config.weak_subjectivity_checkpoint {
if let Err(e) = beacon_chain.verify_weak_subjectivity_checkpoint(
wss_checkpoint,
head.beacon_block_root,
&head.beacon_state,
) {
crit!(
log,
"Weak subjectivity checkpoint verification failed on startup!";
"head_block_root" => format!("{}", head.beacon_block_root),
"head_slot" => format!("{}", head.beacon_block.slot()),
"finalized_epoch" => format!("{}", head.beacon_state.finalized_checkpoint().epoch),
"wss_checkpoint_epoch" => format!("{}", wss_checkpoint.epoch),
"error" => format!("{:?}", e),
);
crit!(log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network.");
return Err(format!("Weak subjectivity verification failed: {:?}", e));
}
}
info!(
log,
"Beacon chain initialized";
"head_state" => format!("{}", head.beacon_state_root()),
"head_block" => format!("{}", head.beacon_block_root),
"head_slot" => format!("{}", head.beacon_block.slot()),
);
// Check for states to reconstruct (in the background).
if beacon_chain.config.reconstruct_historic_states {
beacon_chain.store_migrator.process_reconstruction();
}
Ok(beacon_chain)
}
}
impl<TSlotClock, TEthSpec, THotStore, TColdStore>
BeaconChainBuilder<
Witness<TSlotClock, CachingEth1Backend<TEthSpec>, TEthSpec, THotStore, TColdStore>,
>
where
THotStore: ItemStore<TEthSpec> + 'static,
TColdStore: ItemStore<TEthSpec> + 'static,
TSlotClock: SlotClock + 'static,
TEthSpec: EthSpec + 'static,
{
/// Do not use any eth1 backend. The client will not be able to produce beacon blocks.
pub fn no_eth1_backend(self) -> Self {
self.eth1_backend(None)
}
/// Sets the `BeaconChain` eth1 back-end to produce predictably junk data when producing blocks.
pub fn dummy_eth1_backend(mut self) -> Result<Self, String> {
let log = self
.log
.as_ref()
.ok_or("dummy_eth1_backend requires a log")?;
let backend =
CachingEth1Backend::new(Eth1Config::default(), log.clone(), self.spec.clone());
self.eth1_chain = Some(Eth1Chain::new_dummy(backend));
Ok(self)
}
}
impl<TEth1Backend, TEthSpec, THotStore, TColdStore>
BeaconChainBuilder<Witness<TestingSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>
where
THotStore: ItemStore<TEthSpec> + 'static,
TColdStore: ItemStore<TEthSpec> + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec> + 'static,
TEthSpec: EthSpec + 'static,
{
/// Sets the `BeaconChain` slot clock to `TestingSlotClock`.
///
/// Requires the state to be initialized.
pub fn testing_slot_clock(self, slot_duration: Duration) -> Result<Self, String> {
let genesis_time = self
.genesis_time
.ok_or("testing_slot_clock requires an initialized state")?;
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(genesis_time),
slot_duration,
);
Ok(self.slot_clock(slot_clock))
}
}
fn genesis_block<T: EthSpec>(
genesis_state: &mut BeaconState<T>,
spec: &ChainSpec,
) -> Result<SignedBeaconBlock<T>, String> {
let mut genesis_block = BeaconBlock::empty(spec);
*genesis_block.state_root_mut() = genesis_state
.update_tree_hash_cache()
.map_err(|e| format!("Error hashing genesis state: {:?}", e))?;
Ok(SignedBeaconBlock::from_block(
genesis_block,
// Empty signature, which should NEVER be read. This isn't to-spec, but makes the genesis
// block consistent with every other block.
Signature::empty(),
))
}
// Helper function to return more useful errors when reading from the database.
fn descriptive_db_error(item: &str, error: &StoreError) -> String {
let additional_info = if let StoreError::SszDecodeError(_) = error {
"Ensure the data directory is not initialized for a different network. The \
--purge-db flag can be used to permanently delete the existing data directory."
} else {
"Database corruption may be present. If the issue persists, use \
--purge-db to permanently delete the existing data directory."
};
format!(
"DB error when reading {}: {:?}. {}",
item, error, additional_info
)
}
#[cfg(not(debug_assertions))]
#[cfg(test)]
mod test {
use super::*;
use eth2_hashing::hash;
use genesis::{
generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH,
};
use sloggers::{null::NullLoggerBuilder, Build};
use ssz::Encode;
use std::time::Duration;
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use task_executor::test_utils::TestRuntime;
use types::{EthSpec, MinimalEthSpec, Slot};
type TestEthSpec = MinimalEthSpec;
fn get_logger() -> Logger {
let builder = NullLoggerBuilder;
builder.build().expect("should build logger")
}
#[test]
fn recent_genesis() {
let validator_count = 1;
let genesis_time = 13_371_337;
let log = get_logger();
let store: HotColdDB<
MinimalEthSpec,
MemoryStore<MinimalEthSpec>,
MemoryStore<MinimalEthSpec>,
> = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone())
.unwrap();
let spec = MinimalEthSpec::default_spec();
let genesis_state = interop_genesis_state(
&generate_deterministic_keypairs(validator_count),
genesis_time,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
None,
&spec,
)
.expect("should create interop genesis state");
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let runtime = TestRuntime::default();
let chain = BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.store(Arc::new(store))
.task_executor(runtime.task_executor.clone())
.genesis_state(genesis_state)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
.expect("should build the dummy eth1 backend")
.testing_slot_clock(Duration::from_secs(1))
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.monitor_validators(true, vec![], log.clone())
.build()
.expect("should build");
let head = chain.head_snapshot();
let state = &head.beacon_state;
let block = &head.beacon_block;
assert_eq!(state.slot(), Slot::new(0), "should start from genesis");
assert_eq!(
state.genesis_time(),
13_371_337,
"should have the correct genesis time"
);
assert_eq!(
block.state_root(),
state.canonical_root(),
"block should have correct state root"
);
assert_eq!(
chain
.store
.get_blinded_block(&Hash256::zero())
.expect("should read db")
.expect("should find genesis block"),
block.clone_as_blinded(),
"should store genesis block under zero hash alias"
);
assert_eq!(
state.validators().len(),
validator_count,
"should have correct validator count"
);
assert_eq!(
chain.genesis_block_root,
block.canonical_root(),
"should have correct genesis block root"
);
}
#[test]
fn interop_state() {
let validator_count = 16;
let genesis_time = 42;
let spec = &TestEthSpec::default_spec();
let keypairs = generate_deterministic_keypairs(validator_count);
let state = interop_genesis_state::<TestEthSpec>(
&keypairs,
genesis_time,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
None,
spec,
)
.expect("should build state");
assert_eq!(
state.eth1_data().block_hash,
Hash256::from_slice(&[0x42; 32]),
"eth1 block hash should be co-ordinated junk"
);
assert_eq!(
state.genesis_time(),
genesis_time,
"genesis time should be as specified"
);
for b in state.balances() {
assert_eq!(
*b, spec.max_effective_balance,
"validator balances should be max effective balance"
);
}
for v in state.validators() {
let creds = v.withdrawal_credentials.as_bytes();
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
"first byte of withdrawal creds should be bls prefix"
);
assert_eq!(
&creds[1..],
&hash(&v.pubkey.as_ssz_bytes())[1..],
"rest of withdrawal creds should be pubkey hash"
)
}
assert_eq!(
state.balances().len(),
validator_count,
"validator balances len should be correct"
);
assert_eq!(
state.validators().len(),
validator_count,
"validator count should be correct"
);
}
}