From 20ebf1f3c111197b92e8f065880b03623645a1a4 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 25 Jul 2022 23:53:26 +0000 Subject: [PATCH] Realized unrealized experimentation (#3322) ## Issue Addressed Add a flag that optionally enables unrealized vote tracking. Would like to test out on testnets and benchmark differences in methods of vote tracking. This PR includes a DB schema upgrade to enable to new vote tracking style. Co-authored-by: realbigsean Co-authored-by: Paul Hauner Co-authored-by: sean Co-authored-by: Mac L --- Cargo.lock | 2 + beacon_node/beacon_chain/src/beacon_chain.rs | 16 +- .../src/beacon_fork_choice_store.rs | 36 +- .../beacon_chain/src/block_verification.rs | 4 + beacon_node/beacon_chain/src/builder.rs | 1 + beacon_node/beacon_chain/src/chain_config.rs | 2 + beacon_node/beacon_chain/src/fork_revert.rs | 16 +- beacon_node/beacon_chain/src/lib.rs | 2 +- .../beacon_chain/src/persisted_fork_choice.rs | 10 +- beacon_node/beacon_chain/src/schema_change.rs | 31 +- .../src/schema_change/migration_schema_v10.rs | 97 ++++ .../src/schema_change/migration_schema_v7.rs | 14 +- .../beacon_chain/src/schema_change/types.rs | 147 +++++- beacon_node/beacon_chain/src/test_utils.rs | 15 +- .../beacon_chain/tests/block_verification.rs | 70 ++- .../tests/payload_invalidation.rs | 13 +- beacon_node/beacon_chain/tests/store_tests.rs | 3 +- beacon_node/beacon_chain/tests/tests.rs | 10 +- beacon_node/http_api/src/lib.rs | 9 +- .../beacon_processor/worker/gossip_methods.rs | 9 +- .../beacon_processor/worker/sync_methods.rs | 25 +- beacon_node/network/src/sync/manager.rs | 2 +- .../network/src/sync/range_sync/chain.rs | 13 +- .../src/sync/range_sync/chain_collection.rs | 7 +- beacon_node/src/cli.rs | 8 + beacon_node/src/config.rs | 4 + beacon_node/store/src/metadata.rs | 2 +- consensus/fork_choice/Cargo.toml | 1 + consensus/fork_choice/src/fork_choice.rs | 423 +++++++++++++----- .../fork_choice/src/fork_choice_store.rs | 12 + consensus/fork_choice/src/lib.rs | 6 +- consensus/fork_choice/tests/tests.rs | 7 +- .../src/fork_choice_test_definition.rs | 21 +- consensus/proto_array/src/proto_array.rs | 168 ++++--- .../src/proto_array_fork_choice.rs | 83 ++-- .../src/per_epoch_processing.rs | 2 + .../src/per_epoch_processing/altair.rs | 4 +- .../altair/justification_and_finalization.rs | 14 +- .../src/per_epoch_processing/base.rs | 4 +- .../base/justification_and_finalization.rs | 14 +- .../justification_and_finalization_state.rs | 115 +++++ .../weigh_justification_and_finalization.rs | 14 +- consensus/types/src/test_utils/test_random.rs | 1 + lcli/Cargo.toml | 1 + lcli/src/parse_ssz.rs | 19 +- .../ef_tests/src/cases/epoch_processing.rs | 24 +- testing/ef_tests/src/cases/fork_choice.rs | 91 +++- 47 files changed, 1254 insertions(+), 338 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs create mode 100644 consensus/state_processing/src/per_epoch_processing/justification_and_finalization_state.rs diff --git a/Cargo.lock b/Cargo.lock index ab75fe2ae..adffa23f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2125,6 +2125,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "proto_array", + "state_processing", "store", "tokio", "types", @@ -3008,6 +3009,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "snap", "state_processing", "tree_hash", "types", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c18f4a737..b9f9727e4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -93,6 +93,7 @@ use types::beacon_state::CloneConfig; use types::*; pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; +pub use fork_choice::CountUnrealized; pub type ForkChoiceError = fork_choice::Error; @@ -1740,6 +1741,7 @@ impl BeaconChain { self.slot()?, verified.indexed_attestation(), AttestationFromBlock::False, + &self.spec, ) .map_err(Into::into) } @@ -2220,6 +2222,7 @@ impl BeaconChain { pub async fn process_chain_segment( self: &Arc, chain_segment: Vec>>, + count_unrealized: CountUnrealized, ) -> ChainSegmentResult { let mut imported_blocks = 0; @@ -2284,7 +2287,10 @@ impl BeaconChain { // Import the blocks into the chain. for signature_verified_block in signature_verified_blocks { - match self.process_block(signature_verified_block).await { + match self + .process_block(signature_verified_block, count_unrealized) + .await + { Ok(_) => imported_blocks += 1, Err(error) => { return ChainSegmentResult::Failed { @@ -2368,6 +2374,7 @@ impl BeaconChain { pub async fn process_block>( self: &Arc, unverified_block: B, + count_unrealized: CountUnrealized, ) -> Result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -2383,7 +2390,7 @@ impl BeaconChain { let import_block = async move { let execution_pending = unverified_block.into_execution_pending_block(&chain)?; chain - .import_execution_pending_block(execution_pending) + .import_execution_pending_block(execution_pending, count_unrealized) .await }; @@ -2441,6 +2448,7 @@ impl BeaconChain { async fn import_execution_pending_block( self: Arc, execution_pending_block: ExecutionPendingBlock, + count_unrealized: CountUnrealized, ) -> Result> { let ExecutionPendingBlock { block, @@ -2499,6 +2507,7 @@ impl BeaconChain { state, confirmed_state_roots, payload_verification_status, + count_unrealized, ) }, "payload_verification_handle", @@ -2520,6 +2529,7 @@ impl BeaconChain { mut state: BeaconState, confirmed_state_roots: Vec, payload_verification_status: PayloadVerificationStatus, + count_unrealized: CountUnrealized, ) -> Result> { let current_slot = self.slot()?; let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); @@ -2665,6 +2675,7 @@ impl BeaconChain { &state, payload_verification_status, &self.spec, + count_unrealized.and(self.config.count_unrealized.into()), ) .map_err(|e| BlockError::BeaconChainError(e.into()))?; } @@ -2690,6 +2701,7 @@ impl BeaconChain { current_slot, &indexed_attestation, AttestationFromBlock::True, + &self.spec, ) { Ok(()) => Ok(()), // Ignore invalid attestations whilst importing attestations from a block. The diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index c7663c77c..0d65b8aa6 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -155,6 +155,8 @@ pub struct BeaconForkChoiceStore, Cold: ItemStore< justified_checkpoint: Checkpoint, justified_balances: Vec, best_justified_checkpoint: Checkpoint, + unrealized_justified_checkpoint: Checkpoint, + unrealized_finalized_checkpoint: Checkpoint, proposer_boost_root: Hash256, _phantom: PhantomData, } @@ -201,6 +203,8 @@ where justified_balances: anchor_state.balances().clone().into(), finalized_checkpoint, best_justified_checkpoint: justified_checkpoint, + unrealized_justified_checkpoint: justified_checkpoint, + unrealized_finalized_checkpoint: finalized_checkpoint, proposer_boost_root: Hash256::zero(), _phantom: PhantomData, } @@ -216,6 +220,8 @@ where justified_checkpoint: self.justified_checkpoint, justified_balances: self.justified_balances.clone(), best_justified_checkpoint: self.best_justified_checkpoint, + unrealized_justified_checkpoint: self.unrealized_justified_checkpoint, + unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint, proposer_boost_root: self.proposer_boost_root, } } @@ -233,6 +239,8 @@ where justified_checkpoint: persisted.justified_checkpoint, justified_balances: persisted.justified_balances, best_justified_checkpoint: persisted.best_justified_checkpoint, + unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint, + unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint, proposer_boost_root: persisted.proposer_boost_root, _phantom: PhantomData, }) @@ -280,6 +288,14 @@ where &self.finalized_checkpoint } + fn unrealized_justified_checkpoint(&self) -> &Checkpoint { + &self.unrealized_justified_checkpoint + } + + fn unrealized_finalized_checkpoint(&self) -> &Checkpoint { + &self.unrealized_finalized_checkpoint + } + fn proposer_boost_root(&self) -> Hash256 { self.proposer_boost_root } @@ -323,6 +339,14 @@ where self.best_justified_checkpoint = checkpoint } + fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint) { + self.unrealized_justified_checkpoint = checkpoint; + } + + fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint) { + self.unrealized_finalized_checkpoint = checkpoint; + } + fn set_proposer_boost_root(&mut self, proposer_boost_root: Hash256) { self.proposer_boost_root = proposer_boost_root; } @@ -330,22 +354,26 @@ where /// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database. #[superstruct( - variants(V1, V7, V8), + variants(V1, V7, V8, V10), variant_attributes(derive(Encode, Decode)), no_enum )] pub struct PersistedForkChoiceStore { #[superstruct(only(V1, V7))] pub balances_cache: BalancesCacheV1, - #[superstruct(only(V8))] + #[superstruct(only(V8, V10))] pub balances_cache: BalancesCacheV8, pub time: Slot, pub finalized_checkpoint: Checkpoint, pub justified_checkpoint: Checkpoint, pub justified_balances: Vec, pub best_justified_checkpoint: Checkpoint, - #[superstruct(only(V7, V8))] + #[superstruct(only(V10))] + pub unrealized_justified_checkpoint: Checkpoint, + #[superstruct(only(V10))] + pub unrealized_finalized_checkpoint: Checkpoint, + #[superstruct(only(V7, V8, V10))] pub proposer_boost_root: Hash256, } -pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV8; +pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV10; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index c8341cd60..0031bd2c6 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1416,6 +1416,10 @@ fn check_block_against_finalized_slot( block_root: Hash256, chain: &BeaconChain, ) -> Result<(), BlockError> { + // The finalized checkpoint is being read from fork choice, rather than the cached head. + // + // Fork choice has the most up-to-date view of finalization and there's no point importing a + // block which conflicts with the fork-choice view of finalization. let finalized_slot = chain .canonical_head .cached_head() diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index cef33ee4f..252b7cef5 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -647,6 +647,7 @@ where store.clone(), Some(current_slot), &self.spec, + self.chain_config.count_unrealized.into(), )?; } diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 36c2f41d9..d5e3d1981 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -24,6 +24,7 @@ pub struct ChainConfig { /// /// If set to 0 then block proposal will not wait for fork choice at all. pub fork_choice_before_proposal_timeout_ms: u64, + pub count_unrealized: bool, } impl Default for ChainConfig { @@ -35,6 +36,7 @@ impl Default for ChainConfig { enable_lock_timeouts: true, max_network_size: 10 * 1_048_576, // 10M fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT, + count_unrealized: false, } } } diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index fc89429d3..1d2787d98 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -1,5 +1,5 @@ use crate::{BeaconForkChoiceStore, BeaconSnapshot}; -use fork_choice::{ForkChoice, PayloadVerificationStatus}; +use fork_choice::{CountUnrealized, ForkChoice, PayloadVerificationStatus}; use itertools::process_results; use slog::{info, warn, Logger}; use state_processing::state_advance::complete_state_advance; @@ -99,6 +99,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It store: Arc>, current_slot: Option, spec: &ChainSpec, + count_unrealized_config: CountUnrealized, ) -> Result, E>, String> { // Fetch finalized block. let finalized_checkpoint = head_state.finalized_checkpoint(); @@ -163,7 +164,8 @@ pub fn reset_fork_choice_to_finalization, Cold: It .map_err(|e| format!("Error loading blocks to replay for fork choice: {:?}", e))?; let mut state = finalized_snapshot.beacon_state; - for block in blocks { + let blocks_len = blocks.len(); + for (i, block) in blocks.into_iter().enumerate() { complete_state_advance(&mut state, None, block.slot(), spec) .map_err(|e| format!("State advance failed: {:?}", e))?; @@ -183,6 +185,15 @@ pub fn reset_fork_choice_to_finalization, Cold: It // This scenario is so rare that it seems OK to double-verify some blocks. let payload_verification_status = PayloadVerificationStatus::Optimistic; + // Because we are replaying a single chain of blocks, we only need to calculate unrealized + // justification for the last block in the chain. + let is_last_block = i + 1 == blocks_len; + let count_unrealized = if is_last_block { + count_unrealized_config + } else { + CountUnrealized::False + }; + fork_choice .on_block( block.slot(), @@ -193,6 +204,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It &state, payload_verification_status, spec, + count_unrealized, ) .map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?; } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 728057c90..9cb734f2a 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -44,7 +44,7 @@ mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + CountUnrealized, ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/persisted_fork_choice.rs b/beacon_node/beacon_chain/src/persisted_fork_choice.rs index eb4c76191..eb5078df2 100644 --- a/beacon_node/beacon_chain/src/persisted_fork_choice.rs +++ b/beacon_node/beacon_chain/src/persisted_fork_choice.rs @@ -1,5 +1,6 @@ use crate::beacon_fork_choice_store::{ - PersistedForkChoiceStoreV1, PersistedForkChoiceStoreV7, PersistedForkChoiceStoreV8, + PersistedForkChoiceStoreV1, PersistedForkChoiceStoreV10, PersistedForkChoiceStoreV7, + PersistedForkChoiceStoreV8, }; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -7,10 +8,10 @@ use store::{DBColumn, Error, StoreItem}; use superstruct::superstruct; // If adding a new version you should update this type alias and fix the breakages. -pub type PersistedForkChoice = PersistedForkChoiceV8; +pub type PersistedForkChoice = PersistedForkChoiceV10; #[superstruct( - variants(V1, V7, V8), + variants(V1, V7, V8, V10), variant_attributes(derive(Encode, Decode)), no_enum )] @@ -22,6 +23,8 @@ pub struct PersistedForkChoice { pub fork_choice_store: PersistedForkChoiceStoreV7, #[superstruct(only(V8))] pub fork_choice_store: PersistedForkChoiceStoreV8, + #[superstruct(only(V10))] + pub fork_choice_store: PersistedForkChoiceStoreV10, } macro_rules! impl_store_item { @@ -45,3 +48,4 @@ macro_rules! impl_store_item { impl_store_item!(PersistedForkChoiceV1); impl_store_item!(PersistedForkChoiceV7); impl_store_item!(PersistedForkChoiceV8); +impl_store_item!(PersistedForkChoiceV10); diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index a48f1d375..411ef947d 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -1,4 +1,5 @@ //! Utilities for managing database schema changes. +mod migration_schema_v10; mod migration_schema_v6; mod migration_schema_v7; mod migration_schema_v8; @@ -6,7 +7,9 @@ mod migration_schema_v9; mod types; use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY}; -use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7}; +use crate::persisted_fork_choice::{ + PersistedForkChoiceV1, PersistedForkChoiceV10, PersistedForkChoiceV7, PersistedForkChoiceV8, +}; use crate::types::ChainSpec; use slog::{warn, Logger}; use std::path::Path; @@ -130,6 +133,32 @@ pub fn migrate_schema( migration_schema_v9::downgrade_from_v9::(db.clone(), log)?; db.store_schema_version(to) } + (SchemaVersion(9), SchemaVersion(10)) => { + let mut ops = vec![]; + let fork_choice_opt = db.get_item::(&FORK_CHOICE_DB_KEY)?; + if let Some(fork_choice) = fork_choice_opt { + let updated_fork_choice = migration_schema_v10::update_fork_choice(fork_choice)?; + + ops.push(updated_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)); + } + + db.store_schema_version_atomically(to, ops)?; + + Ok(()) + } + (SchemaVersion(10), SchemaVersion(9)) => { + let mut ops = vec![]; + let fork_choice_opt = db.get_item::(&FORK_CHOICE_DB_KEY)?; + if let Some(fork_choice) = fork_choice_opt { + let updated_fork_choice = migration_schema_v10::downgrade_fork_choice(fork_choice)?; + + ops.push(updated_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)); + } + + db.store_schema_version_atomically(to, ops)?; + + Ok(()) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs new file mode 100644 index 000000000..70e000785 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs @@ -0,0 +1,97 @@ +use crate::beacon_fork_choice_store::{PersistedForkChoiceStoreV10, PersistedForkChoiceStoreV8}; +use crate::persisted_fork_choice::{PersistedForkChoiceV10, PersistedForkChoiceV8}; +use crate::schema_change::{ + types::{SszContainerV10, SszContainerV7}, + StoreError, +}; +use proto_array::core::SszContainer; +use ssz::{Decode, Encode}; + +pub fn update_fork_choice( + mut fork_choice: PersistedForkChoiceV8, +) -> Result { + let ssz_container_v7 = SszContainerV7::from_ssz_bytes( + &fork_choice.fork_choice.proto_array_bytes, + ) + .map_err(|e| { + StoreError::SchemaMigrationError(format!( + "Failed to decode ProtoArrayForkChoice during schema migration: {:?}", + e + )) + })?; + + // These transformations instantiate `node.unrealized_justified_checkpoint` and + // `node.unrealized_finalized_checkpoint` to `None`. + let ssz_container_v10: SszContainerV10 = ssz_container_v7.into(); + let ssz_container: SszContainer = ssz_container_v10.into(); + fork_choice.fork_choice.proto_array_bytes = ssz_container.as_ssz_bytes(); + + Ok(fork_choice.into()) +} + +pub fn downgrade_fork_choice( + mut fork_choice: PersistedForkChoiceV10, +) -> Result { + let ssz_container_v10 = SszContainerV10::from_ssz_bytes( + &fork_choice.fork_choice.proto_array_bytes, + ) + .map_err(|e| { + StoreError::SchemaMigrationError(format!( + "Failed to decode ProtoArrayForkChoice during schema migration: {:?}", + e + )) + })?; + + let ssz_container_v7: SszContainerV7 = ssz_container_v10.into(); + fork_choice.fork_choice.proto_array_bytes = ssz_container_v7.as_ssz_bytes(); + + Ok(fork_choice.into()) +} + +impl From for PersistedForkChoiceStoreV10 { + fn from(other: PersistedForkChoiceStoreV8) -> Self { + Self { + balances_cache: other.balances_cache, + time: other.time, + finalized_checkpoint: other.finalized_checkpoint, + justified_checkpoint: other.justified_checkpoint, + justified_balances: other.justified_balances, + best_justified_checkpoint: other.best_justified_checkpoint, + unrealized_justified_checkpoint: other.best_justified_checkpoint, + unrealized_finalized_checkpoint: other.finalized_checkpoint, + proposer_boost_root: other.proposer_boost_root, + } + } +} + +impl From for PersistedForkChoiceV10 { + fn from(other: PersistedForkChoiceV8) -> Self { + Self { + fork_choice: other.fork_choice, + fork_choice_store: other.fork_choice_store.into(), + } + } +} + +impl From for PersistedForkChoiceStoreV8 { + fn from(other: PersistedForkChoiceStoreV10) -> Self { + Self { + balances_cache: other.balances_cache, + time: other.time, + finalized_checkpoint: other.finalized_checkpoint, + justified_checkpoint: other.justified_checkpoint, + justified_balances: other.justified_balances, + best_justified_checkpoint: other.best_justified_checkpoint, + proposer_boost_root: other.proposer_boost_root, + } + } +} + +impl From for PersistedForkChoiceV8 { + fn from(other: PersistedForkChoiceV10) -> Self { + Self { + fork_choice: other.fork_choice, + fork_choice_store: other.fork_choice_store.into(), + } + } +} diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs index 9222266ba..81147b8af 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs @@ -2,7 +2,7 @@ use crate::beacon_chain::BeaconChainTypes; use crate::beacon_fork_choice_store::{PersistedForkChoiceStoreV1, PersistedForkChoiceStoreV7}; use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7}; -use crate::schema_change::types::{ProtoNodeV6, SszContainerV6, SszContainerV7}; +use crate::schema_change::types::{ProtoNodeV6, SszContainerV10, SszContainerV6, SszContainerV7}; use crate::types::{ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, Slot}; use crate::{BeaconForkChoiceStore, BeaconSnapshot}; use fork_choice::ForkChoice; @@ -86,7 +86,8 @@ pub(crate) fn update_fork_choice( // to `None`. let ssz_container_v7: SszContainerV7 = ssz_container_v6.into_ssz_container_v7(justified_checkpoint, finalized_checkpoint); - let ssz_container: SszContainer = ssz_container_v7.into(); + let ssz_container_v10: SszContainerV10 = ssz_container_v7.into(); + let ssz_container: SszContainer = ssz_container_v10.into(); let mut fork_choice: ProtoArrayForkChoice = ssz_container.into(); update_checkpoints::(finalized_checkpoint.root, &nodes_v6, &mut fork_choice, db) @@ -97,6 +98,13 @@ pub(crate) fn update_fork_choice( update_store_justified_checkpoint(persisted_fork_choice, &mut fork_choice) .map_err(StoreError::SchemaMigrationError)?; + // Need to downgrade the SSZ container to V7 so that all migrations can be applied in sequence. + let ssz_container = SszContainer::from(&fork_choice); + let ssz_container_v7 = SszContainerV7::from(ssz_container); + + persisted_fork_choice.fork_choice.proto_array_bytes = ssz_container_v7.as_ssz_bytes(); + persisted_fork_choice.fork_choice_store.justified_checkpoint = justified_checkpoint; + Ok(()) } @@ -301,8 +309,6 @@ fn update_store_justified_checkpoint( .ok_or("Proto node with current finalized checkpoint not found")?; fork_choice.core_proto_array_mut().justified_checkpoint = justified_checkpoint; - persisted_fork_choice.fork_choice.proto_array_bytes = fork_choice.as_bytes(); - persisted_fork_choice.fork_choice_store.justified_checkpoint = justified_checkpoint; Ok(()) } diff --git a/beacon_node/beacon_chain/src/schema_change/types.rs b/beacon_node/beacon_chain/src/schema_change/types.rs index 8d41a384f..02a54c1a3 100644 --- a/beacon_node/beacon_chain/src/schema_change/types.rs +++ b/beacon_node/beacon_chain/src/schema_change/types.rs @@ -12,7 +12,7 @@ four_byte_option_impl!(four_byte_option_usize, usize); four_byte_option_impl!(four_byte_option_checkpoint, Checkpoint); #[superstruct( - variants(V1, V6, V7), + variants(V1, V6, V7, V10), variant_attributes(derive(Clone, PartialEq, Debug, Encode, Decode)), no_enum )] @@ -30,18 +30,24 @@ pub struct ProtoNode { #[superstruct(only(V1, V6))] pub finalized_epoch: Epoch, #[ssz(with = "four_byte_option_checkpoint")] - #[superstruct(only(V7))] + #[superstruct(only(V7, V10))] pub justified_checkpoint: Option, #[ssz(with = "four_byte_option_checkpoint")] - #[superstruct(only(V7))] + #[superstruct(only(V7, V10))] pub finalized_checkpoint: Option, pub weight: u64, #[ssz(with = "four_byte_option_usize")] pub best_child: Option, #[ssz(with = "four_byte_option_usize")] pub best_descendant: Option, - #[superstruct(only(V6, V7))] + #[superstruct(only(V6, V7, V10))] pub execution_status: ExecutionStatus, + #[ssz(with = "four_byte_option_checkpoint")] + #[superstruct(only(V10))] + pub unrealized_justified_checkpoint: Option, + #[ssz(with = "four_byte_option_checkpoint")] + #[superstruct(only(V10))] + pub unrealized_finalized_checkpoint: Option, } impl Into for ProtoNodeV1 { @@ -88,9 +94,31 @@ impl Into for ProtoNodeV6 { } } -impl Into for ProtoNodeV7 { - fn into(self) -> ProtoNode { - ProtoNode { +impl Into for ProtoNodeV7 { + fn into(self) -> ProtoNodeV10 { + ProtoNodeV10 { + slot: self.slot, + state_root: self.state_root, + target_root: self.target_root, + current_epoch_shuffling_id: self.current_epoch_shuffling_id, + next_epoch_shuffling_id: self.next_epoch_shuffling_id, + root: self.root, + parent: self.parent, + justified_checkpoint: self.justified_checkpoint, + finalized_checkpoint: self.finalized_checkpoint, + weight: self.weight, + best_child: self.best_child, + best_descendant: self.best_descendant, + execution_status: self.execution_status, + unrealized_justified_checkpoint: None, + unrealized_finalized_checkpoint: None, + } + } +} + +impl Into for ProtoNodeV10 { + fn into(self) -> ProtoNodeV7 { + ProtoNodeV7 { slot: self.slot, state_root: self.state_root, target_root: self.target_root, @@ -108,8 +136,50 @@ impl Into for ProtoNodeV7 { } } +impl Into for ProtoNodeV10 { + fn into(self) -> ProtoNode { + ProtoNode { + slot: self.slot, + state_root: self.state_root, + target_root: self.target_root, + current_epoch_shuffling_id: self.current_epoch_shuffling_id, + next_epoch_shuffling_id: self.next_epoch_shuffling_id, + root: self.root, + parent: self.parent, + justified_checkpoint: self.justified_checkpoint, + finalized_checkpoint: self.finalized_checkpoint, + weight: self.weight, + best_child: self.best_child, + best_descendant: self.best_descendant, + execution_status: self.execution_status, + unrealized_justified_checkpoint: self.unrealized_justified_checkpoint, + unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint, + } + } +} + +impl From for ProtoNodeV7 { + fn from(container: ProtoNode) -> Self { + Self { + slot: container.slot, + state_root: container.state_root, + target_root: container.target_root, + current_epoch_shuffling_id: container.current_epoch_shuffling_id, + next_epoch_shuffling_id: container.next_epoch_shuffling_id, + root: container.root, + parent: container.parent, + justified_checkpoint: container.justified_checkpoint, + finalized_checkpoint: container.finalized_checkpoint, + weight: container.weight, + best_child: container.best_child, + best_descendant: container.best_descendant, + execution_status: container.execution_status, + } + } +} + #[superstruct( - variants(V1, V6, V7), + variants(V1, V6, V7, V10), variant_attributes(derive(Encode, Decode)), no_enum )] @@ -122,9 +192,9 @@ pub struct SszContainer { pub justified_epoch: Epoch, #[superstruct(only(V1, V6))] pub finalized_epoch: Epoch, - #[superstruct(only(V7))] + #[superstruct(only(V7, V10))] pub justified_checkpoint: Checkpoint, - #[superstruct(only(V7))] + #[superstruct(only(V7, V10))] pub finalized_checkpoint: Checkpoint, #[superstruct(only(V1))] pub nodes: Vec, @@ -132,8 +202,10 @@ pub struct SszContainer { pub nodes: Vec, #[superstruct(only(V7))] pub nodes: Vec, + #[superstruct(only(V10))] + pub nodes: Vec, pub indices: Vec<(Hash256, usize)>, - #[superstruct(only(V7))] + #[superstruct(only(V7, V10))] pub previous_proposer_boost: ProposerBoost, } @@ -174,7 +246,41 @@ impl SszContainerV6 { } } -impl Into for SszContainerV7 { +impl Into for SszContainerV7 { + fn into(self) -> SszContainerV10 { + let nodes = self.nodes.into_iter().map(Into::into).collect(); + + SszContainerV10 { + votes: self.votes, + balances: self.balances, + prune_threshold: self.prune_threshold, + justified_checkpoint: self.justified_checkpoint, + finalized_checkpoint: self.finalized_checkpoint, + nodes, + indices: self.indices, + previous_proposer_boost: self.previous_proposer_boost, + } + } +} + +impl Into for SszContainerV10 { + fn into(self) -> SszContainerV7 { + let nodes = self.nodes.into_iter().map(Into::into).collect(); + + SszContainerV7 { + votes: self.votes, + balances: self.balances, + prune_threshold: self.prune_threshold, + justified_checkpoint: self.justified_checkpoint, + finalized_checkpoint: self.finalized_checkpoint, + nodes, + indices: self.indices, + previous_proposer_boost: self.previous_proposer_boost, + } + } +} + +impl Into for SszContainerV10 { fn into(self) -> SszContainer { let nodes = self.nodes.into_iter().map(Into::into).collect(); @@ -190,3 +296,20 @@ impl Into for SszContainerV7 { } } } + +impl From for SszContainerV7 { + fn from(container: SszContainer) -> Self { + let nodes = container.nodes.into_iter().map(Into::into).collect(); + + Self { + votes: container.votes, + balances: container.balances, + prune_threshold: container.prune_threshold, + justified_checkpoint: container.justified_checkpoint, + finalized_checkpoint: container.finalized_checkpoint, + nodes, + indices: container.indices, + previous_proposer_boost: container.previous_proposer_boost, + } + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index e9dc8619a..1297e7d78 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -17,6 +17,7 @@ use execution_layer::{ test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK}, ExecutionLayer, }; +use fork_choice::CountUnrealized; use futures::channel::mpsc::Receiver; pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; @@ -1370,8 +1371,11 @@ where block: SignedBeaconBlock, ) -> Result> { self.set_current_slot(slot); - let block_hash: SignedBeaconBlockHash = - self.chain.process_block(Arc::new(block)).await?.into(); + let block_hash: SignedBeaconBlockHash = self + .chain + .process_block(Arc::new(block), CountUnrealized::True) + .await? + .into(); self.chain.recompute_head_at_current_slot().await?; Ok(block_hash) } @@ -1380,8 +1384,11 @@ where &self, block: SignedBeaconBlock, ) -> Result> { - let block_hash: SignedBeaconBlockHash = - self.chain.process_block(Arc::new(block)).await?.into(); + let block_hash: SignedBeaconBlockHash = self + .chain + .process_block(Arc::new(block), CountUnrealized::True) + .await? + .into(); self.chain.recompute_head_at_current_slot().await?; Ok(block_hash) } diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 4b3e1e72f..43dda7ab0 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -4,6 +4,7 @@ use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; use beacon_chain::{BeaconSnapshot, BlockError, ChainSegmentResult}; +use fork_choice::CountUnrealized; use lazy_static::lazy_static; use logging::test_logger; use slasher::{Config as SlasherConfig, Slasher}; @@ -147,14 +148,14 @@ async fn chain_segment_full_segment() { // Sneak in a little check to ensure we can process empty chain segments. harness .chain - .process_chain_segment(vec![]) + .process_chain_segment(vec![], CountUnrealized::True) .await .into_block_error() .expect("should import empty chain segment"); harness .chain - .process_chain_segment(blocks.clone()) + .process_chain_segment(blocks.clone(), CountUnrealized::True) .await .into_block_error() .expect("should import chain segment"); @@ -187,7 +188,7 @@ async fn chain_segment_varying_chunk_size() { for chunk in blocks.chunks(*chunk_size) { harness .chain - .process_chain_segment(chunk.to_vec()) + .process_chain_segment(chunk.to_vec(), CountUnrealized::True) .await .into_block_error() .unwrap_or_else(|_| panic!("should import chain segment of len {}", chunk_size)); @@ -227,7 +228,7 @@ async fn chain_segment_non_linear_parent_roots() { matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::NonLinearParentRoots) @@ -247,7 +248,7 @@ async fn chain_segment_non_linear_parent_roots() { matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::NonLinearParentRoots) @@ -278,7 +279,7 @@ async fn chain_segment_non_linear_slots() { matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::NonLinearSlots) @@ -299,7 +300,7 @@ async fn chain_segment_non_linear_slots() { matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::NonLinearSlots) @@ -325,7 +326,7 @@ async fn assert_invalid_signature( matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::InvalidSignature) @@ -342,12 +343,18 @@ async fn assert_invalid_signature( .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been // imported prior to this test. - let _ = harness.chain.process_chain_segment(ancestor_blocks).await; + let _ = harness + .chain + .process_chain_segment(ancestor_blocks, CountUnrealized::True) + .await; assert!( matches!( harness .chain - .process_block(snapshots[block_index].beacon_block.clone()) + .process_block( + snapshots[block_index].beacon_block.clone(), + CountUnrealized::True + ) .await, Err(BlockError::InvalidSignature) ), @@ -397,7 +404,7 @@ async fn invalid_signature_gossip_block() { .collect(); harness .chain - .process_chain_segment(ancestor_blocks) + .process_chain_segment(ancestor_blocks, CountUnrealized::True) .await .into_block_error() .expect("should import all blocks prior to the one being tested"); @@ -405,10 +412,10 @@ async fn invalid_signature_gossip_block() { matches!( harness .chain - .process_block(Arc::new(SignedBeaconBlock::from_block( - block, - junk_signature() - ))) + .process_block( + Arc::new(SignedBeaconBlock::from_block(block, junk_signature())), + CountUnrealized::True + ) .await, Err(BlockError::InvalidSignature) ), @@ -441,7 +448,7 @@ async fn invalid_signature_block_proposal() { matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::InvalidSignature) @@ -639,7 +646,7 @@ async fn invalid_signature_deposit() { !matches!( harness .chain - .process_chain_segment(blocks) + .process_chain_segment(blocks, CountUnrealized::True) .await .into_block_error(), Err(BlockError::InvalidSignature) @@ -716,11 +723,18 @@ async fn block_gossip_verification() { harness .chain - .process_block(gossip_verified) + .process_block(gossip_verified, CountUnrealized::True) .await .expect("should import valid gossip verified block"); } + // Recompute the head to ensure we cache the latest view of fork choice. + harness + .chain + .recompute_head_at_current_slot() + .await + .unwrap(); + /* * This test ensures that: * @@ -978,7 +992,11 @@ async fn verify_block_for_gossip_slashing_detection() { .verify_block_for_gossip(Arc::new(block1)) .await .unwrap(); - harness.chain.process_block(verified_block).await.unwrap(); + harness + .chain + .process_block(verified_block, CountUnrealized::True) + .await + .unwrap(); unwrap_err( harness .chain @@ -1009,7 +1027,11 @@ async fn verify_block_for_gossip_doppelganger_detection() { .await .unwrap(); let attestations = verified_block.block.message().body().attestations().clone(); - harness.chain.process_block(verified_block).await.unwrap(); + harness + .chain + .process_block(verified_block, CountUnrealized::True) + .await + .unwrap(); for att in attestations.iter() { let epoch = att.data.target.epoch; @@ -1148,7 +1170,7 @@ async fn add_base_block_to_altair_chain() { assert!(matches!( harness .chain - .process_block(Arc::new(base_block.clone())) + .process_block(Arc::new(base_block.clone()), CountUnrealized::True) .await .err() .expect("should error when processing base block"), @@ -1162,7 +1184,7 @@ async fn add_base_block_to_altair_chain() { assert!(matches!( harness .chain - .process_chain_segment(vec![Arc::new(base_block)]) + .process_chain_segment(vec![Arc::new(base_block)], CountUnrealized::True) .await, ChainSegmentResult::Failed { imported_blocks: 0, @@ -1276,7 +1298,7 @@ async fn add_altair_block_to_base_chain() { assert!(matches!( harness .chain - .process_block(Arc::new(altair_block.clone())) + .process_block(Arc::new(altair_block.clone()), CountUnrealized::True) .await .err() .expect("should error when processing altair block"), @@ -1290,7 +1312,7 @@ async fn add_altair_block_to_base_chain() { assert!(matches!( harness .chain - .process_chain_segment(vec![Arc::new(altair_block)]) + .process_chain_segment(vec![Arc::new(altair_block)], CountUnrealized::True) .await, ChainSegmentResult::Failed { imported_blocks: 0, diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index a4e62cf96..f2ebb430d 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -9,7 +9,9 @@ use execution_layer::{ json_structures::{JsonForkChoiceStateV1, JsonPayloadAttributesV1}, ExecutionLayer, ForkChoiceState, PayloadAttributes, }; -use fork_choice::{Error as ForkChoiceError, InvalidationOperation, PayloadVerificationStatus}; +use fork_choice::{ + CountUnrealized, Error as ForkChoiceError, InvalidationOperation, PayloadVerificationStatus, +}; use proto_array::{Error as ProtoArrayError, ExecutionStatus}; use slot_clock::SlotClock; use std::sync::Arc; @@ -648,7 +650,7 @@ async fn invalidates_all_descendants() { let fork_block_root = rig .harness .chain - .process_block(Arc::new(fork_block)) + .process_block(Arc::new(fork_block), CountUnrealized::True) .await .unwrap(); rig.recompute_head().await; @@ -740,7 +742,7 @@ async fn switches_heads() { let fork_block_root = rig .harness .chain - .process_block(Arc::new(fork_block)) + .process_block(Arc::new(fork_block), CountUnrealized::True) .await .unwrap(); rig.recompute_head().await; @@ -984,7 +986,7 @@ async fn invalid_parent() { // Ensure the block built atop an invalid payload is invalid for import. assert!(matches!( - rig.harness.chain.process_block(block.clone()).await, + rig.harness.chain.process_block(block.clone(), CountUnrealized::True).await, Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root }) if invalid_root == parent_root )); @@ -998,7 +1000,8 @@ async fn invalid_parent() { Duration::from_secs(0), &state, PayloadVerificationStatus::Optimistic, - &rig.harness.chain.spec + &rig.harness.chain.spec, + CountUnrealized::True, ), Err(ForkChoiceError::ProtoArrayError(message)) if message.contains(&format!( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 560e865a8..b5b8152e8 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -10,6 +10,7 @@ use beacon_chain::{ BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, ServerSentEventHandler, WhenSlotSkipped, }; +use fork_choice::CountUnrealized; use lazy_static::lazy_static; use logging::test_logger; use maplit::hashset; @@ -2124,7 +2125,7 @@ async fn weak_subjectivity_sync() { beacon_chain.slot_clock.set_slot(block.slot().as_u64()); beacon_chain - .process_block(Arc::new(full_block)) + .process_block(Arc::new(full_block), CountUnrealized::True) .await .unwrap(); beacon_chain.recompute_head_at_current_slot().await.unwrap(); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index f98580db3..80a122976 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -8,6 +8,7 @@ use beacon_chain::{ }, BeaconChain, StateSkipConfig, WhenSlotSkipped, }; +use fork_choice::CountUnrealized; use lazy_static::lazy_static; use operation_pool::PersistedOperationPool; use state_processing::{ @@ -499,7 +500,7 @@ async fn unaggregated_attestations_added_to_fork_choice_some_none() { // Move forward a slot so all queued attestations can be processed. harness.advance_slot(); fork_choice - .update_time(harness.chain.slot().unwrap()) + .update_time(harness.chain.slot().unwrap(), &harness.chain.spec) .unwrap(); let validator_slots: Vec<(usize, Slot)> = (0..VALIDATOR_COUNT) @@ -613,7 +614,7 @@ async fn unaggregated_attestations_added_to_fork_choice_all_updated() { // Move forward a slot so all queued attestations can be processed. harness.advance_slot(); fork_choice - .update_time(harness.chain.slot().unwrap()) + .update_time(harness.chain.slot().unwrap(), &harness.chain.spec) .unwrap(); let validators: Vec = (0..VALIDATOR_COUNT).collect(); @@ -683,7 +684,10 @@ async fn run_skip_slot_test(skip_slots: u64) { assert_eq!( harness_b .chain - .process_block(harness_a.chain.head_snapshot().beacon_block.clone()) + .process_block( + harness_a.chain.head_snapshot().beacon_block.clone(), + CountUnrealized::True + ) .await .unwrap(), harness_a.chain.head_snapshot().beacon_block_root diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a27e5015c..31ae7486e 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -23,7 +23,7 @@ use beacon_chain::{ observed_operations::ObservationOutcome, validator_monitor::{get_block_delay_ms, timestamp_now}, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, - ProduceBlockVerification, WhenSlotSkipped, + CountUnrealized, ProduceBlockVerification, WhenSlotSkipped, }; pub use block_id::BlockId; use eth2::types::{self as api_types, EndpointVersion, ValidatorId}; @@ -1035,7 +1035,10 @@ pub fn serve( let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); - match chain.process_block(block.clone()).await { + match chain + .process_block(block.clone(), CountUnrealized::True) + .await + { Ok(root) => { info!( log, @@ -1179,7 +1182,7 @@ pub fn serve( PubsubMessage::BeaconBlock(new_block.clone()), )?; - match chain.process_block(new_block).await { + match chain.process_block(new_block, CountUnrealized::True).await { Ok(_) => { // Update the head since it's likely this block will become the new // head. diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index b88b58b8b..1b1dc12d8 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -6,7 +6,8 @@ use beacon_chain::{ observed_operations::ObservationOutcome, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::get_block_delay_ms, - BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, + BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError, + GossipVerifiedBlock, }; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use slog::{crit, debug, error, info, trace, warn}; @@ -899,7 +900,11 @@ impl Worker { ) { let block: Arc<_> = verified_block.block.clone(); - match self.chain.process_block(verified_block).await { + match self + .chain + .process_block(verified_block, CountUnrealized::True) + .await + { Ok(block_root) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 84e3c95c6..ffcadb868 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -7,10 +7,10 @@ use crate::beacon_processor::DuplicateCache; use crate::metrics; use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; -use beacon_chain::ExecutionPayloadError; use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, }; +use beacon_chain::{CountUnrealized, ExecutionPayloadError}; use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; use std::sync::Arc; @@ -21,7 +21,7 @@ use types::{Epoch, Hash256, SignedBeaconBlock}; #[derive(Clone, Debug, PartialEq)] pub enum ChainSegmentProcessId { /// Processing Id of a range syncing batch. - RangeBatchId(ChainId, Epoch), + RangeBatchId(ChainId, Epoch, CountUnrealized), /// Processing ID for a backfill syncing batch. BackSyncBatchId(Epoch), /// Processing Id of the parent lookup of a block. @@ -89,7 +89,7 @@ impl Worker { } }; let slot = block.slot(); - let result = self.chain.process_block(block).await; + let result = self.chain.process_block(block, CountUnrealized::True).await; metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); @@ -133,12 +133,15 @@ impl Worker { ) { let result = match sync_type { // this a request from the range sync - ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { + ChainSegmentProcessId::RangeBatchId(chain_id, epoch, count_unrealized) => { let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - match self.process_blocks(downloaded_blocks.iter()).await { + match self + .process_blocks(downloaded_blocks.iter(), count_unrealized) + .await + { (_, Ok(_)) => { debug!(self.log, "Batch processed"; "batch_epoch" => epoch, @@ -207,7 +210,10 @@ impl Worker { ); // parent blocks are ordered from highest slot to lowest, so we need to process in // reverse - match self.process_blocks(downloaded_blocks.iter().rev()).await { + match self + .process_blocks(downloaded_blocks.iter().rev(), CountUnrealized::True) + .await + { (imported_blocks, Err(e)) => { debug!(self.log, "Parent lookup failed"; "error" => %e.message); BatchProcessResult::Failed { @@ -231,9 +237,14 @@ impl Worker { async fn process_blocks<'a>( &self, downloaded_blocks: impl Iterator>>, + count_unrealized: CountUnrealized, ) -> (usize, Result<(), ChainSegmentFailed>) { let blocks: Vec> = downloaded_blocks.cloned().collect(); - match self.chain.process_chain_segment(blocks).await { + match self + .chain + .process_chain_segment(blocks, count_unrealized) + .await + { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); if imported_blocks > 0 { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d0919406b..fe27a33c5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -532,7 +532,7 @@ impl SyncManager { .parent_block_processed(chain_hash, result, &mut self.network), }, SyncMessage::BatchProcessed { sync_type, result } => match sync_type { - ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { + ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { self.range_sync.handle_block_process_result( &mut self.network, chain_id, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index ef5ba23e6..caa08165a 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,7 +2,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::beacon_processor::{ChainSegmentProcessId, FailureMode}; use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; -use beacon_chain::BeaconChainTypes; +use beacon_chain::{BeaconChainTypes, CountUnrealized}; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use rand::seq::SliceRandom; @@ -100,6 +100,8 @@ pub struct SyncingChain { /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: Sender>, + is_finalized_segment: bool, + /// The chain's log. log: slog::Logger, } @@ -126,6 +128,7 @@ impl SyncingChain { target_head_root: Hash256, peer_id: PeerId, beacon_processor_send: Sender>, + is_finalized_segment: bool, log: &slog::Logger, ) -> Self { let mut peers = FnvHashMap::default(); @@ -148,6 +151,7 @@ impl SyncingChain { current_processing_batch: None, validated_batches: 0, beacon_processor_send, + is_finalized_segment, log: log.new(o!("chain" => id)), } } @@ -302,7 +306,12 @@ impl SyncingChain { // for removing chains and checking completion is in the callback. let blocks = batch.start_processing()?; - let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); + let count_unrealized = if self.is_finalized_segment { + CountUnrealized::False + } else { + CountUnrealized::True + }; + let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); self.current_processing_batch = Some(batch_id); if let Err(e) = self diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 7ddfc3f70..e76adff3a 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -472,10 +472,10 @@ impl ChainCollection { network: &mut SyncNetworkContext, ) { let id = SyncingChain::::id(&target_head_root, &target_head_slot); - let collection = if let RangeSyncType::Finalized = sync_type { - &mut self.finalized_chains + let (collection, is_finalized) = if let RangeSyncType::Finalized = sync_type { + (&mut self.finalized_chains, true) } else { - &mut self.head_chains + (&mut self.head_chains, false) }; match collection.entry(id) { Entry::Occupied(mut entry) => { @@ -501,6 +501,7 @@ impl ChainCollection { target_head_root, peer, beacon_processor_send.clone(), + is_finalized, &self.log, ); debug_assert_eq!(new_chain.get_id(), id); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 964873a94..b36f154ae 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -708,4 +708,12 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("250") .takes_value(true) ) + .arg( + Arg::with_name("count-unrealized") + .long("count-unrealized") + .hidden(true) + .help("**EXPERIMENTAL** Enables an alternative, potentially more performant FFG \ + vote tracking method.") + .takes_value(false) + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index c91bd711e..fb0cbe0c9 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -630,6 +630,10 @@ pub fn get_config( client_config.chain.fork_choice_before_proposal_timeout_ms = timeout; } + if cli_args.is_present("count-unrealized") { + client_config.chain.count_unrealized = true; + } + Ok(client_config) } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 5551f1f44..235550ddd 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(10); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/consensus/fork_choice/Cargo.toml b/consensus/fork_choice/Cargo.toml index 429ab1b8c..b2570092e 100644 --- a/consensus/fork_choice/Cargo.toml +++ b/consensus/fork_choice/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] types = { path = "../types" } +state_processing = { path = "../state_processing" } proto_array = { path = "../proto_array" } eth2_ssz = "0.4.1" eth2_ssz_derive = "0.3.0" diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 984eeaada..c3a88433f 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1,6 +1,7 @@ use crate::{ForkChoiceStore, InvalidationOperation}; use proto_array::{Block as ProtoBlock, ExecutionStatus, ProtoArrayForkChoice}; use ssz_derive::{Decode, Encode}; +use state_processing::per_epoch_processing; use std::cmp::Ordering; use std::marker::PhantomData; use std::time::Duration; @@ -51,6 +52,9 @@ pub enum Error { MissingFinalizedBlock { finalized_checkpoint: Checkpoint, }, + UnrealizedVoteProcessing(state_processing::EpochProcessingError), + ParticipationCacheBuild(BeaconStateError), + ValidatorStatuses(BeaconStateError), } impl From for Error { @@ -59,6 +63,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: state_processing::EpochProcessingError) -> Self { + Error::UnrealizedVoteProcessing(e) + } +} + #[derive(Debug)] pub enum InvalidBlock { UnknownParent(Hash256), @@ -114,6 +124,66 @@ impl From for Error { } } +/// Indicates whether the unrealized justification of a block should be calculated and tracked. +/// If a block has been finalized, this can be set to false. This is useful when syncing finalized +/// portions of the chain. Otherwise this should always be set to true. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum CountUnrealized { + True, + False, +} + +impl CountUnrealized { + pub fn is_true(&self) -> bool { + matches!(self, CountUnrealized::True) + } + + pub fn and(&self, other: CountUnrealized) -> CountUnrealized { + if self.is_true() && other.is_true() { + CountUnrealized::True + } else { + CountUnrealized::False + } + } +} + +impl From for CountUnrealized { + fn from(count_unrealized: bool) -> Self { + if count_unrealized { + CountUnrealized::True + } else { + CountUnrealized::False + } + } +} + +#[derive(Copy, Clone)] +enum UpdateJustifiedCheckpointSlots { + OnTick { + current_slot: Slot, + }, + OnBlock { + state_slot: Slot, + current_slot: Slot, + }, +} + +impl UpdateJustifiedCheckpointSlots { + fn current_slot(&self) -> Slot { + match self { + UpdateJustifiedCheckpointSlots::OnTick { current_slot } => *current_slot, + UpdateJustifiedCheckpointSlots::OnBlock { current_slot, .. } => *current_slot, + } + } + + fn state_slot(&self) -> Option { + match self { + UpdateJustifiedCheckpointSlots::OnTick { .. } => None, + UpdateJustifiedCheckpointSlots::OnBlock { state_slot, .. } => Some(*state_slot), + } + } +} + /// Indicates if a block has been verified by an execution payload. /// /// There is no variant for "invalid", since such a block should never be added to fork choice. @@ -162,51 +232,6 @@ fn compute_start_slot_at_epoch(epoch: Epoch) -> Slot { epoch.start_slot(E::slots_per_epoch()) } -/// Called whenever the current time increases. -/// -/// ## Specification -/// -/// Equivalent to: -/// -/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#on_tick -fn on_tick(store: &mut T, time: Slot) -> Result<(), Error> -where - T: ForkChoiceStore, - E: EthSpec, -{ - let previous_slot = store.get_current_slot(); - - if time > previous_slot + 1 { - return Err(Error::InconsistentOnTick { - previous_slot, - time, - }); - } - - // Update store time. - store.set_current_slot(time); - - let current_slot = store.get_current_slot(); - - // Reset proposer boost if this is a new slot. - if current_slot > previous_slot { - store.set_proposer_boost_root(Hash256::zero()); - } - - // Not a new epoch, return. - if !(current_slot > previous_slot && compute_slots_since_epoch_start::(current_slot) == 0) { - return Ok(()); - } - - if store.best_justified_checkpoint().epoch > store.justified_checkpoint().epoch { - store - .set_justified_checkpoint(*store.best_justified_checkpoint()) - .map_err(Error::ForkChoiceStoreError)?; - } - - Ok(()) -} - /// Used for queuing attestations from the current slot. Only contains the minimum necessary /// information about the attestation. #[derive(Clone, PartialEq, Encode, Decode)] @@ -356,7 +381,7 @@ where // If the current slot is not provided, use the value that was last provided to the store. let current_slot = current_slot.unwrap_or_else(|| fc_store.get_current_slot()); - let proto_array = ProtoArrayForkChoice::new( + let proto_array = ProtoArrayForkChoice::new::( finalized_block_slot, finalized_block_state_root, *fc_store.justified_checkpoint(), @@ -473,7 +498,7 @@ where current_slot: Slot, spec: &ChainSpec, ) -> Result> { - self.update_time(current_slot)?; + self.update_time(current_slot, spec)?; let store = &mut self.fc_store; @@ -482,6 +507,7 @@ where *store.finalized_checkpoint(), store.justified_balances(), store.proposer_boost_root(), + current_slot, spec, )?; @@ -539,13 +565,11 @@ where /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#should_update_justified_checkpoint fn should_update_justified_checkpoint( &mut self, - current_slot: Slot, - state: &BeaconState, + new_justified_checkpoint: Checkpoint, + slots: UpdateJustifiedCheckpointSlots, spec: &ChainSpec, ) -> Result> { - self.update_time(current_slot)?; - - let new_justified_checkpoint = &state.current_justified_checkpoint(); + self.update_time(slots.current_slot(), spec)?; if compute_slots_since_epoch_start::(self.fc_store.get_current_slot()) < spec.safe_slots_to_update_justified @@ -557,11 +581,13 @@ where compute_start_slot_at_epoch::(self.fc_store.justified_checkpoint().epoch); // This sanity check is not in the spec, but the invariant is implied. - if justified_slot >= state.slot() { - return Err(Error::AttemptToRevertJustification { - store: justified_slot, - state: state.slot(), - }); + if let Some(state_slot) = slots.state_slot() { + if justified_slot >= state_slot { + return Err(Error::AttemptToRevertJustification { + store: justified_slot, + state: state_slot, + }); + } } // We know that the slot for `new_justified_checkpoint.root` is not greater than @@ -629,15 +655,15 @@ where state: &BeaconState, payload_verification_status: PayloadVerificationStatus, spec: &ChainSpec, + count_unrealized: CountUnrealized, ) -> Result<(), Error> { - let current_slot = self.update_time(current_slot)?; + let current_slot = self.update_time(current_slot, spec)?; // Parent block must be known. - if !self.proto_array.contains_block(&block.parent_root()) { - return Err(Error::InvalidBlock(InvalidBlock::UnknownParent( - block.parent_root(), - ))); - } + let parent_block = self + .proto_array + .get_block(&block.parent_root()) + .ok_or_else(|| Error::InvalidBlock(InvalidBlock::UnknownParent(block.parent_root())))?; // Blocks cannot be in the future. If they are, their consideration must be delayed until // the are in the past. @@ -686,29 +712,110 @@ where self.fc_store.set_proposer_boost_root(block_root); } - // Update justified checkpoint. - if state.current_justified_checkpoint().epoch > self.fc_store.justified_checkpoint().epoch { - if state.current_justified_checkpoint().epoch - > self.fc_store.best_justified_checkpoint().epoch + let update_justified_checkpoint_slots = UpdateJustifiedCheckpointSlots::OnBlock { + state_slot: state.slot(), + current_slot, + }; + + // Update store with checkpoints if necessary + self.update_checkpoints( + state.current_justified_checkpoint(), + state.finalized_checkpoint(), + update_justified_checkpoint_slots, + spec, + )?; + + // Update unrealized justified/finalized checkpoints. + let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) = if count_unrealized + .is_true() + { + let block_epoch = block.slot().epoch(E::slots_per_epoch()); + + // If the parent checkpoints are already at the same epoch as the block being imported, + // it's impossible for the unrealized checkpoints to differ from the parent's. This + // holds true because: + // + // 1. A child block cannot have lower FFG checkpoints than its parent. + // 2. A block in epoch `N` cannot contain attestations which would justify an epoch higher than `N`. + // 3. A block in epoch `N` cannot contain attestations which would finalize an epoch higher than `N - 1`. + // + // This is an optimization. It should reduce the amount of times we run + // `process_justification_and_finalization` by approximately 1/3rd when the chain is + // performing optimally. + let parent_checkpoints = parent_block + .unrealized_justified_checkpoint + .zip(parent_block.unrealized_finalized_checkpoint) + .filter(|(parent_justified, parent_finalized)| { + parent_justified.epoch == block_epoch + && parent_finalized.epoch + 1 >= block_epoch + }); + + let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) = + if let Some((parent_justified, parent_finalized)) = parent_checkpoints { + (parent_justified, parent_finalized) + } else { + let justification_and_finalization_state = match block { + BeaconBlockRef::Merge(_) | BeaconBlockRef::Altair(_) => { + let participation_cache = + per_epoch_processing::altair::ParticipationCache::new(state, spec) + .map_err(Error::ParticipationCacheBuild)?; + per_epoch_processing::altair::process_justification_and_finalization( + state, + &participation_cache, + )? + } + BeaconBlockRef::Base(_) => { + let mut validator_statuses = + per_epoch_processing::base::ValidatorStatuses::new(state, spec) + .map_err(Error::ValidatorStatuses)?; + validator_statuses + .process_attestations(state) + .map_err(Error::ValidatorStatuses)?; + per_epoch_processing::base::process_justification_and_finalization( + state, + &validator_statuses.total_balances, + spec, + )? + } + }; + + ( + justification_and_finalization_state.current_justified_checkpoint(), + justification_and_finalization_state.finalized_checkpoint(), + ) + }; + + // Update best known unrealized justified & finalized checkpoints + if unrealized_justified_checkpoint.epoch + > self.fc_store.unrealized_justified_checkpoint().epoch { self.fc_store - .set_best_justified_checkpoint(state.current_justified_checkpoint()); + .set_unrealized_justified_checkpoint(unrealized_justified_checkpoint); } - if self.should_update_justified_checkpoint(current_slot, state, spec)? { + if unrealized_finalized_checkpoint.epoch + > self.fc_store.unrealized_finalized_checkpoint().epoch + { self.fc_store - .set_justified_checkpoint(state.current_justified_checkpoint()) - .map_err(Error::UnableToSetJustifiedCheckpoint)?; + .set_unrealized_finalized_checkpoint(unrealized_finalized_checkpoint); } - } - // Update finalized checkpoint. - if state.finalized_checkpoint().epoch > self.fc_store.finalized_checkpoint().epoch { - self.fc_store - .set_finalized_checkpoint(state.finalized_checkpoint()); - self.fc_store - .set_justified_checkpoint(state.current_justified_checkpoint()) - .map_err(Error::UnableToSetJustifiedCheckpoint)?; - } + // If block is from past epochs, try to update store's justified & finalized checkpoints right away + if block.slot().epoch(E::slots_per_epoch()) < current_slot.epoch(E::slots_per_epoch()) { + self.update_checkpoints( + unrealized_justified_checkpoint, + unrealized_finalized_checkpoint, + update_justified_checkpoint_slots, + spec, + )?; + } + + ( + Some(unrealized_justified_checkpoint), + Some(unrealized_finalized_checkpoint), + ) + } else { + (None, None) + }; let target_slot = block .slot() @@ -757,32 +864,68 @@ where // This does not apply a vote to the block, it just makes fork choice aware of the block so // it can still be identified as the head even if it doesn't have any votes. - self.proto_array.process_block(ProtoBlock { - slot: block.slot(), - root: block_root, - parent_root: Some(block.parent_root()), - target_root, - current_epoch_shuffling_id: AttestationShufflingId::new( - block_root, - state, - RelativeEpoch::Current, - ) - .map_err(Error::BeaconStateError)?, - next_epoch_shuffling_id: AttestationShufflingId::new( - block_root, - state, - RelativeEpoch::Next, - ) - .map_err(Error::BeaconStateError)?, - state_root: block.state_root(), - justified_checkpoint: state.current_justified_checkpoint(), - finalized_checkpoint: state.finalized_checkpoint(), - execution_status, - })?; + self.proto_array.process_block::( + ProtoBlock { + slot: block.slot(), + root: block_root, + parent_root: Some(block.parent_root()), + target_root, + current_epoch_shuffling_id: AttestationShufflingId::new( + block_root, + state, + RelativeEpoch::Current, + ) + .map_err(Error::BeaconStateError)?, + next_epoch_shuffling_id: AttestationShufflingId::new( + block_root, + state, + RelativeEpoch::Next, + ) + .map_err(Error::BeaconStateError)?, + state_root: block.state_root(), + justified_checkpoint: state.current_justified_checkpoint(), + finalized_checkpoint: state.finalized_checkpoint(), + execution_status, + unrealized_justified_checkpoint, + unrealized_finalized_checkpoint, + }, + current_slot, + )?; Ok(()) } + /// Update checkpoints in store if necessary + fn update_checkpoints( + &mut self, + justified_checkpoint: Checkpoint, + finalized_checkpoint: Checkpoint, + slots: UpdateJustifiedCheckpointSlots, + spec: &ChainSpec, + ) -> Result<(), Error> { + // Update justified checkpoint. + if justified_checkpoint.epoch > self.fc_store.justified_checkpoint().epoch { + if justified_checkpoint.epoch > self.fc_store.best_justified_checkpoint().epoch { + self.fc_store + .set_best_justified_checkpoint(justified_checkpoint); + } + if self.should_update_justified_checkpoint(justified_checkpoint, slots, spec)? { + self.fc_store + .set_justified_checkpoint(justified_checkpoint) + .map_err(Error::UnableToSetJustifiedCheckpoint)?; + } + } + + // Update finalized checkpoint. + if finalized_checkpoint.epoch > self.fc_store.finalized_checkpoint().epoch { + self.fc_store.set_finalized_checkpoint(finalized_checkpoint); + self.fc_store + .set_justified_checkpoint(justified_checkpoint) + .map_err(Error::UnableToSetJustifiedCheckpoint)?; + } + Ok(()) + } + /// Validates the `epoch` against the current time according to the fork choice store. /// /// ## Specification @@ -920,9 +1063,10 @@ where current_slot: Slot, attestation: &IndexedAttestation, is_from_block: AttestationFromBlock, + spec: &ChainSpec, ) -> Result<(), Error> { // Ensure the store is up-to-date. - self.update_time(current_slot)?; + self.update_time(current_slot, spec)?; // Ignore any attestations to the zero hash. // @@ -967,12 +1111,16 @@ where /// Call `on_tick` for all slots between `fc_store.get_current_slot()` and the provided /// `current_slot`. Returns the value of `self.fc_store.get_current_slot`. - pub fn update_time(&mut self, current_slot: Slot) -> Result> { + pub fn update_time( + &mut self, + current_slot: Slot, + spec: &ChainSpec, + ) -> Result> { while self.fc_store.get_current_slot() < current_slot { let previous_slot = self.fc_store.get_current_slot(); // Note: we are relying upon `on_tick` to update `fc_store.time` to ensure we don't // get stuck in a loop. - on_tick(&mut self.fc_store, previous_slot + 1)? + self.on_tick(previous_slot + 1, spec)? } // Process any attestations that might now be eligible. @@ -981,6 +1129,63 @@ where Ok(self.fc_store.get_current_slot()) } + /// Called whenever the current time increases. + /// + /// ## Specification + /// + /// Equivalent to: + /// + /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#on_tick + fn on_tick(&mut self, time: Slot, spec: &ChainSpec) -> Result<(), Error> { + let store = &mut self.fc_store; + let previous_slot = store.get_current_slot(); + + if time > previous_slot + 1 { + return Err(Error::InconsistentOnTick { + previous_slot, + time, + }); + } + + // Update store time. + store.set_current_slot(time); + + let current_slot = store.get_current_slot(); + + // Reset proposer boost if this is a new slot. + if current_slot > previous_slot { + store.set_proposer_boost_root(Hash256::zero()); + } + + // Not a new epoch, return. + if !(current_slot > previous_slot + && compute_slots_since_epoch_start::(current_slot) == 0) + { + return Ok(()); + } + + if store.best_justified_checkpoint().epoch > store.justified_checkpoint().epoch { + let store = &self.fc_store; + if self.is_descendant_of_finalized(store.best_justified_checkpoint().root) { + let store = &mut self.fc_store; + store + .set_justified_checkpoint(*store.best_justified_checkpoint()) + .map_err(Error::ForkChoiceStoreError)?; + } + } + + // Update store.justified_checkpoint if a better unrealized justified checkpoint is known + let unrealized_justified_checkpoint = *self.fc_store.unrealized_justified_checkpoint(); + let unrealized_finalized_checkpoint = *self.fc_store.unrealized_finalized_checkpoint(); + self.update_checkpoints( + unrealized_justified_checkpoint, + unrealized_finalized_checkpoint, + UpdateJustifiedCheckpointSlots::OnTick { current_slot }, + spec, + )?; + Ok(()) + } + /// Processes and removes from the queue any queued attestations which may now be eligible for /// processing due to the slot clock incrementing. fn process_attestation_queue(&mut self) -> Result<(), Error> { @@ -1158,6 +1363,14 @@ where *self.fc_store.best_justified_checkpoint() } + pub fn unrealized_justified_checkpoint(&self) -> Checkpoint { + *self.fc_store.unrealized_justified_checkpoint() + } + + pub fn unrealized_finalized_checkpoint(&self) -> Checkpoint { + *self.fc_store.unrealized_finalized_checkpoint() + } + /// Returns the latest message for a given validator, if any. /// /// Returns `(block_root, block_slot)`. diff --git a/consensus/fork_choice/src/fork_choice_store.rs b/consensus/fork_choice/src/fork_choice_store.rs index 6df0cbc2c..a7085b024 100644 --- a/consensus/fork_choice/src/fork_choice_store.rs +++ b/consensus/fork_choice/src/fork_choice_store.rs @@ -50,6 +50,12 @@ pub trait ForkChoiceStore: Sized { /// Returns the `finalized_checkpoint`. fn finalized_checkpoint(&self) -> &Checkpoint; + /// Returns the `unrealized_justified_checkpoint`. + fn unrealized_justified_checkpoint(&self) -> &Checkpoint; + + /// Returns the `unrealized_finalized_checkpoint`. + fn unrealized_finalized_checkpoint(&self) -> &Checkpoint; + /// Returns the `proposer_boost_root`. fn proposer_boost_root(&self) -> Hash256; @@ -62,6 +68,12 @@ pub trait ForkChoiceStore: Sized { /// Sets the `best_justified_checkpoint`. fn set_best_justified_checkpoint(&mut self, checkpoint: Checkpoint); + /// Sets the `unrealized_justified_checkpoint`. + fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint); + + /// Sets the `unrealized_finalized_checkpoint`. + fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint); + /// Sets the proposer boost root. fn set_proposer_boost_root(&mut self, proposer_boost_root: Hash256); } diff --git a/consensus/fork_choice/src/lib.rs b/consensus/fork_choice/src/lib.rs index 6f79b488d..6cb2010f1 100644 --- a/consensus/fork_choice/src/lib.rs +++ b/consensus/fork_choice/src/lib.rs @@ -2,9 +2,9 @@ mod fork_choice; mod fork_choice_store; pub use crate::fork_choice::{ - AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters, - InvalidAttestation, InvalidBlock, PayloadVerificationStatus, PersistedForkChoice, - QueuedAttestation, + AttestationFromBlock, CountUnrealized, Error, ForkChoice, ForkChoiceView, + ForkchoiceUpdateParameters, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, + PersistedForkChoice, QueuedAttestation, }; pub use fork_choice_store::ForkChoiceStore; pub use proto_array::{Block as ProtoBlock, ExecutionStatus, InvalidationOperation}; diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 2d10319cf..850f7c4a1 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -12,7 +12,8 @@ use beacon_chain::{ StateSkipConfig, WhenSlotSkipped, }; use fork_choice::{ - ForkChoiceStore, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, QueuedAttestation, + CountUnrealized, ForkChoiceStore, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, + QueuedAttestation, }; use store::MemoryStore; use types::{ @@ -150,7 +151,7 @@ impl ForkChoiceTest { .chain .canonical_head .fork_choice_write_lock() - .update_time(self.harness.chain.slot().unwrap()) + .update_time(self.harness.chain.slot().unwrap(), &self.harness.spec) .unwrap(); func( self.harness @@ -292,6 +293,7 @@ impl ForkChoiceTest { &state, PayloadVerificationStatus::Verified, &self.harness.chain.spec, + CountUnrealized::True, ) .unwrap(); self @@ -334,6 +336,7 @@ impl ForkChoiceTest { &state, PayloadVerificationStatus::Verified, &self.harness.chain.spec, + CountUnrealized::True, ) .err() .expect("on_block did not return an error"); diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index 2be46cc59..0cfa3a194 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -78,7 +78,7 @@ impl ForkChoiceTestDefinition { let junk_shuffling_id = AttestationShufflingId::from_components(Epoch::new(0), Hash256::zero()); - let mut fork_choice = ProtoArrayForkChoice::new( + let mut fork_choice = ProtoArrayForkChoice::new::( self.finalized_block_slot, Hash256::zero(), self.justified_checkpoint, @@ -103,6 +103,7 @@ impl ForkChoiceTestDefinition { finalized_checkpoint, &justified_state_balances, Hash256::zero(), + Slot::new(0), &spec, ) .unwrap_or_else(|e| { @@ -129,6 +130,7 @@ impl ForkChoiceTestDefinition { finalized_checkpoint, &justified_state_balances, proposer_boost_root, + Slot::new(0), &spec, ) .unwrap_or_else(|e| { @@ -152,6 +154,7 @@ impl ForkChoiceTestDefinition { finalized_checkpoint, &justified_state_balances, Hash256::zero(), + Slot::new(0), &spec, ); @@ -190,13 +193,17 @@ impl ForkChoiceTestDefinition { execution_status: ExecutionStatus::Optimistic( ExecutionBlockHash::from_root(root), ), + unrealized_justified_checkpoint: None, + unrealized_finalized_checkpoint: None, }; - fork_choice.process_block(block).unwrap_or_else(|e| { - panic!( - "process_block op at index {} returned error: {:?}", - op_index, e - ) - }); + fork_choice + .process_block::(block, slot) + .unwrap_or_else(|e| { + panic!( + "process_block op at index {} returned error: {:?}", + op_index, e + ) + }); check_bytes_round_trip(&fork_choice); } Operation::ProcessAttestation { diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index f3ee4ca48..85a15fb60 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -97,6 +97,10 @@ pub struct ProtoNode { /// Indicates if an execution node has marked this block as valid. Also contains the execution /// block hash. pub execution_status: ExecutionStatus, + #[ssz(with = "four_byte_option_checkpoint")] + pub unrealized_justified_checkpoint: Option, + #[ssz(with = "four_byte_option_checkpoint")] + pub unrealized_finalized_checkpoint: Option, } #[derive(PartialEq, Debug, Encode, Decode, Serialize, Deserialize, Copy, Clone)] @@ -140,6 +144,7 @@ impl ProtoArray { /// - Compare the current node with the parents best-child, updating it if the current node /// should become the best child. /// - If required, update the parents best-descendant with the current node or its best-descendant. + #[allow(clippy::too_many_arguments)] pub fn apply_score_changes( &mut self, mut deltas: Vec, @@ -147,6 +152,7 @@ impl ProtoArray { finalized_checkpoint: Checkpoint, new_balances: &[u64], proposer_boost_root: Hash256, + current_slot: Slot, spec: &ChainSpec, ) -> Result<(), Error> { if deltas.len() != self.indices.len() { @@ -280,7 +286,11 @@ impl ProtoArray { // If the node has a parent, try to update its best-child and best-descendant. if let Some(parent_index) = node.parent { - self.maybe_update_best_child_and_descendant(parent_index, node_index)?; + self.maybe_update_best_child_and_descendant::( + parent_index, + node_index, + current_slot, + )?; } } @@ -290,7 +300,7 @@ impl ProtoArray { /// Register a block with the fork choice. /// /// It is only sane to supply a `None` parent for the genesis block. - pub fn on_block(&mut self, block: Block) -> Result<(), Error> { + pub fn on_block(&mut self, block: Block, current_slot: Slot) -> Result<(), Error> { // If the block is already known, simply ignore it. if self.indices.contains_key(&block.root) { return Ok(()); @@ -314,6 +324,8 @@ impl ProtoArray { best_child: None, best_descendant: None, execution_status: block.execution_status, + unrealized_justified_checkpoint: block.unrealized_justified_checkpoint, + unrealized_finalized_checkpoint: block.unrealized_finalized_checkpoint, }; // If the parent has an invalid execution status, return an error before adding the block to @@ -335,7 +347,11 @@ impl ProtoArray { self.nodes.push(node.clone()); if let Some(parent_index) = node.parent { - self.maybe_update_best_child_and_descendant(parent_index, node_index)?; + self.maybe_update_best_child_and_descendant::( + parent_index, + node_index, + current_slot, + )?; if matches!(block.execution_status, ExecutionStatus::Valid(_)) { self.propagate_execution_payload_validation_by_index(parent_index)?; @@ -604,7 +620,11 @@ impl ProtoArray { /// been called without a subsequent `Self::apply_score_changes` call. This is because /// `on_new_block` does not attempt to walk backwards through the tree and update the /// best-child/best-descendant links. - pub fn find_head(&self, justified_root: &Hash256) -> Result { + pub fn find_head( + &self, + justified_root: &Hash256, + current_slot: Slot, + ) -> Result { let justified_index = self .indices .get(justified_root) @@ -637,7 +657,7 @@ impl ProtoArray { .ok_or(Error::InvalidBestDescendant(best_descendant_index))?; // Perform a sanity check that the node is indeed valid to be the head. - if !self.node_is_viable_for_head(best_node) { + if !self.node_is_viable_for_head::(best_node, current_slot) { return Err(Error::InvalidBestNode(Box::new(InvalidBestNodeInfo { start_root: *justified_root, justified_checkpoint: self.justified_checkpoint, @@ -733,10 +753,11 @@ impl ProtoArray { /// best-descendant. /// - The child is not the best child but becomes the best child. /// - The child is not the best child and does not become the best child. - fn maybe_update_best_child_and_descendant( + fn maybe_update_best_child_and_descendant( &mut self, parent_index: usize, child_index: usize, + current_slot: Slot, ) -> Result<(), Error> { let child = self .nodes @@ -748,7 +769,8 @@ impl ProtoArray { .get(parent_index) .ok_or(Error::InvalidNodeIndex(parent_index))?; - let child_leads_to_viable_head = self.node_leads_to_viable_head(child)?; + let child_leads_to_viable_head = + self.node_leads_to_viable_head::(child, current_slot)?; // These three variables are aliases to the three options that we may set the // `parent.best_child` and `parent.best_descendant` to. @@ -761,54 +783,54 @@ impl ProtoArray { ); let no_change = (parent.best_child, parent.best_descendant); - let (new_best_child, new_best_descendant) = if let Some(best_child_index) = - parent.best_child - { - if best_child_index == child_index && !child_leads_to_viable_head { - // If the child is already the best-child of the parent but it's not viable for - // the head, remove it. - change_to_none - } else if best_child_index == child_index { - // If the child is the best-child already, set it again to ensure that the - // best-descendant of the parent is updated. - change_to_child - } else { - let best_child = self - .nodes - .get(best_child_index) - .ok_or(Error::InvalidBestDescendant(best_child_index))?; - - let best_child_leads_to_viable_head = self.node_leads_to_viable_head(best_child)?; - - if child_leads_to_viable_head && !best_child_leads_to_viable_head { - // The child leads to a viable head, but the current best-child doesn't. + let (new_best_child, new_best_descendant) = + if let Some(best_child_index) = parent.best_child { + if best_child_index == child_index && !child_leads_to_viable_head { + // If the child is already the best-child of the parent but it's not viable for + // the head, remove it. + change_to_none + } else if best_child_index == child_index { + // If the child is the best-child already, set it again to ensure that the + // best-descendant of the parent is updated. change_to_child - } else if !child_leads_to_viable_head && best_child_leads_to_viable_head { - // The best child leads to a viable head, but the child doesn't. - no_change - } else if child.weight == best_child.weight { - // Tie-breaker of equal weights by root. - if child.root >= best_child.root { - change_to_child - } else { - no_change - } } else { - // Choose the winner by weight. - if child.weight >= best_child.weight { + let best_child = self + .nodes + .get(best_child_index) + .ok_or(Error::InvalidBestDescendant(best_child_index))?; + + let best_child_leads_to_viable_head = + self.node_leads_to_viable_head::(best_child, current_slot)?; + + if child_leads_to_viable_head && !best_child_leads_to_viable_head { + // The child leads to a viable head, but the current best-child doesn't. change_to_child - } else { + } else if !child_leads_to_viable_head && best_child_leads_to_viable_head { + // The best child leads to a viable head, but the child doesn't. no_change + } else if child.weight == best_child.weight { + // Tie-breaker of equal weights by root. + if child.root >= best_child.root { + change_to_child + } else { + no_change + } + } else { + // Choose the winner by weight. + if child.weight >= best_child.weight { + change_to_child + } else { + no_change + } } } - } - } else if child_leads_to_viable_head { - // There is no current best-child and the child is viable. - change_to_child - } else { - // There is no current best-child but the child is not viable. - no_change - }; + } else if child_leads_to_viable_head { + // There is no current best-child and the child is viable. + change_to_child + } else { + // There is no current best-child but the child is not viable. + no_change + }; let parent = self .nodes @@ -823,7 +845,11 @@ impl ProtoArray { /// Indicates if the node itself is viable for the head, or if it's best descendant is viable /// for the head. - fn node_leads_to_viable_head(&self, node: &ProtoNode) -> Result { + fn node_leads_to_viable_head( + &self, + node: &ProtoNode, + current_slot: Slot, + ) -> Result { let best_descendant_is_viable_for_head = if let Some(best_descendant_index) = node.best_descendant { let best_descendant = self @@ -831,12 +857,13 @@ impl ProtoArray { .get(best_descendant_index) .ok_or(Error::InvalidBestDescendant(best_descendant_index))?; - self.node_is_viable_for_head(best_descendant) + self.node_is_viable_for_head::(best_descendant, current_slot) } else { false }; - Ok(best_descendant_is_viable_for_head || self.node_is_viable_for_head(node)) + Ok(best_descendant_is_viable_for_head + || self.node_is_viable_for_head::(node, current_slot)) } /// This is the equivalent to the `filter_block_tree` function in the eth2 spec: @@ -845,18 +872,43 @@ impl ProtoArray { /// /// Any node that has a different finalized or justified epoch should not be viable for the /// head. - fn node_is_viable_for_head(&self, node: &ProtoNode) -> bool { + fn node_is_viable_for_head(&self, node: &ProtoNode, current_slot: Slot) -> bool { if node.execution_status.is_invalid() { return false; } - if let (Some(node_justified_checkpoint), Some(node_finalized_checkpoint)) = + let checkpoint_match_predicate = + |node_justified_checkpoint: Checkpoint, node_finalized_checkpoint: Checkpoint| { + let correct_justified = node_justified_checkpoint == self.justified_checkpoint + || self.justified_checkpoint.epoch == Epoch::new(0); + let correct_finalized = node_finalized_checkpoint == self.finalized_checkpoint + || self.finalized_checkpoint.epoch == Epoch::new(0); + correct_justified && correct_finalized + }; + + if let ( + Some(unrealized_justified_checkpoint), + Some(unrealized_finalized_checkpoint), + Some(justified_checkpoint), + Some(finalized_checkpoint), + ) = ( + node.unrealized_justified_checkpoint, + node.unrealized_finalized_checkpoint, + node.justified_checkpoint, + node.finalized_checkpoint, + ) { + if node.slot.epoch(E::slots_per_epoch()) < current_slot.epoch(E::slots_per_epoch()) { + checkpoint_match_predicate( + unrealized_justified_checkpoint, + unrealized_finalized_checkpoint, + ) + } else { + checkpoint_match_predicate(justified_checkpoint, finalized_checkpoint) + } + } else if let (Some(justified_checkpoint), Some(finalized_checkpoint)) = (node.justified_checkpoint, node.finalized_checkpoint) { - (node_justified_checkpoint == self.justified_checkpoint - || self.justified_checkpoint.epoch == Epoch::new(0)) - && (node_finalized_checkpoint == self.finalized_checkpoint - || self.finalized_checkpoint.epoch == Epoch::new(0)) + checkpoint_match_predicate(justified_checkpoint, finalized_checkpoint) } else { false } diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 88bf7840c..568cfa964 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -124,6 +124,8 @@ pub struct Block { /// Indicates if an execution node has marked this block as valid. Also contains the execution /// block hash. pub execution_status: ExecutionStatus, + pub unrealized_justified_checkpoint: Option, + pub unrealized_finalized_checkpoint: Option, } /// A Vec-wrapper which will grow to match any request. @@ -162,7 +164,7 @@ pub struct ProtoArrayForkChoice { impl ProtoArrayForkChoice { #[allow(clippy::too_many_arguments)] - pub fn new( + pub fn new( finalized_block_slot: Slot, finalized_block_state_root: Hash256, justified_checkpoint: Checkpoint, @@ -193,10 +195,12 @@ impl ProtoArrayForkChoice { justified_checkpoint, finalized_checkpoint, execution_status, + unrealized_justified_checkpoint: Some(justified_checkpoint), + unrealized_finalized_checkpoint: Some(finalized_checkpoint), }; proto_array - .on_block(block) + .on_block::(block, finalized_block_slot) .map_err(|e| format!("Failed to add finalized block to proto_array: {:?}", e))?; Ok(Self { @@ -242,13 +246,17 @@ impl ProtoArrayForkChoice { Ok(()) } - pub fn process_block(&mut self, block: Block) -> Result<(), String> { + pub fn process_block( + &mut self, + block: Block, + current_slot: Slot, + ) -> Result<(), String> { if block.parent_root.is_none() { return Err("Missing parent root".to_string()); } self.proto_array - .on_block(block) + .on_block::(block, current_slot) .map_err(|e| format!("process_block_error: {:?}", e)) } @@ -258,6 +266,7 @@ impl ProtoArrayForkChoice { finalized_checkpoint: Checkpoint, justified_state_balances: &[u64], proposer_boost_root: Hash256, + current_slot: Slot, spec: &ChainSpec, ) -> Result { let old_balances = &mut self.balances; @@ -279,6 +288,7 @@ impl ProtoArrayForkChoice { finalized_checkpoint, new_balances, proposer_boost_root, + current_slot, spec, ) .map_err(|e| format!("find_head apply_score_changes failed: {:?}", e))?; @@ -286,7 +296,7 @@ impl ProtoArrayForkChoice { *old_balances = new_balances.to_vec(); self.proto_array - .find_head(&justified_checkpoint.root) + .find_head::(&justified_checkpoint.root, current_slot) .map_err(|e| format!("find_head failed: {:?}", e)) } @@ -341,6 +351,8 @@ impl ProtoArrayForkChoice { justified_checkpoint, finalized_checkpoint, execution_status: block.execution_status, + unrealized_justified_checkpoint: block.unrealized_justified_checkpoint, + unrealized_finalized_checkpoint: block.unrealized_finalized_checkpoint, }) } else { None @@ -485,6 +497,7 @@ fn compute_deltas( #[cfg(test)] mod test_compute_deltas { use super::*; + use types::MainnetEthSpec; /// Gives a hash that is not the zero hash (unless i is `usize::max_value)`. fn hash_from_index(i: usize) -> Hash256 { @@ -510,7 +523,7 @@ mod test_compute_deltas { root: finalized_root, }; - let mut fc = ProtoArrayForkChoice::new( + let mut fc = ProtoArrayForkChoice::new::( genesis_slot, state_root, genesis_checkpoint, @@ -523,34 +536,44 @@ mod test_compute_deltas { // Add block that is a finalized descendant. fc.proto_array - .on_block(Block { - slot: genesis_slot + 1, - root: finalized_desc, - parent_root: Some(finalized_root), - state_root, - target_root: finalized_root, - current_epoch_shuffling_id: junk_shuffling_id.clone(), - next_epoch_shuffling_id: junk_shuffling_id.clone(), - justified_checkpoint: genesis_checkpoint, - finalized_checkpoint: genesis_checkpoint, - execution_status, - }) + .on_block::( + Block { + slot: genesis_slot + 1, + root: finalized_desc, + parent_root: Some(finalized_root), + state_root, + target_root: finalized_root, + current_epoch_shuffling_id: junk_shuffling_id.clone(), + next_epoch_shuffling_id: junk_shuffling_id.clone(), + justified_checkpoint: genesis_checkpoint, + finalized_checkpoint: genesis_checkpoint, + execution_status, + unrealized_justified_checkpoint: Some(genesis_checkpoint), + unrealized_finalized_checkpoint: Some(genesis_checkpoint), + }, + genesis_slot + 1, + ) .unwrap(); // Add block that is *not* a finalized descendant. fc.proto_array - .on_block(Block { - slot: genesis_slot + 1, - root: not_finalized_desc, - parent_root: None, - state_root, - target_root: finalized_root, - current_epoch_shuffling_id: junk_shuffling_id.clone(), - next_epoch_shuffling_id: junk_shuffling_id, - justified_checkpoint: genesis_checkpoint, - finalized_checkpoint: genesis_checkpoint, - execution_status, - }) + .on_block::( + Block { + slot: genesis_slot + 1, + root: not_finalized_desc, + parent_root: None, + state_root, + target_root: finalized_root, + current_epoch_shuffling_id: junk_shuffling_id.clone(), + next_epoch_shuffling_id: junk_shuffling_id, + justified_checkpoint: genesis_checkpoint, + finalized_checkpoint: genesis_checkpoint, + execution_status, + unrealized_justified_checkpoint: None, + unrealized_finalized_checkpoint: None, + }, + genesis_slot + 1, + ) .unwrap(); assert!(!fc.is_descendant(unknown, unknown)); diff --git a/consensus/state_processing/src/per_epoch_processing.rs b/consensus/state_processing/src/per_epoch_processing.rs index d813dc42f..cb90c67b5 100644 --- a/consensus/state_processing/src/per_epoch_processing.rs +++ b/consensus/state_processing/src/per_epoch_processing.rs @@ -2,6 +2,7 @@ pub use epoch_processing_summary::EpochProcessingSummary; use errors::EpochProcessingError as Error; +pub use justification_and_finalization_state::JustificationAndFinalizationState; pub use registry_updates::process_registry_updates; use safe_arith::SafeArith; pub use slashings::process_slashings; @@ -14,6 +15,7 @@ pub mod effective_balance_updates; pub mod epoch_processing_summary; pub mod errors; pub mod historical_roots_update; +pub mod justification_and_finalization_state; pub mod registry_updates; pub mod resets; pub mod slashings; diff --git a/consensus/state_processing/src/per_epoch_processing/altair.rs b/consensus/state_processing/src/per_epoch_processing/altair.rs index 1011abe28..d5df2fc97 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair.rs @@ -33,7 +33,9 @@ pub fn process_epoch( let sync_committee = state.current_sync_committee()?.clone(); // Justification and finalization. - process_justification_and_finalization(state, &participation_cache)?; + let justification_and_finalization_state = + process_justification_and_finalization(state, &participation_cache)?; + justification_and_finalization_state.apply_changes_to_state(state); process_inactivity_updates(state, &participation_cache, spec)?; diff --git a/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs b/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs index f47d9c0e6..1f17cf56e 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs @@ -1,17 +1,21 @@ use super::ParticipationCache; -use crate::per_epoch_processing::weigh_justification_and_finalization; use crate::per_epoch_processing::Error; +use crate::per_epoch_processing::{ + weigh_justification_and_finalization, JustificationAndFinalizationState, +}; use safe_arith::SafeArith; use types::consts::altair::TIMELY_TARGET_FLAG_INDEX; use types::{BeaconState, EthSpec}; /// Update the justified and finalized checkpoints for matching target attestations. pub fn process_justification_and_finalization( - state: &mut BeaconState, + state: &BeaconState, participation_cache: &ParticipationCache, -) -> Result<(), Error> { +) -> Result, Error> { + let justification_and_finalization_state = JustificationAndFinalizationState::new(state); + if state.current_epoch() <= T::genesis_epoch().safe_add(1)? { - return Ok(()); + return Ok(justification_and_finalization_state); } let previous_epoch = state.previous_epoch(); @@ -24,7 +28,7 @@ pub fn process_justification_and_finalization( let previous_target_balance = previous_indices.total_balance()?; let current_target_balance = current_indices.total_balance()?; weigh_justification_and_finalization( - state, + justification_and_finalization_state, total_active_balance, previous_target_balance, current_target_balance, diff --git a/consensus/state_processing/src/per_epoch_processing/base.rs b/consensus/state_processing/src/per_epoch_processing/base.rs index 4ae2207ff..cb7e7d4b3 100644 --- a/consensus/state_processing/src/per_epoch_processing/base.rs +++ b/consensus/state_processing/src/per_epoch_processing/base.rs @@ -31,7 +31,9 @@ pub fn process_epoch( validator_statuses.process_attestations(state)?; // Justification and finalization. - process_justification_and_finalization(state, &validator_statuses.total_balances, spec)?; + let justification_and_finalization_state = + process_justification_and_finalization(state, &validator_statuses.total_balances, spec)?; + justification_and_finalization_state.apply_changes_to_state(state); // Rewards and Penalties. process_rewards_and_penalties(state, &mut validator_statuses, spec)?; diff --git a/consensus/state_processing/src/per_epoch_processing/base/justification_and_finalization.rs b/consensus/state_processing/src/per_epoch_processing/base/justification_and_finalization.rs index 89fb506ee..9792b5450 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/justification_and_finalization.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/justification_and_finalization.rs @@ -1,21 +1,25 @@ use crate::per_epoch_processing::base::TotalBalances; -use crate::per_epoch_processing::weigh_justification_and_finalization; use crate::per_epoch_processing::Error; +use crate::per_epoch_processing::{ + weigh_justification_and_finalization, JustificationAndFinalizationState, +}; use safe_arith::SafeArith; use types::{BeaconState, ChainSpec, EthSpec}; /// Update the justified and finalized checkpoints for matching target attestations. pub fn process_justification_and_finalization( - state: &mut BeaconState, + state: &BeaconState, total_balances: &TotalBalances, _spec: &ChainSpec, -) -> Result<(), Error> { +) -> Result, Error> { + let justification_and_finalization_state = JustificationAndFinalizationState::new(state); + if state.current_epoch() <= T::genesis_epoch().safe_add(1)? { - return Ok(()); + return Ok(justification_and_finalization_state); } weigh_justification_and_finalization( - state, + justification_and_finalization_state, total_balances.current_epoch(), total_balances.previous_epoch_target_attesters(), total_balances.current_epoch_target_attesters(), diff --git a/consensus/state_processing/src/per_epoch_processing/justification_and_finalization_state.rs b/consensus/state_processing/src/per_epoch_processing/justification_and_finalization_state.rs new file mode 100644 index 000000000..d8a641f46 --- /dev/null +++ b/consensus/state_processing/src/per_epoch_processing/justification_and_finalization_state.rs @@ -0,0 +1,115 @@ +use types::{BeaconState, BeaconStateError, BitVector, Checkpoint, Epoch, EthSpec, Hash256}; + +/// This is a subset of the `BeaconState` which is used to compute justification and finality +/// without modifying the `BeaconState`. +/// +/// A `JustificationAndFinalizationState` can be created from a `BeaconState` to compute +/// justification/finality changes and then applied to a `BeaconState` to enshrine those changes. +#[must_use = "this value must be applied to a state or explicitly dropped"] +pub struct JustificationAndFinalizationState { + /* + * Immutable fields. + */ + previous_epoch: Epoch, + previous_epoch_target_root: Result, + current_epoch: Epoch, + current_epoch_target_root: Result, + /* + * Mutable fields. + */ + previous_justified_checkpoint: Checkpoint, + current_justified_checkpoint: Checkpoint, + finalized_checkpoint: Checkpoint, + justification_bits: BitVector, +} + +impl JustificationAndFinalizationState { + pub fn new(state: &BeaconState) -> Self { + let previous_epoch = state.previous_epoch(); + let current_epoch = state.current_epoch(); + Self { + previous_epoch, + previous_epoch_target_root: state.get_block_root_at_epoch(previous_epoch).copied(), + current_epoch, + current_epoch_target_root: state.get_block_root_at_epoch(current_epoch).copied(), + previous_justified_checkpoint: state.previous_justified_checkpoint(), + current_justified_checkpoint: state.current_justified_checkpoint(), + finalized_checkpoint: state.finalized_checkpoint(), + justification_bits: state.justification_bits().clone(), + } + } + + pub fn apply_changes_to_state(self, state: &mut BeaconState) { + let Self { + /* + * Immutable fields do not need to be used. + */ + previous_epoch: _, + previous_epoch_target_root: _, + current_epoch: _, + current_epoch_target_root: _, + /* + * Mutable fields *must* be used. + */ + previous_justified_checkpoint, + current_justified_checkpoint, + finalized_checkpoint, + justification_bits, + } = self; + + *state.previous_justified_checkpoint_mut() = previous_justified_checkpoint; + *state.current_justified_checkpoint_mut() = current_justified_checkpoint; + *state.finalized_checkpoint_mut() = finalized_checkpoint; + *state.justification_bits_mut() = justification_bits; + } + + pub fn previous_epoch(&self) -> Epoch { + self.previous_epoch + } + + pub fn current_epoch(&self) -> Epoch { + self.current_epoch + } + + pub fn get_block_root_at_epoch(&self, epoch: Epoch) -> Result { + if epoch == self.previous_epoch { + self.previous_epoch_target_root.clone() + } else if epoch == self.current_epoch { + self.current_epoch_target_root.clone() + } else { + Err(BeaconStateError::SlotOutOfBounds) + } + } + + pub fn previous_justified_checkpoint(&self) -> Checkpoint { + self.previous_justified_checkpoint + } + + pub fn previous_justified_checkpoint_mut(&mut self) -> &mut Checkpoint { + &mut self.previous_justified_checkpoint + } + + pub fn current_justified_checkpoint_mut(&mut self) -> &mut Checkpoint { + &mut self.current_justified_checkpoint + } + + pub fn current_justified_checkpoint(&self) -> Checkpoint { + self.current_justified_checkpoint + } + + pub fn finalized_checkpoint(&self) -> Checkpoint { + self.finalized_checkpoint + } + + pub fn finalized_checkpoint_mut(&mut self) -> &mut Checkpoint { + &mut self.finalized_checkpoint + } + + pub fn justification_bits(&self) -> &BitVector { + &self.justification_bits + } + + pub fn justification_bits_mut(&mut self) -> &mut BitVector { + &mut self.justification_bits + } +} diff --git a/consensus/state_processing/src/per_epoch_processing/weigh_justification_and_finalization.rs b/consensus/state_processing/src/per_epoch_processing/weigh_justification_and_finalization.rs index 6e90ee8f3..96f6a8ef1 100644 --- a/consensus/state_processing/src/per_epoch_processing/weigh_justification_and_finalization.rs +++ b/consensus/state_processing/src/per_epoch_processing/weigh_justification_and_finalization.rs @@ -1,16 +1,16 @@ -use crate::per_epoch_processing::Error; +use crate::per_epoch_processing::{Error, JustificationAndFinalizationState}; use safe_arith::SafeArith; use std::ops::Range; -use types::{BeaconState, Checkpoint, EthSpec}; +use types::{Checkpoint, EthSpec}; /// Update the justified and finalized checkpoints for matching target attestations. #[allow(clippy::if_same_then_else)] // For readability and consistency with spec. pub fn weigh_justification_and_finalization( - state: &mut BeaconState, + mut state: JustificationAndFinalizationState, total_active_balance: u64, previous_target_balance: u64, current_target_balance: u64, -) -> Result<(), Error> { +) -> Result, Error> { let previous_epoch = state.previous_epoch(); let current_epoch = state.current_epoch(); @@ -24,7 +24,7 @@ pub fn weigh_justification_and_finalization( if previous_target_balance.safe_mul(3)? >= total_active_balance.safe_mul(2)? { *state.current_justified_checkpoint_mut() = Checkpoint { epoch: previous_epoch, - root: *state.get_block_root_at_epoch(previous_epoch)?, + root: state.get_block_root_at_epoch(previous_epoch)?, }; state.justification_bits_mut().set(1, true)?; } @@ -32,7 +32,7 @@ pub fn weigh_justification_and_finalization( if current_target_balance.safe_mul(3)? >= total_active_balance.safe_mul(2)? { *state.current_justified_checkpoint_mut() = Checkpoint { epoch: current_epoch, - root: *state.get_block_root_at_epoch(current_epoch)?, + root: state.get_block_root_at_epoch(current_epoch)?, }; state.justification_bits_mut().set(0, true)?; } @@ -66,5 +66,5 @@ pub fn weigh_justification_and_finalization( *state.finalized_checkpoint_mut() = old_current_justified_checkpoint; } - Ok(()) + Ok(state) } diff --git a/consensus/types/src/test_utils/test_random.rs b/consensus/types/src/test_utils/test_random.rs index 55135a8a2..43396dedc 100644 --- a/consensus/types/src/test_utils/test_random.rs +++ b/consensus/types/src/test_utils/test_random.rs @@ -129,6 +129,7 @@ macro_rules! impl_test_random_for_u8_array { }; } +impl_test_random_for_u8_array!(3); impl_test_random_for_u8_array!(4); impl_test_random_for_u8_array!(32); impl_test_random_for_u8_array!(48); diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 6cc0e5959..ddf0cdc8c 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -37,3 +37,4 @@ web3 = { version = "0.18.0", default-features = false, features = ["http-tls", " eth1_test_rig = { path = "../testing/eth1_test_rig" } sensitive_url = { path = "../common/sensitive_url" } eth2 = { path = "../common/eth2" } +snap = "1.0.1" diff --git a/lcli/src/parse_ssz.rs b/lcli/src/parse_ssz.rs index 3f272780d..5d988ee18 100644 --- a/lcli/src/parse_ssz.rs +++ b/lcli/src/parse_ssz.rs @@ -1,7 +1,9 @@ use clap::ArgMatches; use clap_utils::parse_required; use serde::Serialize; +use snap::raw::Decoder; use ssz::Decode; +use std::fs; use std::fs::File; use std::io::Read; use std::str::FromStr; @@ -29,11 +31,18 @@ pub fn run_parse_ssz(matches: &ArgMatches) -> Result<(), String> { let filename = matches.value_of("ssz-file").ok_or("No file supplied")?; let format = parse_required(matches, "format")?; - let mut bytes = vec![]; - let mut file = - File::open(filename).map_err(|e| format!("Unable to open {}: {}", filename, e))?; - file.read_to_end(&mut bytes) - .map_err(|e| format!("Unable to read {}: {}", filename, e))?; + let bytes = if filename.ends_with("ssz_snappy") { + let bytes = fs::read(filename).unwrap(); + let mut decoder = Decoder::new(); + decoder.decompress_vec(&bytes).unwrap() + } else { + let mut bytes = vec![]; + let mut file = + File::open(filename).map_err(|e| format!("Unable to open {}: {}", filename, e))?; + file.read_to_end(&mut bytes) + .map_err(|e| format!("Unable to read {}: {}", filename, e))?; + bytes + }; info!("Using {} spec", T::spec_name()); info!("Type: {:?}", type_str); diff --git a/testing/ef_tests/src/cases/epoch_processing.rs b/testing/ef_tests/src/cases/epoch_processing.rs index 08722c8e4..7546c96a7 100644 --- a/testing/ef_tests/src/cases/epoch_processing.rs +++ b/testing/ef_tests/src/cases/epoch_processing.rs @@ -88,17 +88,23 @@ impl EpochTransition for JustificationAndFinalization { BeaconState::Base(_) => { let mut validator_statuses = base::ValidatorStatuses::new(state, spec)?; validator_statuses.process_attestations(state)?; - base::process_justification_and_finalization( - state, - &validator_statuses.total_balances, - spec, - ) + let justification_and_finalization_state = + base::process_justification_and_finalization( + state, + &validator_statuses.total_balances, + spec, + )?; + justification_and_finalization_state.apply_changes_to_state(state); + Ok(()) } BeaconState::Altair(_) | BeaconState::Merge(_) => { - altair::process_justification_and_finalization( - state, - &altair::ParticipationCache::new(state, spec).unwrap(), - ) + let justification_and_finalization_state = + altair::process_justification_and_finalization( + state, + &altair::ParticipationCache::new(state, spec).unwrap(), + )?; + justification_and_finalization_state.apply_changes_to_state(state); + Ok(()) } } } diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 4f9f4daca..4d90bb161 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -7,7 +7,7 @@ use beacon_chain::{ obtain_indexed_attestation_and_committees_per_slot, VerifiedAttestation, }, test_utils::{BeaconChainHarness, EphemeralHarnessType}, - BeaconChainTypes, CachedHead, + BeaconChainTypes, CachedHead, CountUnrealized, }; use serde_derive::Deserialize; use ssz_derive::Decode; @@ -16,8 +16,8 @@ use std::future::Future; use std::sync::Arc; use std::time::Duration; use types::{ - Attestation, BeaconBlock, BeaconState, Checkpoint, EthSpec, ExecutionBlockHash, ForkName, - Hash256, IndexedAttestation, SignedBeaconBlock, Slot, Uint256, + Attestation, AttesterSlashing, BeaconBlock, BeaconState, Checkpoint, EthSpec, + ExecutionBlockHash, ForkName, Hash256, IndexedAttestation, SignedBeaconBlock, Slot, Uint256, }; #[derive(Default, Debug, PartialEq, Clone, Deserialize, Decode)] @@ -45,17 +45,20 @@ pub struct Checks { justified_checkpoint_root: Option, finalized_checkpoint: Option, best_justified_checkpoint: Option, + u_justified_checkpoint: Option, + u_finalized_checkpoint: Option, proposer_boost_root: Option, } #[derive(Debug, Clone, Deserialize)] #[serde(untagged, deny_unknown_fields)] -pub enum Step { +pub enum Step { Tick { tick: u64 }, ValidBlock { block: B }, MaybeValidBlock { block: B, valid: bool }, Attestation { attestation: A }, PowBlock { pow_block: P }, + AttesterSlashing { attester_slashing: S }, Checks { checks: Box }, } @@ -71,16 +74,13 @@ pub struct ForkChoiceTest { pub description: String, pub anchor_state: BeaconState, pub anchor_block: BeaconBlock, - pub steps: Vec, Attestation, PowBlock>>, + #[allow(clippy::type_complexity)] + pub steps: Vec, Attestation, PowBlock, AttesterSlashing>>, } -/// Spec for fork choice tests, with proposer boosting enabled. -/// -/// This function can be deleted once `ChainSpec::mainnet` enables proposer boosting by default. +/// Spec to be used for fork choice tests. pub fn fork_choice_spec(fork_name: ForkName) -> ChainSpec { - let mut spec = testing_spec::(fork_name); - spec.proposer_score_boost = Some(70); - spec + testing_spec::(fork_name) } impl LoadCase for ForkChoiceTest { @@ -93,7 +93,8 @@ impl LoadCase for ForkChoiceTest { .expect("path must be valid OsStr") .to_string(); let spec = &fork_choice_spec::(fork_name); - let steps: Vec> = yaml_decode_file(&path.join("steps.yaml"))?; + let steps: Vec> = + yaml_decode_file(&path.join("steps.yaml"))?; // Resolve the object names in `steps.yaml` into actual decoded block/attestation objects. let steps = steps .into_iter() @@ -119,6 +120,10 @@ impl LoadCase for ForkChoiceTest { ssz_decode_file(&path.join(format!("{}.ssz_snappy", pow_block))) .map(|pow_block| Step::PowBlock { pow_block }) } + Step::AttesterSlashing { attester_slashing } => { + ssz_decode_file(&path.join(format!("{}.ssz_snappy", attester_slashing))) + .map(|attester_slashing| Step::AttesterSlashing { attester_slashing }) + } Step::Checks { checks } => Ok(Step::Checks { checks }), }) .collect::>()?; @@ -159,7 +164,10 @@ impl Case for ForkChoiceTest { // TODO(merge): re-enable this test before production. // This test is skipped until we can do retrospective confirmations of the terminal // block after an optimistic sync. - if self.description == "block_lookup_failed" { + if self.description == "block_lookup_failed" + //TODO(sean): enable once we implement equivocation logic (https://github.com/sigp/lighthouse/issues/3241) + || self.description == "discard_equivocations" + { return Err(Error::SkippedKnownFailure); }; @@ -172,6 +180,10 @@ impl Case for ForkChoiceTest { } Step::Attestation { attestation } => tester.process_attestation(attestation)?, Step::PowBlock { pow_block } => tester.process_pow_block(pow_block), + //TODO(sean): enable once we implement equivocation logic (https://github.com/sigp/lighthouse/issues/3241) + Step::AttesterSlashing { + attester_slashing: _, + } => (), Step::Checks { checks } => { let Checks { head, @@ -181,6 +193,8 @@ impl Case for ForkChoiceTest { justified_checkpoint_root, finalized_checkpoint, best_justified_checkpoint, + u_justified_checkpoint, + u_finalized_checkpoint, proposer_boost_root, } = checks.as_ref(); @@ -214,6 +228,14 @@ impl Case for ForkChoiceTest { .check_best_justified_checkpoint(*expected_best_justified_checkpoint)?; } + if let Some(expected_u_justified_checkpoint) = u_justified_checkpoint { + tester.check_u_justified_checkpoint(*expected_u_justified_checkpoint)?; + } + + if let Some(expected_u_finalized_checkpoint) = u_finalized_checkpoint { + tester.check_u_finalized_checkpoint(*expected_u_finalized_checkpoint)?; + } + if let Some(expected_proposer_boost_root) = proposer_boost_root { tester.check_expected_proposer_boost_root(*expected_proposer_boost_root)?; } @@ -319,14 +341,18 @@ impl Tester { .chain .canonical_head .fork_choice_write_lock() - .update_time(slot) + .update_time(slot, &self.spec) .unwrap(); } pub fn process_block(&self, block: SignedBeaconBlock, valid: bool) -> Result<(), Error> { let block_root = block.canonical_root(); let block = Arc::new(block); - let result = self.block_on_dangerous(self.harness.chain.process_block(block.clone()))?; + let result = self.block_on_dangerous( + self.harness + .chain + .process_block(block.clone(), CountUnrealized::True), + )?; if result.is_ok() != valid { return Err(Error::DidntFail(format!( "block with root {} was valid={} whilst test expects valid={}. result: {:?}", @@ -384,6 +410,7 @@ impl Tester { &state, PayloadVerificationStatus::Irrelevant, &self.harness.chain.spec, + self.harness.chain.config.count_unrealized.into(), ); if result.is_ok() { @@ -520,6 +547,40 @@ impl Tester { ) } + pub fn check_u_justified_checkpoint( + &self, + expected_checkpoint: Checkpoint, + ) -> Result<(), Error> { + let u_justified_checkpoint = self + .harness + .chain + .canonical_head + .fork_choice_read_lock() + .unrealized_justified_checkpoint(); + check_equal( + "u_justified_checkpoint", + u_justified_checkpoint, + expected_checkpoint, + ) + } + + pub fn check_u_finalized_checkpoint( + &self, + expected_checkpoint: Checkpoint, + ) -> Result<(), Error> { + let u_finalized_checkpoint = self + .harness + .chain + .canonical_head + .fork_choice_read_lock() + .unrealized_finalized_checkpoint(); + check_equal( + "u_finalized_checkpoint", + u_finalized_checkpoint, + expected_checkpoint, + ) + } + pub fn check_expected_proposer_boost_root( &self, expected_proposer_boost_root: Hash256,