Ensure doppelganger detects attestations in blocks (#2495)

## Issue Addressed

NA

## Proposed Changes

When testing our (not-yet-released) Doppelganger implementation, I noticed that we aren't detecting attestations included in blocks (only those on the gossip network).

This is because during [block processing](e8c0d1f19b/beacon_node/beacon_chain/src/beacon_chain.rs (L2168)) we only update the `observed_attestations` cache with each attestation, but not the `observed_attesters` cache. This is the correct behaviour when we consider the [p2p spec](https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md):

> [IGNORE] There has been no other valid attestation seen on an attestation subnet that has an identical attestation.data.target.epoch and participating validator index.

We're doing the right thing here and still allowing attestations on gossip that we've seen in a block. However, this doesn't work so nicely for Doppelganger.

To resolve this, I've taken the following steps:

- Add a `observed_block_attesters` cache.
- Rename `observed_attesters` to `observed_gossip_attesters`.

## TODO

- [x] Add a test to ensure a validator that's been seen in a block attestation (but not a gossip attestation) returns `true` for `BeaconChain::validator_seen_at_epoch`.
- [x] Add a test to ensure `observed_block_attesters` isn't polluted via gossip attestations and vice versa. 


Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
Paul Hauner 2021-08-09 02:43:03 +00:00
parent ff85b05249
commit ceda27371d
7 changed files with 214 additions and 22 deletions

View File

@ -685,7 +685,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
* for the slot, attestation.data.slot.
*/
if chain
.observed_attesters
.observed_gossip_attesters
.read()
.validator_has_been_observed(attestation.data.target.epoch, validator_index as usize)
.map_err(BeaconChainError::from)?
@ -712,7 +712,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if chain
.observed_attesters
.observed_gossip_attesters
.write()
.observe_validator(attestation.data.target.epoch, validator_index as usize)
.map_err(BeaconChainError::from)?

View File

@ -245,13 +245,17 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) observed_attestations: RwLock<ObservedAggregateAttestations<T::EthSpec>>,
/// Contains a store of sync contributions which have been observed by the beacon chain.
pub(crate) observed_sync_contributions: RwLock<ObservedSyncContributions<T::EthSpec>>,
/// Maintains a record of which validators have been seen to attest in recent epochs.
pub(crate) observed_attesters: RwLock<ObservedAttesters<T::EthSpec>>,
/// Maintains a record of which validators have been seen to publish gossip attestations in
/// recent epochs.
pub observed_gossip_attesters: RwLock<ObservedAttesters<T::EthSpec>>,
/// Maintains a record of which validators have been seen to have attestations included in
/// blocks in recent epochs.
pub observed_block_attesters: RwLock<ObservedAttesters<T::EthSpec>>,
/// Maintains a record of which validators have been seen sending sync messages in recent epochs.
pub(crate) observed_sync_contributors: RwLock<ObservedSyncContributors<T::EthSpec>>,
/// Maintains a record of which validators have been seen to create `SignedAggregateAndProofs`
/// in recent epochs.
pub(crate) observed_aggregators: RwLock<ObservedAggregators<T::EthSpec>>,
pub observed_aggregators: RwLock<ObservedAggregators<T::EthSpec>>,
/// Maintains a record of which validators have been seen to create `SignedContributionAndProofs`
/// in recent epochs.
pub(crate) observed_sync_aggregators: RwLock<ObservedSyncAggregators<T::EthSpec>>,
@ -2337,6 +2341,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
for attestation in block.body().attestations() {
let _fork_choice_attestation_timer =
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
let attestation_target_epoch = attestation.data.target.epoch;
let committee =
state.get_beacon_committee(attestation.data.slot, attestation.data.index)?;
@ -2351,6 +2356,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
// To avoid slowing down sync, only register attestations for the
// `observed_block_attesters` if they are from the previous epoch or later.
if attestation_target_epoch + 1 >= current_epoch {
let mut observed_block_attesters = self.observed_block_attesters.write();
for &validator_index in &indexed_attestation.attesting_indices {
if let Err(e) = observed_block_attesters
.observe_validator(attestation_target_epoch, validator_index as usize)
{
debug!(
self.log,
"Failed to register observed block attester";
"error" => ?e,
"epoch" => attestation_target_epoch,
"validator_index" => validator_index,
)
}
}
}
// Only register this with the validator monitor when the block is sufficiently close to
// the current slot.
if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch()
@ -3521,8 +3545,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// It's necessary to assign these checks to intermediate variables to avoid a deadlock.
//
// See: https://github.com/sigp/lighthouse/pull/2230#discussion_r620013993
let attested = self
.observed_attesters
let gossip_attested = self
.observed_gossip_attesters
.read()
.index_seen_at_epoch(validator_index, epoch);
let block_attested = self
.observed_block_attesters
.read()
.index_seen_at_epoch(validator_index, epoch);
let aggregated = self
@ -3534,7 +3562,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.read()
.index_seen_at_epoch(validator_index as u64, epoch);
attested || aggregated || produced_block
gossip_attested || block_attested || aggregated || produced_block
}
}

View File

@ -519,7 +519,9 @@ where
// TODO: allow for persisting and loading the pool from disk.
observed_sync_contributions: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_attesters: <_>::default(),
observed_gossip_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_sync_contributors: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.

View File

@ -848,7 +848,7 @@ fn scrape_attestation_observation<T: BeaconChainTypes>(slot_now: Slot, chain: &B
let prev_epoch = slot_now.epoch(T::EthSpec::slots_per_epoch()) - 1;
if let Some(count) = chain
.observed_attesters
.observed_gossip_attesters
.read()
.observed_validator_count(prev_epoch)
{

View File

@ -22,6 +22,21 @@ use std::marker::PhantomData;
use types::slot_data::SlotData;
use types::{Epoch, EthSpec, Slot, Unsigned};
/// The maximum capacity of the `AutoPruningEpochContainer`.
///
/// Fits the next, current and previous epochs. We require the next epoch due to the
/// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. We require the previous epoch since the specification
/// declares:
///
/// ```ignore
/// aggregate.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE
/// >= current_slot >= aggregate.data.slot
/// ```
///
/// This means that during the current epoch we will always accept an attestation
/// from at least one slot in the previous epoch.
pub const MAX_CACHED_EPOCHS: u64 = 3;
pub type ObservedAttesters<E> = AutoPruningEpochContainer<EpochBitfield, E>;
pub type ObservedSyncContributors<E> =
AutoPruningSlotContainer<SlotSubcommitteeIndex, SyncContributorSlotHashSet<E>, E>;
@ -347,18 +362,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
/// The maximum number of epochs stored in `self`.
fn max_capacity(&self) -> u64 {
// The next, current and previous epochs. We require the next epoch due to the
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. We require the previous epoch since the
// specification delcares:
//
// ```
// aggregate.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE
// >= current_slot >= aggregate.data.slot
// ```
//
// This means that during the current epoch we will always accept an attestation
// from at least one slot in the previous epoch.
3
MAX_CACHED_EPOCHS
}
/// Updates `self` with the current epoch, removing all attestations that become expired

View File

@ -963,3 +963,114 @@ fn attestation_that_skips_epochs() {
.verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id))
.expect("should gossip verify attestation that skips slots");
}
#[test]
fn verify_aggregate_for_gossip_doppelganger_detection() {
let harness = get_harness(VALIDATOR_COUNT);
// Extend the chain out a few epochs so we have some chain depth to play with.
harness.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * 3 - 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
// Advance into a slot where there have not been blocks or attestations produced.
harness.advance_slot();
let current_slot = harness.chain.slot().expect("should get slot");
assert_eq!(
current_slot % E::slots_per_epoch(),
0,
"the test requires a new epoch to avoid already-seen errors"
);
let (valid_attestation, _attester_index, _attester_committee_index, _, _) =
get_valid_unaggregated_attestation(&harness.chain);
let (valid_aggregate, _, _) =
get_valid_aggregated_attestation(&harness.chain, valid_attestation);
harness
.chain
.verify_aggregated_attestation_for_gossip(valid_aggregate.clone())
.expect("should verify aggregate attestation");
let epoch = valid_aggregate.message.aggregate.data.target.epoch;
let index = valid_aggregate.message.aggregator_index as usize;
assert!(harness.chain.validator_seen_at_epoch(index, epoch));
// Check the correct beacon cache is populated
assert!(!harness
.chain
.observed_block_attesters
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if block attester was observed"));
assert!(!harness
.chain
.observed_gossip_attesters
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if gossip attester was observed"));
assert!(harness
.chain
.observed_aggregators
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if gossip aggregator was observed"));
}
#[test]
fn verify_attestation_for_gossip_doppelganger_detection() {
let harness = get_harness(VALIDATOR_COUNT);
// Extend the chain out a few epochs so we have some chain depth to play with.
harness.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * 3 - 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
// Advance into a slot where there have not been blocks or attestations produced.
harness.advance_slot();
let current_slot = harness.chain.slot().expect("should get slot");
assert_eq!(
current_slot % E::slots_per_epoch(),
0,
"the test requires a new epoch to avoid already-seen errors"
);
let (valid_attestation, index, _attester_committee_index, _, subnet_id) =
get_valid_unaggregated_attestation(&harness.chain);
harness
.chain
.verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), Some(subnet_id))
.expect("should verify attestation");
let epoch = valid_attestation.data.target.epoch;
assert!(harness.chain.validator_seen_at_epoch(index, epoch));
// Check the correct beacon cache is populated
assert!(!harness
.chain
.observed_block_attesters
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if block attester was observed"));
assert!(harness
.chain
.observed_gossip_attesters
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if gossip attester was observed"));
assert!(!harness
.chain
.observed_aggregators
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if gossip aggregator was observed"));
}

View File

@ -9,6 +9,7 @@ use beacon_chain::test_utils::{
use beacon_chain::{BeaconSnapshot, BlockError, ChainConfig, ChainSegmentResult};
use slasher::{Config as SlasherConfig, Slasher};
use state_processing::{
common::get_indexed_attestation,
per_block_processing::{per_block_processing, BlockSignatureStrategy},
per_slot_processing, BlockProcessingError,
};
@ -868,6 +869,52 @@ fn verify_block_for_gossip_slashing_detection() {
slasher_dir.close().unwrap();
}
#[test]
fn verify_block_for_gossip_doppelganger_detection() {
let harness = get_harness(VALIDATOR_COUNT);
let state = harness.get_current_state();
let (block, _) = harness.make_block(state.clone(), Slot::new(1));
let verified_block = harness.chain.verify_block_for_gossip(block).unwrap();
let attestations = verified_block.block.message().body().attestations().clone();
harness.chain.process_block(verified_block).unwrap();
for att in attestations.iter() {
let epoch = att.data.target.epoch;
let committee = state
.get_beacon_committee(att.data.slot, att.data.index)
.unwrap();
let indexed_attestation = get_indexed_attestation(committee.committee, att).unwrap();
for &index in &indexed_attestation.attesting_indices {
let index = index as usize;
assert!(harness.chain.validator_seen_at_epoch(index, epoch));
// Check the correct beacon cache is populated
assert!(harness
.chain
.observed_block_attesters
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if block attester was observed"));
assert!(!harness
.chain
.observed_gossip_attesters
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if gossip attester was observed"));
assert!(!harness
.chain
.observed_aggregators
.read()
.validator_has_been_observed(epoch, index)
.expect("should check if gossip aggregator was observed"));
}
}
}
#[test]
fn add_base_block_to_altair_chain() {
let mut spec = MainnetEthSpec::default_spec();