diff --git a/Cargo.lock b/Cargo.lock index 13b02ce2f..c7a3ffbe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4317,7 +4317,6 @@ dependencies = [ "store", "strum", "task_executor", - "tempfile", "tokio", "tokio-stream", "tokio-util", diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d6524cabe..925d6f5da 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,5 +1,6 @@ use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; +use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -19,7 +20,7 @@ use futures::channel::mpsc::Sender; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; use slasher::Slasher; -use slog::{crit, info, Logger}; +use slog::{crit, error, info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; use std::sync::Arc; @@ -135,6 +136,11 @@ where self } + /// Get a reference to the builder's spec. + pub fn get_spec(&self) -> &ChainSpec { + &self.spec + } + /// Sets the maximum number of blocks that will be skipped when processing /// some consensus messages. /// @@ -320,10 +326,10 @@ where let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis); - let fork_choice = ForkChoice::from_genesis( + let fork_choice = ForkChoice::from_anchor( fc_store, genesis.beacon_block_root, - &genesis.beacon_block.deconstruct().0, + &genesis.beacon_block, &genesis.beacon_state, ) .map_err(|e| format!("Unable to build initialize ForkChoice: {:?}", e))?; @@ -356,6 +362,13 @@ where self } + /// Fetch a reference to the slot clock. + /// + /// Can be used for mutation during testing due to `SlotClock`'s internal mutability. + pub fn get_slot_clock(&self) -> Option<&TSlotClock> { + self.slot_clock.as_ref() + } + /// Sets a `Sender` to allow the beacon chain to send shutdown signals. pub fn shutdown_sender(mut self, sender: Sender) -> Self { self.shutdown_sender = Some(sender); @@ -427,6 +440,7 @@ where let mut validator_monitor = self .validator_monitor .ok_or("Cannot build without a validator monitor")?; + let head_tracker = Arc::new(self.head_tracker.unwrap_or_default()); let current_slot = if slot_clock .is_prior_to_genesis() @@ -437,20 +451,57 @@ where slot_clock.now().ok_or("Unable to read slot")? }; - let head_block_root = fork_choice + let initial_head_block_root = fork_choice .get_head(current_slot) .map_err(|e| format!("Unable to get fork choice head: {:?}", e))?; - let head_block = store - .get_block(&head_block_root) - .map_err(|e| descriptive_db_error("head block", &e))? - .ok_or("Head block not found in store")?; + // Try to decode the head block according to the current fork, if that fails, try + // to backtrack to before the most recent fork. + let (head_block_root, head_block, head_reverted) = + match store.get_block(&initial_head_block_root) { + Ok(Some(block)) => (initial_head_block_root, block, false), + Ok(None) => return Err("Head block not found in store".into()), + Err(StoreError::SszDecodeError(_)) => { + error!( + log, + "Error decoding head block"; + "message" => "This node has likely missed a hard fork. \ + It will try to revert the invalid blocks and keep running, \ + but any stray blocks and states will not be deleted. \ + Long-term you should consider re-syncing this node." + ); + let (block_root, block) = revert_to_fork_boundary( + current_slot, + initial_head_block_root, + store.clone(), + &self.spec, + &log, + )?; + + // Update head tracker. + head_tracker.register_block(block_root, block.parent_root(), block.slot()); + (block_root, block, true) + } + Err(e) => return Err(descriptive_db_error("head block", &e)), + }; + let head_state_root = head_block.state_root(); let head_state = store .get_state(&head_state_root, Some(head_block.slot())) .map_err(|e| descriptive_db_error("head state", &e))? .ok_or("Head state not found in store")?; + // If the head reverted then we need to reset fork choice using the new head's finalized + // checkpoint. + if head_reverted { + fork_choice = reset_fork_choice_to_finalization( + head_block_root, + &head_state, + store.clone(), + &self.spec, + )?; + } + let mut canonical_head = BeaconSnapshot { beacon_block_root: head_block_root, beacon_block: head_block, @@ -541,7 +592,7 @@ where genesis_state_root, fork_choice: RwLock::new(fork_choice), event_handler: self.event_handler, - head_tracker: Arc::new(self.head_tracker.unwrap_or_default()), + head_tracker, snapshot_cache: TimeoutRwLock::new(SnapshotCache::new( DEFAULT_SNAPSHOT_CACHE_SIZE, canonical_head, diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs new file mode 100644 index 000000000..31678580a --- /dev/null +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -0,0 +1,174 @@ +use crate::{BeaconForkChoiceStore, BeaconSnapshot}; +use fork_choice::ForkChoice; +use itertools::process_results; +use slog::{info, warn, Logger}; +use state_processing::state_advance::complete_state_advance; +use state_processing::{per_block_processing, per_block_processing::BlockSignatureStrategy}; +use std::sync::Arc; +use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore}; +use types::{BeaconState, ChainSpec, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot}; + +const CORRUPT_DB_MESSAGE: &str = "The database could be corrupt. Check its file permissions or \ + consider deleting it by running with the --purge-db flag."; + +/// Revert the head to the last block before the most recent hard fork. +/// +/// This function is destructive and should only be used if there is no viable alternative. It will +/// cause the reverted blocks and states to be completely forgotten, lying dormant in the database +/// forever. +/// +/// Return the `(head_block_root, head_block)` that should be used post-reversion. +pub fn revert_to_fork_boundary, Cold: ItemStore>( + current_slot: Slot, + head_block_root: Hash256, + store: Arc>, + spec: &ChainSpec, + log: &Logger, +) -> Result<(Hash256, SignedBeaconBlock), String> { + let current_fork = spec.fork_name_at_slot::(current_slot); + let fork_epoch = spec + .fork_epoch(current_fork) + .ok_or_else(|| format!("Current fork '{}' never activates", current_fork))?; + + if current_fork == ForkName::Base { + return Err(format!( + "Cannot revert to before phase0 hard fork. {}", + CORRUPT_DB_MESSAGE + )); + } + + warn!( + log, + "Reverting invalid head block"; + "target_fork" => %current_fork, + "fork_epoch" => fork_epoch, + ); + let block_iter = ParentRootBlockIterator::fork_tolerant(&store, head_block_root); + + process_results(block_iter, |mut iter| { + iter.find_map(|(block_root, block)| { + if block.slot() < fork_epoch.start_slot(E::slots_per_epoch()) { + Some((block_root, block)) + } else { + info!( + log, + "Reverting block"; + "block_root" => ?block_root, + "slot" => block.slot(), + ); + None + } + }) + }) + .map_err(|e| { + format!( + "Error fetching blocks to revert: {:?}. {}", + e, CORRUPT_DB_MESSAGE + ) + })? + .ok_or_else(|| format!("No pre-fork blocks found. {}", CORRUPT_DB_MESSAGE)) +} + +/// Reset fork choice to the finalized checkpoint of the supplied head state. +/// +/// The supplied `head_block_root` should correspond to the most recently applied block on +/// `head_state`. +/// +/// This function avoids quirks of fork choice initialization by replaying all of the blocks from +/// the checkpoint to the head. +/// +/// See this issue for details: https://github.com/ethereum/consensus-specs/issues/2566 +/// +/// It will fail if the finalized state or any of the blocks to replay are unavailable. +/// +/// WARNING: this function is destructive and causes fork choice to permanently forget all +/// chains other than the chain leading to `head_block_root`. It should only be used in extreme +/// circumstances when there is no better alternative. +pub fn reset_fork_choice_to_finalization, Cold: ItemStore>( + head_block_root: Hash256, + head_state: &BeaconState, + store: Arc>, + spec: &ChainSpec, +) -> Result, E>, String> { + // Fetch finalized block. + let finalized_checkpoint = head_state.finalized_checkpoint(); + let finalized_block_root = finalized_checkpoint.root; + let finalized_block = store + .get_block(&finalized_block_root) + .map_err(|e| format!("Error loading finalized block: {:?}", e))? + .ok_or_else(|| { + format!( + "Finalized block missing for revert: {:?}", + finalized_block_root + ) + })?; + + // Advance finalized state to finalized epoch (to handle skipped slots). + let finalized_state_root = finalized_block.state_root(); + let mut finalized_state = store + .get_state(&finalized_state_root, Some(finalized_block.slot())) + .map_err(|e| format!("Error loading finalized state: {:?}", e))? + .ok_or_else(|| { + format!( + "Finalized block state missing from database: {:?}", + finalized_state_root + ) + })?; + let finalized_slot = finalized_checkpoint.epoch.start_slot(E::slots_per_epoch()); + complete_state_advance( + &mut finalized_state, + Some(finalized_state_root), + finalized_slot, + spec, + ) + .map_err(|e| { + format!( + "Error advancing finalized state to finalized epoch: {:?}", + e + ) + })?; + let finalized_snapshot = BeaconSnapshot { + beacon_block_root: finalized_block_root, + beacon_block: finalized_block, + beacon_state: finalized_state, + }; + + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot); + + let mut fork_choice = ForkChoice::from_anchor( + fc_store, + finalized_block_root, + &finalized_snapshot.beacon_block, + &finalized_snapshot.beacon_state, + ) + .map_err(|e| format!("Unable to reset fork choice for revert: {:?}", e))?; + + // Replay blocks from finalized checkpoint back to head. + // We do not replay attestations presently, relying on the absence of other blocks + // to guarantee `head_block_root` as the head. + let blocks = store + .load_blocks_to_replay(finalized_slot + 1, head_state.slot(), head_block_root) + .map_err(|e| format!("Error loading blocks to replay for fork choice: {:?}", e))?; + + let mut state = finalized_snapshot.beacon_state; + for block in blocks { + complete_state_advance(&mut state, None, block.slot(), spec) + .map_err(|e| format!("State advance failed: {:?}", e))?; + + per_block_processing( + &mut state, + &block, + None, + BlockSignatureStrategy::NoVerification, + spec, + ) + .map_err(|e| format!("Error replaying block: {:?}", e))?; + + let (block, _) = block.deconstruct(); + fork_choice + .on_block(block.slot(), &block, block.canonical_root(), &state) + .map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?; + } + + Ok(fork_choice) +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 973564c69..5efcc3400 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -11,6 +11,7 @@ pub mod chain_config; mod errors; pub mod eth1_chain; pub mod events; +pub mod fork_revert; mod head_tracker; mod metrics; pub mod migrate; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 5a599586c..cf2fb6484 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -334,16 +334,33 @@ impl, Cold: ItemStore> BackgroundMigrator block.state_root(), + Ok(None) => { + return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) + } + Err(Error::SszDecodeError(e)) => { + warn!( + log, + "Forgetting invalid head block"; + "block_root" => ?head_hash, + "error" => ?e, + ); + abandoned_heads.insert(head_hash); + continue; + } + Err(e) => return Err(e.into()), + }; + let mut potentially_abandoned_head = Some(head_hash); let mut potentially_abandoned_blocks = vec![]; - let head_state_hash = store - .get_block(&head_hash)? - .ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))? - .state_root(); - // Iterate backwards from this head, staging blocks and states for deletion. - let iter = std::iter::once(Ok((head_hash, head_state_hash, head_slot))) + let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot))) .chain(RootsIterator::from_block(store.clone(), head_hash)?); for maybe_tuple in iter { diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 67a53b904..41747c3c0 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -29,7 +29,6 @@ use std::sync::Arc; use std::time::Duration; use store::{config::StoreConfig, BlockReplay, HotColdDB, ItemStore, LevelDB, MemoryStore}; use task_executor::ShutdownReason; -use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; use types::sync_selection_proof::SyncSelectionProof; pub use types::test_utils::generate_deterministic_keypairs; @@ -50,6 +49,12 @@ pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1); // Environment variable to read if `fork_from_env` feature is enabled. const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; +// Default target aggregators to set during testing, this ensures an aggregator at each slot. +// +// You should mutate the `ChainSpec` prior to initialising the harness if you would like to use +// a different value. +pub const DEFAULT_TARGET_AGGREGATORS: u64 = u64::max_value(); + pub type BaseHarnessType = Witness, TEthSpec, THotStore, TColdStore>; @@ -126,7 +131,7 @@ pub fn test_logger() -> Logger { /// If the `fork_from_env` feature is enabled, read the fork to use from the FORK_NAME environment /// variable. Otherwise use the default spec. pub fn test_spec() -> ChainSpec { - if cfg!(feature = "fork_from_env") { + let mut spec = if cfg!(feature = "fork_from_env") { let fork_name = std::env::var(FORK_NAME_ENV_VAR).unwrap_or_else(|e| { panic!( "{} env var must be defined when using fork_from_env: {:?}", @@ -141,7 +146,11 @@ pub fn test_spec() -> ChainSpec { fork.make_genesis_spec(E::default_spec()) } else { E::default_spec() - } + }; + + // Set target aggregators to a high value by default. + spec.target_aggregators_per_committee = DEFAULT_TARGET_AGGREGATORS; + spec } /// A testing harness which can instantiate a `BeaconChain` and populate it with blocks and @@ -153,7 +162,6 @@ pub struct BeaconChainHarness { pub chain: Arc>, pub spec: ChainSpec, - pub data_dir: TempDir, pub shutdown_receiver: Receiver, pub rng: Mutex, @@ -188,200 +196,123 @@ impl BeaconChainHarness> { spec: Option, validator_keypairs: Vec, config: StoreConfig, - ) -> Self { - // Setting the target aggregators to really high means that _all_ validators in the - // committee are required to produce an aggregate. This is overkill, however with small - // validator counts it's the only way to be certain there is _at least one_ aggregator per - // committee. - Self::new_with_target_aggregators( - eth_spec_instance, - spec, - validator_keypairs, - 1 << 32, - config, - ) - } - - /// Instantiate a new harness with a custom `target_aggregators_per_committee` spec value - pub fn new_with_target_aggregators( - eth_spec_instance: E, - spec: Option, - validator_keypairs: Vec, - target_aggregators_per_committee: u64, - store_config: StoreConfig, ) -> Self { Self::new_with_chain_config( eth_spec_instance, spec, validator_keypairs, - target_aggregators_per_committee, - store_config, + config, ChainConfig::default(), ) } - /// Instantiate a new harness with `validator_count` initial validators, a custom - /// `target_aggregators_per_committee` spec value, and a `ChainConfig` pub fn new_with_chain_config( eth_spec_instance: E, spec: Option, validator_keypairs: Vec, - target_aggregators_per_committee: u64, store_config: StoreConfig, chain_config: ChainConfig, ) -> Self { - Self::new_with_mutator( + Self::ephemeral_with_mutator( eth_spec_instance, spec, validator_keypairs, - target_aggregators_per_committee, store_config, chain_config, |x| x, ) } - /// Apply a function to beacon chain builder before building. - pub fn new_with_mutator( + pub fn ephemeral_with_mutator( eth_spec_instance: E, spec: Option, validator_keypairs: Vec, - target_aggregators_per_committee: u64, store_config: StoreConfig, chain_config: ChainConfig, mutator: impl FnOnce( BeaconChainBuilder>, ) -> BeaconChainBuilder>, ) -> Self { - let data_dir = tempdir().expect("should create temporary data_dir"); - let mut spec = spec.unwrap_or_else(test_spec::); - - spec.target_aggregators_per_committee = target_aggregators_per_committee; - - let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); - + let spec = spec.unwrap_or_else(test_spec::); let log = test_logger(); - - let store = HotColdDB::open_ephemeral(store_config, spec.clone(), log.clone()).unwrap(); - let builder = BeaconChainBuilder::new(eth_spec_instance) - .logger(log.clone()) - .custom_spec(spec.clone()) - .store(Arc::new(store)) - .store_migrator_config(MigratorConfig::default().blocking()) - .genesis_state( - interop_genesis_state::(&validator_keypairs, HARNESS_GENESIS_TIME, &spec) - .expect("should generate interop state"), - ) - .expect("should build state using recent genesis") - .dummy_eth1_backend() - .expect("should build dummy backend") - .testing_slot_clock(HARNESS_SLOT_TIME) - .expect("should configure testing slot clock") - .shutdown_sender(shutdown_tx) - .chain_config(chain_config) - .event_handler(Some(ServerSentEventHandler::new_with_capacity( - log.clone(), - 1, - ))) - .monitor_validators(true, vec![], log); - - let chain = mutator(builder).build().expect("should build"); - - Self { - spec: chain.spec.clone(), - chain: Arc::new(chain), - validator_keypairs, - data_dir, - shutdown_receiver, - rng: make_rng(), - } + let store = Arc::new(HotColdDB::open_ephemeral(store_config, spec.clone(), log).unwrap()); + Self::new_with_mutator( + eth_spec_instance, + spec, + store, + validator_keypairs.clone(), + chain_config, + |mut builder| { + builder = mutator(builder); + let genesis_state = interop_genesis_state::( + &validator_keypairs, + HARNESS_GENESIS_TIME, + builder.get_spec(), + ) + .expect("should generate interop state"); + builder + .genesis_state(genesis_state) + .expect("should build state using recent genesis") + }, + ) } } impl BeaconChainHarness> { - /// Instantiate a new harness with `validator_count` initial validators. + /// Disk store, start from genesis. pub fn new_with_disk_store( eth_spec_instance: E, spec: Option, store: Arc, LevelDB>>, validator_keypairs: Vec, ) -> Self { - let data_dir = tempdir().expect("should create temporary data_dir"); let spec = spec.unwrap_or_else(test_spec::); - let log = test_logger(); - let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); + let chain_config = ChainConfig::default(); - let chain = BeaconChainBuilder::new(eth_spec_instance) - .logger(log.clone()) - .custom_spec(spec.clone()) - .import_max_skip_slots(None) - .store(store) - .store_migrator_config(MigratorConfig::default().blocking()) - .genesis_state( - interop_genesis_state::(&validator_keypairs, HARNESS_GENESIS_TIME, &spec) - .expect("should generate interop state"), - ) - .expect("should build state using recent genesis") - .dummy_eth1_backend() - .expect("should build dummy backend") - .testing_slot_clock(HARNESS_SLOT_TIME) - .expect("should configure testing slot clock") - .shutdown_sender(shutdown_tx) - .monitor_validators(true, vec![], log) - .build() - .expect("should build"); - - Self { - spec: chain.spec.clone(), - chain: Arc::new(chain), - validator_keypairs, - data_dir, - shutdown_receiver, - rng: make_rng(), - } + Self::new_with_mutator( + eth_spec_instance, + spec, + store, + validator_keypairs.clone(), + chain_config, + |builder| { + let genesis_state = interop_genesis_state::( + &validator_keypairs, + HARNESS_GENESIS_TIME, + builder.get_spec(), + ) + .expect("should generate interop state"); + builder + .genesis_state(genesis_state) + .expect("should build state using recent genesis") + }, + ) } -} -impl BeaconChainHarness> { - /// Instantiate a new harness with `validator_count` initial validators. + /// Disk store, resume. pub fn resume_from_disk_store( eth_spec_instance: E, spec: Option, store: Arc, LevelDB>>, validator_keypairs: Vec, - data_dir: TempDir, ) -> Self { let spec = spec.unwrap_or_else(test_spec::); - let log = test_logger(); - let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); + let chain_config = ChainConfig::default(); - let chain = BeaconChainBuilder::new(eth_spec_instance) - .logger(log.clone()) - .custom_spec(spec) - .import_max_skip_slots(None) - .store(store) - .store_migrator_config(MigratorConfig::default().blocking()) - .resume_from_db() - .expect("should resume beacon chain from db") - .dummy_eth1_backend() - .expect("should build dummy backend") - .testing_slot_clock(Duration::from_secs(1)) - .expect("should configure testing slot clock") - .shutdown_sender(shutdown_tx) - .monitor_validators(true, vec![], log) - .build() - .expect("should build"); - - Self { - spec: chain.spec.clone(), - chain: Arc::new(chain), + Self::new_with_mutator( + eth_spec_instance, + spec, + store, validator_keypairs, - data_dir, - shutdown_receiver, - rng: make_rng(), - } + chain_config, + |builder| { + builder + .resume_from_db() + .expect("should resume from database") + }, + ) } } @@ -391,6 +322,62 @@ where Hot: ItemStore, Cold: ItemStore, { + /// Generic initializer. + /// + /// This initializer should be able to handle almost any configuration via arguments and the + /// provided `mutator` function. Please do not copy and paste this function. + pub fn new_with_mutator( + eth_spec_instance: E, + spec: ChainSpec, + store: Arc>, + validator_keypairs: Vec, + chain_config: ChainConfig, + mutator: impl FnOnce( + BeaconChainBuilder>, + ) -> BeaconChainBuilder>, + ) -> Self { + let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); + + let log = test_logger(); + + let mut builder = BeaconChainBuilder::new(eth_spec_instance) + .logger(log.clone()) + .custom_spec(spec) + .store(store) + .store_migrator_config(MigratorConfig::default().blocking()) + .dummy_eth1_backend() + .expect("should build dummy backend") + .shutdown_sender(shutdown_tx) + .chain_config(chain_config) + .event_handler(Some(ServerSentEventHandler::new_with_capacity( + log.clone(), + 1, + ))) + .monitor_validators(true, vec![], log); + + // Caller must initialize genesis state. + builder = mutator(builder); + + // Initialize the slot clock only if it hasn't already been initialized. + builder = if builder.get_slot_clock().is_none() { + builder + .testing_slot_clock(HARNESS_SLOT_TIME) + .expect("should configure testing slot clock") + } else { + builder + }; + + let chain = builder.build().expect("should build"); + + Self { + spec: chain.spec.clone(), + chain: Arc::new(chain), + validator_keypairs, + shutdown_receiver, + rng: make_rng(), + } + } + pub fn logger(&self) -> &slog::Logger { &self.chain.log } @@ -574,6 +561,9 @@ where attestation_slot: Slot, ) -> Vec, SubnetId)>> { let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap(); + let fork = self + .spec + .fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch())); state .get_beacon_committees_at_slot(attestation_slot) @@ -604,7 +594,7 @@ where let domain = self.spec.get_domain( attestation.data.target.epoch, Domain::BeaconAttester, - &state.fork(), + &fork, state.genesis_validators_root(), ); @@ -651,6 +641,9 @@ where .expect("should be called on altair beacon state") .clone(), }; + let fork = self + .spec + .fork_at_epoch(message_slot.epoch(E::slots_per_epoch())); sync_committee .pubkeys @@ -672,7 +665,7 @@ where head_block_root, validator_index as u64, &self.validator_keypairs[validator_index].sk, - &state.fork(), + &fork, state.genesis_validators_root(), &self.spec, ); @@ -727,6 +720,7 @@ where block_hash, slot, ); + let fork = self.spec.fork_at_epoch(slot.epoch(E::slots_per_epoch())); let aggregated_attestations: Vec>> = unaggregated_attestations @@ -749,9 +743,9 @@ where } let selection_proof = SelectionProof::new::( - state.slot(), + slot, &self.validator_keypairs[*validator_index].sk, - &state.fork(), + &fork, state.genesis_validators_root(), &self.spec, ); @@ -782,7 +776,7 @@ where aggregate, None, &self.validator_keypairs[aggregator_index].sk, - &state.fork(), + &fork, state.genesis_validators_root(), &self.spec, ); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 30cde98d6..dbba5e318 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -5,7 +5,9 @@ extern crate lazy_static; use beacon_chain::{ attestation_verification::Error as AttnError, - test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, + test_utils::{ + test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, + }, BeaconChain, BeaconChainTypes, WhenSlotSkipped, }; use int_to_bytes::int_to_bytes32; @@ -33,13 +35,16 @@ lazy_static! { /// Returns a beacon chain harness. fn get_harness(validator_count: usize) -> BeaconChainHarness> { - let harness = BeaconChainHarness::new_with_target_aggregators( + let mut spec = test_spec::(); + + // A kind-of arbitrary number that ensures that _some_ validators are aggregators, but + // not all. + spec.target_aggregators_per_committee = 4; + + let harness = BeaconChainHarness::new_with_store_config( MainnetEthSpec, - None, + Some(spec), KEYPAIRS[0..validator_count].to_vec(), - // A kind-of arbitrary number that ensures that _some_ validators are aggregators, but - // not all. - 4, StoreConfig::default(), ); diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index e02b80231..b03a106c9 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -840,11 +840,10 @@ fn verify_block_for_gossip_slashing_detection() { .unwrap(), ); - let harness = BeaconChainHarness::new_with_mutator( + let harness = BeaconChainHarness::ephemeral_with_mutator( MainnetEthSpec, None, KEYPAIRS.to_vec(), - 1 << 32, StoreConfig::default(), ChainConfig::default(), |builder| builder.slasher(slasher.clone()), @@ -927,7 +926,6 @@ fn add_base_block_to_altair_chain() { MainnetEthSpec, Some(spec), KEYPAIRS[..].to_vec(), - 1 << 32, StoreConfig::default(), ChainConfig::default(), ); @@ -1047,7 +1045,6 @@ fn add_altair_block_to_base_chain() { MainnetEthSpec, Some(spec), KEYPAIRS[..].to_vec(), - 1 << 32, StoreConfig::default(), ChainConfig::default(), ); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 4b687fa04..bff962736 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2,9 +2,10 @@ use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::test_utils::{ - test_logger, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, + test_logger, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, + DiskHarnessType, HARNESS_SLOT_TIME, }; -use beacon_chain::{BeaconChain, BeaconChainTypes, BeaconSnapshot}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BeaconSnapshot, ChainConfig}; use lazy_static::lazy_static; use maplit::hashset; use rand::Rng; @@ -34,7 +35,13 @@ type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { - let spec = test_spec::(); + get_store_with_spec(db_path, test_spec::()) +} + +fn get_store_with_spec( + db_path: &TempDir, + spec: ChainSpec, +) -> Arc, LevelDB>> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let config = StoreConfig::default(); @@ -1785,7 +1792,6 @@ fn finalizes_after_resuming_from_db() { .persist_eth1_cache() .expect("should persist the eth1 cache"); - let data_dir = harness.data_dir; let original_chain = harness.chain; let resumed_harness = BeaconChainHarness::resume_from_disk_store( @@ -1793,7 +1799,6 @@ fn finalizes_after_resuming_from_db() { None, store, KEYPAIRS[0..validator_count].to_vec(), - data_dir, ); assert_chains_pretty_much_the_same(&original_chain, &resumed_harness.chain); @@ -1839,6 +1844,170 @@ fn finalizes_after_resuming_from_db() { ); } +#[test] +fn revert_minority_fork_on_resume() { + let validator_count = 16; + let slots_per_epoch = MinimalEthSpec::slots_per_epoch(); + + let fork_epoch = Epoch::new(4); + let fork_slot = fork_epoch.start_slot(slots_per_epoch); + let initial_blocks = slots_per_epoch * fork_epoch.as_u64() - 1; + let post_fork_blocks = slots_per_epoch * 3; + + let mut spec1 = MinimalEthSpec::default_spec(); + spec1.altair_fork_epoch = None; + let mut spec2 = MinimalEthSpec::default_spec(); + spec2.altair_fork_epoch = Some(fork_epoch); + + let all_validators = (0..validator_count).collect::>(); + + // Chain with no fork epoch configured. + let db_path1 = tempdir().unwrap(); + let store1 = get_store_with_spec(&db_path1, spec1.clone()); + let harness1 = BeaconChainHarness::new_with_disk_store( + MinimalEthSpec, + Some(spec1), + store1, + KEYPAIRS[0..validator_count].to_vec(), + ); + + // Chain with fork epoch configured. + let db_path2 = tempdir().unwrap(); + let store2 = get_store_with_spec(&db_path2, spec2.clone()); + let harness2 = BeaconChainHarness::new_with_disk_store( + MinimalEthSpec, + Some(spec2.clone()), + store2, + KEYPAIRS[0..validator_count].to_vec(), + ); + + // Apply the same blocks to both chains initially. + let mut state = harness1.get_current_state(); + let mut block_root = harness1.chain.genesis_block_root; + for slot in (1..=initial_blocks).map(Slot::new) { + let state_root = state.update_tree_hash_cache().unwrap(); + + let attestations = harness1.make_attestations( + &all_validators, + &state, + state_root, + block_root.into(), + slot, + ); + harness1.set_current_slot(slot); + harness2.set_current_slot(slot); + harness1.process_attestations(attestations.clone()); + harness2.process_attestations(attestations); + + let (block, new_state) = harness1.make_block(state, slot); + + harness1.process_block(slot, block.clone()).unwrap(); + harness2.process_block(slot, block.clone()).unwrap(); + + state = new_state; + block_root = block.canonical_root(); + } + + assert_eq!(harness1.chain.head_info().unwrap().slot, fork_slot - 1); + assert_eq!(harness2.chain.head_info().unwrap().slot, fork_slot - 1); + + // Fork the two chains. + let mut state1 = state.clone(); + let mut state2 = state.clone(); + + let mut majority_blocks = vec![]; + + for i in 0..post_fork_blocks { + let slot = fork_slot + i; + + // Attestations on majority chain. + let state_root = state.update_tree_hash_cache().unwrap(); + + let attestations = harness2.make_attestations( + &all_validators, + &state2, + state_root, + block_root.into(), + slot, + ); + harness2.set_current_slot(slot); + harness2.process_attestations(attestations); + + // Minority chain block (no attesters). + let (block1, new_state1) = harness1.make_block(state1, slot); + harness1.process_block(slot, block1).unwrap(); + state1 = new_state1; + + // Majority chain block (all attesters). + let (block2, new_state2) = harness2.make_block(state2, slot); + harness2.process_block(slot, block2.clone()).unwrap(); + + state2 = new_state2; + block_root = block2.canonical_root(); + + majority_blocks.push(block2); + } + + let end_slot = fork_slot + post_fork_blocks - 1; + assert_eq!(harness1.chain.head_info().unwrap().slot, end_slot); + assert_eq!(harness2.chain.head_info().unwrap().slot, end_slot); + + // Resume from disk with the hard-fork activated: this should revert the post-fork blocks. + // We have to do some hackery with the `slot_clock` so that the correct slot is set when + // the beacon chain builder loads the head block. + drop(harness1); + let resume_store = get_store_with_spec(&db_path1, spec2.clone()); + let resumed_harness = BeaconChainHarness::new_with_mutator( + MinimalEthSpec, + spec2, + resume_store, + KEYPAIRS[0..validator_count].to_vec(), + ChainConfig::default(), + |mut builder| { + builder = builder + .resume_from_db() + .unwrap() + .testing_slot_clock(HARNESS_SLOT_TIME) + .unwrap(); + builder + .get_slot_clock() + .unwrap() + .set_slot(end_slot.as_u64()); + builder + }, + ); + + // Head should now be just before the fork. + resumed_harness.chain.fork_choice().unwrap(); + let head = resumed_harness.chain.head_info().unwrap(); + assert_eq!(head.slot, fork_slot - 1); + + // Head track should know the canonical head and the rogue head. + assert_eq!(resumed_harness.chain.heads().len(), 2); + assert!(resumed_harness.chain.knows_head(&head.block_root.into())); + + // Apply blocks from the majority chain and trigger finalization. + let initial_split_slot = resumed_harness.chain.store.get_split_slot(); + for block in &majority_blocks { + resumed_harness.process_block_result(block.clone()).unwrap(); + + // The canonical head should be the block from the majority chain. + resumed_harness.chain.fork_choice().unwrap(); + let head_info = resumed_harness.chain.head_info().unwrap(); + assert_eq!(head_info.slot, block.slot()); + assert_eq!(head_info.block_root, block.canonical_root()); + } + let advanced_split_slot = resumed_harness.chain.store.get_split_slot(); + + // Check that the migration ran successfully. + assert!(advanced_split_slot > initial_split_slot); + + // Check that there is only a single head now matching harness2 (the minority chain is gone). + let heads = resumed_harness.chain.heads(); + assert_eq!(heads, harness2.chain.heads()); + assert_eq!(heads.len(), 1); +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 9329c3847..e1483c939 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -9,7 +9,6 @@ sloggers = "1.0.1" genesis = { path = "../genesis" } lazy_static = "1.4.0" matches = "0.1.8" -tempfile = "3.1.0" exit-future = "0.2.0" slog-term = "2.6.0" slog-async = "2.5.0" diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 793194d26..43f1626e1 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -262,22 +262,40 @@ impl, Cold: ItemStore> HotColdDB return Ok(Some(block.clone())); } - // Fetch from database. - match self - .hot_db - .get_bytes(DBColumn::BeaconBlock.into(), block_root.as_bytes())? - { - Some(block_bytes) => { - // Deserialize. - let block = SignedBeaconBlock::from_ssz_bytes(&block_bytes, &self.spec)?; + let block = self.get_block_with(block_root, |bytes| { + SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) + })?; - // Add to cache. - self.block_cache.lock().put(*block_root, block.clone()); - - Ok(Some(block)) - } - None => Ok(None), + // Add to cache. + if let Some(ref block) = block { + self.block_cache.lock().put(*block_root, block.clone()); } + + Ok(block) + } + + /// Fetch a block from the store, ignoring which fork variant it *should* be for. + pub fn get_block_any_variant( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + self.get_block_with(block_root, SignedBeaconBlock::any_from_ssz_bytes) + } + + /// Fetch a block from the store using a custom decode function. + /// + /// This is useful for e.g. ignoring the slot-indicated fork to forcefully load a block as if it + /// were for a different fork. + pub fn get_block_with( + &self, + block_root: &Hash256, + decoder: impl FnOnce(&[u8]) -> Result, ssz::DecodeError>, + ) -> Result>, Error> { + self.hot_db + .get_bytes(DBColumn::BeaconBlock.into(), block_root.as_bytes())? + .map(|block_bytes| decoder(&block_bytes)) + .transpose() + .map_err(|e| e.into()) } /// Determine whether a block exists in the database. @@ -766,7 +784,7 @@ impl, Cold: ItemStore> HotColdDB /// /// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot /// equal to `start_slot`, to reach a state with slot equal to `end_slot`. - fn load_blocks_to_replay( + pub fn load_blocks_to_replay( &self, start_slot: Slot, end_slot: Slot, diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 014565cb2..f32b531ad 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -232,6 +232,7 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { store: &'a HotColdDB, next_block_root: Hash256, + decode_any_variant: bool, _phantom: PhantomData, } @@ -242,6 +243,17 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Self { store, next_block_root: start_block_root, + decode_any_variant: false, + _phantom: PhantomData, + } + } + + /// Block iterator that is tolerant of blocks that have the wrong fork for their slot. + pub fn fork_tolerant(store: &'a HotColdDB, start_block_root: Hash256) -> Self { + Self { + store, + next_block_root: start_block_root, + decode_any_variant: true, _phantom: PhantomData, } } @@ -253,10 +265,12 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Ok(None) } else { let block_root = self.next_block_root; - let block = self - .store - .get_block(&block_root)? - .ok_or(Error::BlockNotFound(block_root))?; + let block = if self.decode_any_variant { + self.store.get_block_any_variant(&block_root) + } else { + self.store.get_block(&block_root) + }? + .ok_or(Error::BlockNotFound(block_root))?; self.next_block_root = block.message().parent_root(); Ok(Some((block_root, block))) } diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 2c672d870..a943e11e0 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -4,7 +4,7 @@ use proto_array::{Block as ProtoBlock, ProtoArrayForkChoice}; use ssz_derive::{Decode, Encode}; use types::{ AttestationShufflingId, BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, - Hash256, IndexedAttestation, RelativeEpoch, Slot, + Hash256, IndexedAttestation, RelativeEpoch, SignedBeaconBlock, Slot, }; use crate::ForkChoiceStore; @@ -38,6 +38,10 @@ pub enum Error { ForkChoiceStoreError(T), UnableToSetJustifiedCheckpoint(T), AfterBlockFailed(T), + InvalidAnchor { + block_slot: Slot, + state_slot: Slot, + }, } impl From for Error { @@ -237,20 +241,28 @@ where T: ForkChoiceStore, E: EthSpec, { - /// Instantiates `Self` from the genesis parameters. - pub fn from_genesis( + /// Instantiates `Self` from an anchor (genesis or another finalized checkpoint). + pub fn from_anchor( fc_store: T, - genesis_block_root: Hash256, - genesis_block: &BeaconBlock, - genesis_state: &BeaconState, + anchor_block_root: Hash256, + anchor_block: &SignedBeaconBlock, + anchor_state: &BeaconState, ) -> Result> { - let finalized_block_slot = genesis_block.slot(); - let finalized_block_state_root = genesis_block.state_root(); + // Sanity check: the anchor must lie on an epoch boundary. + if anchor_block.slot() % E::slots_per_epoch() != 0 { + return Err(Error::InvalidAnchor { + block_slot: anchor_block.slot(), + state_slot: anchor_state.slot(), + }); + } + + let finalized_block_slot = anchor_block.slot(); + let finalized_block_state_root = anchor_block.state_root(); let current_epoch_shuffling_id = - AttestationShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Current) + AttestationShufflingId::new(anchor_block_root, anchor_state, RelativeEpoch::Current) .map_err(Error::BeaconStateError)?; let next_epoch_shuffling_id = - AttestationShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Next) + AttestationShufflingId::new(anchor_block_root, anchor_state, RelativeEpoch::Next) .map_err(Error::BeaconStateError)?; let proto_array = ProtoArrayForkChoice::new( diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 21bb63418..5e78bde28 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -48,12 +48,10 @@ impl fmt::Debug for ForkChoiceTest { impl ForkChoiceTest { /// Creates a new tester. pub fn new() -> Self { - let harness = BeaconChainHarness::new_with_target_aggregators( + let harness = BeaconChainHarness::new_with_store_config( MainnetEthSpec, None, generate_deterministic_keypairs(VALIDATOR_COUNT), - // Ensure we always have an aggregator for each slot. - u64::max_value(), StoreConfig::default(), ); @@ -66,8 +64,6 @@ impl ForkChoiceTest { MainnetEthSpec, None, generate_deterministic_keypairs(VALIDATOR_COUNT), - // Ensure we always have an aggregator for each slot. - u64::max_value(), StoreConfig::default(), chain_config, ); diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index 27bed6f49..962e75cc2 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -88,6 +88,17 @@ impl BeaconBlock { } } + /// Try decoding each beacon block variant in sequence. + /// + /// This is *not* recommended unless you really have no idea what variant the block should be. + /// Usually it's better to prefer `from_ssz_bytes` which will decode the correct variant based + /// on the fork slot. + pub fn any_from_ssz_bytes(bytes: &[u8]) -> Result { + BeaconBlockAltair::from_ssz_bytes(bytes) + .map(BeaconBlock::Altair) + .or_else(|_| BeaconBlockBase::from_ssz_bytes(bytes).map(BeaconBlock::Base)) + } + /// Convenience accessor for the `body` as a `BeaconBlockBodyRef`. pub fn body(&self) -> BeaconBlockBodyRef<'_, T> { self.to_ref().body() diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 14c1ffb8c..2d1fed18a 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -74,10 +74,23 @@ impl SignedBeaconBlock { self.message().fork_name(spec) } - /// SSZ decode. + /// SSZ decode with fork variant determined by slot. pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { - // We need to use the slot-switching `from_ssz_bytes` of `BeaconBlock`, which doesn't - // compose with the other SSZ utils, so we duplicate some parts of `ssz_derive` here. + Self::from_ssz_bytes_with(bytes, |bytes| BeaconBlock::from_ssz_bytes(bytes, spec)) + } + + /// SSZ decode which attempts to decode all variants (slow). + pub fn any_from_ssz_bytes(bytes: &[u8]) -> Result { + Self::from_ssz_bytes_with(bytes, BeaconBlock::any_from_ssz_bytes) + } + + /// SSZ decode with custom decode function. + pub fn from_ssz_bytes_with( + bytes: &[u8], + block_decoder: impl FnOnce(&[u8]) -> Result, ssz::DecodeError>, + ) -> Result { + // We need the customer decoder for `BeaconBlock`, which doesn't compose with the other + // SSZ utils, so we duplicate some parts of `ssz_derive` here. let mut builder = ssz::SszDecoderBuilder::new(bytes); builder.register_anonymous_variable_length_item()?; @@ -86,7 +99,7 @@ impl SignedBeaconBlock { let mut decoder = builder.build()?; // Read the first item as a `BeaconBlock`. - let message = decoder.decode_next_with(|bytes| BeaconBlock::from_ssz_bytes(bytes, spec))?; + let message = decoder.decode_next_with(block_decoder)?; let signature = decoder.decode_next()?; Ok(Self::from_block(message, signature))