Avoid looking up pre-finalization blocks (#2909)

## Issue Addressed

This PR fixes the unnecessary `WARN Single block lookup failed` messages described here:

https://github.com/sigp/lighthouse/pull/2866#issuecomment-1008442640

## Proposed Changes

Add a new cache to the `BeaconChain` that tracks the block roots of blocks from before finalization. These could be blocks from the canonical chain (which might need to be read from disk), or old pre-finalization blocks that have been forked out.

The cache also stores a set of block roots for in-progress single block lookups, which duplicates some of the information from sync's `single_block_lookups` hashmap:

a836e180f9/beacon_node/network/src/sync/manager.rs (L192-L196)

On a live node you can confirm that the cache is working by grepping logs for the message: `Rejected attestation to finalized block`.
This commit is contained in:
Michael Sproul 2022-01-27 22:58:32 +00:00
parent e05142b798
commit 99d2c33387
9 changed files with 267 additions and 5 deletions

View File

@ -141,6 +141,14 @@ pub enum Error {
/// The attestation points to a block we have not yet imported. It's unclear if the attestation /// The attestation points to a block we have not yet imported. It's unclear if the attestation
/// is valid or not. /// is valid or not.
UnknownHeadBlock { beacon_block_root: Hash256 }, UnknownHeadBlock { beacon_block_root: Hash256 },
/// The `attestation.data.beacon_block_root` block is from before the finalized checkpoint.
///
/// ## Peer scoring
///
/// The attestation is not descended from the finalized checkpoint, which is a REJECT according
/// to the spec. We downscore lightly because this could also happen if we are processing
/// attestations extremely slowly.
HeadBlockFinalized { beacon_block_root: Hash256 },
/// The `attestation.data.slot` is not from the same epoch as `data.target.epoch`. /// The `attestation.data.slot` is not from the same epoch as `data.target.epoch`.
/// ///
/// ## Peer scoring /// ## Peer scoring
@ -990,7 +998,17 @@ fn verify_head_block_is_known<T: BeaconChainTypes>(
} }
Ok(block) Ok(block)
} else if chain.is_pre_finalization_block(attestation.data.beacon_block_root)? {
Err(Error::HeadBlockFinalized {
beacon_block_root: attestation.data.beacon_block_root,
})
} else { } else {
// The block is either:
//
// 1) A pre-finalization block that has been pruned. We'll do one network lookup
// for it and when it fails we will penalise all involved peers.
// 2) A post-finalization block that we don't know about yet. We'll queue
// the attestation until the block becomes available (or we time out).
Err(Error::UnknownHeadBlock { Err(Error::UnknownHeadBlock {
beacon_block_root: attestation.data.beacon_block_root, beacon_block_root: attestation.data.beacon_block_root,
}) })

View File

@ -34,6 +34,7 @@ use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_fork_choice::PersistedForkChoice; use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache; use crate::snapshot_cache::SnapshotCache;
use crate::sync_committee_verification::{ use crate::sync_committee_verification::{
@ -336,6 +337,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>, pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// A cache used to keep track of various block timings. /// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>, pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
pub pre_finalization_block_cache: PreFinalizationBlockCache,
/// Sender given to tasks, so that if they encounter a state in which execution cannot /// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down. /// continue they can request that everything shuts down.
pub shutdown_sender: Sender<ShutdownReason>, pub shutdown_sender: Sender<ShutdownReason>,
@ -2855,6 +2858,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
); );
} }
// Inform the unknown block cache, in case it was waiting on this block.
self.pre_finalization_block_cache
.block_processed(block_root);
Ok(block_root) Ok(block_root)
} }

View File

@ -618,7 +618,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
check_block_against_anchor_slot(block.message(), chain)?; check_block_against_anchor_slot(block.message(), chain)?;
// Do not gossip a block from a finalized slot. // Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(block.message(), chain)?; check_block_against_finalized_slot(block.message(), block_root, chain)?;
// Check if the block is already known. We know it is post-finalization, so it is // Check if the block is already known. We know it is post-finalization, so it is
// sufficient to check the fork choice. // sufficient to check the fork choice.
@ -1292,6 +1292,7 @@ fn check_block_against_anchor_slot<T: BeaconChainTypes>(
/// verifying that condition. /// verifying that condition.
fn check_block_against_finalized_slot<T: BeaconChainTypes>( fn check_block_against_finalized_slot<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>, block: BeaconBlockRef<'_, T::EthSpec>,
block_root: Hash256,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<(), BlockError<T::EthSpec>> { ) -> Result<(), BlockError<T::EthSpec>> {
let finalized_slot = chain let finalized_slot = chain
@ -1301,6 +1302,7 @@ fn check_block_against_finalized_slot<T: BeaconChainTypes>(
.start_slot(T::EthSpec::slots_per_epoch()); .start_slot(T::EthSpec::slots_per_epoch());
if block.slot() <= finalized_slot { if block.slot() <= finalized_slot {
chain.pre_finalization_block_rejected(block_root);
Err(BlockError::WouldRevertFinalizedSlot { Err(BlockError::WouldRevertFinalizedSlot {
block_slot: block.slot(), block_slot: block.slot(),
finalized_slot, finalized_slot,
@ -1373,11 +1375,11 @@ pub fn check_block_relevancy<T: BeaconChainTypes>(
return Err(BlockError::BlockSlotLimitReached); return Err(BlockError::BlockSlotLimitReached);
} }
// Do not process a block from a finalized slot.
check_block_against_finalized_slot(block, chain)?;
let block_root = block_root.unwrap_or_else(|| get_block_root(signed_block)); let block_root = block_root.unwrap_or_else(|| get_block_root(signed_block));
// Do not process a block from a finalized slot.
check_block_against_finalized_slot(block, block_root, chain)?;
// Check if the block is already known. We know it is post-finalization, so it is // Check if the block is already known. We know it is post-finalization, so it is
// sufficient to check the fork choice. // sufficient to check the fork choice.
if chain.fork_choice.read().contains_block(&block_root) { if chain.fork_choice.read().contains_block(&block_root) {

View File

@ -752,6 +752,7 @@ where
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
beacon_proposer_cache: <_>::default(), beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(), block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(), attester_cache: <_>::default(),
early_attester_cache: <_>::default(), early_attester_cache: <_>::default(),

View File

@ -27,6 +27,7 @@ mod observed_block_producers;
pub mod observed_operations; pub mod observed_operations;
mod persisted_beacon_chain; mod persisted_beacon_chain;
mod persisted_fork_choice; mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod schema_change; pub mod schema_change;
mod shuffling_cache; mod shuffling_cache;
mod snapshot_cache; mod snapshot_cache;

View File

@ -904,6 +904,20 @@ lazy_static! {
"beacon_backfill_signature_total_seconds", "beacon_backfill_signature_total_seconds",
"Time spent verifying the signature set during backfill sync, including setup" "Time spent verifying the signature set during backfill sync, including setup"
); );
/*
* Pre-finalization block cache.
*/
pub static ref PRE_FINALIZATION_BLOCK_CACHE_SIZE: Result<IntGauge> =
try_create_int_gauge(
"beacon_pre_finalization_block_cache_size",
"Number of pre-finalization block roots cached for quick rejection"
);
pub static ref PRE_FINALIZATION_BLOCK_LOOKUP_COUNT: Result<IntGauge> =
try_create_int_gauge(
"beacon_pre_finalization_block_lookup_count",
"Number of block roots subject to single block lookups"
);
} }
/// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot,
@ -931,6 +945,11 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
) )
} }
if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() {
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size);
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_LOOKUP_COUNT, num_lookups);
}
set_gauge_by_usize( set_gauge_by_usize(
&OP_POOL_NUM_ATTESTATIONS, &OP_POOL_NUM_ATTESTATIONS,
attestation_stats.num_attestations, attestation_stats.num_attestations,

View File

@ -0,0 +1,119 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use itertools::process_results;
use lru::LruCache;
use parking_lot::Mutex;
use slog::debug;
use std::time::Duration;
use types::Hash256;
const BLOCK_ROOT_CACHE_LIMIT: usize = 512;
const LOOKUP_LIMIT: usize = 8;
const METRICS_TIMEOUT: Duration = Duration::from_millis(100);
/// Cache for rejecting attestations to blocks from before finalization.
///
/// It stores a collection of block roots that are pre-finalization and therefore not known to fork
/// choice in `verify_head_block_is_known` during attestation processing.
#[derive(Default)]
pub struct PreFinalizationBlockCache {
cache: Mutex<Cache>,
}
struct Cache {
/// Set of block roots that are known to be pre-finalization.
block_roots: LruCache<Hash256, ()>,
/// Set of block roots that are the subject of single block lookups.
in_progress_lookups: LruCache<Hash256, ()>,
}
impl Default for Cache {
fn default() -> Self {
Cache {
block_roots: LruCache::new(BLOCK_ROOT_CACHE_LIMIT),
in_progress_lookups: LruCache::new(LOOKUP_LIMIT),
}
}
}
impl<T: BeaconChainTypes> BeaconChain<T> {
/// Check whether the block with `block_root` is known to be pre-finalization.
///
/// The provided `block_root` is assumed to be unknown to fork choice. I.e., it
/// is not known to be a descendant of the finalized block.
///
/// Return `true` if the attestation to this block should be rejected outright,
/// return `false` if more information is needed from a single-block-lookup.
pub fn is_pre_finalization_block(&self, block_root: Hash256) -> Result<bool, BeaconChainError> {
let mut cache = self.pre_finalization_block_cache.cache.lock();
// Check the cache to see if we already know this pre-finalization block root.
if cache.block_roots.contains(&block_root) {
return Ok(true);
}
// Avoid repeating the disk lookup for blocks that are already subject to a network lookup.
// Sync will take care of de-duplicating the single block lookups.
if cache.in_progress_lookups.contains(&block_root) {
return Ok(false);
}
// 1. Check memory for a recent pre-finalization block.
let is_recent_finalized_block = self.with_head(|head| {
process_results(
head.beacon_state.rev_iter_block_roots(&self.spec),
|mut iter| iter.any(|(_, root)| root == block_root),
)
.map_err(BeaconChainError::BeaconStateError)
})?;
if is_recent_finalized_block {
cache.block_roots.put(block_root, ());
return Ok(true);
}
// 2. Check on disk.
if self.store.get_block(&block_root)?.is_some() {
cache.block_roots.put(block_root, ());
return Ok(true);
}
// 3. Check the network with a single block lookup.
cache.in_progress_lookups.put(block_root, ());
if cache.in_progress_lookups.len() == LOOKUP_LIMIT {
// NOTE: we expect this to occur sometimes if a lot of blocks that we look up fail to be
// imported for reasons other than being pre-finalization. The cache will eventually
// self-repair in this case by replacing old entries with new ones until all the failed
// blocks have been flushed out. Solving this issue isn't as simple as hooking the
// beacon processor's functions that handle failed blocks because we need the block root
// and it has been erased from the `BlockError` by that point.
debug!(
self.log,
"Pre-finalization lookup cache is full";
);
}
Ok(false)
}
pub fn pre_finalization_block_rejected(&self, block_root: Hash256) {
// Future requests can know that this block is invalid without having to look it up again.
let mut cache = self.pre_finalization_block_cache.cache.lock();
cache.in_progress_lookups.pop(&block_root);
cache.block_roots.put(block_root, ());
}
}
impl PreFinalizationBlockCache {
pub fn block_processed(&self, block_root: Hash256) {
// Future requests will find this block in fork choice, so no need to cache it in the
// ongoing lookup cache any longer.
self.cache.lock().in_progress_lookups.pop(&block_root);
}
pub fn contains(&self, block_root: Hash256) -> bool {
self.cache.lock().block_roots.contains(&block_root)
}
pub fn metrics(&self) -> Option<(usize, usize)> {
let cache = self.cache.try_lock_for(METRICS_TIMEOUT)?;
Some((cache.block_roots.len(), cache.in_progress_lookups.len()))
}
}

View File

@ -5,7 +5,7 @@ use beacon_chain::{
test_utils::{ test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
}, },
BeaconChain, BeaconChainTypes, WhenSlotSkipped, BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped,
}; };
use int_to_bytes::int_to_bytes32; use int_to_bytes::int_to_bytes32;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@ -991,6 +991,81 @@ fn attestation_that_skips_epochs() {
.expect("should gossip verify attestation that skips slots"); .expect("should gossip verify attestation that skips slots");
} }
#[test]
fn attestation_to_finalized_block() {
let harness = get_harness(VALIDATOR_COUNT);
// Extend the chain out a few epochs so we have some chain depth to play with.
harness.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * 4 + 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
let finalized_checkpoint = harness
.chain
.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.finalized_checkpoint()))
.unwrap();
assert!(finalized_checkpoint.epoch > 0);
let current_slot = harness.get_current_slot();
let earlier_slot = finalized_checkpoint
.epoch
.start_slot(MainnetEthSpec::slots_per_epoch())
- 1;
let earlier_block = harness
.chain
.block_at_slot(earlier_slot, WhenSlotSkipped::Prev)
.expect("should not error getting block at slot")
.expect("should find block at slot");
let earlier_block_root = earlier_block.canonical_root();
assert_ne!(earlier_block_root, finalized_checkpoint.root);
let mut state = harness
.chain
.get_state(&earlier_block.state_root(), Some(earlier_slot))
.expect("should not error getting state")
.expect("should find state");
while state.slot() < current_slot {
per_slot_processing(&mut state, None, &harness.spec).expect("should process slot");
}
let state_root = state.update_tree_hash_cache().unwrap();
let (attestation, subnet_id) = harness
.get_unaggregated_attestations(
&AttestationStrategy::AllValidators,
&state,
state_root,
earlier_block_root,
current_slot,
)
.first()
.expect("should have at least one committee")
.first()
.cloned()
.expect("should have at least one attestation in committee");
assert_eq!(attestation.data.beacon_block_root, earlier_block_root);
// Attestation should be rejected for attesting to a pre-finalization block.
let res = harness
.chain
.verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id));
assert!(
matches!(res, Err(AttnError:: HeadBlockFinalized { beacon_block_root })
if beacon_block_root == earlier_block_root
)
);
// Pre-finalization block cache should contain the block root.
assert!(harness
.chain
.pre_finalization_block_cache
.contains(earlier_block_root));
}
#[test] #[test]
fn verify_aggregate_for_gossip_doppelganger_detection() { fn verify_aggregate_for_gossip_doppelganger_detection() {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT);

View File

@ -1701,6 +1701,26 @@ impl<T: BeaconChainTypes> Worker<T> {
"attn_too_many_skipped_slots", "attn_too_many_skipped_slots",
); );
} }
AttnError::HeadBlockFinalized { beacon_block_root } => {
debug!(
self.log,
"Rejected attestation to finalized block";
"block_root" => ?beacon_block_root,
"attestation_slot" => failed_att.attestation().data.slot,
);
// We have to reject the message as it isn't a descendant of the finalized
// checkpoint.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
// The peer that sent us this could be a lagger, or a spammer, or this failure could
// be due to us processing attestations extremely slowly. Don't be too harsh.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"attn_to_finalized_block",
);
}
AttnError::BeaconChainError(BeaconChainError::DBError(Error::HotColdDBError( AttnError::BeaconChainError(BeaconChainError::DBError(Error::HotColdDBError(
HotColdDBError::AttestationStateIsFinalized { .. }, HotColdDBError::AttestationStateIsFinalized { .. },
))) => { ))) => {