From 9d2d6239cdbf3a6d2dc398f433a809189ca6923e Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 1 Oct 2020 01:41:58 +0000 Subject: [PATCH] Weak subjectivity start from genesis (#1675) ## Issue Addressed Solution 2 proposed here: https://github.com/sigp/lighthouse/issues/1435#issuecomment-692317639 ## Proposed Changes - Adds an optional `--wss-checkpoint` flag that takes a string `root:epoch` - Verify that the given checkpoint exists in the chain, or that the the chain syncs through this checkpoint. If not, shutdown and prompt the user to purge state before restarting. ## Additional Info Co-authored-by: Paul Hauner --- Cargo.lock | 3 + beacon_node/Cargo.toml | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 127 ++++++- .../beacon_chain/src/block_verification.rs | 7 + beacon_node/beacon_chain/src/builder.rs | 36 ++ beacon_node/beacon_chain/src/chain_config.rs | 6 + beacon_node/beacon_chain/src/errors.rs | 3 + beacon_node/beacon_chain/src/test_utils.rs | 50 ++- beacon_node/client/src/builder.rs | 7 + .../src/attestation_service/tests/mod.rs | 4 + .../network/src/beacon_processor/worker.rs | 1 + beacon_node/src/cli.rs | 10 + beacon_node/src/config.rs | 37 ++- consensus/fork_choice/Cargo.toml | 1 + consensus/fork_choice/tests/tests.rs | 309 +++++++++++++++++- 16 files changed, 584 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6049d2e7f..2468994ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,6 +330,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "eth2_ssz_types", + "exit-future", "fork_choice", "futures 0.3.5", "genesis", @@ -386,6 +387,7 @@ dependencies = [ "exit-future", "futures 0.3.5", "genesis", + "hex 0.4.2", "hyper 0.13.8", "lighthouse_version", "logging", @@ -1781,6 +1783,7 @@ dependencies = [ "beacon_chain", "eth2_ssz", "eth2_ssz_derive", + "hex 0.4.2", "proto_array", "slot_clock", "state_processing", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 0351b1cb4..0fb55b6f8 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -40,3 +40,4 @@ serde = "1.0.110" clap_utils = { path = "../common/clap_utils" } hyper = "0.13.5" lighthouse_version = { path = "../common/lighthouse_version" } +hex = "0.4.2" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 05ae819c4..9694e8fb3 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -59,3 +59,4 @@ bus = "2.2.3" derivative = "2.1.1" itertools = "0.9.0" regex = "1.3.9" +exit-future = "0.2.0" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1caaec5fe..63aaada9c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -12,7 +12,6 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; use crate::head_tracker::HeadTracker; -use crate::metrics; use crate::migrate::Migrate; use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool}; use crate::observed_attestations::{Error as AttestationObservationError, ObservedAttestations}; @@ -27,7 +26,9 @@ use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; +use crate::{metrics, BeaconChainError}; use fork_choice::ForkChoice; +use futures::channel::mpsc::Sender; use itertools::process_results; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; @@ -222,6 +223,9 @@ pub struct BeaconChain { pub(crate) validator_pubkey_cache: TimeoutRwLock, /// A list of any hard-coded forks that have been disabled. pub disabled_forks: Vec, + /// Sender given to tasks, so that if they encounter a state in which execution cannot + /// continue they can request that everything shuts down. + pub shutdown_sender: Sender<&'static str>, /// Logging to CLI, etc. pub(crate) log: Logger, /// Arbitrary bytes included in the blocks. @@ -680,7 +684,7 @@ impl BeaconChain { /// Returns the block canonical root of the current canonical chain at a given slot. /// - /// Returns None if a block doesn't exist at the slot. + /// Returns `None` if the given slot doesn't exist in the chain. pub fn root_at_slot(&self, target_slot: Slot) -> Result, Error> { process_results(self.rev_iter_block_roots()?, |mut iter| { iter.find(|(_, slot)| *slot == target_slot) @@ -688,6 +692,26 @@ impl BeaconChain { }) } + /// Returns the block canonical root of the current canonical chain at a given slot, starting from the given state. + /// + /// Returns `None` if the given slot doesn't exist in the chain. + pub fn root_at_slot_from_state( + &self, + target_slot: Slot, + beacon_block_root: Hash256, + state: &BeaconState, + ) -> Result, Error> { + let iter = BlockRootsIterator::new(self.store.clone(), state); + let iter_with_head = std::iter::once(Ok((beacon_block_root, state.slot))) + .chain(iter) + .map(|result| result.map_err(|e| e.into())); + + process_results(iter_with_head, |mut iter| { + iter.find(|(_, slot)| *slot == target_slot) + .map(|(root, _)| root) + }) + } + /// Returns the block proposer for a given slot. /// /// Information is read from the present `beacon_state` shuffling, only information from the @@ -1242,7 +1266,7 @@ impl BeaconChain { return ChainSegmentResult::Failed { imported_blocks, error: BlockError::NotFinalizedDescendant { block_parent_root }, - } + }; } // If there was an error whilst determining if the block was invalid, return that // error. @@ -1250,7 +1274,7 @@ impl BeaconChain { return ChainSegmentResult::Failed { imported_blocks, error: BlockError::BeaconChainError(e), - } + }; } // If the block was decided to be irrelevant for any other reason, don't include // this block or any of it's children in the filtered chain segment. @@ -1284,7 +1308,7 @@ impl BeaconChain { return ChainSegmentResult::Failed { imported_blocks, error, - } + }; } }; @@ -1296,7 +1320,7 @@ impl BeaconChain { return ChainSegmentResult::Failed { imported_blocks, error, - } + }; } } } @@ -1514,6 +1538,38 @@ impl BeaconChain { check_block_is_finalized_descendant::(signed_block, &fork_choice, &self.store)?; let block = &signed_block.message; + // compare the existing finalized checkpoint with the incoming block's finalized checkpoint + let old_finalized_checkpoint = fork_choice.finalized_checkpoint(); + let new_finalized_checkpoint = state.finalized_checkpoint; + + // Only perform the weak subjectivity check if it was configured. + if let Some(wss_checkpoint) = self.config.weak_subjectivity_checkpoint { + // This ensures we only perform the check once. + if (old_finalized_checkpoint.epoch < wss_checkpoint.epoch) + && (wss_checkpoint.epoch <= new_finalized_checkpoint.epoch) + { + if let Err(e) = + self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, &state) + { + let mut shutdown_sender = self.shutdown_sender(); + crit!( + self.log, + "Weak subjectivity checkpoint verification failed while importing block!"; + "block_root" => format!("{:?}", block_root), + "parent_root" => format!("{:?}", block.parent_root), + "old_finalized_epoch" => format!("{:?}", old_finalized_checkpoint.epoch), + "new_finalized_epoch" => format!("{:?}", new_finalized_checkpoint.epoch), + "weak_subjectivity_epoch" => format!("{:?}", wss_checkpoint.epoch), + "error" => format!("{:?}", e), + ); + crit!(self.log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network."); + shutdown_sender.try_send("Weak subjectivity checkpoint verification failed. Provided block root is not a checkpoint.") + .map_err(|err|BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?; + return Err(BlockError::WeakSubjectivityConflict); + } + } + } + // Register the new block with the fork choice service. { let _fork_choice_block_timer = @@ -1928,6 +1984,60 @@ impl BeaconChain { Ok(()) } + /// This function takes a configured weak subjectivity `Checkpoint` and the latest finalized `Checkpoint`. + /// If the weak subjectivity checkpoint and finalized checkpoint share the same epoch, we compare + /// roots. If we the weak subjectivity checkpoint is from an older epoch, we iterate back through + /// roots in the canonical chain until we reach the finalized checkpoint from the correct epoch, and + /// compare roots. This must called on startup and during verification of any block which causes a finality + /// change affecting the weak subjectivity checkpoint. + pub fn verify_weak_subjectivity_checkpoint( + &self, + wss_checkpoint: Checkpoint, + beacon_block_root: Hash256, + state: &BeaconState, + ) -> Result<(), BeaconChainError> { + let finalized_checkpoint = state.finalized_checkpoint; + info!(self.log, "Verifying the configured weak subjectivity checkpoint"; "weak_subjectivity_epoch" => wss_checkpoint.epoch, "weak_subjectivity_root" => format!("{:?}", wss_checkpoint.root)); + // If epochs match, simply compare roots. + if wss_checkpoint.epoch == finalized_checkpoint.epoch + && wss_checkpoint.root != finalized_checkpoint.root + { + crit!( + self.log, + "Root found at the specified checkpoint differs"; + "weak_subjectivity_root" => format!("{:?}", wss_checkpoint.root), + "finalized_checkpoint_root" => format!("{:?}", finalized_checkpoint.root) + ); + return Err(BeaconChainError::WeakSubjectivtyVerificationFailure); + } else if wss_checkpoint.epoch < finalized_checkpoint.epoch { + let slot = wss_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // Iterate backwards through block roots from the given state. If first slot of the epoch is a skip-slot, + // this will return the root of the closest prior non-skipped slot. + match self.root_at_slot_from_state(slot, beacon_block_root, state)? { + Some(root) => { + if root != wss_checkpoint.root { + crit!( + self.log, + "Root found at the specified checkpoint differs"; + "weak_subjectivity_root" => format!("{:?}", wss_checkpoint.root), + "finalized_checkpoint_root" => format!("{:?}", finalized_checkpoint.root) + ); + return Err(BeaconChainError::WeakSubjectivtyVerificationFailure); + } + } + None => { + crit!(self.log, "The root at the start slot of the given epoch could not be found"; + "wss_checkpoint_slot" => format!("{:?}", slot)); + return Err(BeaconChainError::WeakSubjectivtyVerificationFailure); + } + } + } + Ok(()) + } + /// Called by the timer on every slot. /// /// Performs slot-based pruning. @@ -2163,6 +2273,11 @@ impl BeaconChain { writeln!(output, "}}").unwrap(); } + /// Get a channel to request shutting down. + pub fn shutdown_sender(&self) -> Sender<&'static str> { + self.shutdown_sender.clone() + } + // Used for debugging #[allow(dead_code)] pub fn dump_dot_file(&self, file_name: &str) { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index c6d8ab02b..0e3e7db7d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -207,6 +207,13 @@ pub enum BlockError { /// We were unable to process this block due to an internal error. It's unclear if the block is /// valid. BeaconChainError(BeaconChainError), + /// There was an error whilst verifying weak subjectivity. This block conflicts with the + /// configured weak subjectivity checkpoint and was not imported. + /// + /// ## Peer scoring + /// + /// The block is invalid and the peer is faulty. + WeakSubjectivityConflict, } impl std::fmt::Display for BlockError { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ac9d1c4b1..f83f0d6cb 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -18,6 +18,7 @@ use crate::{ }; use eth1::Config as Eth1Config; use fork_choice::ForkChoice; +use futures::channel::mpsc::Sender; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; use slog::{info, Logger}; @@ -107,6 +108,7 @@ pub struct BeaconChainBuilder { eth1_chain: Option>, event_handler: Option, slot_clock: Option, + shutdown_sender: Option>, head_tracker: Option, data_dir: Option, pubkey_cache_path: Option, @@ -154,6 +156,7 @@ where eth1_chain: None, event_handler: None, slot_clock: None, + shutdown_sender: None, head_tracker: None, pubkey_cache_path: None, data_dir: None, @@ -405,6 +408,12 @@ where self } + /// Sets a `Sender` to allow the beacon chain to send shutdown signals. + pub fn shutdown_sender(mut self, sender: Sender<&'static str>) -> Self { + self.shutdown_sender = Some(sender); + self + } + /// Creates a new, empty operation pool. fn empty_op_pool(mut self) -> Self { self.op_pool = Some(OperationPool::new()); @@ -575,6 +584,9 @@ where shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), disabled_forks: self.disabled_forks, + shutdown_sender: self + .shutdown_sender + .ok_or_else(|| "Cannot build without a shutdown sender.".to_string())?, log: log.clone(), graffiti: self.graffiti, }; @@ -583,6 +595,27 @@ where .head() .map_err(|e| format!("Failed to get head: {:?}", e))?; + // Only perform the check if it was configured. + if let Some(wss_checkpoint) = beacon_chain.config.weak_subjectivity_checkpoint { + if let Err(e) = beacon_chain.verify_weak_subjectivity_checkpoint( + wss_checkpoint, + head.beacon_block_root, + &head.beacon_state, + ) { + crit!( + log, + "Weak subjectivity checkpoint verification failed on startup!"; + "head_block_root" => format!("{}", head.beacon_block_root), + "head_slot" => format!("{}", head.beacon_block.slot()), + "finalized_epoch" => format!("{}", head.beacon_state.finalized_checkpoint.epoch), + "wss_checkpoint_epoch" => format!("{}", wss_checkpoint.epoch), + "error" => format!("{:?}", e), + ); + crit!(log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network."); + return Err(format!("Weak subjectivity verification failed: {:?}", e)); + } + } + info!( log, "Beacon chain initialized"; @@ -761,6 +794,8 @@ mod test { ) .expect("should create interop genesis state"); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .store(Arc::new(store)) @@ -773,6 +808,7 @@ mod test { .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") + .shutdown_sender(shutdown_tx) .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 5c12bb40f..d84e62665 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,4 +1,5 @@ use serde_derive::{Deserialize, Serialize}; +use types::Checkpoint; /// There is a 693 block skip in the current canonical Medalla chain, we use 700 to be safe. pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 700; @@ -10,12 +11,17 @@ pub struct ChainConfig { /// /// If `None`, there is no limit. pub import_max_skip_slots: Option, + /// A user-input `Checkpoint` that must exist in the beacon chain's sync path. + /// + /// If `None`, there is no weak subjectivity verification. + pub weak_subjectivity_checkpoint: Option, } impl Default for ChainConfig { fn default() -> Self { Self { import_max_skip_slots: Some(DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS), + weak_subjectivity_checkpoint: None, } } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 96f1c9a84..869250d62 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -5,6 +5,7 @@ use crate::naive_aggregation_pool::Error as NaiveAggregationError; use crate::observed_attestations::Error as ObservedAttestationsError; use crate::observed_attesters::Error as ObservedAttestersError; use crate::observed_block_producers::Error as ObservedBlockProducersError; +use futures::channel::mpsc::TrySendError; use operation_pool::OpPoolError; use safe_arith::ArithError; use ssz_types::Error as SszTypesError; @@ -83,6 +84,8 @@ pub enum BeaconChainError { ObservedBlockProducersError(ObservedBlockProducersError), PruningError(PruningError), ArithError(ArithError), + WeakSubjectivtyVerificationFailure, + WeakSubjectivtyShutdownError(TrySendError<&'static str>), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 8690c2e8d..abf233027 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -8,8 +8,9 @@ use crate::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, events::NullEventHandler, - BeaconChain, BeaconChainTypes, StateSkipConfig, + BeaconChain, BeaconChainTypes, BlockError, ChainConfig, StateSkipConfig, }; +use futures::channel::mpsc::Receiver; use genesis::interop_genesis_state; use rand::rngs::StdRng; use rand::Rng; @@ -107,6 +108,7 @@ pub struct BeaconChainHarness { pub chain: BeaconChain, pub spec: ChainSpec, pub data_dir: TempDir, + pub shutdown_receiver: Receiver<&'static str>, pub rng: StdRng, } @@ -134,6 +136,7 @@ impl BeaconChainHarness> { let config = StoreConfig::default(); let store = Arc::new(HotColdDB::open_ephemeral(config, spec.clone(), log.clone()).unwrap()); + let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) @@ -151,6 +154,7 @@ impl BeaconChainHarness> { .null_event_handler() .testing_slot_clock(HARNESS_SLOT_TIME) .unwrap() + .shutdown_sender(shutdown_tx) .build() .unwrap(); @@ -159,6 +163,7 @@ impl BeaconChainHarness> { chain, validators_keypairs, data_dir, + shutdown_receiver, rng: make_rng(), } } @@ -184,7 +189,25 @@ impl BeaconChainHarness> { eth_spec_instance: E, validators_keypairs: Vec, target_aggregators_per_committee: u64, - config: StoreConfig, + store_config: StoreConfig, + ) -> Self { + Self::new_with_chain_config( + eth_spec_instance, + validators_keypairs, + target_aggregators_per_committee, + store_config, + ChainConfig::default(), + ) + } + + /// Instantiate a new harness with `validator_count` initial validators, a custom + /// `target_aggregators_per_committee` spec value, and a `ChainConfig` + pub fn new_with_chain_config( + eth_spec_instance: E, + validators_keypairs: Vec, + target_aggregators_per_committee: u64, + store_config: StoreConfig, + chain_config: ChainConfig, ) -> Self { let data_dir = tempdir().expect("should create temporary data_dir"); let mut spec = E::default_spec(); @@ -195,8 +218,9 @@ impl BeaconChainHarness> { let drain = slog_term::FullFormat::new(decorator).build(); let debug_level = slog::LevelFilter::new(drain, slog::Level::Debug); let log = slog::Logger::root(std::sync::Mutex::new(debug_level).fuse(), o!()); + let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); - let store = HotColdDB::open_ephemeral(config, spec.clone(), log.clone()).unwrap(); + let store = HotColdDB::open_ephemeral(store_config, spec.clone(), log.clone()).unwrap(); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log) .custom_spec(spec.clone()) @@ -213,6 +237,8 @@ impl BeaconChainHarness> { .null_event_handler() .testing_slot_clock(HARNESS_SLOT_TIME) .expect("should configure testing slot clock") + .shutdown_sender(shutdown_tx) + .chain_config(chain_config) .build() .expect("should build"); @@ -221,6 +247,7 @@ impl BeaconChainHarness> { chain, validators_keypairs, data_dir, + shutdown_receiver, rng: make_rng(), } } @@ -240,6 +267,7 @@ impl BeaconChainHarness> { let drain = slog_term::FullFormat::new(decorator).build(); let debug_level = slog::LevelFilter::new(drain, slog::Level::Debug); let log = slog::Logger::root(std::sync::Mutex::new(debug_level).fuse(), o!()); + let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) @@ -258,6 +286,7 @@ impl BeaconChainHarness> { .null_event_handler() .testing_slot_clock(HARNESS_SLOT_TIME) .expect("should configure testing slot clock") + .shutdown_sender(shutdown_tx) .build() .expect("should build"); @@ -266,6 +295,7 @@ impl BeaconChainHarness> { chain, validators_keypairs, data_dir, + shutdown_receiver, rng: make_rng(), } } @@ -282,6 +312,7 @@ impl BeaconChainHarness> { let spec = E::default_spec(); let log = NullLoggerBuilder.build().expect("logger should build"); + let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) @@ -300,6 +331,7 @@ impl BeaconChainHarness> { .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") + .shutdown_sender(shutdown_tx) .build() .expect("should build"); @@ -308,6 +340,7 @@ impl BeaconChainHarness> { chain, validators_keypairs, data_dir, + shutdown_receiver, rng: make_rng(), } } @@ -608,6 +641,17 @@ where block_hash } + pub fn process_block_result( + &self, + slot: Slot, + block: SignedBeaconBlock, + ) -> Result> { + assert_eq!(self.chain.slot().unwrap(), slot); + let block_hash: SignedBeaconBlockHash = self.chain.process_block(block)?.into(); + self.chain.fork_choice().unwrap(); + Ok(block_hash) + } + pub fn process_attestations(&self, attestations: HarnessAttestations) { for (unaggregated_attestations, maybe_signed_aggregate) in attestations.into_iter() { for (attestation, subnet_id) in unaggregated_attestations { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 15cd97ea8..3c8026f5b 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -412,6 +412,12 @@ where { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. pub fn build_beacon_chain(mut self) -> Result { + let context = self + .runtime_context + .as_ref() + .ok_or_else(|| "beacon_chain requires a runtime context")? + .clone(); + let chain = self .beacon_chain_builder .ok_or_else(|| "beacon_chain requires a beacon_chain_builder")? @@ -424,6 +430,7 @@ where .clone() .ok_or_else(|| "beacon_chain requires a slot clock")?, ) + .shutdown_sender(context.executor.shutdown_sender()) .build() .map_err(|e| format!("Failed to build beacon chain: {}", e))?; diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 0a392727a..9ae897304 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -47,6 +47,9 @@ mod tests { let store = HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()) .unwrap(); + + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let chain = Arc::new( BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) @@ -67,6 +70,7 @@ mod tests { Duration::from_secs(recent_genesis_time()), Duration::from_millis(SLOT_DURATION_MILLIS), )) + .shutdown_sender(shutdown_tx) .build() .expect("should build"), ); diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker.rs index 653922dfe..cef373f17 100644 --- a/beacon_node/network/src/beacon_processor/worker.rs +++ b/beacon_node/network/src/beacon_processor/worker.rs @@ -231,6 +231,7 @@ impl Worker { | Err(e @ BlockError::BlockIsNotLaterThanParent { .. }) | Err(e @ BlockError::InvalidSignature) | Err(e @ BlockError::TooManySkippedSlots { .. }) + | Err(e @ BlockError::WeakSubjectivityConflict) | Err(e @ BlockError::GenesisBlock) => { warn!(self.log, "Could not verify block for gossip, rejecting the block"; "error" => e.to_string()); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9f6ee79b1..987f151ad 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -281,4 +281,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("700") ) + .arg( + Arg::with_name("wss-checkpoint") + .long("wss-checkpoint") + .help( + "Used to input a Weak Subjectivity State Checkpoint in `block_root:epoch_number` format,\ + where block_root is an '0x' prefixed 32-byte hex string and epoch_number is an integer." + ) + .value_name("WSS_CHECKPOINT") + .takes_value(true) + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 42b3b8277..ccd965381 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -11,7 +11,7 @@ use std::fs; use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::net::{TcpListener, UdpSocket}; use std::path::PathBuf; -use types::{ChainSpec, EthSpec, GRAFFITI_BYTES_LEN}; +use types::{ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, GRAFFITI_BYTES_LEN}; pub const BEACON_NODE_DIR: &str = "beacon"; pub const NETWORK_DIR: &str = "network"; @@ -270,6 +270,41 @@ pub fn get_config( client_config.graffiti[..trimmed_graffiti_len] .copy_from_slice(&raw_graffiti[..trimmed_graffiti_len]); + if let Some(wss_checkpoint) = cli_args.value_of("wss-checkpoint") { + let mut split = wss_checkpoint.split(':'); + let root_str = split + .next() + .ok_or_else(|| "Improperly formatted weak subjectivity checkpoint".to_string())?; + let epoch_str = split + .next() + .ok_or_else(|| "Improperly formatted weak subjectivity checkpoint".to_string())?; + + if !root_str.starts_with("0x") { + return Err( + "Unable to parse weak subjectivity checkpoint root, must have 0x prefix" + .to_string(), + ); + } + + if !root_str.chars().count() == 66 { + return Err( + "Unable to parse weak subjectivity checkpoint root, must have 32 bytes".to_string(), + ); + } + + let root = + Hash256::from_slice(&hex::decode(&root_str[2..]).map_err(|e| { + format!("Unable to parse weak subjectivity checkpoint root: {:?}", e) + })?); + let epoch = Epoch::new( + epoch_str + .parse() + .map_err(|_| "Invalid weak subjectivity checkpoint epoch".to_string())?, + ); + + client_config.chain.weak_subjectivity_checkpoint = Some(Checkpoint { epoch, root }) + } + if let Some(max_skip_slots) = cli_args.value_of("max-skip-slots") { client_config.chain.import_max_skip_slots = match max_skip_slots { "none" => None, diff --git a/consensus/fork_choice/Cargo.toml b/consensus/fork_choice/Cargo.toml index b398364c4..e07111407 100644 --- a/consensus/fork_choice/Cargo.toml +++ b/consensus/fork_choice/Cargo.toml @@ -18,3 +18,4 @@ beacon_chain = { path = "../../beacon_node/beacon_chain" } store = { path = "../../beacon_node/store" } tree_hash = { path = "../../consensus/tree_hash" } slot_clock = { path = "../../common/slot_clock" } +hex = "0.4.2" diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index ffa9cbe6b..712b653b7 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -4,17 +4,19 @@ use beacon_chain::{ test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, NullMigratorEphemeralHarnessType, }, - BeaconChain, BeaconChainError, BeaconForkChoiceStore, ForkChoiceError, StateSkipConfig, + BeaconChain, BeaconChainError, BeaconForkChoiceStore, ChainConfig, ForkChoiceError, + StateSkipConfig, }; use fork_choice::{ ForkChoiceStore, InvalidAttestation, InvalidBlock, QueuedAttestation, SAFE_SLOTS_TO_UPDATE_JUSTIFIED, }; +use std::fmt; use std::sync::Mutex; use store::{MemoryStore, StoreConfig}; use types::{ test_utils::{generate_deterministic_keypair, generate_deterministic_keypairs}, - Epoch, EthSpec, IndexedAttestation, MainnetEthSpec, Slot, SubnetId, + Checkpoint, Epoch, EthSpec, IndexedAttestation, MainnetEthSpec, Slot, SubnetId, }; use types::{BeaconBlock, BeaconState, Hash256, SignedBeaconBlock}; @@ -35,6 +37,13 @@ struct ForkChoiceTest { harness: BeaconChainHarness>, } +/// Allows us to use `unwrap` in some cases. +impl fmt::Debug for ForkChoiceTest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ForkChoiceTest").finish() + } +} + impl ForkChoiceTest { /// Creates a new tester. pub fn new() -> Self { @@ -49,6 +58,20 @@ impl ForkChoiceTest { Self { harness } } + /// Creates a new tester with a custom chain config. + pub fn new_with_chain_config(chain_config: ChainConfig) -> Self { + let harness = BeaconChainHarness::new_with_chain_config( + MainnetEthSpec, + generate_deterministic_keypairs(VALIDATOR_COUNT), + // Ensure we always have an aggregator for each slot. + u64::max_value(), + StoreConfig::default(), + chain_config, + ); + + Self { harness } + } + /// Get a value from the `ForkChoice` instantiation. fn get(&self, func: T) -> U where @@ -87,6 +110,36 @@ impl ForkChoiceTest { self } + /// Assert the given slot is greater than the head slot. + pub fn assert_finalized_epoch_is_less_than(self, epoch: Epoch) -> Self { + assert!( + self.harness + .chain + .head_info() + .unwrap() + .finalized_checkpoint + .epoch + < epoch + ); + self + } + + /// Assert there was a shutdown signal sent by the beacon chain. + pub fn assert_shutdown_signal_sent(mut self) -> Self { + self.harness.shutdown_receiver.close(); + let msg = self.harness.shutdown_receiver.try_next().unwrap(); + assert!(msg.is_some()); + self + } + + /// Assert no shutdown was signal sent by the beacon chain. + pub fn assert_shutdown_signal_not_sent(mut self) -> Self { + self.harness.shutdown_receiver.close(); + let msg = self.harness.shutdown_receiver.try_next().unwrap(); + assert!(msg.is_none()); + self + } + /// Inspect the queued attestations in fork choice. pub fn inspect_queued_attestations(self, mut func: F) -> Self where @@ -116,8 +169,8 @@ impl ForkChoiceTest { self } - /// Build the chain whilst `predicate` returns `true`. - pub fn apply_blocks_while(mut self, mut predicate: F) -> Self + /// Build the chain whilst `predicate` returns `true` and `process_block_result` does not error. + pub fn apply_blocks_while(mut self, mut predicate: F) -> Result where F: FnMut(&BeaconBlock, &BeaconState) -> bool, { @@ -131,13 +184,16 @@ impl ForkChoiceTest { if !predicate(&block.message, &state) { break; } - let block_hash = self.harness.process_block(slot, block.clone()); - self.harness - .attest_block(&state, block_hash, &block, &validators); - self.harness.advance_slot(); + if let Ok(block_hash) = self.harness.process_block_result(slot, block.clone()) { + self.harness + .attest_block(&state, block_hash, &block, &validators); + self.harness.advance_slot(); + } else { + return Err(self); + } } - self + Ok(self) } /// Apply `count` blocks to the chain (with attestations). @@ -409,6 +465,7 @@ fn is_safe_to_update(slot: Slot) -> bool { fn justified_checkpoint_updates_with_descendent_inside_safe_slots() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch == 0) + .unwrap() .move_inside_safe_to_update() .assert_justified_epoch(0) .apply_blocks(1) @@ -422,6 +479,7 @@ fn justified_checkpoint_updates_with_descendent_inside_safe_slots() { fn justified_checkpoint_updates_with_descendent_outside_safe_slots() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch <= 2) + .unwrap() .move_outside_safe_to_update() .assert_justified_epoch(2) .assert_best_justified_epoch(2) @@ -436,6 +494,7 @@ fn justified_checkpoint_updates_with_descendent_outside_safe_slots() { fn justified_checkpoint_updates_first_justification_outside_safe_to_update() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch == 0) + .unwrap() .move_to_next_unsafe_period() .assert_justified_epoch(0) .assert_best_justified_epoch(0) @@ -451,6 +510,7 @@ fn justified_checkpoint_updates_first_justification_outside_safe_to_update() { fn justified_checkpoint_updates_with_non_descendent_inside_safe_slots_without_finality() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .move_inside_safe_to_update() .assert_justified_epoch(2) @@ -476,6 +536,7 @@ fn justified_checkpoint_updates_with_non_descendent_inside_safe_slots_without_fi fn justified_checkpoint_updates_with_non_descendent_outside_safe_slots_without_finality() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .move_to_next_unsafe_period() .assert_justified_epoch(2) @@ -501,6 +562,7 @@ fn justified_checkpoint_updates_with_non_descendent_outside_safe_slots_without_f fn justified_checkpoint_updates_with_non_descendent_outside_safe_slots_with_finality() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .move_to_next_unsafe_period() .assert_justified_epoch(2) @@ -524,6 +586,7 @@ fn justified_checkpoint_updates_with_non_descendent_outside_safe_slots_with_fina fn justified_balances() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.current_justified_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .assert_justified_epoch(2) .check_justified_balances() @@ -590,6 +653,7 @@ fn invalid_block_future_slot() { fn invalid_block_finalized_slot() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .apply_invalid_block_directly_to_fork_choice( |block, _| { @@ -619,6 +683,7 @@ fn invalid_block_finalized_descendant() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .assert_finalized_epoch(2) .apply_invalid_block_directly_to_fork_choice( @@ -904,6 +969,232 @@ fn valid_attestation_skip_across_epoch() { fn can_read_finalized_block() { ForkChoiceTest::new() .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() .apply_blocks(1) .check_finalized_block_is_accessible(); } + +#[test] +#[should_panic] +fn weak_subjectivity_fail_on_startup() { + let epoch = Epoch::new(0); + let root = Hash256::from_low_u64_le(1); + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(Checkpoint { epoch, root }), + import_max_skip_slots: None, + }; + + ForkChoiceTest::new_with_chain_config(chain_config); +} + +#[test] +fn weak_subjectivity_pass_on_startup() { + let epoch = Epoch::new(0); + let root = Hash256::zero(); + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(Checkpoint { epoch, root }), + import_max_skip_slots: None, + }; + + ForkChoiceTest::new_with_chain_config(chain_config) + .apply_blocks(E::slots_per_epoch() as usize) + .assert_shutdown_signal_not_sent(); +} + +#[test] +fn weak_subjectivity_check_passes() { + let setup_harness = ForkChoiceTest::new() + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(2); + + let checkpoint = setup_harness + .harness + .chain + .head_info() + .unwrap() + .finalized_checkpoint; + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(checkpoint), + import_max_skip_slots: None, + }; + + ForkChoiceTest::new_with_chain_config(chain_config.clone()) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(2) + .assert_shutdown_signal_not_sent(); +} + +#[test] +fn weak_subjectivity_check_fails_early_epoch() { + let setup_harness = ForkChoiceTest::new() + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(2); + + let mut checkpoint = setup_harness + .harness + .chain + .head_info() + .unwrap() + .finalized_checkpoint; + + checkpoint.epoch = checkpoint.epoch - 1; + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(checkpoint), + import_max_skip_slots: None, + }; + + ForkChoiceTest::new_with_chain_config(chain_config.clone()) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 3) + .unwrap_err() + .assert_finalized_epoch_is_less_than(checkpoint.epoch) + .assert_shutdown_signal_sent(); +} + +#[test] +fn weak_subjectivity_check_fails_late_epoch() { + let setup_harness = ForkChoiceTest::new() + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(2); + + let mut checkpoint = setup_harness + .harness + .chain + .head_info() + .unwrap() + .finalized_checkpoint; + + checkpoint.epoch = checkpoint.epoch + 1; + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(checkpoint), + import_max_skip_slots: None, + }; + + ForkChoiceTest::new_with_chain_config(chain_config.clone()) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 4) + .unwrap_err() + .assert_finalized_epoch_is_less_than(checkpoint.epoch) + .assert_shutdown_signal_sent(); +} + +#[test] +fn weak_subjectivity_check_fails_incorrect_root() { + let setup_harness = ForkChoiceTest::new() + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(2); + + let mut checkpoint = setup_harness + .harness + .chain + .head_info() + .unwrap() + .finalized_checkpoint; + + checkpoint.root = Hash256::zero(); + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(checkpoint), + import_max_skip_slots: None, + }; + + ForkChoiceTest::new_with_chain_config(chain_config.clone()) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 3) + .unwrap_err() + .assert_finalized_epoch_is_less_than(checkpoint.epoch) + .assert_shutdown_signal_sent(); +} + +#[test] +fn weak_subjectivity_check_epoch_boundary_is_skip_slot() { + let setup_harness = ForkChoiceTest::new() + // first two epochs + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap(); + + // get the head, it will become the finalized root of epoch 4 + let checkpoint_root = setup_harness.harness.chain.head_info().unwrap().block_root; + + setup_harness + // epoch 3 will be entirely skip slots + .skip_slots(E::slots_per_epoch() as usize) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 5) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(5); + + // the checkpoint at epoch 4 should become the root of last block of epoch 2 + let checkpoint = Checkpoint { + epoch: Epoch::new(4), + root: checkpoint_root, + }; + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(checkpoint), + import_max_skip_slots: None, + }; + + // recreate the chain exactly + ForkChoiceTest::new_with_chain_config(chain_config.clone()) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .skip_slots(E::slots_per_epoch() as usize) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 5) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(5) + .assert_shutdown_signal_not_sent(); +} + +#[test] +fn weak_subjectivity_check_epoch_boundary_is_skip_slot_failure() { + let setup_harness = ForkChoiceTest::new() + // first two epochs + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap(); + + // get the head, it will become the finalized root of epoch 4 + let checkpoint_root = setup_harness.harness.chain.head_info().unwrap().block_root; + + setup_harness + // epoch 3 will be entirely skip slots + .skip_slots(E::slots_per_epoch() as usize) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 5) + .unwrap() + .apply_blocks(1) + .assert_finalized_epoch(5); + + // Invalid checkpoint (epoch too early) + let checkpoint = Checkpoint { + epoch: Epoch::new(1), + root: checkpoint_root, + }; + + let chain_config = ChainConfig { + weak_subjectivity_checkpoint: Some(checkpoint), + import_max_skip_slots: None, + }; + + // recreate the chain exactly + ForkChoiceTest::new_with_chain_config(chain_config.clone()) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch == 0) + .unwrap() + .skip_slots(E::slots_per_epoch() as usize) + .apply_blocks_while(|_, state| state.finalized_checkpoint.epoch < 6) + .unwrap_err() + .assert_finalized_epoch_is_less_than(checkpoint.epoch) + .assert_shutdown_signal_sent(); +}