From 84e6d71950529a3e2522043f61333ad525012af2 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 23 Jul 2021 00:23:53 +0000 Subject: [PATCH] Tree hash caching and optimisations for Altair (#2459) ## Proposed Changes Remove the remaining Altair `FIXME`s from consensus land. 1. Implement tree hash caching for the participation lists. This required some light type manipulation, including removing the `TreeHash` bound from `CachedTreeHash` which was purely descriptive. 2. Plumb the proposer index through Altair attestation processing, to avoid calculating it for _every_ attestation (potentially 128ms on large networks). This duplicates some work from #2431, but with the aim of getting it in sooner, particularly for the Altair devnets. 3. Removes two FIXMEs related to `superstruct` and cloning, which are unlikely to be particularly detrimental and will be tracked here instead: https://github.com/sigp/superstruct/issues/5 --- .../src/beacon_fork_choice_store.rs | 1 - beacon_node/http_api/src/lib.rs | 1 - consensus/cached_tree_hash/src/lib.rs | 3 +- .../src/per_block_processing.rs | 2 +- .../process_operations.rs | 20 ++- .../src/per_block_processing/tests.rs | 7 + .../types/src/beacon_state/tree_hash_cache.rs | 138 ++++++++++++++---- consensus/types/src/lib.rs | 2 + consensus/types/src/participation_flags.rs | 4 + consensus/types/src/participation_list.rs | 55 +++++++ testing/ef_tests/src/cases/operations.rs | 12 +- 11 files changed, 206 insertions(+), 39 deletions(-) create mode 100644 consensus/types/src/participation_list.rs diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 6345aac27..34903aed5 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -321,7 +321,6 @@ where .deconstruct() .0; - // FIXME(altair): could remove clone with by-value `balances` accessor self.justified_balances = self .store .get_state(&justified_block.state_root(), Some(justified_block.slot())) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 10c5668f0..057c8693d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -939,7 +939,6 @@ pub fn serve( blocking_json_task(move || { block_id .block(&chain) - // FIXME(altair): could avoid clone with by-value accessor .map(|block| block.message().body().attestations().clone()) .map(api_types::GenericResponse::from) }) diff --git a/consensus/cached_tree_hash/src/lib.rs b/consensus/cached_tree_hash/src/lib.rs index d60c920c3..af333f267 100644 --- a/consensus/cached_tree_hash/src/lib.rs +++ b/consensus/cached_tree_hash/src/lib.rs @@ -11,7 +11,6 @@ pub type CacheArena = cache_arena::CacheArena; pub use crate::cache::TreeHashCache; pub use crate::impls::int_log; use ethereum_types::H256 as Hash256; -use tree_hash::TreeHash; #[derive(Debug, PartialEq, Clone)] pub enum Error { @@ -34,7 +33,7 @@ impl From for Error { } /// Trait for types which can make use of a cache to accelerate calculation of their tree hash root. -pub trait CachedTreeHash: TreeHash { +pub trait CachedTreeHash { /// Create a new cache appropriate for use with values of this type. fn new_tree_hash_cache(&self, arena: &mut CacheArena) -> Cache; diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 41f85a889..57714e68d 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -127,7 +127,7 @@ pub fn per_block_processing( process_randao(state, block, verify_signatures, spec)?; process_eth1_data(state, block.body().eth1_data())?; - process_operations(state, block.body(), verify_signatures, spec)?; + process_operations(state, block.body(), proposer_index, verify_signatures, spec)?; if let BeaconBlockRef::Altair(inner) = block { process_sync_aggregate(state, &inner.body.sync_aggregate, proposer_index, spec)?; diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index d576396fb..6a28f5561 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -11,6 +11,7 @@ use types::consts::altair::{PARTICIPATION_FLAG_WEIGHTS, PROPOSER_WEIGHT, WEIGHT_ pub fn process_operations<'a, T: EthSpec>( state: &mut BeaconState, block_body: BeaconBlockBodyRef<'a, T>, + proposer_index: u64, verify_signatures: VerifySignatures, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { @@ -26,7 +27,7 @@ pub fn process_operations<'a, T: EthSpec>( verify_signatures, spec, )?; - process_attestations(state, block_body, verify_signatures, spec)?; + process_attestations(state, block_body, proposer_index, verify_signatures, spec)?; process_deposits(state, block_body.deposits(), spec)?; process_exits(state, block_body.voluntary_exits(), verify_signatures, spec)?; Ok(()) @@ -85,6 +86,7 @@ pub mod altair { pub fn process_attestations( state: &mut BeaconState, attestations: &[Attestation], + proposer_index: u64, verify_signatures: VerifySignatures, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { @@ -92,7 +94,14 @@ pub mod altair { .iter() .enumerate() .try_for_each(|(i, attestation)| { - process_attestation(state, attestation, i, verify_signatures, spec) + process_attestation( + state, + attestation, + i, + proposer_index, + verify_signatures, + spec, + ) }) } @@ -100,6 +109,7 @@ pub mod altair { state: &mut BeaconState, attestation: &Attestation, att_index: usize, + proposer_index: u64, verify_signatures: VerifySignatures, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { @@ -145,9 +155,7 @@ pub mod altair { .safe_mul(WEIGHT_DENOMINATOR)? .safe_div(PROPOSER_WEIGHT)?; let proposer_reward = proposer_reward_numerator.safe_div(proposer_reward_denominator)?; - // FIXME(altair): optimise by passing in proposer_index - let proposer_index = state.get_beacon_proposer_index(state.slot(), spec)?; - increase_balance(state, proposer_index, proposer_reward)?; + increase_balance(state, proposer_index as usize, proposer_reward)?; Ok(()) } } @@ -212,6 +220,7 @@ pub fn process_attester_slashings( pub fn process_attestations<'a, T: EthSpec>( state: &mut BeaconState, block_body: BeaconBlockBodyRef<'a, T>, + proposer_index: u64, verify_signatures: VerifySignatures, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { @@ -223,6 +232,7 @@ pub fn process_attestations<'a, T: EthSpec>( altair::process_attestations( state, block_body.attestations(), + proposer_index, verify_signatures, spec, )?; diff --git a/consensus/state_processing/src/per_block_processing/tests.rs b/consensus/state_processing/src/per_block_processing/tests.rs index 63e57bddc..fe1537a50 100644 --- a/consensus/state_processing/src/per_block_processing/tests.rs +++ b/consensus/state_processing/src/per_block_processing/tests.rs @@ -337,6 +337,7 @@ fn invalid_attestation_no_committee_for_index() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); @@ -368,6 +369,7 @@ fn invalid_attestation_wrong_justified_checkpoint() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); @@ -400,6 +402,7 @@ fn invalid_attestation_bad_aggregation_bitfield_len() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); @@ -425,6 +428,7 @@ fn invalid_attestation_bad_signature() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); @@ -456,6 +460,7 @@ fn invalid_attestation_included_too_early() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); @@ -491,6 +496,7 @@ fn invalid_attestation_included_too_late() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); @@ -522,6 +528,7 @@ fn invalid_attestation_target_epoch_slot_mismatch() { let result = process_operations::process_attestations( &mut state, head_block.body(), + head_block.proposer_index(), VerifySignatures::True, &spec, ); diff --git a/consensus/types/src/beacon_state/tree_hash_cache.rs b/consensus/types/src/beacon_state/tree_hash_cache.rs index 22b6ace21..3fc81ab2d 100644 --- a/consensus/types/src/beacon_state/tree_hash_cache.rs +++ b/consensus/types/src/beacon_state/tree_hash_cache.rs @@ -3,7 +3,9 @@ #![allow(clippy::indexing_slicing)] use super::Error; -use crate::{BeaconState, EthSpec, Hash256, Slot, Unsigned, Validator}; +use crate::{ + BeaconState, EthSpec, Hash256, ParticipationFlags, ParticipationList, Slot, Unsigned, Validator, +}; use cached_tree_hash::{int_log, CacheArena, CachedTreeHash, TreeHashCache}; use rayon::prelude::*; use ssz_derive::{Decode, Encode}; @@ -139,6 +141,9 @@ pub struct BeaconTreeHashCacheInner { randao_mixes: TreeHashCache, slashings: TreeHashCache, eth1_data_votes: Eth1DataVotesTreeHashCache, + // Participation caches + previous_epoch_participation: ParticipationTreeHashCache, + current_epoch_participation: ParticipationTreeHashCache, } impl BeaconTreeHashCacheInner { @@ -163,6 +168,11 @@ impl BeaconTreeHashCacheInner { let mut slashings_arena = CacheArena::default(); let slashings = state.slashings().new_tree_hash_cache(&mut slashings_arena); + let previous_epoch_participation = + ParticipationTreeHashCache::new(state, BeaconState::previous_epoch_participation); + let current_epoch_participation = + ParticipationTreeHashCache::new(state, BeaconState::current_epoch_participation); + Self { previous_state: None, validators, @@ -176,6 +186,8 @@ impl BeaconTreeHashCacheInner { randao_mixes, slashings, eth1_data_votes: Eth1DataVotesTreeHashCache::new(state), + previous_epoch_participation, + current_epoch_participation, } } @@ -264,31 +276,25 @@ impl BeaconTreeHashCacheInner { )?; // Participation - match state { - BeaconState::Base(state) => { - hasher.write( - state - .previous_epoch_attestations - .tree_hash_root() - .as_bytes(), - )?; - hasher.write(state.current_epoch_attestations.tree_hash_root().as_bytes())?; - } - // FIXME(altair): add a cache to accelerate hashing of these fields - BeaconState::Altair(state) => { - hasher.write( - state - .previous_epoch_participation - .tree_hash_root() - .as_bytes(), - )?; - hasher.write( - state - .current_epoch_participation - .tree_hash_root() - .as_bytes(), - )?; - } + if let BeaconState::Base(state) = state { + hasher.write( + state + .previous_epoch_attestations + .tree_hash_root() + .as_bytes(), + )?; + hasher.write(state.current_epoch_attestations.tree_hash_root().as_bytes())?; + } else { + hasher.write( + self.previous_epoch_participation + .recalculate_tree_hash_root(state.previous_epoch_participation()?)? + .as_bytes(), + )?; + hasher.write( + self.current_epoch_participation + .recalculate_tree_hash_root(state.current_epoch_participation()?)? + .as_bytes(), + )?; } hasher.write(state.justification_bits().tree_hash_root().as_bytes())?; @@ -506,6 +512,60 @@ impl ParallelValidatorTreeHash { } } +#[derive(Debug, PartialEq, Clone)] +pub struct ParticipationTreeHashCache { + inner: Option, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct ParticipationTreeHashCacheInner { + arena: CacheArena, + tree_hash_cache: TreeHashCache, +} + +impl ParticipationTreeHashCache { + /// Initialize a new cache for the participation list returned by `field` (if any). + fn new( + state: &BeaconState, + field: impl FnOnce( + &BeaconState, + ) -> Result< + &VariableList, + Error, + >, + ) -> Self { + let inner = field(state).map(ParticipationTreeHashCacheInner::new).ok(); + Self { inner } + } + + /// Compute the tree hash root for the given `epoch_participation`. + /// + /// This function will initialize the inner cache if necessary (e.g. when crossing the fork). + fn recalculate_tree_hash_root( + &mut self, + epoch_participation: &VariableList, + ) -> Result { + let cache = self + .inner + .get_or_insert_with(|| ParticipationTreeHashCacheInner::new(epoch_participation)); + ParticipationList::new(epoch_participation) + .recalculate_tree_hash_root(&mut cache.arena, &mut cache.tree_hash_cache) + .map_err(Into::into) + } +} + +impl ParticipationTreeHashCacheInner { + fn new(epoch_participation: &VariableList) -> Self { + let mut arena = CacheArena::default(); + let tree_hash_cache = + ParticipationList::new(epoch_participation).new_tree_hash_cache(&mut arena); + ParticipationTreeHashCacheInner { + arena, + tree_hash_cache, + } + } +} + #[cfg(feature = "arbitrary-fuzz")] impl arbitrary::Arbitrary for BeaconTreeHashCache { fn arbitrary(_u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { @@ -516,6 +576,7 @@ impl arbitrary::Arbitrary for BeaconTreeHashCache { #[cfg(test)] mod test { use super::*; + use crate::MainnetEthSpec; #[test] fn validator_node_count() { @@ -524,4 +585,29 @@ mod test { let _cache = v.new_tree_hash_cache(&mut arena); assert_eq!(arena.backing_len(), NODES_PER_VALIDATOR); } + + #[test] + fn participation_flags() { + type N = ::ValidatorRegistryLimit; + let len = 65; + let mut test_flag = ParticipationFlags::default(); + test_flag.add_flag(0).unwrap(); + let epoch_participation = VariableList::<_, N>::new(vec![test_flag; len]).unwrap(); + + let mut cache = ParticipationTreeHashCache { inner: None }; + + let cache_root = cache + .recalculate_tree_hash_root(&epoch_participation) + .unwrap(); + let recalc_root = cache + .recalculate_tree_hash_root(&epoch_participation) + .unwrap(); + + assert_eq!(cache_root, recalc_root, "recalculated root should match"); + assert_eq!( + cache_root, + epoch_participation.tree_hash_root(), + "cached root should match uncached" + ); + } } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 3e0aa5367..7df65cb26 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -62,6 +62,7 @@ pub mod voluntary_exit; pub mod slot_epoch_macros; pub mod config_and_preset; pub mod participation_flags; +pub mod participation_list; pub mod preset; pub mod slot_epoch; pub mod subnet_id; @@ -113,6 +114,7 @@ pub use crate::graffiti::{Graffiti, GRAFFITI_BYTES_LEN}; pub use crate::historical_batch::HistoricalBatch; pub use crate::indexed_attestation::IndexedAttestation; pub use crate::participation_flags::ParticipationFlags; +pub use crate::participation_list::ParticipationList; pub use crate::pending_attestation::PendingAttestation; pub use crate::preset::{AltairPreset, BasePreset}; pub use crate::proposer_slashing::ProposerSlashing; diff --git a/consensus/types/src/participation_flags.rs b/consensus/types/src/participation_flags.rs index c0ccb6db2..476e7757b 100644 --- a/consensus/types/src/participation_flags.rs +++ b/consensus/types/src/participation_flags.rs @@ -28,6 +28,10 @@ impl ParticipationFlags { let mask = 1u8.safe_shl(flag_index as u32)?; Ok(self.bits & mask == mask) } + + pub fn into_u8(self) -> u8 { + self.bits + } } /// Decode implementation that transparently behaves like the inner `u8`. diff --git a/consensus/types/src/participation_list.rs b/consensus/types/src/participation_list.rs new file mode 100644 index 000000000..b81dd7965 --- /dev/null +++ b/consensus/types/src/participation_list.rs @@ -0,0 +1,55 @@ +#![allow(clippy::integer_arithmetic)] + +use crate::{Hash256, ParticipationFlags, Unsigned, VariableList}; +use cached_tree_hash::{int_log, CacheArena, CachedTreeHash, Error, TreeHashCache}; +use tree_hash::{mix_in_length, BYTES_PER_CHUNK}; + +/// Wrapper type allowing the implementation of `CachedTreeHash`. +#[derive(Debug)] +pub struct ParticipationList<'a, N: Unsigned> { + pub inner: &'a VariableList, +} + +impl<'a, N: Unsigned> ParticipationList<'a, N> { + pub fn new(inner: &'a VariableList) -> Self { + Self { inner } + } +} + +impl<'a, N: Unsigned> CachedTreeHash for ParticipationList<'a, N> { + fn new_tree_hash_cache(&self, arena: &mut CacheArena) -> TreeHashCache { + TreeHashCache::new( + arena, + int_log(N::to_usize() / BYTES_PER_CHUNK), + leaf_count(self.inner.len()), + ) + } + + fn recalculate_tree_hash_root( + &self, + arena: &mut CacheArena, + cache: &mut TreeHashCache, + ) -> Result { + Ok(mix_in_length( + &cache.recalculate_merkle_root(arena, leaf_iter(&self.inner))?, + self.inner.len(), + )) + } +} + +pub fn leaf_count(len: usize) -> usize { + (len + BYTES_PER_CHUNK - 1) / BYTES_PER_CHUNK +} + +pub fn leaf_iter( + values: &[ParticipationFlags], +) -> impl Iterator + ExactSizeIterator + '_ { + values.chunks(BYTES_PER_CHUNK).map(|xs| { + // Zero-pad chunks on the right. + let mut chunk = [0u8; BYTES_PER_CHUNK]; + for (byte, x) in chunk.iter_mut().zip(xs) { + *byte = x.into_u8(); + } + chunk + }) +} diff --git a/testing/ef_tests/src/cases/operations.rs b/testing/ef_tests/src/cases/operations.rs index 0f63d4eb0..c9fbec0cb 100644 --- a/testing/ef_tests/src/cases/operations.rs +++ b/testing/ef_tests/src/cases/operations.rs @@ -67,13 +67,19 @@ impl Operation for Attestation { state: &mut BeaconState, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { + let proposer_index = state.get_beacon_proposer_index(state.slot(), spec)? as u64; match state { BeaconState::Base(_) => { base::process_attestations(state, &[self.clone()], VerifySignatures::True, spec) } - BeaconState::Altair(_) => { - altair::process_attestation(state, self, 0, VerifySignatures::True, spec) - } + BeaconState::Altair(_) => altair::process_attestation( + state, + self, + 0, + proposer_index, + VerifySignatures::True, + spec, + ), } } }