1129 lines
43 KiB
Rust
1129 lines
43 KiB
Rust
use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
|
|
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
|
|
use crate::fork_choice_signal::ForkChoiceSignalTx;
|
|
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
|
|
use crate::head_tracker::HeadTracker;
|
|
use crate::migrate::{BackgroundMigrator, MigratorConfig};
|
|
use crate::persisted_beacon_chain::PersistedBeaconChain;
|
|
use crate::shuffling_cache::ShufflingCache;
|
|
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
|
|
use crate::timeout_rw_lock::TimeoutRwLock;
|
|
use crate::validator_monitor::ValidatorMonitor;
|
|
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
|
use crate::ChainConfig;
|
|
use crate::{
|
|
BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain,
|
|
Eth1ChainBackend, ServerSentEventHandler,
|
|
};
|
|
use eth1::Config as Eth1Config;
|
|
use execution_layer::ExecutionLayer;
|
|
use fork_choice::{ForkChoice, ResetPayloadStatuses};
|
|
use futures::channel::mpsc::Sender;
|
|
use operation_pool::{OperationPool, PersistedOperationPool};
|
|
use parking_lot::RwLock;
|
|
use slasher::Slasher;
|
|
use slog::{crit, error, info, Logger};
|
|
use slot_clock::{SlotClock, TestingSlotClock};
|
|
use std::marker::PhantomData;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
|
|
use task_executor::{ShutdownReason, TaskExecutor};
|
|
use types::{
|
|
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
|
|
Signature, SignedBeaconBlock, Slot,
|
|
};
|
|
|
|
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
|
|
/// functionality and only exists to satisfy the type system.
|
|
pub struct Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>(
|
|
PhantomData<(TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore)>,
|
|
);
|
|
|
|
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore> BeaconChainTypes
|
|
for Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
|
|
where
|
|
THotStore: ItemStore<TEthSpec> + 'static,
|
|
TColdStore: ItemStore<TEthSpec> + 'static,
|
|
TSlotClock: SlotClock + 'static,
|
|
TEth1Backend: Eth1ChainBackend<TEthSpec> + 'static,
|
|
TEthSpec: EthSpec + 'static,
|
|
{
|
|
type HotStore = THotStore;
|
|
type ColdStore = TColdStore;
|
|
type SlotClock = TSlotClock;
|
|
type Eth1Chain = TEth1Backend;
|
|
type EthSpec = TEthSpec;
|
|
}
|
|
|
|
/// Builds a `BeaconChain` by either creating anew from genesis, or, resuming from an existing chain
|
|
/// persisted to `store`.
|
|
///
|
|
/// Types may be elided and the compiler will infer them if all necessary builder methods have been
|
|
/// called. If type inference errors are being raised, it is likely that not all required methods
|
|
/// have been called.
|
|
///
|
|
/// See the tests for an example of a complete working example.
|
|
pub struct BeaconChainBuilder<T: BeaconChainTypes> {
|
|
#[allow(clippy::type_complexity)]
|
|
store: Option<Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>>,
|
|
store_migrator_config: Option<MigratorConfig>,
|
|
pub genesis_time: Option<u64>,
|
|
genesis_block_root: Option<Hash256>,
|
|
genesis_state_root: Option<Hash256>,
|
|
#[allow(clippy::type_complexity)]
|
|
fork_choice: Option<
|
|
ForkChoice<BeaconForkChoiceStore<T::EthSpec, T::HotStore, T::ColdStore>, T::EthSpec>,
|
|
>,
|
|
op_pool: Option<OperationPool<T::EthSpec>>,
|
|
eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>,
|
|
execution_layer: Option<ExecutionLayer<T::EthSpec>>,
|
|
event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
|
|
slot_clock: Option<T::SlotClock>,
|
|
shutdown_sender: Option<Sender<ShutdownReason>>,
|
|
head_tracker: Option<HeadTracker>,
|
|
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
|
|
spec: ChainSpec,
|
|
chain_config: ChainConfig,
|
|
log: Option<Logger>,
|
|
graffiti: Graffiti,
|
|
slasher: Option<Arc<Slasher<T::EthSpec>>>,
|
|
validator_monitor: Option<ValidatorMonitor<T::EthSpec>>,
|
|
// Pending I/O batch that is constructed during building and should be executed atomically
|
|
// alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called.
|
|
pending_io_batch: Vec<KeyValueStoreOp>,
|
|
task_executor: Option<TaskExecutor>,
|
|
}
|
|
|
|
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
|
|
BeaconChainBuilder<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>
|
|
where
|
|
THotStore: ItemStore<TEthSpec> + 'static,
|
|
TColdStore: ItemStore<TEthSpec> + 'static,
|
|
TSlotClock: SlotClock + 'static,
|
|
TEth1Backend: Eth1ChainBackend<TEthSpec> + 'static,
|
|
TEthSpec: EthSpec + 'static,
|
|
{
|
|
/// Returns a new builder.
|
|
///
|
|
/// The `_eth_spec_instance` parameter is only supplied to make concrete the `TEthSpec` trait.
|
|
/// This should generally be either the `MinimalEthSpec` or `MainnetEthSpec` types.
|
|
pub fn new(_eth_spec_instance: TEthSpec) -> Self {
|
|
Self {
|
|
store: None,
|
|
store_migrator_config: None,
|
|
genesis_time: None,
|
|
genesis_block_root: None,
|
|
genesis_state_root: None,
|
|
fork_choice: None,
|
|
op_pool: None,
|
|
eth1_chain: None,
|
|
execution_layer: None,
|
|
event_handler: None,
|
|
slot_clock: None,
|
|
shutdown_sender: None,
|
|
head_tracker: None,
|
|
validator_pubkey_cache: None,
|
|
spec: TEthSpec::default_spec(),
|
|
chain_config: ChainConfig::default(),
|
|
log: None,
|
|
graffiti: Graffiti::default(),
|
|
slasher: None,
|
|
validator_monitor: None,
|
|
pending_io_batch: vec![],
|
|
task_executor: None,
|
|
}
|
|
}
|
|
|
|
/// Override the default spec (as defined by `TEthSpec`).
|
|
///
|
|
/// This method should generally be called immediately after `Self::new` to ensure components
|
|
/// are started with a consistent spec.
|
|
pub fn custom_spec(mut self, spec: ChainSpec) -> Self {
|
|
self.spec = spec;
|
|
self
|
|
}
|
|
|
|
/// Get a reference to the builder's spec.
|
|
pub fn get_spec(&self) -> &ChainSpec {
|
|
&self.spec
|
|
}
|
|
|
|
/// Sets the maximum number of blocks that will be skipped when processing
|
|
/// some consensus messages.
|
|
///
|
|
/// Set to `None` for no limit.
|
|
pub fn import_max_skip_slots(mut self, n: Option<u64>) -> Self {
|
|
self.chain_config.import_max_skip_slots = n;
|
|
self
|
|
}
|
|
|
|
/// Sets the store (database).
|
|
///
|
|
/// Should generally be called early in the build chain.
|
|
pub fn store(mut self, store: Arc<HotColdDB<TEthSpec, THotStore, TColdStore>>) -> Self {
|
|
self.store = Some(store);
|
|
self
|
|
}
|
|
|
|
/// Sets the store migrator config (optional).
|
|
pub fn store_migrator_config(mut self, config: MigratorConfig) -> Self {
|
|
self.store_migrator_config = Some(config);
|
|
self
|
|
}
|
|
|
|
/// Sets the slasher.
|
|
pub fn slasher(mut self, slasher: Arc<Slasher<TEthSpec>>) -> Self {
|
|
self.slasher = Some(slasher);
|
|
self
|
|
}
|
|
|
|
/// Sets the logger.
|
|
///
|
|
/// Should generally be called early in the build chain.
|
|
pub fn logger(mut self, log: Logger) -> Self {
|
|
self.log = Some(log);
|
|
self
|
|
}
|
|
|
|
/// Sets the task executor.
|
|
pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self {
|
|
self.task_executor = Some(task_executor);
|
|
self
|
|
}
|
|
|
|
/// Attempt to load an existing eth1 cache from the builder's `Store`.
|
|
pub fn get_persisted_eth1_backend(&self) -> Result<Option<SszEth1>, String> {
|
|
let store = self
|
|
.store
|
|
.clone()
|
|
.ok_or("get_persisted_eth1_backend requires a store.")?;
|
|
|
|
store
|
|
.get_item::<SszEth1>(Ð1_CACHE_DB_KEY)
|
|
.map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e))
|
|
}
|
|
|
|
/// Returns true if `self.store` contains a persisted beacon chain.
|
|
pub fn store_contains_beacon_chain(&self) -> Result<bool, String> {
|
|
let store = self
|
|
.store
|
|
.clone()
|
|
.ok_or("store_contains_beacon_chain requires a store.")?;
|
|
|
|
Ok(store
|
|
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
|
|
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
|
|
.is_some())
|
|
}
|
|
|
|
/// Attempt to load an existing chain from the builder's `Store`.
|
|
///
|
|
/// May initialize several components; including the op_pool and finalized checkpoints.
|
|
pub fn resume_from_db(mut self) -> Result<Self, String> {
|
|
let log = self.log.as_ref().ok_or("resume_from_db requires a log")?;
|
|
|
|
info!(
|
|
log,
|
|
"Starting beacon chain";
|
|
"method" => "resume"
|
|
);
|
|
|
|
let store = self
|
|
.store
|
|
.clone()
|
|
.ok_or("resume_from_db requires a store.")?;
|
|
|
|
let chain = store
|
|
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
|
|
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
|
|
.ok_or_else(|| {
|
|
"No persisted beacon chain found in store. Try purging the beacon chain database."
|
|
.to_string()
|
|
})?;
|
|
|
|
let fork_choice =
|
|
BeaconChain::<Witness<TSlotClock, TEth1Backend, _, _, _>>::load_fork_choice(
|
|
store.clone(),
|
|
ResetPayloadStatuses::always_reset_conditionally(
|
|
self.chain_config.always_reset_payload_statuses,
|
|
),
|
|
self.chain_config.count_unrealized_full,
|
|
&self.spec,
|
|
log,
|
|
)
|
|
.map_err(|e| format!("Unable to load fork choice from disk: {:?}", e))?
|
|
.ok_or("Fork choice not found in store")?;
|
|
|
|
let genesis_block = store
|
|
.get_blinded_block(&chain.genesis_block_root)
|
|
.map_err(|e| descriptive_db_error("genesis block", &e))?
|
|
.ok_or("Genesis block not found in store")?;
|
|
let genesis_state = store
|
|
.get_state(&genesis_block.state_root(), Some(genesis_block.slot()))
|
|
.map_err(|e| descriptive_db_error("genesis state", &e))?
|
|
.ok_or("Genesis block not found in store")?;
|
|
|
|
self.genesis_time = Some(genesis_state.genesis_time());
|
|
|
|
self.op_pool = Some(
|
|
store
|
|
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
|
|
.map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))?
|
|
.map(PersistedOperationPool::into_operation_pool)
|
|
.transpose()
|
|
.map_err(|e| {
|
|
format!(
|
|
"Error while creating the op pool from the persisted op pool: {:?}",
|
|
e
|
|
)
|
|
})?
|
|
.unwrap_or_else(OperationPool::new),
|
|
);
|
|
|
|
let pubkey_cache = ValidatorPubkeyCache::load_from_store(store)
|
|
.map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?;
|
|
|
|
self.genesis_block_root = Some(chain.genesis_block_root);
|
|
self.genesis_state_root = Some(genesis_block.state_root());
|
|
self.head_tracker = Some(
|
|
HeadTracker::from_ssz_container(&chain.ssz_head_tracker)
|
|
.map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?,
|
|
);
|
|
self.validator_pubkey_cache = Some(pubkey_cache);
|
|
self.fork_choice = Some(fork_choice);
|
|
|
|
Ok(self)
|
|
}
|
|
|
|
/// Store the genesis state & block in the DB.
|
|
///
|
|
/// Do *not* initialize fork choice, or do anything that assumes starting from genesis.
|
|
///
|
|
/// Return the `BeaconSnapshot` representing genesis as well as the mutated builder.
|
|
fn set_genesis_state(
|
|
mut self,
|
|
mut beacon_state: BeaconState<TEthSpec>,
|
|
) -> Result<(BeaconSnapshot<TEthSpec>, Self), String> {
|
|
let store = self
|
|
.store
|
|
.clone()
|
|
.ok_or("set_genesis_state requires a store")?;
|
|
|
|
let beacon_block = genesis_block(&mut beacon_state, &self.spec)?;
|
|
|
|
beacon_state
|
|
.build_all_caches(&self.spec)
|
|
.map_err(|e| format!("Failed to build genesis state caches: {:?}", e))?;
|
|
|
|
let beacon_state_root = beacon_block.message().state_root();
|
|
let beacon_block_root = beacon_block.canonical_root();
|
|
|
|
store
|
|
.put_state(&beacon_state_root, &beacon_state)
|
|
.map_err(|e| format!("Failed to store genesis state: {:?}", e))?;
|
|
store
|
|
.put_block(&beacon_block_root, beacon_block.clone())
|
|
.map_err(|e| format!("Failed to store genesis block: {:?}", e))?;
|
|
|
|
// Store the genesis block under the `ZERO_HASH` key.
|
|
store
|
|
.put_block(&Hash256::zero(), beacon_block.clone())
|
|
.map_err(|e| {
|
|
format!(
|
|
"Failed to store genesis block under 0x00..00 alias: {:?}",
|
|
e
|
|
)
|
|
})?;
|
|
|
|
self.genesis_state_root = Some(beacon_state_root);
|
|
self.genesis_block_root = Some(beacon_block_root);
|
|
self.genesis_time = Some(beacon_state.genesis_time());
|
|
|
|
Ok((
|
|
BeaconSnapshot {
|
|
beacon_block_root,
|
|
beacon_block: Arc::new(beacon_block),
|
|
beacon_state,
|
|
},
|
|
self,
|
|
))
|
|
}
|
|
|
|
/// Starts a new chain from a genesis state.
|
|
pub fn genesis_state(mut self, beacon_state: BeaconState<TEthSpec>) -> Result<Self, String> {
|
|
let store = self.store.clone().ok_or("genesis_state requires a store")?;
|
|
|
|
let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
|
|
self = updated_builder;
|
|
|
|
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis);
|
|
let current_slot = None;
|
|
|
|
let fork_choice = ForkChoice::from_anchor(
|
|
fc_store,
|
|
genesis.beacon_block_root,
|
|
&genesis.beacon_block,
|
|
&genesis.beacon_state,
|
|
current_slot,
|
|
self.chain_config.count_unrealized_full,
|
|
&self.spec,
|
|
)
|
|
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
|
|
|
|
self.fork_choice = Some(fork_choice);
|
|
|
|
Ok(self.empty_op_pool())
|
|
}
|
|
|
|
/// Start the chain from a weak subjectivity state.
|
|
pub fn weak_subjectivity_state(
|
|
mut self,
|
|
mut weak_subj_state: BeaconState<TEthSpec>,
|
|
weak_subj_block: SignedBeaconBlock<TEthSpec>,
|
|
genesis_state: BeaconState<TEthSpec>,
|
|
) -> Result<Self, String> {
|
|
let store = self.store.clone().ok_or("genesis_state requires a store")?;
|
|
|
|
let weak_subj_slot = weak_subj_state.slot();
|
|
let weak_subj_block_root = weak_subj_block.canonical_root();
|
|
let weak_subj_state_root = weak_subj_block.state_root();
|
|
|
|
// Check that the given block lies on an epoch boundary. Due to the database only storing
|
|
// full states on epoch boundaries and at restore points it would be difficult to support
|
|
// starting from a mid-epoch state.
|
|
if weak_subj_slot % TEthSpec::slots_per_epoch() != 0 {
|
|
return Err(format!(
|
|
"Checkpoint block at slot {} is not aligned to epoch start. \
|
|
Please supply an aligned checkpoint with block.slot % 32 == 0",
|
|
weak_subj_block.slot(),
|
|
));
|
|
}
|
|
|
|
// Check that the block and state have consistent slots and state roots.
|
|
if weak_subj_state.slot() != weak_subj_block.slot() {
|
|
return Err(format!(
|
|
"Slot of snapshot block ({}) does not match snapshot state ({})",
|
|
weak_subj_block.slot(),
|
|
weak_subj_state.slot(),
|
|
));
|
|
}
|
|
|
|
// Prime all caches before storing the state in the database and computing the tree hash
|
|
// root.
|
|
weak_subj_state
|
|
.build_all_caches(&self.spec)
|
|
.map_err(|e| format!("Error building caches on checkpoint state: {e:?}"))?;
|
|
|
|
let computed_state_root = weak_subj_state
|
|
.update_tree_hash_cache()
|
|
.map_err(|e| format!("Error computing checkpoint state root: {:?}", e))?;
|
|
|
|
if weak_subj_state_root != computed_state_root {
|
|
return Err(format!(
|
|
"Snapshot state root does not match block, expected: {:?}, got: {:?}",
|
|
weak_subj_state_root, computed_state_root
|
|
));
|
|
}
|
|
|
|
// Check that the checkpoint state is for the same network as the genesis state.
|
|
// This check doesn't do much for security but should prevent mistakes.
|
|
if weak_subj_state.genesis_validators_root() != genesis_state.genesis_validators_root() {
|
|
return Err(format!(
|
|
"Snapshot state appears to be from the wrong network. Genesis validators root \
|
|
is {:?} but should be {:?}",
|
|
weak_subj_state.genesis_validators_root(),
|
|
genesis_state.genesis_validators_root()
|
|
));
|
|
}
|
|
|
|
// Set the store's split point *before* storing genesis so that genesis is stored
|
|
// immediately in the freezer DB.
|
|
store.set_split(weak_subj_slot, weak_subj_state_root);
|
|
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
|
|
self = updated_builder;
|
|
|
|
// Write the state and block non-atomically, it doesn't matter if they're forgotten
|
|
// about on a crash restart.
|
|
store
|
|
.put_state(&weak_subj_state_root, &weak_subj_state)
|
|
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
|
|
store
|
|
.put_block(&weak_subj_block_root, weak_subj_block.clone())
|
|
.map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?;
|
|
|
|
// Stage the database's metadata fields for atomic storage when `build` is called.
|
|
// This prevents the database from restarting in an inconsistent state if the anchor
|
|
// info or split point is written before the `PersistedBeaconChain`.
|
|
self.pending_io_batch.push(store.store_split_in_batch());
|
|
self.pending_io_batch.push(
|
|
store
|
|
.init_anchor_info(weak_subj_block.message())
|
|
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
|
|
);
|
|
|
|
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
|
|
self.pending_io_batch
|
|
.push(store.pruning_checkpoint_store_op(Checkpoint {
|
|
root: weak_subj_block_root,
|
|
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
|
|
}));
|
|
|
|
let snapshot = BeaconSnapshot {
|
|
beacon_block_root: weak_subj_block_root,
|
|
beacon_block: Arc::new(weak_subj_block),
|
|
beacon_state: weak_subj_state,
|
|
};
|
|
|
|
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot);
|
|
|
|
let current_slot = Some(snapshot.beacon_block.slot());
|
|
let fork_choice = ForkChoice::from_anchor(
|
|
fc_store,
|
|
snapshot.beacon_block_root,
|
|
&snapshot.beacon_block,
|
|
&snapshot.beacon_state,
|
|
current_slot,
|
|
self.chain_config.count_unrealized_full,
|
|
&self.spec,
|
|
)
|
|
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
|
|
|
|
self.fork_choice = Some(fork_choice);
|
|
|
|
Ok(self.empty_op_pool())
|
|
}
|
|
|
|
/// Sets the `BeaconChain` eth1 backend.
|
|
pub fn eth1_backend(mut self, backend: Option<TEth1Backend>) -> Self {
|
|
self.eth1_chain = backend.map(Eth1Chain::new);
|
|
self
|
|
}
|
|
|
|
/// Sets the `BeaconChain` execution layer.
|
|
pub fn execution_layer(mut self, execution_layer: Option<ExecutionLayer<TEthSpec>>) -> Self {
|
|
self.execution_layer = execution_layer;
|
|
self
|
|
}
|
|
|
|
/// Sets the `BeaconChain` event handler backend.
|
|
///
|
|
/// For example, provide `ServerSentEventHandler` as a `handler`.
|
|
pub fn event_handler(mut self, handler: Option<ServerSentEventHandler<TEthSpec>>) -> Self {
|
|
self.event_handler = handler;
|
|
self
|
|
}
|
|
|
|
/// Sets the `BeaconChain` slot clock.
|
|
///
|
|
/// For example, provide `SystemTimeSlotClock` as a `clock`.
|
|
pub fn slot_clock(mut self, clock: TSlotClock) -> Self {
|
|
self.slot_clock = Some(clock);
|
|
self
|
|
}
|
|
|
|
/// Fetch a reference to the slot clock.
|
|
///
|
|
/// Can be used for mutation during testing due to `SlotClock`'s internal mutability.
|
|
pub fn get_slot_clock(&self) -> Option<&TSlotClock> {
|
|
self.slot_clock.as_ref()
|
|
}
|
|
|
|
/// Sets a `Sender` to allow the beacon chain to send shutdown signals.
|
|
pub fn shutdown_sender(mut self, sender: Sender<ShutdownReason>) -> Self {
|
|
self.shutdown_sender = Some(sender);
|
|
self
|
|
}
|
|
|
|
/// Creates a new, empty operation pool.
|
|
fn empty_op_pool(mut self) -> Self {
|
|
self.op_pool = Some(OperationPool::new());
|
|
self
|
|
}
|
|
|
|
/// Sets the `graffiti` field.
|
|
pub fn graffiti(mut self, graffiti: Graffiti) -> Self {
|
|
self.graffiti = graffiti;
|
|
self
|
|
}
|
|
|
|
/// Sets the `ChainConfig` that determines `BeaconChain` runtime behaviour.
|
|
pub fn chain_config(mut self, config: ChainConfig) -> Self {
|
|
self.chain_config = config;
|
|
self
|
|
}
|
|
|
|
/// Register some validators for additional monitoring.
|
|
///
|
|
/// `validators` is a comma-separated string of 0x-formatted BLS pubkeys.
|
|
pub fn monitor_validators(
|
|
mut self,
|
|
auto_register: bool,
|
|
validators: Vec<PublicKeyBytes>,
|
|
log: Logger,
|
|
) -> Self {
|
|
self.validator_monitor = Some(ValidatorMonitor::new(
|
|
validators,
|
|
auto_register,
|
|
log.clone(),
|
|
));
|
|
self
|
|
}
|
|
|
|
/// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied.
|
|
///
|
|
/// An error will be returned at runtime if all required parameters have not been configured.
|
|
///
|
|
/// Will also raise ambiguous type errors at compile time if some parameters have not been
|
|
/// configured.
|
|
#[allow(clippy::type_complexity)] // I think there's nothing to be gained here from a type alias.
|
|
pub fn build(
|
|
mut self,
|
|
) -> Result<
|
|
BeaconChain<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>,
|
|
String,
|
|
> {
|
|
let log = self.log.ok_or("Cannot build without a logger")?;
|
|
let slot_clock = self
|
|
.slot_clock
|
|
.ok_or("Cannot build without a slot_clock.")?;
|
|
let store = self.store.clone().ok_or("Cannot build without a store.")?;
|
|
let mut fork_choice = self
|
|
.fork_choice
|
|
.ok_or("Cannot build without fork choice.")?;
|
|
let genesis_block_root = self
|
|
.genesis_block_root
|
|
.ok_or("Cannot build without a genesis block root")?;
|
|
let genesis_state_root = self
|
|
.genesis_state_root
|
|
.ok_or("Cannot build without a genesis state root")?;
|
|
let mut validator_monitor = self
|
|
.validator_monitor
|
|
.ok_or("Cannot build without a validator monitor")?;
|
|
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());
|
|
|
|
let current_slot = if slot_clock
|
|
.is_prior_to_genesis()
|
|
.ok_or("Unable to read slot clock")?
|
|
{
|
|
self.spec.genesis_slot
|
|
} else {
|
|
slot_clock.now().ok_or("Unable to read slot")?
|
|
};
|
|
|
|
let initial_head_block_root = fork_choice
|
|
.get_head(current_slot, &self.spec)
|
|
.map_err(|e| format!("Unable to get fork choice head: {:?}", e))?;
|
|
|
|
// Try to decode the head block according to the current fork, if that fails, try
|
|
// to backtrack to before the most recent fork.
|
|
let (head_block_root, head_block, head_reverted) =
|
|
match store.get_full_block(&initial_head_block_root) {
|
|
Ok(Some(block)) => (initial_head_block_root, block, false),
|
|
Ok(None) => return Err("Head block not found in store".into()),
|
|
Err(StoreError::SszDecodeError(_)) => {
|
|
error!(
|
|
log,
|
|
"Error decoding head block";
|
|
"message" => "This node has likely missed a hard fork. \
|
|
It will try to revert the invalid blocks and keep running, \
|
|
but any stray blocks and states will not be deleted. \
|
|
Long-term you should consider re-syncing this node."
|
|
);
|
|
let (block_root, block) = revert_to_fork_boundary(
|
|
current_slot,
|
|
initial_head_block_root,
|
|
store.clone(),
|
|
&self.spec,
|
|
&log,
|
|
)?;
|
|
|
|
// Update head tracker.
|
|
head_tracker.register_block(block_root, block.parent_root(), block.slot());
|
|
(block_root, block, true)
|
|
}
|
|
Err(e) => return Err(descriptive_db_error("head block", &e)),
|
|
};
|
|
|
|
let head_state_root = head_block.state_root();
|
|
let head_state = store
|
|
.get_state(&head_state_root, Some(head_block.slot()))
|
|
.map_err(|e| descriptive_db_error("head state", &e))?
|
|
.ok_or("Head state not found in store")?;
|
|
|
|
// If the head reverted then we need to reset fork choice using the new head's finalized
|
|
// checkpoint.
|
|
if head_reverted {
|
|
fork_choice = reset_fork_choice_to_finalization(
|
|
head_block_root,
|
|
&head_state,
|
|
store.clone(),
|
|
Some(current_slot),
|
|
&self.spec,
|
|
self.chain_config.count_unrealized.into(),
|
|
self.chain_config.count_unrealized_full,
|
|
)?;
|
|
}
|
|
|
|
let mut head_snapshot = BeaconSnapshot {
|
|
beacon_block_root: head_block_root,
|
|
beacon_block: Arc::new(head_block),
|
|
beacon_state: head_state,
|
|
};
|
|
|
|
head_snapshot
|
|
.beacon_state
|
|
.build_all_caches(&self.spec)
|
|
.map_err(|e| format!("Failed to build state caches: {:?}", e))?;
|
|
|
|
// Perform a check to ensure that the finalization points of the head and fork choice are
|
|
// consistent.
|
|
//
|
|
// This is a sanity check to detect database corruption.
|
|
let fc_finalized = fork_choice.finalized_checkpoint();
|
|
let head_finalized = head_snapshot.beacon_state.finalized_checkpoint();
|
|
if fc_finalized.epoch < head_finalized.epoch {
|
|
return Err(format!(
|
|
"Database corrupt: fork choice is finalized at {:?} whilst head is finalized at \
|
|
{:?}",
|
|
fc_finalized, head_finalized
|
|
));
|
|
}
|
|
|
|
let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| {
|
|
ValidatorPubkeyCache::new(&head_snapshot.beacon_state, store.clone())
|
|
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
|
|
})?;
|
|
|
|
let migrator_config = self.store_migrator_config.unwrap_or_default();
|
|
let store_migrator = BackgroundMigrator::new(
|
|
store.clone(),
|
|
migrator_config,
|
|
genesis_block_root,
|
|
log.clone(),
|
|
);
|
|
|
|
if let Some(slot) = slot_clock.now() {
|
|
validator_monitor.process_valid_state(
|
|
slot.epoch(TEthSpec::slots_per_epoch()),
|
|
&head_snapshot.beacon_state,
|
|
);
|
|
}
|
|
|
|
// If enabled, set up the fork choice signaller.
|
|
let (fork_choice_signal_tx, fork_choice_signal_rx) =
|
|
if self.chain_config.fork_choice_before_proposal_timeout_ms != 0 {
|
|
let tx = ForkChoiceSignalTx::new();
|
|
let rx = tx.get_receiver();
|
|
(Some(tx), Some(rx))
|
|
} else {
|
|
(None, None)
|
|
};
|
|
|
|
// Store the `PersistedBeaconChain` in the database atomically with the metadata so that on
|
|
// restart we can correctly detect the presence of an initialized database.
|
|
//
|
|
// This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance
|
|
// doesn't write a `PersistedBeaconChain` without the rest of the batch.
|
|
self.pending_io_batch.push(BeaconChain::<
|
|
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
|
>::persist_head_in_batch_standalone(
|
|
genesis_block_root, &head_tracker
|
|
));
|
|
self.pending_io_batch.push(BeaconChain::<
|
|
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
|
>::persist_fork_choice_in_batch_standalone(
|
|
&fork_choice
|
|
));
|
|
store
|
|
.hot_db
|
|
.do_atomically(self.pending_io_batch)
|
|
.map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?;
|
|
|
|
let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root();
|
|
let genesis_time = head_snapshot.beacon_state.genesis_time();
|
|
let head_for_snapshot_cache = head_snapshot.clone();
|
|
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
|
|
|
|
let beacon_chain = BeaconChain {
|
|
spec: self.spec,
|
|
config: self.chain_config,
|
|
store,
|
|
task_executor: self
|
|
.task_executor
|
|
.ok_or("Cannot build without task executor")?,
|
|
store_migrator,
|
|
slot_clock,
|
|
op_pool: self.op_pool.ok_or("Cannot build without op pool")?,
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
naive_aggregation_pool: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
naive_sync_aggregation_pool: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_attestations: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_sync_contributions: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_gossip_attesters: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_block_attesters: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_sync_contributors: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_aggregators: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_sync_aggregators: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_block_producers: <_>::default(),
|
|
// TODO: allow for persisting and loading the pool from disk.
|
|
observed_voluntary_exits: <_>::default(),
|
|
observed_proposer_slashings: <_>::default(),
|
|
observed_attester_slashings: <_>::default(),
|
|
eth1_chain: self.eth1_chain,
|
|
execution_layer: self.execution_layer,
|
|
genesis_validators_root,
|
|
genesis_time,
|
|
canonical_head,
|
|
genesis_block_root,
|
|
genesis_state_root,
|
|
fork_choice_signal_tx,
|
|
fork_choice_signal_rx,
|
|
event_handler: self.event_handler,
|
|
head_tracker,
|
|
snapshot_cache: TimeoutRwLock::new(SnapshotCache::new(
|
|
DEFAULT_SNAPSHOT_CACHE_SIZE,
|
|
head_for_snapshot_cache,
|
|
)),
|
|
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
|
|
beacon_proposer_cache: <_>::default(),
|
|
block_times_cache: <_>::default(),
|
|
pre_finalization_block_cache: <_>::default(),
|
|
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
|
|
attester_cache: <_>::default(),
|
|
early_attester_cache: <_>::default(),
|
|
shutdown_sender: self
|
|
.shutdown_sender
|
|
.ok_or("Cannot build without a shutdown sender.")?,
|
|
log: log.clone(),
|
|
graffiti: self.graffiti,
|
|
slasher: self.slasher.clone(),
|
|
validator_monitor: RwLock::new(validator_monitor),
|
|
};
|
|
|
|
let head = beacon_chain.head_snapshot();
|
|
|
|
// Prime the attester cache with the head state.
|
|
beacon_chain
|
|
.attester_cache
|
|
.maybe_cache_state(
|
|
&head.beacon_state,
|
|
head.beacon_block_root,
|
|
&beacon_chain.spec,
|
|
)
|
|
.map_err(|e| format!("Failed to prime attester cache: {:?}", e))?;
|
|
|
|
// Only perform the check if it was configured.
|
|
if let Some(wss_checkpoint) = beacon_chain.config.weak_subjectivity_checkpoint {
|
|
if let Err(e) = beacon_chain.verify_weak_subjectivity_checkpoint(
|
|
wss_checkpoint,
|
|
head.beacon_block_root,
|
|
&head.beacon_state,
|
|
) {
|
|
crit!(
|
|
log,
|
|
"Weak subjectivity checkpoint verification failed on startup!";
|
|
"head_block_root" => format!("{}", head.beacon_block_root),
|
|
"head_slot" => format!("{}", head.beacon_block.slot()),
|
|
"finalized_epoch" => format!("{}", head.beacon_state.finalized_checkpoint().epoch),
|
|
"wss_checkpoint_epoch" => format!("{}", wss_checkpoint.epoch),
|
|
"error" => format!("{:?}", e),
|
|
);
|
|
crit!(log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network.");
|
|
return Err(format!("Weak subjectivity verification failed: {:?}", e));
|
|
}
|
|
}
|
|
|
|
info!(
|
|
log,
|
|
"Beacon chain initialized";
|
|
"head_state" => format!("{}", head.beacon_state_root()),
|
|
"head_block" => format!("{}", head.beacon_block_root),
|
|
"head_slot" => format!("{}", head.beacon_block.slot()),
|
|
);
|
|
|
|
// Check for states to reconstruct (in the background).
|
|
if beacon_chain.config.reconstruct_historic_states {
|
|
beacon_chain.store_migrator.process_reconstruction();
|
|
}
|
|
|
|
// Prune finalized execution payloads in the background.
|
|
if beacon_chain.store.get_config().prune_payloads {
|
|
let store = beacon_chain.store.clone();
|
|
let log = log.clone();
|
|
beacon_chain.task_executor.spawn_blocking(
|
|
move || {
|
|
if let Err(e) = store.try_prune_execution_payloads(false) {
|
|
error!(log, "Error pruning payloads in background"; "error" => ?e);
|
|
}
|
|
},
|
|
"prune_payloads_background",
|
|
);
|
|
}
|
|
|
|
Ok(beacon_chain)
|
|
}
|
|
}
|
|
|
|
impl<TSlotClock, TEthSpec, THotStore, TColdStore>
|
|
BeaconChainBuilder<
|
|
Witness<TSlotClock, CachingEth1Backend<TEthSpec>, TEthSpec, THotStore, TColdStore>,
|
|
>
|
|
where
|
|
THotStore: ItemStore<TEthSpec> + 'static,
|
|
TColdStore: ItemStore<TEthSpec> + 'static,
|
|
TSlotClock: SlotClock + 'static,
|
|
TEthSpec: EthSpec + 'static,
|
|
{
|
|
/// Do not use any eth1 backend. The client will not be able to produce beacon blocks.
|
|
pub fn no_eth1_backend(self) -> Self {
|
|
self.eth1_backend(None)
|
|
}
|
|
|
|
/// Sets the `BeaconChain` eth1 back-end to produce predictably junk data when producing blocks.
|
|
pub fn dummy_eth1_backend(mut self) -> Result<Self, String> {
|
|
let log = self
|
|
.log
|
|
.as_ref()
|
|
.ok_or("dummy_eth1_backend requires a log")?;
|
|
|
|
let backend =
|
|
CachingEth1Backend::new(Eth1Config::default(), log.clone(), self.spec.clone());
|
|
|
|
self.eth1_chain = Some(Eth1Chain::new_dummy(backend));
|
|
|
|
Ok(self)
|
|
}
|
|
}
|
|
|
|
impl<TEth1Backend, TEthSpec, THotStore, TColdStore>
|
|
BeaconChainBuilder<Witness<TestingSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>
|
|
where
|
|
THotStore: ItemStore<TEthSpec> + 'static,
|
|
TColdStore: ItemStore<TEthSpec> + 'static,
|
|
TEth1Backend: Eth1ChainBackend<TEthSpec> + 'static,
|
|
TEthSpec: EthSpec + 'static,
|
|
{
|
|
/// Sets the `BeaconChain` slot clock to `TestingSlotClock`.
|
|
///
|
|
/// Requires the state to be initialized.
|
|
pub fn testing_slot_clock(self, slot_duration: Duration) -> Result<Self, String> {
|
|
let genesis_time = self
|
|
.genesis_time
|
|
.ok_or("testing_slot_clock requires an initialized state")?;
|
|
|
|
let slot_clock = TestingSlotClock::new(
|
|
Slot::new(0),
|
|
Duration::from_secs(genesis_time),
|
|
slot_duration,
|
|
);
|
|
|
|
Ok(self.slot_clock(slot_clock))
|
|
}
|
|
}
|
|
|
|
fn genesis_block<T: EthSpec>(
|
|
genesis_state: &mut BeaconState<T>,
|
|
spec: &ChainSpec,
|
|
) -> Result<SignedBeaconBlock<T>, String> {
|
|
let mut genesis_block = BeaconBlock::empty(spec);
|
|
*genesis_block.state_root_mut() = genesis_state
|
|
.update_tree_hash_cache()
|
|
.map_err(|e| format!("Error hashing genesis state: {:?}", e))?;
|
|
|
|
Ok(SignedBeaconBlock::from_block(
|
|
genesis_block,
|
|
// Empty signature, which should NEVER be read. This isn't to-spec, but makes the genesis
|
|
// block consistent with every other block.
|
|
Signature::empty(),
|
|
))
|
|
}
|
|
|
|
// Helper function to return more useful errors when reading from the database.
|
|
fn descriptive_db_error(item: &str, error: &StoreError) -> String {
|
|
let additional_info = if let StoreError::SszDecodeError(_) = error {
|
|
"Ensure the data directory is not initialized for a different network. The \
|
|
--purge-db flag can be used to permanently delete the existing data directory."
|
|
} else {
|
|
"Database corruption may be present. If the issue persists, use \
|
|
--purge-db to permanently delete the existing data directory."
|
|
};
|
|
format!(
|
|
"DB error when reading {}: {:?}. {}",
|
|
item, error, additional_info
|
|
)
|
|
}
|
|
|
|
#[cfg(not(debug_assertions))]
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::*;
|
|
use eth2_hashing::hash;
|
|
use genesis::{
|
|
generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH,
|
|
};
|
|
use sloggers::{null::NullLoggerBuilder, Build};
|
|
use ssz::Encode;
|
|
use std::time::Duration;
|
|
use store::config::StoreConfig;
|
|
use store::{HotColdDB, MemoryStore};
|
|
use task_executor::test_utils::TestRuntime;
|
|
use types::{EthSpec, MinimalEthSpec, Slot};
|
|
|
|
type TestEthSpec = MinimalEthSpec;
|
|
|
|
fn get_logger() -> Logger {
|
|
let builder = NullLoggerBuilder;
|
|
builder.build().expect("should build logger")
|
|
}
|
|
|
|
#[test]
|
|
fn recent_genesis() {
|
|
let validator_count = 1;
|
|
let genesis_time = 13_371_337;
|
|
|
|
let log = get_logger();
|
|
let store: HotColdDB<
|
|
MinimalEthSpec,
|
|
MemoryStore<MinimalEthSpec>,
|
|
MemoryStore<MinimalEthSpec>,
|
|
> = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone())
|
|
.unwrap();
|
|
let spec = MinimalEthSpec::default_spec();
|
|
|
|
let genesis_state = interop_genesis_state(
|
|
&generate_deterministic_keypairs(validator_count),
|
|
genesis_time,
|
|
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
|
|
None,
|
|
&spec,
|
|
)
|
|
.expect("should create interop genesis state");
|
|
|
|
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
|
let runtime = TestRuntime::default();
|
|
|
|
let chain = BeaconChainBuilder::new(MinimalEthSpec)
|
|
.logger(log.clone())
|
|
.store(Arc::new(store))
|
|
.task_executor(runtime.task_executor.clone())
|
|
.genesis_state(genesis_state)
|
|
.expect("should build state using recent genesis")
|
|
.dummy_eth1_backend()
|
|
.expect("should build the dummy eth1 backend")
|
|
.testing_slot_clock(Duration::from_secs(1))
|
|
.expect("should configure testing slot clock")
|
|
.shutdown_sender(shutdown_tx)
|
|
.monitor_validators(true, vec![], log.clone())
|
|
.build()
|
|
.expect("should build");
|
|
|
|
let head = chain.head_snapshot();
|
|
|
|
let state = &head.beacon_state;
|
|
let block = &head.beacon_block;
|
|
|
|
assert_eq!(state.slot(), Slot::new(0), "should start from genesis");
|
|
assert_eq!(
|
|
state.genesis_time(),
|
|
13_371_337,
|
|
"should have the correct genesis time"
|
|
);
|
|
assert_eq!(
|
|
block.state_root(),
|
|
state.canonical_root(),
|
|
"block should have correct state root"
|
|
);
|
|
assert_eq!(
|
|
chain
|
|
.store
|
|
.get_blinded_block(&Hash256::zero())
|
|
.expect("should read db")
|
|
.expect("should find genesis block"),
|
|
block.clone_as_blinded(),
|
|
"should store genesis block under zero hash alias"
|
|
);
|
|
assert_eq!(
|
|
state.validators().len(),
|
|
validator_count,
|
|
"should have correct validator count"
|
|
);
|
|
assert_eq!(
|
|
chain.genesis_block_root,
|
|
block.canonical_root(),
|
|
"should have correct genesis block root"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn interop_state() {
|
|
let validator_count = 16;
|
|
let genesis_time = 42;
|
|
let spec = &TestEthSpec::default_spec();
|
|
|
|
let keypairs = generate_deterministic_keypairs(validator_count);
|
|
|
|
let state = interop_genesis_state::<TestEthSpec>(
|
|
&keypairs,
|
|
genesis_time,
|
|
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
|
|
None,
|
|
spec,
|
|
)
|
|
.expect("should build state");
|
|
|
|
assert_eq!(
|
|
state.eth1_data().block_hash,
|
|
Hash256::from_slice(&[0x42; 32]),
|
|
"eth1 block hash should be co-ordinated junk"
|
|
);
|
|
|
|
assert_eq!(
|
|
state.genesis_time(),
|
|
genesis_time,
|
|
"genesis time should be as specified"
|
|
);
|
|
|
|
for b in state.balances() {
|
|
assert_eq!(
|
|
*b, spec.max_effective_balance,
|
|
"validator balances should be max effective balance"
|
|
);
|
|
}
|
|
|
|
for v in state.validators() {
|
|
let creds = v.withdrawal_credentials.as_bytes();
|
|
assert_eq!(
|
|
creds[0], spec.bls_withdrawal_prefix_byte,
|
|
"first byte of withdrawal creds should be bls prefix"
|
|
);
|
|
assert_eq!(
|
|
&creds[1..],
|
|
&hash(&v.pubkey.as_ssz_bytes())[1..],
|
|
"rest of withdrawal creds should be pubkey hash"
|
|
)
|
|
}
|
|
|
|
assert_eq!(
|
|
state.balances().len(),
|
|
validator_count,
|
|
"validator balances len should be correct"
|
|
);
|
|
|
|
assert_eq!(
|
|
state.validators().len(),
|
|
validator_count,
|
|
"validator count should be correct"
|
|
);
|
|
}
|
|
}
|