diff --git a/Cargo.lock b/Cargo.lock index fc7b49de0..a52bd3790 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1107,6 +1107,7 @@ dependencies = [ "eth2", "eth2_config", "execution_layer", + "futures", "genesis", "http_api", "http_metrics", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 79fbe577d..b05ba01ee 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -38,6 +38,7 @@ use crate::light_client_finality_update_verification::{ use crate::light_client_optimistic_update_verification::{ Error as LightClientOptimisticUpdateError, VerifiedLightClientOptimisticUpdate, }; +use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::BackgroundMigrator; use crate::naive_aggregation_pool::{ AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool, @@ -339,6 +340,8 @@ struct PartialBeaconBlock { bls_to_execution_changes: Vec, } +pub type LightClientProducerEvent = (Hash256, Slot, SyncAggregate); + pub type BeaconForkChoice = ForkChoice< BeaconForkChoiceStore< ::EthSpec, @@ -420,10 +423,6 @@ pub struct BeaconChain { /// Maintains a record of which validators we've seen BLS to execution changes for. pub(crate) observed_bls_to_execution_changes: Mutex>, - /// The most recently validated light client finality update received on gossip. - pub latest_seen_finality_update: Mutex>>, - /// The most recently validated light client optimistic update received on gossip. - pub latest_seen_optimistic_update: Mutex>>, /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, /// Interfaces with the execution client. @@ -466,6 +465,10 @@ pub struct BeaconChain { pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, + /// A cache used to produce light_client server messages + pub light_client_server_cache: LightClientServerCache, + /// Sender to signal the light_client server to produce new updates + pub light_client_server_tx: Option>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, @@ -1344,6 +1347,19 @@ impl BeaconChain { self.state_at_slot(load_slot, StateSkipConfig::WithoutStateRoots) } + pub fn recompute_and_cache_light_client_updates( + &self, + (parent_root, slot, sync_aggregate): LightClientProducerEvent, + ) -> Result<(), Error> { + self.light_client_server_cache.recompute_and_cache_updates( + &self.log, + self.store.clone(), + &parent_root, + slot, + &sync_aggregate, + ) + } + /// Returns the current heads of the `BeaconChain`. For the canonical head, see `Self::head`. /// /// Returns `(block_root, block_slot)`. @@ -3521,6 +3537,20 @@ impl BeaconChain { }; let current_finalized_checkpoint = state.finalized_checkpoint(); + // compute state proofs for light client updates before inserting the state into the + // snapshot cache. + if self.config.enable_light_client_server { + self.light_client_server_cache + .cache_state_data( + &self.spec, block, block_root, + // mutable reference on the state is needed to compute merkle proofs + &mut state, + ) + .unwrap_or_else(|e| { + error!(self.log, "error caching light_client data {:?}", e); + }); + } + self.snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .ok_or(Error::SnapshotCacheLockTimeout) @@ -3893,6 +3923,28 @@ impl BeaconChain { })); } } + + // Do not trigger light_client server update producer for old blocks, to extra work + // during sync. + if self.config.enable_light_client_server + && block_delay_total < self.slot_clock.slot_duration() * 32 + { + if let Some(mut light_client_server_tx) = self.light_client_server_tx.clone() { + if let Ok(sync_aggregate) = block.body().sync_aggregate() { + if let Err(e) = light_client_server_tx.try_send(( + block.parent_root(), + block.slot(), + sync_aggregate.clone(), + )) { + warn!( + self.log, + "Failed to send light_client server event"; + "error" => ?e + ); + } + } + } + } } // For the current and next epoch of this state, ensure we have the shuffling from this diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 330036d43..5e06692b8 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,4 +1,6 @@ -use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; +use crate::beacon_chain::{ + CanonicalHead, LightClientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY, +}; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::data_availability_checker::DataAvailabilityChecker; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; @@ -6,6 +8,7 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::head_tracker::HeadTracker; +use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; @@ -87,6 +90,7 @@ pub struct BeaconChainBuilder { event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, + light_client_server_tx: Option>>, head_tracker: Option, validator_pubkey_cache: Option>, spec: ChainSpec, @@ -129,6 +133,7 @@ where event_handler: None, slot_clock: None, shutdown_sender: None, + light_client_server_tx: None, head_tracker: None, validator_pubkey_cache: None, spec: TEthSpec::default_spec(), @@ -603,6 +608,15 @@ where self } + /// Sets a `Sender` to allow the beacon chain to trigger light_client update production. + pub fn light_client_server_tx( + mut self, + sender: Sender>, + ) -> Self { + self.light_client_server_tx = Some(sender); + self + } + /// Creates a new, empty operation pool. fn empty_op_pool(mut self) -> Self { self.op_pool = Some(OperationPool::new()); @@ -887,8 +901,6 @@ where observed_proposer_slashings: <_>::default(), observed_attester_slashings: <_>::default(), observed_bls_to_execution_changes: <_>::default(), - latest_seen_finality_update: <_>::default(), - latest_seen_optimistic_update: <_>::default(), eth1_chain: self.eth1_chain, execution_layer: self.execution_layer, genesis_validators_root, @@ -916,6 +928,8 @@ where validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), early_attester_cache: <_>::default(), + light_client_server_cache: LightClientServerCache::new(), + light_client_server_tx: self.light_client_server_tx, shutdown_sender: self .shutdown_sender .ok_or("Cannot build without a shutdown sender.")?, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 7bcb764ab..23e17a6ef 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -83,6 +83,8 @@ pub struct ChainConfig { pub progressive_balances_mode: ProgressiveBalancesMode, /// Number of epochs between each migration of data from the hot database to the freezer. pub epochs_per_migration: u64, + /// When set to true Light client server computes and caches state proofs for serving updates + pub enable_light_client_server: bool, } impl Default for ChainConfig { @@ -114,6 +116,7 @@ impl Default for ChainConfig { always_prepare_payload: false, progressive_balances_mode: ProgressiveBalancesMode::Fast, epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, + enable_light_client_server: false, } } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index ce841b106..522009b1b 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -32,6 +32,7 @@ pub mod historical_blocks; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; +mod light_client_server_cache; pub mod merge_readiness; pub mod metrics; pub mod migrate; @@ -61,8 +62,8 @@ pub mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse, BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification, StateSkipConfig, - WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification, + StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs index 791d63ccf..35863aa05 100644 --- a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs @@ -1,11 +1,9 @@ -use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use crate::{BeaconChain, BeaconChainTypes}; use derivative::Derivative; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; -use types::{ - light_client_update::Error as LightClientUpdateError, LightClientFinalityUpdate, Slot, -}; +use types::LightClientFinalityUpdate; /// Returned when a light client finality update was not successfully verified. It might not have been verified for /// two reasons: @@ -16,8 +14,6 @@ use types::{ /// (the `BeaconChainError` variant) #[derive(Debug, AsRefStr)] pub enum Error { - /// Light client finality update message with a lower or equal finalized_header slot already forwarded. - FinalityUpdateAlreadySeen, /// The light client finality message was received is prior to one-third of slot duration passage. (with /// respect to the gossip clock disparity and slot clock duration). /// @@ -26,29 +22,11 @@ pub enum Error { /// Assuming the local clock is correct, the peer has sent an invalid message. TooEarly, /// Light client finality update message does not match the locally constructed one. - /// - /// ## Peer Scoring - /// InvalidLightClientFinalityUpdate, /// Signature slot start time is none. SigSlotStartIsNone, /// Failed to construct a LightClientFinalityUpdate from state. FailedConstructingUpdate, - /// Beacon chain error occurred. - BeaconChainError(BeaconChainError), - LightClientUpdateError(LightClientUpdateError), -} - -impl From for Error { - fn from(e: BeaconChainError) -> Self { - Error::BeaconChainError(e) - } -} - -impl From for Error { - fn from(e: LightClientUpdateError) -> Self { - Error::LightClientUpdateError(e) - } } /// Wraps a `LightClientFinalityUpdate` that has been verified for propagation on the gossip network. @@ -63,71 +41,34 @@ impl VerifiedLightClientFinalityUpdate { /// Returns `Ok(Self)` if the `light_client_finality_update` is valid to be (re)published on the gossip /// network. pub fn verify( - light_client_finality_update: LightClientFinalityUpdate, + rcv_finality_update: LightClientFinalityUpdate, chain: &BeaconChain, seen_timestamp: Duration, ) -> Result { - let gossiped_finality_slot = light_client_finality_update.finalized_header.beacon.slot; - let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); - let signature_slot = light_client_finality_update.signature_slot; - let start_time = chain.slot_clock.start_of(signature_slot); - let mut latest_seen_finality_update = chain.latest_seen_finality_update.lock(); - - let head = chain.canonical_head.cached_head(); - let head_block = &head.snapshot.beacon_block; - let attested_block_root = head_block.message().parent_root(); - let attested_block = chain - .get_blinded_block(&attested_block_root)? - .ok_or(Error::FailedConstructingUpdate)?; - let mut attested_state = chain - .get_state(&attested_block.state_root(), Some(attested_block.slot()))? - .ok_or(Error::FailedConstructingUpdate)?; - - let finalized_block_root = attested_state.finalized_checkpoint().root; - let finalized_block = chain - .get_blinded_block(&finalized_block_root)? - .ok_or(Error::FailedConstructingUpdate)?; - let latest_seen_finality_update_slot = match latest_seen_finality_update.as_ref() { - Some(update) => update.finalized_header.beacon.slot, - None => Slot::new(0), - }; - - // verify that no other finality_update with a lower or equal - // finalized_header.slot was already forwarded on the network - if gossiped_finality_slot <= latest_seen_finality_update_slot { - return Err(Error::FinalityUpdateAlreadySeen); - } - // verify that enough time has passed for the block to have been propagated - match start_time { - Some(time) => { - if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() - < time + one_third_slot_duration - { - return Err(Error::TooEarly); - } - } - None => return Err(Error::SigSlotStartIsNone), + let start_time = chain + .slot_clock + .start_of(rcv_finality_update.signature_slot) + .ok_or(Error::SigSlotStartIsNone)?; + let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); + if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() + < start_time + one_third_slot_duration + { + return Err(Error::TooEarly); } - let head_state = &head.snapshot.beacon_state; - let finality_update = LightClientFinalityUpdate::new( - &chain.spec, - head_state, - head_block, - &mut attested_state, - &finalized_block, - )?; + let latest_finality_update = chain + .light_client_server_cache + .get_latest_finality_update() + .ok_or(Error::FailedConstructingUpdate)?; // verify that the gossiped finality update is the same as the locally constructed one. - if finality_update != light_client_finality_update { + if latest_finality_update != rcv_finality_update { return Err(Error::InvalidLightClientFinalityUpdate); } - *latest_seen_finality_update = Some(light_client_finality_update.clone()); - Ok(Self { - light_client_finality_update, + light_client_finality_update: rcv_finality_update, seen_timestamp, }) } diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index 374cc9a77..813b112db 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -1,12 +1,10 @@ -use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use crate::{BeaconChain, BeaconChainTypes}; use derivative::Derivative; use eth2::types::Hash256; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; -use types::{ - light_client_update::Error as LightClientUpdateError, LightClientOptimisticUpdate, Slot, -}; +use types::LightClientOptimisticUpdate; /// Returned when a light client optimistic update was not successfully verified. It might not have been verified for /// two reasons: @@ -17,8 +15,6 @@ use types::{ /// (the `BeaconChainError` variant) #[derive(Debug, AsRefStr)] pub enum Error { - /// Light client optimistic update message with a lower or equal optimistic_header slot already forwarded. - OptimisticUpdateAlreadySeen, /// The light client optimistic message was received is prior to one-third of slot duration passage. (with /// respect to the gossip clock disparity and slot clock duration). /// @@ -27,9 +23,6 @@ pub enum Error { /// Assuming the local clock is correct, the peer has sent an invalid message. TooEarly, /// Light client optimistic update message does not match the locally constructed one. - /// - /// ## Peer Scoring - /// InvalidLightClientOptimisticUpdate, /// Signature slot start time is none. SigSlotStartIsNone, @@ -37,21 +30,6 @@ pub enum Error { FailedConstructingUpdate, /// Unknown block with parent root. UnknownBlockParentRoot(Hash256), - /// Beacon chain error occurred. - BeaconChainError(BeaconChainError), - LightClientUpdateError(LightClientUpdateError), -} - -impl From for Error { - fn from(e: BeaconChainError) -> Self { - Error::BeaconChainError(e) - } -} - -impl From for Error { - fn from(e: LightClientUpdateError) -> Self { - Error::LightClientUpdateError(e) - } } /// Wraps a `LightClientOptimisticUpdate` that has been verified for propagation on the gossip network. @@ -67,52 +45,27 @@ impl VerifiedLightClientOptimisticUpdate { /// Returns `Ok(Self)` if the `light_client_optimistic_update` is valid to be (re)published on the gossip /// network. pub fn verify( - light_client_optimistic_update: LightClientOptimisticUpdate, + rcv_optimistic_update: LightClientOptimisticUpdate, chain: &BeaconChain, seen_timestamp: Duration, ) -> Result { - let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.beacon.slot; + // verify that enough time has passed for the block to have been propagated + let start_time = chain + .slot_clock + .start_of(rcv_optimistic_update.signature_slot) + .ok_or(Error::SigSlotStartIsNone)?; let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); - let signature_slot = light_client_optimistic_update.signature_slot; - let start_time = chain.slot_clock.start_of(signature_slot); - let mut latest_seen_optimistic_update = chain.latest_seen_optimistic_update.lock(); + if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() + < start_time + one_third_slot_duration + { + return Err(Error::TooEarly); + } let head = chain.canonical_head.cached_head(); let head_block = &head.snapshot.beacon_block; - let attested_block_root = head_block.message().parent_root(); - let attested_block = chain - .get_blinded_block(&attested_block_root)? - .ok_or(Error::FailedConstructingUpdate)?; - - let attested_state = chain - .get_state(&attested_block.state_root(), Some(attested_block.slot()))? - .ok_or(Error::FailedConstructingUpdate)?; - let latest_seen_optimistic_update_slot = match latest_seen_optimistic_update.as_ref() { - Some(update) => update.attested_header.beacon.slot, - None => Slot::new(0), - }; - - // verify that no other optimistic_update with a lower or equal - // optimistic_header.slot was already forwarded on the network - if gossiped_optimistic_slot <= latest_seen_optimistic_update_slot { - return Err(Error::OptimisticUpdateAlreadySeen); - } - - // verify that enough time has passed for the block to have been propagated - match start_time { - Some(time) => { - if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() - < time + one_third_slot_duration - { - return Err(Error::TooEarly); - } - } - None => return Err(Error::SigSlotStartIsNone), - } - // check if we can process the optimistic update immediately // otherwise queue - let canonical_root = light_client_optimistic_update + let canonical_root = rcv_optimistic_update .attested_header .beacon .canonical_root(); @@ -121,19 +74,20 @@ impl VerifiedLightClientOptimisticUpdate { return Err(Error::UnknownBlockParentRoot(canonical_root)); } - let optimistic_update = - LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?; + let latest_optimistic_update = chain + .light_client_server_cache + .get_latest_optimistic_update() + .ok_or(Error::FailedConstructingUpdate)?; // verify that the gossiped optimistic update is the same as the locally constructed one. - if optimistic_update != light_client_optimistic_update { + if latest_optimistic_update != rcv_optimistic_update { return Err(Error::InvalidLightClientOptimisticUpdate); } - *latest_seen_optimistic_update = Some(light_client_optimistic_update.clone()); - + let parent_root = rcv_optimistic_update.attested_header.beacon.parent_root; Ok(Self { - light_client_optimistic_update, - parent_root: canonical_root, + light_client_optimistic_update: rcv_optimistic_update, + parent_root, seen_timestamp, }) } diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs new file mode 100644 index 000000000..1397e3fc9 --- /dev/null +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -0,0 +1,256 @@ +use crate::errors::BeaconChainError; +use crate::{metrics, BeaconChainTypes, BeaconStore}; +use parking_lot::{Mutex, RwLock}; +use slog::{debug, Logger}; +use ssz_types::FixedVector; +use std::num::NonZeroUsize; +use types::light_client_update::{FinalizedRootProofLen, FINALIZED_ROOT_INDEX}; +use types::non_zero_usize::new_non_zero_usize; +use types::{ + BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate, + LightClientHeader, LightClientOptimisticUpdate, Slot, SyncAggregate, +}; + +/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the +/// prev block cache are very small 32 * (6 + 1) = 224 bytes. 32 is an arbitrary number that +/// represents unlikely re-orgs, while keeping the cache very small. +const PREV_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32); + +/// This cache computes light client messages ahead of time, required to satisfy p2p and API +/// requests. These messages include proofs on historical states, so on-demand computation is +/// expensive. +/// +pub struct LightClientServerCache { + /// Tracks a single global latest finality update out of all imported blocks. + /// + /// TODO: Active discussion with @etan-status if this cache should be fork aware to return + /// latest canonical (update with highest signature slot, where its attested header is part of + /// the head chain) instead of global latest (update with highest signature slot, out of all + /// branches). + latest_finality_update: RwLock>>, + /// Tracks a single global latest optimistic update out of all imported blocks. + latest_optimistic_update: RwLock>>, + /// Caches state proofs by block root + prev_block_cache: Mutex>, +} + +impl LightClientServerCache { + pub fn new() -> Self { + Self { + latest_finality_update: None.into(), + latest_optimistic_update: None.into(), + prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(), + } + } + + /// Compute and cache state proofs for latter production of light-client messages. Does not + /// trigger block replay. + pub fn cache_state_data( + &self, + spec: &ChainSpec, + block: BeaconBlockRef, + block_root: Hash256, + block_post_state: &mut BeaconState, + ) -> Result<(), BeaconChainError> { + let _timer = metrics::start_timer(&metrics::LIGHT_CLIENT_SERVER_CACHE_STATE_DATA_TIMES); + + // Only post-altair + if spec.fork_name_at_slot::(block.slot()) == ForkName::Base { + return Ok(()); + } + + // Persist in memory cache for a descendent block + + let cached_data = LightClientCachedData::from_state(block_post_state)?; + self.prev_block_cache.lock().put(block_root, cached_data); + + Ok(()) + } + + /// Given a block with a SyncAggregte computes better or more recent light client updates. The + /// results are cached either on disk or memory to be served via p2p and rest API + pub fn recompute_and_cache_updates( + &self, + log: &Logger, + store: BeaconStore, + block_parent_root: &Hash256, + block_slot: Slot, + sync_aggregate: &SyncAggregate, + ) -> Result<(), BeaconChainError> { + let _timer = + metrics::start_timer(&metrics::LIGHT_CLIENT_SERVER_CACHE_RECOMPUTE_UPDATES_TIMES); + + let signature_slot = block_slot; + let attested_block_root = block_parent_root; + + let attested_block = store.get_blinded_block(attested_block_root)?.ok_or( + BeaconChainError::DBInconsistent(format!( + "Block not available {:?}", + attested_block_root + )), + )?; + + let cached_parts = self.get_or_compute_prev_block_cache( + store.clone(), + attested_block_root, + &attested_block.state_root(), + attested_block.slot(), + )?; + + let attested_slot = attested_block.slot(); + + // Spec: Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest + // attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice + let is_latest_optimistic = match &self.latest_optimistic_update.read().clone() { + Some(latest_optimistic_update) => { + is_latest_optimistic_update(latest_optimistic_update, attested_slot, signature_slot) + } + None => true, + }; + if is_latest_optimistic { + // can create an optimistic update, that is more recent + *self.latest_optimistic_update.write() = Some(LightClientOptimisticUpdate { + attested_header: block_to_light_client_header(attested_block.message()), + sync_aggregate: sync_aggregate.clone(), + signature_slot, + }); + }; + + // Spec: Full nodes SHOULD provide the LightClientFinalityUpdate with the highest + // attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice + let is_latest_finality = match &self.latest_finality_update.read().clone() { + Some(latest_finality_update) => { + is_latest_finality_update(latest_finality_update, attested_slot, signature_slot) + } + None => true, + }; + if is_latest_finality & !cached_parts.finalized_block_root.is_zero() { + // Immediately after checkpoint sync the finalized block may not be available yet. + if let Some(finalized_block) = + store.get_blinded_block(&cached_parts.finalized_block_root)? + { + *self.latest_finality_update.write() = Some(LightClientFinalityUpdate { + // TODO: may want to cache this result from latest_optimistic_update if producing a + // light_client header becomes expensive + attested_header: block_to_light_client_header(attested_block.message()), + finalized_header: block_to_light_client_header(finalized_block.message()), + finality_branch: cached_parts.finality_branch.clone(), + sync_aggregate: sync_aggregate.clone(), + signature_slot, + }); + } else { + debug!( + log, + "Finalized block not available in store for light_client server"; + "finalized_block_root" => format!("{}", cached_parts.finalized_block_root), + ); + } + } + + Ok(()) + } + + /// Retrieves prev block cached data from cache. If not present re-computes by retrieving the + /// parent state, and inserts an entry to the cache. + /// + /// In separate function since FnOnce of get_or_insert can not be fallible. + fn get_or_compute_prev_block_cache( + &self, + store: BeaconStore, + block_root: &Hash256, + block_state_root: &Hash256, + block_slot: Slot, + ) -> Result { + // Attempt to get the value from the cache first. + if let Some(cached_parts) = self.prev_block_cache.lock().get(block_root) { + return Ok(cached_parts.clone()); + } + metrics::inc_counter(&metrics::LIGHT_CLIENT_SERVER_CACHE_PREV_BLOCK_CACHE_MISS); + + // Compute the value, handling potential errors. + let mut state = store + .get_state(block_state_root, Some(block_slot))? + .ok_or_else(|| { + BeaconChainError::DBInconsistent(format!("Missing state {:?}", block_state_root)) + })?; + let new_value = LightClientCachedData::from_state(&mut state)?; + + // Insert value and return owned + self.prev_block_cache + .lock() + .put(*block_root, new_value.clone()); + Ok(new_value) + } + + pub fn get_latest_finality_update(&self) -> Option> { + self.latest_finality_update.read().clone() + } + + pub fn get_latest_optimistic_update(&self) -> Option> { + self.latest_optimistic_update.read().clone() + } +} + +impl Default for LightClientServerCache { + fn default() -> Self { + Self::new() + } +} + +type FinalityBranch = FixedVector; + +#[derive(Clone)] +struct LightClientCachedData { + finality_branch: FinalityBranch, + finalized_block_root: Hash256, +} + +impl LightClientCachedData { + fn from_state(state: &mut BeaconState) -> Result { + Ok(Self { + finality_branch: state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?.into(), + finalized_block_root: state.finalized_checkpoint().root, + }) + } +} + +// Implements spec priorization rules: +// > Full nodes SHOULD provide the LightClientFinalityUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot) +// +// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_finality_update +fn is_latest_finality_update( + prev: &LightClientFinalityUpdate, + attested_slot: Slot, + signature_slot: Slot, +) -> bool { + if attested_slot > prev.attested_header.beacon.slot { + true + } else { + attested_slot == prev.attested_header.beacon.slot && signature_slot > prev.signature_slot + } +} + +// Implements spec priorization rules: +// > Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot) +// +// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_optimistic_update +fn is_latest_optimistic_update( + prev: &LightClientOptimisticUpdate, + attested_slot: Slot, + signature_slot: Slot, +) -> bool { + if attested_slot > prev.attested_header.beacon.slot { + true + } else { + attested_slot == prev.attested_header.beacon.slot && signature_slot > prev.signature_slot + } +} + +fn block_to_light_client_header( + block: BeaconBlockRef>, +) -> LightClientHeader { + // TODO: make fork aware + LightClientHeader { + beacon: block.block_header(), + } +} diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ad095b37b..24c05e01f 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1128,6 +1128,22 @@ lazy_static! { // Create a custom bucket list for greater granularity in block delay Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) ); + + /* + * light_client server metrics + */ + pub static ref LIGHT_CLIENT_SERVER_CACHE_STATE_DATA_TIMES: Result = try_create_histogram( + "beacon_light_client_server_cache_state_data_seconds", + "Time taken to produce and cache state data", + ); + pub static ref LIGHT_CLIENT_SERVER_CACHE_RECOMPUTE_UPDATES_TIMES: Result = try_create_histogram( + "beacon_light_client_server_cache_recompute_updates_seconds", + "Time taken to recompute and cache updates", + ); + pub static ref LIGHT_CLIENT_SERVER_CACHE_PREV_BLOCK_CACHE_MISS: Result = try_create_int_counter( + "beacon_light_client_server_cache_prev_block_cache_miss", + "Count of prev block cache misses", + ); } /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 9191509d3..20f3e21d0 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -82,12 +82,15 @@ pub enum ReprocessQueueMessage { /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), - /// A block that was successfully processed. We use this to handle attestations and light client updates + /// A block that was successfully processed. We use this to handle attestations updates /// for unknown blocks. BlockImported { block_root: Hash256, parent_root: Hash256, }, + /// A new `LightClientOptimisticUpdate` has been produced. We use this to handle light client + /// updates for unknown parent blocks. + NewLightClientOptimisticUpdate { parent_root: Hash256 }, /// An unaggregated attestation that references an unknown block. UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. @@ -688,6 +691,8 @@ impl ReprocessQueue { ); } } + } + InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { // Unqueue the light client optimistic updates we have for this root, if any. if let Some(queued_lc_id) = self .awaiting_lc_updates_per_parent_root diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 26c53154e..0160cad4b 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -25,6 +25,7 @@ serde = { workspace = true } error-chain = { workspace = true } slog = { workspace = true } tokio = { workspace = true } +futures = { workspace = true } dirs = { workspace = true } eth1 = { workspace = true } eth2 = { workspace = true } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 9c88eccc7..444a27750 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,4 +1,7 @@ use crate::address_change_broadcast::broadcast_address_changes_at_capella; +use crate::compute_light_client_updates::{ + compute_light_client_updates, LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY, +}; use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; @@ -7,6 +10,7 @@ use beacon_chain::data_availability_checker::start_availability_cache_maintenanc use beacon_chain::otb_verification_service::start_otb_verification_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; +use beacon_chain::LightClientProducerEvent; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, @@ -24,6 +28,7 @@ use eth2::{ BeaconNodeHttpClient, Error as ApiError, Timeouts, }; use execution_layer::ExecutionLayer; +use futures::channel::mpsc::Receiver; use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH}; use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals}; use monitoring_api::{MonitoringHttpClient, ProcessType}; @@ -83,6 +88,7 @@ pub struct ClientBuilder { slasher: Option>>, beacon_processor_config: Option, beacon_processor_channels: Option>, + light_client_server_rv: Option>>, eth_spec_instance: T::EthSpec, } @@ -118,6 +124,7 @@ where eth_spec_instance, beacon_processor_config: None, beacon_processor_channels: None, + light_client_server_rv: None, } } @@ -206,6 +213,16 @@ where builder }; + let builder = if config.network.enable_light_client_server { + let (tx, rv) = futures::channel::mpsc::channel::>( + LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY, + ); + self.light_client_server_rv = Some(rv); + builder.light_client_server_tx(tx) + } else { + builder + }; + let chain_exists = builder.store_contains_beacon_chain().unwrap_or(false); // If the client is expect to resume but there's no beacon chain in the database, @@ -797,7 +814,7 @@ where } .spawn_manager( beacon_processor_channels.beacon_processor_rx, - beacon_processor_channels.work_reprocessing_tx, + beacon_processor_channels.work_reprocessing_tx.clone(), beacon_processor_channels.work_reprocessing_rx, None, beacon_chain.slot_clock.clone(), @@ -860,7 +877,7 @@ where } // Spawn a service to publish BLS to execution changes at the Capella fork. - if let Some(network_senders) = self.network_senders { + if let Some(network_senders) = self.network_senders.clone() { let inner_chain = beacon_chain.clone(); let broadcast_context = runtime_context.service_context("addr_bcast".to_string()); @@ -879,6 +896,26 @@ where } } + // Spawn service to publish light_client updates at some interval into the slot. + if let Some(light_client_server_rv) = self.light_client_server_rv { + let inner_chain = beacon_chain.clone(); + let light_client_update_context = + runtime_context.service_context("lc_update".to_string()); + let log = light_client_update_context.log().clone(); + light_client_update_context.executor.spawn( + async move { + compute_light_client_updates( + &inner_chain, + light_client_server_rv, + beacon_processor_channels.work_reprocessing_tx, + &log, + ) + .await + }, + "lc_update", + ); + } + start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); start_otb_verification_service(runtime_context.executor.clone(), beacon_chain.clone()); start_availability_cache_maintenance_service( diff --git a/beacon_node/client/src/compute_light_client_updates.rs b/beacon_node/client/src/compute_light_client_updates.rs new file mode 100644 index 000000000..1eb977d42 --- /dev/null +++ b/beacon_node/client/src/compute_light_client_updates.rs @@ -0,0 +1,39 @@ +use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent}; +use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use futures::channel::mpsc::Receiver; +use futures::StreamExt; +use slog::{error, Logger}; +use tokio::sync::mpsc::Sender; + +// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent +// updates it is okay to drop some events in case of overloading. In normal network conditions +// there's one event emitted per block at most every 12 seconds, while consuming the event should +// take a few milliseconds. 32 is a small enough arbitrary number. +pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32; + +pub async fn compute_light_client_updates( + chain: &BeaconChain, + mut light_client_server_rv: Receiver>, + reprocess_tx: Sender, + log: &Logger, +) { + // Should only receive events for recent blocks, import_block filters by blocks close to clock. + // + // Intents to process SyncAggregates of all recent blocks sequentially, without skipping. + // Uses a bounded receiver, so may drop some SyncAggregates if very overloaded. This is okay + // since only the most recent updates have value. + while let Some(event) = light_client_server_rv.next().await { + let parent_root = event.0; + + chain + .recompute_and_cache_light_client_updates(event) + .unwrap_or_else(|e| { + error!(log, "error computing light_client updates {:?}", e); + }); + + let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root }; + if reprocess_tx.try_send(msg).is_err() { + error!(log, "Failed to inform light client update"; "parent_root" => %parent_root) + }; + } +} diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 399aa0651..2f14d87ef 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,6 +1,7 @@ extern crate slog; mod address_change_broadcast; +mod compute_light_client_updates; pub mod config; mod metrics; mod notifier; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 1a8f15225..fe01f3c52 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2434,9 +2434,8 @@ pub fn serve( accept_header: Option| { task_spawner.blocking_response_task(Priority::P1, move || { let update = chain - .latest_seen_optimistic_update - .lock() - .clone() + .light_client_server_cache + .get_latest_optimistic_update() .ok_or_else(|| { warp_utils::reject::custom_not_found( "No LightClientOptimisticUpdate is available".to_string(), @@ -2482,9 +2481,8 @@ pub fn serve( accept_header: Option| { task_spawner.blocking_response_task(Priority::P1, move || { let update = chain - .latest_seen_finality_update - .lock() - .clone() + .light_client_server_cache + .get_latest_finality_update() .ok_or_else(|| { warp_utils::reject::custom_not_found( "No LightClientFinalityUpdate is available".to_string(), diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 308ecd679..2d946f309 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1731,7 +1731,10 @@ impl ApiTester { Err(e) => panic!("query failed incorrectly: {e:?}"), }; - let expected = self.chain.latest_seen_optimistic_update.lock().clone(); + let expected = self + .chain + .light_client_server_cache + .get_latest_optimistic_update(); assert_eq!(result, expected); self @@ -1747,7 +1750,10 @@ impl ApiTester { Err(e) => panic!("query failed incorrectly: {e:?}"), }; - let expected = self.chain.latest_seen_finality_update.lock().clone(); + let expected = self + .chain + .light_client_server_cache + .get_latest_finality_update(); assert_eq!(result, expected); self diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 04ec6bac4..cd3579ad6 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -384,7 +384,7 @@ pub enum RPCResponse { /// A response to a get BLOBS_BY_RANGE request BlobsByRange(Arc>), - /// A response to a get LIGHTCLIENT_BOOTSTRAP request. + /// A response to a get LIGHT_CLIENT_BOOTSTRAP request. LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. @@ -426,7 +426,7 @@ pub enum RPCCodedResponse { StreamTermination(ResponseTermination), } -/// Request a light_client_bootstrap for lightclients peers. +/// Request a light_client_bootstrap for light_clients peers. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct LightClientBootstrapRequest { pub root: Hash256, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 9d9b196e9..07fc06bc3 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1657,7 +1657,7 @@ impl NetworkBeaconProcessor { self.gossip_penalize_peer( peer_id, - PeerAction::LowToleranceError, + PeerAction::HighToleranceError, "light_client_gossip_error", ); } @@ -1675,15 +1675,7 @@ impl NetworkBeaconProcessor { "light_client_gossip_error", ); } - LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!( - self.log, - "Light client finality update already seen"; - "peer" => %peer_id, - "error" => ?e, - ), - LightClientFinalityUpdateError::BeaconChainError(_) - | LightClientFinalityUpdateError::LightClientUpdateError(_) - | LightClientFinalityUpdateError::SigSlotStartIsNone + LightClientFinalityUpdateError::SigSlotStartIsNone | LightClientFinalityUpdateError::FailedConstructingUpdate => debug!( self.log, "Light client error constructing finality update"; @@ -1801,19 +1793,7 @@ impl NetworkBeaconProcessor { "light_client_gossip_error", ); } - LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => { - metrics::register_optimistic_update_error(&e); - - debug!( - self.log, - "Light client optimistic update already seen"; - "peer" => %peer_id, - "error" => ?e, - ) - } - LightClientOptimisticUpdateError::BeaconChainError(_) - | LightClientOptimisticUpdateError::LightClientUpdateError(_) - | LightClientOptimisticUpdateError::SigSlotStartIsNone + LightClientOptimisticUpdateError::SigSlotStartIsNone | LightClientOptimisticUpdateError::FailedConstructingUpdate => { metrics::register_optimistic_update_error(&e); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 67fc2fabb..e7d3a7ce2 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -589,7 +589,7 @@ impl NetworkBeaconProcessor { } /// Create a new work event to process `LightClientBootstrap`s from the RPC network. - pub fn send_lightclient_bootstrap_request( + pub fn send_light_client_bootstrap_request( self: &Arc, peer_id: PeerId, request_id: PeerRequestId, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index f56a3b744..a774c0e16 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -218,7 +218,7 @@ impl Router { ), Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor - .send_lightclient_bootstrap_request(peer_id, request_id, request), + .send_light_client_bootstrap_request(peer_id, request_id, request), ), } } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index c940049c5..fff384e19 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -163,6 +163,10 @@ pub fn get_config( cli_args.is_present("light-client-server"); } + if cli_args.is_present("light-client-server") { + client_config.chain.enable_light_client_server = true; + } + if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { client_config.chain.shuffling_cache_size = cache_size; } diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index e2e25f24b..bdc3be9ec 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1866,6 +1866,7 @@ impl BeaconState { }; // 2. Get all `BeaconState` leaves. + self.initialize_tree_hash_cache(); let mut cache = self .tree_hash_cache_mut() .take() diff --git a/consensus/types/src/light_client_bootstrap.rs b/consensus/types/src/light_client_bootstrap.rs index 616aced48..6660783ab 100644 --- a/consensus/types/src/light_client_bootstrap.rs +++ b/consensus/types/src/light_client_bootstrap.rs @@ -9,7 +9,7 @@ use ssz_derive::{Decode, Encode}; use std::sync::Arc; use test_random_derive::TestRandom; -/// A LightClientBootstrap is the initializer we send over to lightclient nodes +/// A LightClientBootstrap is the initializer we send over to light_client nodes /// that are trying to generate their basic storage when booting up. #[derive( Debug, diff --git a/consensus/types/src/light_client_finality_update.rs b/consensus/types/src/light_client_finality_update.rs index 87601b815..494e68b63 100644 --- a/consensus/types/src/light_client_finality_update.rs +++ b/consensus/types/src/light_client_finality_update.rs @@ -11,7 +11,7 @@ use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash::TreeHash; -/// A LightClientFinalityUpdate is the update lightclient request or received by a gossip that +/// A LightClientFinalityUpdate is the update light_client request or received by a gossip that /// signal a new finalized beacon block header for the light client sync protocol. #[derive( Debug, diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 2a88770cd..3efcb2d0e 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2372,6 +2372,7 @@ fn light_client_server_default() { .run_with_zero_port() .with_config(|config| { assert_eq!(config.network.enable_light_client_server, false); + assert_eq!(config.chain.enable_light_client_server, false); assert_eq!(config.http_api.enable_light_client_server, false); }); } @@ -2383,6 +2384,7 @@ fn light_client_server_enabled() { .run_with_zero_port() .with_config(|config| { assert_eq!(config.network.enable_light_client_server, true); + assert_eq!(config.chain.enable_light_client_server, true); }); } diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index d34cdbc9f..f38eacc39 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -1,5 +1,5 @@ use crate::local_network::LocalNetwork; -use node_test_rig::eth2::types::{BlockId, StateId}; +use node_test_rig::eth2::types::{BlockId, FinalityCheckpointsData, StateId}; use std::time::Duration; use types::{Epoch, EthSpec, ExecPayload, ExecutionBlockHash, Hash256, Slot, Unsigned}; @@ -243,3 +243,93 @@ pub async fn verify_transition_block_finalized( )) } } + +pub(crate) async fn verify_light_client_updates( + network: LocalNetwork, + start_slot: Slot, + end_slot: Slot, + slot_duration: Duration, +) -> Result<(), String> { + slot_delay(start_slot, slot_duration).await; + + // Tolerance of 2 slot allows for 1 single missed slot. + let light_client_update_slot_tolerance = Slot::new(2); + let remote_nodes = network.remote_nodes()?; + let client = remote_nodes.first().unwrap(); + let mut have_seen_block = false; + let mut have_achieved_finality = false; + + for slot in start_slot.as_u64()..=end_slot.as_u64() { + slot_delay(Slot::new(1), slot_duration).await; + let slot = Slot::new(slot); + let previous_slot = slot - 1; + + let previous_slot_block = client + .get_beacon_blocks::(BlockId::Slot(previous_slot)) + .await + .map_err(|e| { + format!("Unable to get beacon block for previous slot {previous_slot:?}: {e:?}") + })?; + let previous_slot_has_block = previous_slot_block.is_some(); + + if !have_seen_block { + // Make sure we have seen the first block in Altair, to make sure we have sync aggregates available. + if previous_slot_has_block { + have_seen_block = true; + } + // Wait for another slot before we check the first update to avoid race condition. + continue; + } + + // Make sure previous slot has a block, otherwise skip checking for the signature slot distance + if !previous_slot_has_block { + continue; + } + + // Verify light client optimistic update. `signature_slot_distance` should be 1 in the ideal scenario. + let signature_slot = client + .get_beacon_light_client_optimistic_update::() + .await + .map_err(|e| format!("Error while getting light client updates: {:?}", e))? + .ok_or(format!("Light client optimistic update not found {slot:?}"))? + .data + .signature_slot; + let signature_slot_distance = slot - signature_slot; + if signature_slot_distance > light_client_update_slot_tolerance { + return Err(format!("Existing optimistic update too old: signature slot {signature_slot}, current slot {slot:?}")); + } + + // Verify light client finality update. `signature_slot_distance` should be 1 in the ideal scenario. + // NOTE: Currently finality updates are produced as long as the finalized block is known, even if the finalized header + // sync committee period does not match the signature slot committee period. + // TODO: This complies with the current spec, but we should check if this is a bug. + if !have_achieved_finality { + let FinalityCheckpointsData { finalized, .. } = client + .get_beacon_states_finality_checkpoints(StateId::Head) + .await + .map_err(|e| format!("Unable to get beacon state finality checkpoint: {e:?}"))? + .ok_or("Unable to get head state".to_string())? + .data; + if !finalized.root.is_zero() { + // Wait for another slot before we check the first finality update to avoid race condition. + have_achieved_finality = true; + } + continue; + } + let signature_slot = client + .get_beacon_light_client_finality_update::() + .await + .map_err(|e| format!("Error while getting light client updates: {:?}", e))? + .ok_or(format!("Light client finality update not found {slot:?}"))? + .data + .signature_slot; + let signature_slot_distance = slot - signature_slot; + if signature_slot_distance > light_client_update_slot_tolerance { + return Err(format!( + "Existing finality update too old: signature slot {signature_slot}, current slot {slot:?}" + )); + } + } + + Ok(()) +} diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 953dcf582..8d6ffc42f 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -220,6 +220,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { fork, sync_aggregate, transition, + light_client_update, ) = futures::join!( // Check that the chain finalizes at the first given opportunity. checks::verify_first_finalization(network.clone(), slot_duration), @@ -272,6 +273,13 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { Epoch::new(TERMINAL_BLOCK / MinimalEthSpec::slots_per_epoch()), slot_duration, post_merge_sim + ), + checks::verify_light_client_updates( + network.clone(), + // Sync aggregate available from slot 1 after Altair fork transition. + Epoch::new(ALTAIR_FORK_EPOCH).start_slot(MinimalEthSpec::slots_per_epoch()) + 1, + Epoch::new(END_EPOCH).start_slot(MinimalEthSpec::slots_per_epoch()), + slot_duration ) ); @@ -282,6 +290,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { fork?; sync_aggregate?; transition?; + light_client_update?; // The `final_future` either completes immediately or never completes, depending on the value // of `continue_after_checks`. @@ -380,6 +389,9 @@ async fn create_local_network( beacon_config.network.target_peers = node_count + proposer_nodes - 1; beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); + beacon_config.network.enable_light_client_server = true; + beacon_config.chain.enable_light_client_server = true; + beacon_config.http_api.enable_light_client_server = true; if post_merge_sim { let el_config = execution_layer::Config {