Add early attester cache (#2872)
## Issue Addressed NA ## Proposed Changes Introduces a cache to attestation to produce atop blocks which will become the head, but are not fully imported (e.g., not inserted into the database). Whilst attesting to a block before it's imported is rather easy, if we're going to produce that attestation then we also need to be able to: 1. Verify that attestation. 1. Respond to RPC requests for the `beacon_block_root`. Attestation verification (1) is *partially* covered. Since we prime the shuffling cache before we insert the block into the early attester cache, we should be fine for all typical use-cases. However, it is possible that the cache is washed out before we've managed to insert the state into the database and then attestation verification will fail with a "missing beacon state"-type error. Providing the block via RPC (2) is also partially covered, since we'll check the database *and* the early attester cache when responding a blocks-by-root request. However, we'll still omit the block from blocks-by-range requests (until the block lands in the DB). I *think* this is fine, since there's no guarantee that we return all blocks for those responses. Another important consideration is whether or not the *parent* of the early attester block is available in the databse. If it were not, we might fail to respond to blocks-by-root request that are iterating backwards to collect a chain of blocks. I argue that *we will always have the parent of the early attester block in the database.* This is because we are holding the fork-choice write-lock when inserting the block into the early attester cache and we do not drop that until the block is in the database.
This commit is contained in:
		
							parent
							
								
									65b1374b58
								
							
						
					
					
						commit
						02e2fd2fb8
					
				| @ -986,11 +986,17 @@ fn verify_head_block_is_known<T: BeaconChainTypes>( | |||||||
|     attestation: &Attestation<T::EthSpec>, |     attestation: &Attestation<T::EthSpec>, | ||||||
|     max_skip_slots: Option<u64>, |     max_skip_slots: Option<u64>, | ||||||
| ) -> Result<ProtoBlock, Error> { | ) -> Result<ProtoBlock, Error> { | ||||||
|     if let Some(block) = chain |     let block_opt = chain | ||||||
|         .fork_choice |         .fork_choice | ||||||
|         .read() |         .read() | ||||||
|         .get_block(&attestation.data.beacon_block_root) |         .get_block(&attestation.data.beacon_block_root) | ||||||
|     { |         .or_else(|| { | ||||||
|  |             chain | ||||||
|  |                 .early_attester_cache | ||||||
|  |                 .get_proto_block(attestation.data.beacon_block_root) | ||||||
|  |         }); | ||||||
|  | 
 | ||||||
|  |     if let Some(block) = block_opt { | ||||||
|         // Reject any block that exceeds our limit on skipped slots.
 |         // Reject any block that exceeds our limit on skipped slots.
 | ||||||
|         if let Some(max_skip_slots) = max_skip_slots { |         if let Some(max_skip_slots) = max_skip_slots { | ||||||
|             if attestation.data.slot > block.slot + max_skip_slots { |             if attestation.data.slot > block.slot + max_skip_slots { | ||||||
| @ -1242,7 +1248,9 @@ where | |||||||
|     // processing an attestation that does not include our latest finalized block in its chain.
 |     // processing an attestation that does not include our latest finalized block in its chain.
 | ||||||
|     //
 |     //
 | ||||||
|     // We do not delay consideration for later, we simply drop the attestation.
 |     // We do not delay consideration for later, we simply drop the attestation.
 | ||||||
|     if !chain.fork_choice.read().contains_block(&target.root) { |     if !chain.fork_choice.read().contains_block(&target.root) | ||||||
|  |         && !chain.early_attester_cache.contains_block(target.root) | ||||||
|  |     { | ||||||
|         return Err(Error::UnknownTargetRoot(target.root)); |         return Err(Error::UnknownTargetRoot(target.root)); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -75,7 +75,7 @@ impl From<BeaconChainError> for Error { | |||||||
| 
 | 
 | ||||||
| /// Stores the minimal amount of data required to compute the committee length for any committee at any
 | /// Stores the minimal amount of data required to compute the committee length for any committee at any
 | ||||||
| /// slot in a given `epoch`.
 | /// slot in a given `epoch`.
 | ||||||
| struct CommitteeLengths { | pub struct CommitteeLengths { | ||||||
|     /// The `epoch` to which the lengths pertain.
 |     /// The `epoch` to which the lengths pertain.
 | ||||||
|     epoch: Epoch, |     epoch: Epoch, | ||||||
|     /// The length of the shuffling in `self.epoch`.
 |     /// The length of the shuffling in `self.epoch`.
 | ||||||
| @ -84,7 +84,7 @@ struct CommitteeLengths { | |||||||
| 
 | 
 | ||||||
| impl CommitteeLengths { | impl CommitteeLengths { | ||||||
|     /// Instantiate `Self` using `state.current_epoch()`.
 |     /// Instantiate `Self` using `state.current_epoch()`.
 | ||||||
|     fn new<T: EthSpec>(state: &BeaconState<T>, spec: &ChainSpec) -> Result<Self, Error> { |     pub fn new<T: EthSpec>(state: &BeaconState<T>, spec: &ChainSpec) -> Result<Self, Error> { | ||||||
|         let active_validator_indices_len = if let Ok(committee_cache) = |         let active_validator_indices_len = if let Ok(committee_cache) = | ||||||
|             state.committee_cache(RelativeEpoch::Current) |             state.committee_cache(RelativeEpoch::Current) | ||||||
|         { |         { | ||||||
| @ -101,8 +101,16 @@ impl CommitteeLengths { | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Get the count of committees per each slot of `self.epoch`.
 | ||||||
|  |     pub fn get_committee_count_per_slot<T: EthSpec>( | ||||||
|  |         &self, | ||||||
|  |         spec: &ChainSpec, | ||||||
|  |     ) -> Result<usize, Error> { | ||||||
|  |         T::get_committee_count_per_slot(self.active_validator_indices_len, spec).map_err(Into::into) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Get the length of the committee at the given `slot` and `committee_index`.
 |     /// Get the length of the committee at the given `slot` and `committee_index`.
 | ||||||
|     fn get<T: EthSpec>( |     pub fn get_committee_length<T: EthSpec>( | ||||||
|         &self, |         &self, | ||||||
|         slot: Slot, |         slot: Slot, | ||||||
|         committee_index: CommitteeIndex, |         committee_index: CommitteeIndex, | ||||||
| @ -120,8 +128,7 @@ impl CommitteeLengths { | |||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         let slots_per_epoch = slots_per_epoch as usize; |         let slots_per_epoch = slots_per_epoch as usize; | ||||||
|         let committees_per_slot = |         let committees_per_slot = self.get_committee_count_per_slot::<T>(spec)?; | ||||||
|             T::get_committee_count_per_slot(self.active_validator_indices_len, spec)?; |  | ||||||
|         let index_in_epoch = compute_committee_index_in_epoch( |         let index_in_epoch = compute_committee_index_in_epoch( | ||||||
|             slot, |             slot, | ||||||
|             slots_per_epoch, |             slots_per_epoch, | ||||||
| @ -172,7 +179,7 @@ impl AttesterCacheValue { | |||||||
|         spec: &ChainSpec, |         spec: &ChainSpec, | ||||||
|     ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { |     ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { | ||||||
|         self.committee_lengths |         self.committee_lengths | ||||||
|             .get::<T>(slot, committee_index, spec) |             .get_committee_length::<T>(slot, committee_index, spec) | ||||||
|             .map(|committee_length| (self.current_justified_checkpoint, committee_length)) |             .map(|committee_length| (self.current_justified_checkpoint, committee_length)) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -12,6 +12,7 @@ use crate::block_verification::{ | |||||||
|     IntoFullyVerifiedBlock, |     IntoFullyVerifiedBlock, | ||||||
| }; | }; | ||||||
| use crate::chain_config::ChainConfig; | use crate::chain_config::ChainConfig; | ||||||
|  | use crate::early_attester_cache::EarlyAttesterCache; | ||||||
| use crate::errors::{BeaconChainError as Error, BlockProductionError}; | use crate::errors::{BeaconChainError as Error, BlockProductionError}; | ||||||
| use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; | use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; | ||||||
| use crate::events::ServerSentEventHandler; | use crate::events::ServerSentEventHandler; | ||||||
| @ -107,6 +108,9 @@ pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero(); | |||||||
| pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero(); | pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero(); | ||||||
| pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero(); | pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero(); | ||||||
| 
 | 
 | ||||||
|  | /// Defines how old a block can be before it's no longer a candidate for the early attester cache.
 | ||||||
|  | const EARLY_ATTESTER_CACHE_HISTORIC_SLOTS: u64 = 4; | ||||||
|  | 
 | ||||||
| /// Defines the behaviour when a block/block-root for a skipped slot is requested.
 | /// Defines the behaviour when a block/block-root for a skipped slot is requested.
 | ||||||
| pub enum WhenSlotSkipped { | pub enum WhenSlotSkipped { | ||||||
|     /// If the slot is a skip slot, return `None`.
 |     /// If the slot is a skip slot, return `None`.
 | ||||||
| @ -328,6 +332,8 @@ pub struct BeaconChain<T: BeaconChainTypes> { | |||||||
|     pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>, |     pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>, | ||||||
|     /// A cache used when producing attestations.
 |     /// A cache used when producing attestations.
 | ||||||
|     pub(crate) attester_cache: Arc<AttesterCache>, |     pub(crate) attester_cache: Arc<AttesterCache>, | ||||||
|  |     /// A cache used when producing attestations whilst the head block is still being imported.
 | ||||||
|  |     pub early_attester_cache: EarlyAttesterCache<T::EthSpec>, | ||||||
|     /// A cache used to keep track of various block timings.
 |     /// A cache used to keep track of various block timings.
 | ||||||
|     pub block_times_cache: Arc<RwLock<BlockTimesCache>>, |     pub block_times_cache: Arc<RwLock<BlockTimesCache>>, | ||||||
|     /// A list of any hard-coded forks that have been disabled.
 |     /// A list of any hard-coded forks that have been disabled.
 | ||||||
| @ -926,6 +932,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> { | |||||||
|         )? |         )? | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Returns the block at the given root, if any.
 | ||||||
|  |     ///
 | ||||||
|  |     /// Will also check the early attester cache for the block. Because of this, there's no
 | ||||||
|  |     /// guarantee that a block returned from this function has a `BeaconState` available in
 | ||||||
|  |     /// `self.store`. The expected use for this function is *only* for returning blocks requested
 | ||||||
|  |     /// from P2P peers.
 | ||||||
|  |     ///
 | ||||||
|  |     /// ## Errors
 | ||||||
|  |     ///
 | ||||||
|  |     /// May return a database error.
 | ||||||
|  |     pub fn get_block_checking_early_attester_cache( | ||||||
|  |         &self, | ||||||
|  |         block_root: &Hash256, | ||||||
|  |     ) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> { | ||||||
|  |         let block_opt = self | ||||||
|  |             .store | ||||||
|  |             .get_block(block_root)? | ||||||
|  |             .or_else(|| self.early_attester_cache.get_block(*block_root)); | ||||||
|  | 
 | ||||||
|  |         Ok(block_opt) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Returns the block at the given root, if any.
 |     /// Returns the block at the given root, if any.
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// ## Errors
 |     /// ## Errors
 | ||||||
| @ -1422,6 +1450,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> { | |||||||
|     ) -> Result<Attestation<T::EthSpec>, Error> { |     ) -> Result<Attestation<T::EthSpec>, Error> { | ||||||
|         let _total_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_SECONDS); |         let _total_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_SECONDS); | ||||||
| 
 | 
 | ||||||
|  |         // The early attester cache will return `Some(attestation)` in the scenario where there is a
 | ||||||
|  |         // block being imported that will become the head block, but that block has not yet been
 | ||||||
|  |         // inserted into the database and set as `self.canonical_head`.
 | ||||||
|  |         //
 | ||||||
|  |         // In effect, the early attester cache prevents slow database IO from causing missed
 | ||||||
|  |         // head/target votes.
 | ||||||
|  |         match self | ||||||
|  |             .early_attester_cache | ||||||
|  |             .try_attest(request_slot, request_index, &self.spec) | ||||||
|  |         { | ||||||
|  |             // The cache matched this request, return the value.
 | ||||||
|  |             Ok(Some(attestation)) => return Ok(attestation), | ||||||
|  |             // The cache did not match this request, proceed with the rest of this function.
 | ||||||
|  |             Ok(None) => (), | ||||||
|  |             // The cache returned an error. Log the error and proceed with the rest of this
 | ||||||
|  |             // function.
 | ||||||
|  |             Err(e) => warn!( | ||||||
|  |                 self.log, | ||||||
|  |                 "Early attester cache failed"; | ||||||
|  |                 "error" => ?e | ||||||
|  |             ), | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         let slots_per_epoch = T::EthSpec::slots_per_epoch(); |         let slots_per_epoch = T::EthSpec::slots_per_epoch(); | ||||||
|         let request_epoch = request_slot.epoch(slots_per_epoch); |         let request_epoch = request_slot.epoch(slots_per_epoch); | ||||||
| 
 | 
 | ||||||
| @ -2602,6 +2653,42 @@ impl<T: BeaconChainTypes> BeaconChain<T> { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |         // If the block is recent enough, check to see if it becomes the head block. If so, apply it
 | ||||||
|  |         // to the early attester cache. This will allow attestations to the block without waiting
 | ||||||
|  |         // for the block and state to be inserted to the database.
 | ||||||
|  |         //
 | ||||||
|  |         // Only performing this check on recent blocks avoids slowing down sync with lots of calls
 | ||||||
|  |         // to fork choice `get_head`.
 | ||||||
|  |         if block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot { | ||||||
|  |             let new_head_root = fork_choice | ||||||
|  |                 .get_head(current_slot, &self.spec) | ||||||
|  |                 .map_err(BeaconChainError::from)?; | ||||||
|  | 
 | ||||||
|  |             if new_head_root == block_root { | ||||||
|  |                 if let Some(proto_block) = fork_choice.get_block(&block_root) { | ||||||
|  |                     if let Err(e) = self.early_attester_cache.add_head_block( | ||||||
|  |                         block_root, | ||||||
|  |                         signed_block.clone(), | ||||||
|  |                         proto_block, | ||||||
|  |                         &state, | ||||||
|  |                         &self.spec, | ||||||
|  |                     ) { | ||||||
|  |                         warn!( | ||||||
|  |                             self.log, | ||||||
|  |                             "Early attester cache insert failed"; | ||||||
|  |                             "error" => ?e | ||||||
|  |                         ); | ||||||
|  |                     } | ||||||
|  |                 } else { | ||||||
|  |                     warn!( | ||||||
|  |                         self.log, | ||||||
|  |                         "Early attester block missing"; | ||||||
|  |                         "block_root" => ?block_root | ||||||
|  |                     ); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         // Register sync aggregate with validator monitor
 |         // Register sync aggregate with validator monitor
 | ||||||
|         if let Ok(sync_aggregate) = block.body().sync_aggregate() { |         if let Ok(sync_aggregate) = block.body().sync_aggregate() { | ||||||
|             // `SyncCommittee` for the sync_aggregate should correspond to the duty slot
 |             // `SyncCommittee` for the sync_aggregate should correspond to the duty slot
 | ||||||
| @ -3248,6 +3335,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> { | |||||||
| 
 | 
 | ||||||
|         drop(lag_timer); |         drop(lag_timer); | ||||||
| 
 | 
 | ||||||
|  |         // Clear the early attester cache in case it conflicts with `self.canonical_head`.
 | ||||||
|  |         self.early_attester_cache.clear(); | ||||||
|  | 
 | ||||||
|         // Update the snapshot that stores the head of the chain at the time it received the
 |         // Update the snapshot that stores the head of the chain at the time it received the
 | ||||||
|         // block.
 |         // block.
 | ||||||
|         *self |         *self | ||||||
|  | |||||||
| @ -763,6 +763,7 @@ where | |||||||
|             block_times_cache: <_>::default(), |             block_times_cache: <_>::default(), | ||||||
|             validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), |             validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), | ||||||
|             attester_cache: <_>::default(), |             attester_cache: <_>::default(), | ||||||
|  |             early_attester_cache: <_>::default(), | ||||||
|             disabled_forks: self.disabled_forks, |             disabled_forks: self.disabled_forks, | ||||||
|             shutdown_sender: self |             shutdown_sender: self | ||||||
|                 .shutdown_sender |                 .shutdown_sender | ||||||
|  | |||||||
							
								
								
									
										161
									
								
								beacon_node/beacon_chain/src/early_attester_cache.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										161
									
								
								beacon_node/beacon_chain/src/early_attester_cache.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,161 @@ | |||||||
|  | use crate::{ | ||||||
|  |     attester_cache::{CommitteeLengths, Error}, | ||||||
|  |     metrics, | ||||||
|  | }; | ||||||
|  | use parking_lot::RwLock; | ||||||
|  | use proto_array::Block as ProtoBlock; | ||||||
|  | use types::*; | ||||||
|  | 
 | ||||||
|  | pub struct CacheItem<E: EthSpec> { | ||||||
|  |     /* | ||||||
|  |      * Values used to create attestations. | ||||||
|  |      */ | ||||||
|  |     epoch: Epoch, | ||||||
|  |     committee_lengths: CommitteeLengths, | ||||||
|  |     beacon_block_root: Hash256, | ||||||
|  |     source: Checkpoint, | ||||||
|  |     target: Checkpoint, | ||||||
|  |     /* | ||||||
|  |      * Values used to make the block available. | ||||||
|  |      */ | ||||||
|  |     block: SignedBeaconBlock<E>, | ||||||
|  |     proto_block: ProtoBlock, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// Provides a single-item cache which allows for attesting to blocks before those blocks have
 | ||||||
|  | /// reached the database.
 | ||||||
|  | ///
 | ||||||
|  | /// This cache stores enough information to allow Lighthouse to:
 | ||||||
|  | ///
 | ||||||
|  | /// - Produce an attestation without using `chain.canonical_head`.
 | ||||||
|  | /// - Verify that a block root exists (i.e., will be imported in the future) during attestation
 | ||||||
|  | ///     verification.
 | ||||||
|  | /// - Provide a block which can be sent to peers via RPC.
 | ||||||
|  | #[derive(Default)] | ||||||
|  | pub struct EarlyAttesterCache<E: EthSpec> { | ||||||
|  |     item: RwLock<Option<CacheItem<E>>>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<E: EthSpec> EarlyAttesterCache<E> { | ||||||
|  |     /// Removes the cached item, meaning that all future calls to `Self::try_attest` will return
 | ||||||
|  |     /// `None` until a new cache item is added.
 | ||||||
|  |     pub fn clear(&self) { | ||||||
|  |         *self.item.write() = None | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Updates the cache item, so that `Self::try_attest` with return `Some` when given suitable
 | ||||||
|  |     /// parameters.
 | ||||||
|  |     pub fn add_head_block( | ||||||
|  |         &self, | ||||||
|  |         beacon_block_root: Hash256, | ||||||
|  |         block: SignedBeaconBlock<E>, | ||||||
|  |         proto_block: ProtoBlock, | ||||||
|  |         state: &BeaconState<E>, | ||||||
|  |         spec: &ChainSpec, | ||||||
|  |     ) -> Result<(), Error> { | ||||||
|  |         let epoch = state.current_epoch(); | ||||||
|  |         let committee_lengths = CommitteeLengths::new(state, spec)?; | ||||||
|  |         let source = state.current_justified_checkpoint(); | ||||||
|  |         let target_slot = epoch.start_slot(E::slots_per_epoch()); | ||||||
|  |         let target = Checkpoint { | ||||||
|  |             epoch, | ||||||
|  |             root: if state.slot() <= target_slot { | ||||||
|  |                 beacon_block_root | ||||||
|  |             } else { | ||||||
|  |                 *state.get_block_root(target_slot)? | ||||||
|  |             }, | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         let item = CacheItem { | ||||||
|  |             epoch, | ||||||
|  |             committee_lengths, | ||||||
|  |             beacon_block_root, | ||||||
|  |             source, | ||||||
|  |             target, | ||||||
|  |             block, | ||||||
|  |             proto_block, | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         *self.item.write() = Some(item); | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Will return `Some(attestation)` if all the following conditions are met:
 | ||||||
|  |     ///
 | ||||||
|  |     /// - There is a cache `item` present.
 | ||||||
|  |     /// - If `request_slot` is in the same epoch as `item.epoch`.
 | ||||||
|  |     /// - If `request_index` does not exceed `item.comittee_count`.
 | ||||||
|  |     pub fn try_attest( | ||||||
|  |         &self, | ||||||
|  |         request_slot: Slot, | ||||||
|  |         request_index: CommitteeIndex, | ||||||
|  |         spec: &ChainSpec, | ||||||
|  |     ) -> Result<Option<Attestation<E>>, Error> { | ||||||
|  |         let lock = self.item.read(); | ||||||
|  |         let item = if let Some(item) = lock.as_ref() { | ||||||
|  |             item | ||||||
|  |         } else { | ||||||
|  |             return Ok(None); | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         let request_epoch = request_slot.epoch(E::slots_per_epoch()); | ||||||
|  |         if request_epoch != item.epoch { | ||||||
|  |             return Ok(None); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let committee_count = item | ||||||
|  |             .committee_lengths | ||||||
|  |             .get_committee_count_per_slot::<E>(spec)?; | ||||||
|  |         if request_index >= committee_count as u64 { | ||||||
|  |             return Ok(None); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let committee_len = | ||||||
|  |             item.committee_lengths | ||||||
|  |                 .get_committee_length::<E>(request_slot, request_index, spec)?; | ||||||
|  | 
 | ||||||
|  |         let attestation = Attestation { | ||||||
|  |             aggregation_bits: BitList::with_capacity(committee_len) | ||||||
|  |                 .map_err(BeaconStateError::from)?, | ||||||
|  |             data: AttestationData { | ||||||
|  |                 slot: request_slot, | ||||||
|  |                 index: request_index, | ||||||
|  |                 beacon_block_root: item.beacon_block_root, | ||||||
|  |                 source: item.source, | ||||||
|  |                 target: item.target, | ||||||
|  |             }, | ||||||
|  |             signature: AggregateSignature::empty(), | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         metrics::inc_counter(&metrics::BEACON_EARLY_ATTESTER_CACHE_HITS); | ||||||
|  | 
 | ||||||
|  |         Ok(Some(attestation)) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns `true` if `block_root` matches the cached item.
 | ||||||
|  |     pub fn contains_block(&self, block_root: Hash256) -> bool { | ||||||
|  |         self.item | ||||||
|  |             .read() | ||||||
|  |             .as_ref() | ||||||
|  |             .map_or(false, |item| item.beacon_block_root == block_root) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the block, if `block_root` matches the cached item.
 | ||||||
|  |     pub fn get_block(&self, block_root: Hash256) -> Option<SignedBeaconBlock<E>> { | ||||||
|  |         self.item | ||||||
|  |             .read() | ||||||
|  |             .as_ref() | ||||||
|  |             .filter(|item| item.beacon_block_root == block_root) | ||||||
|  |             .map(|item| item.block.clone()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the proto-array block, if `block_root` matches the cached item.
 | ||||||
|  |     pub fn get_proto_block(&self, block_root: Hash256) -> Option<ProtoBlock> { | ||||||
|  |         self.item | ||||||
|  |             .read() | ||||||
|  |             .as_ref() | ||||||
|  |             .filter(|item| item.beacon_block_root == block_root) | ||||||
|  |             .map(|item| item.proto_block.clone()) | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -9,6 +9,7 @@ mod block_times_cache; | |||||||
| mod block_verification; | mod block_verification; | ||||||
| pub mod builder; | pub mod builder; | ||||||
| pub mod chain_config; | pub mod chain_config; | ||||||
|  | mod early_attester_cache; | ||||||
| mod errors; | mod errors; | ||||||
| pub mod eth1_chain; | pub mod eth1_chain; | ||||||
| pub mod events; | pub mod events; | ||||||
|  | |||||||
| @ -240,6 +240,14 @@ lazy_static! { | |||||||
|     pub static ref SHUFFLING_CACHE_MISSES: Result<IntCounter> = |     pub static ref SHUFFLING_CACHE_MISSES: Result<IntCounter> = | ||||||
|         try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); |         try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); | ||||||
| 
 | 
 | ||||||
|  |     /* | ||||||
|  |      * Early attester cache | ||||||
|  |      */ | ||||||
|  |     pub static ref BEACON_EARLY_ATTESTER_CACHE_HITS: Result<IntCounter> = try_create_int_counter( | ||||||
|  |         "beacon_early_attester_cache_hits", | ||||||
|  |         "Count of times the early attester cache returns an attestation" | ||||||
|  |     ); | ||||||
|  | 
 | ||||||
|     /* |     /* | ||||||
|      * Attestation Production |      * Attestation Production | ||||||
|      */ |      */ | ||||||
|  | |||||||
| @ -122,6 +122,24 @@ fn produces_attestations() { | |||||||
|             ); |             ); | ||||||
|             assert_eq!(data.target.epoch, state.current_epoch(), "bad target epoch"); |             assert_eq!(data.target.epoch, state.current_epoch(), "bad target epoch"); | ||||||
|             assert_eq!(data.target.root, target_root, "bad target root"); |             assert_eq!(data.target.root, target_root, "bad target root"); | ||||||
|  | 
 | ||||||
|  |             let early_attestation = { | ||||||
|  |                 let proto_block = chain.fork_choice.read().get_block(&block_root).unwrap(); | ||||||
|  |                 chain | ||||||
|  |                     .early_attester_cache | ||||||
|  |                     .add_head_block(block_root, block.clone(), proto_block, &state, &chain.spec) | ||||||
|  |                     .unwrap(); | ||||||
|  |                 chain | ||||||
|  |                     .early_attester_cache | ||||||
|  |                     .try_attest(slot, index, &chain.spec) | ||||||
|  |                     .unwrap() | ||||||
|  |                     .unwrap() | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |             assert_eq!( | ||||||
|  |                 attestation, early_attestation, | ||||||
|  |                 "early attester cache inconsistent" | ||||||
|  |             ); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -129,7 +129,7 @@ impl<T: BeaconChainTypes> Worker<T> { | |||||||
|     ) { |     ) { | ||||||
|         let mut send_block_count = 0; |         let mut send_block_count = 0; | ||||||
|         for root in request.block_roots.iter() { |         for root in request.block_roots.iter() { | ||||||
|             if let Ok(Some(block)) = self.chain.store.get_block(root) { |             if let Ok(Some(block)) = self.chain.get_block_checking_early_attester_cache(root) { | ||||||
|                 self.send_response( |                 self.send_response( | ||||||
|                     peer_id, |                     peer_id, | ||||||
|                     Response::BlocksByRoot(Some(Box::new(block))), |                     Response::BlocksByRoot(Some(Box::new(block))), | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user