create unified slashing cache (#5033)

* create unified slashing cache

* add observed slashable file

* fix broadcast validation tests

* revert block seen cache changes

* clean up slashable cache test

* check header signatures for RPC blobs

* don't throw error on RPC signature invalie
This commit is contained in:
realbigsean 2024-01-08 10:30:57 -05:00 committed by GitHub
parent f62cfc6475
commit f70c32ec70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 655 additions and 156 deletions

View File

@ -12,8 +12,8 @@ use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{ use crate::block_verification::{
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy,
signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock, signature_verify_chain_segment, verify_header_signature, BlockError, ExecutionPendingBlock,
IntoExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock,
}; };
use crate::block_verification_types::{ use crate::block_verification_types::{
AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, RpcBlock, AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, RpcBlock,
@ -52,6 +52,7 @@ use crate::observed_attesters::{
use crate::observed_blob_sidecars::ObservedBlobSidecars; use crate::observed_blob_sidecars::ObservedBlobSidecars;
use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_fork_choice::PersistedForkChoice; use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::pre_finalization_cache::PreFinalizationBlockCache;
@ -407,6 +408,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>, pub observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
/// Maintains a record of blob sidecars seen over the gossip network. /// Maintains a record of blob sidecars seen over the gossip network.
pub observed_blob_sidecars: RwLock<ObservedBlobSidecars<T::EthSpec>>, pub observed_blob_sidecars: RwLock<ObservedBlobSidecars<T::EthSpec>>,
/// Maintains a record of slashable message seen over the gossip network or RPC.
pub observed_slashable: RwLock<ObservedSlashable<T::EthSpec>>,
/// Maintains a record of which validators have submitted voluntary exits. /// Maintains a record of which validators have submitted voluntary exits.
pub(crate) observed_voluntary_exits: Mutex<ObservedOperations<SignedVoluntaryExit, T::EthSpec>>, pub(crate) observed_voluntary_exits: Mutex<ObservedOperations<SignedVoluntaryExit, T::EthSpec>>,
/// Maintains a record of which validators we've seen proposer slashings for. /// Maintains a record of which validators we've seen proposer slashings for.
@ -3157,9 +3160,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256, block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>, blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> { ) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() { // Need to scope this to ensure the lock is dropped before calling `process_availability`
for blob_sidecar in blobs.iter().filter_map(|blob| blob.clone()) { // Even an explicit drop is not enough to convince the borrow checker.
slasher.accept_block_header(blob_sidecar.signed_block_header.clone()); {
let mut slashable_cache = self.observed_slashable.write();
for header in blobs
.into_iter()
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
.unique()
{
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
} }
} }
let availability = self let availability = self

View File

@ -599,6 +599,16 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
}); });
} }
chain
.observed_slashable
.write()
.observe_slashable(
blob_sidecar.slot(),
blob_sidecar.block_proposer_index(),
block_root,
)
.map_err(|e| GossipBlobError::BeaconChainError(e.into()))?;
// Now the signature is valid, store the proposal so we don't accept another blob sidecar // Now the signature is valid, store the proposal so we don't accept another blob sidecar
// with the same `BlobIdentifier`. // with the same `BlobIdentifier`.
// It's important to double-check that the proposer still hasn't been observed so we don't // It's important to double-check that the proposer still hasn't been observed so we don't

View File

@ -946,6 +946,11 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
return Err(BlockError::ProposalSignatureInvalid); return Err(BlockError::ProposalSignatureInvalid);
} }
chain
.observed_slashable
.write()
.observe_slashable(block.slot(), block.message().proposer_index(), block_root)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
// Now the signature is valid, store the proposal so we don't accept another from this // Now the signature is valid, store the proposal so we don't accept another from this
// validator and slot. // validator and slot.
// //
@ -1241,6 +1246,12 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
chain: &Arc<BeaconChain<T>>, chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer, notify_execution_layer: NotifyExecutionLayer,
) -> Result<Self, BlockError<T::EthSpec>> { ) -> Result<Self, BlockError<T::EthSpec>> {
chain
.observed_slashable
.write()
.observe_slashable(block.slot(), block.message().proposer_index(), block_root)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
chain chain
.observed_block_producers .observed_block_producers
.write() .write()
@ -2066,7 +2077,7 @@ fn get_signature_verifier<'a, T: BeaconChainTypes>(
/// Verify that `header` was signed with a valid signature from its proposer. /// Verify that `header` was signed with a valid signature from its proposer.
/// ///
/// Return `Ok(())` if the signature is valid, and an `Err` otherwise. /// Return `Ok(())` if the signature is valid, and an `Err` otherwise.
fn verify_header_signature<T: BeaconChainTypes, Err: BlockBlobError>( pub fn verify_header_signature<T: BeaconChainTypes, Err: BlockBlobError>(
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
header: &SignedBeaconBlockHeader, header: &SignedBeaconBlockHeader,
) -> Result<(), Err> { ) -> Result<(), Err> {

View File

@ -880,6 +880,7 @@ where
// TODO: allow for persisting and loading the pool from disk. // TODO: allow for persisting and loading the pool from disk.
observed_block_producers: <_>::default(), observed_block_producers: <_>::default(),
observed_blob_sidecars: <_>::default(), observed_blob_sidecars: <_>::default(),
observed_slashable: <_>::default(),
observed_voluntary_exits: <_>::default(), observed_voluntary_exits: <_>::default(),
observed_proposer_slashings: <_>::default(), observed_proposer_slashings: <_>::default(),
observed_attester_slashings: <_>::default(), observed_attester_slashings: <_>::default(),

View File

@ -991,6 +991,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.start_slot(T::EthSpec::slots_per_epoch()), .start_slot(T::EthSpec::slots_per_epoch()),
); );
self.observed_slashable.write().prune(
new_view
.finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()),
);
self.snapshot_cache self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| { .map(|mut snapshot_cache| {

View File

@ -40,6 +40,7 @@ mod observed_attesters;
mod observed_blob_sidecars; mod observed_blob_sidecars;
pub mod observed_block_producers; pub mod observed_block_producers;
pub mod observed_operations; pub mod observed_operations;
mod observed_slashable;
pub mod otb_verification_service; pub mod otb_verification_service;
mod persisted_beacon_chain; mod persisted_beacon_chain;
mod persisted_fork_choice; mod persisted_fork_choice;

View File

@ -3,10 +3,10 @@
//! Only `BlobSidecar`s that have completed proposer signature verification can be added //! Only `BlobSidecar`s that have completed proposer signature verification can be added
//! to this cache to reduce DoS risks. //! to this cache to reduce DoS risks.
use crate::observed_block_producers::{ProposalKey, SeenBlock}; use crate::observed_block_producers::ProposalKey;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::marker::PhantomData; use std::marker::PhantomData;
use types::{BlobSidecar, EthSpec, Hash256, Slot}; use types::{BlobSidecar, EthSpec, Slot};
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
@ -30,7 +30,7 @@ pub enum Error {
pub struct ObservedBlobSidecars<T: EthSpec> { pub struct ObservedBlobSidecars<T: EthSpec> {
finalized_slot: Slot, finalized_slot: Slot,
/// Stores all received blob indices for a given `(ValidatorIndex, Slot)` tuple. /// Stores all received blob indices for a given `(ValidatorIndex, Slot)` tuple.
items: HashMap<ProposalKey, (HashSet<u64>, HashSet<Hash256>)>, items: HashMap<ProposalKey, HashSet<u64>>,
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
@ -51,23 +51,16 @@ impl<T: EthSpec> ObservedBlobSidecars<T> {
/// ///
/// The supplied `blob_sidecar` **MUST** have completed proposer signature verification. /// The supplied `blob_sidecar` **MUST** have completed proposer signature verification.
pub fn observe_sidecar(&mut self, blob_sidecar: &BlobSidecar<T>) -> Result<bool, Error> { pub fn observe_sidecar(&mut self, blob_sidecar: &BlobSidecar<T>) -> Result<bool, Error> {
let block_root = blob_sidecar.block_root();
self.sanitize_blob_sidecar(blob_sidecar)?; self.sanitize_blob_sidecar(blob_sidecar)?;
let (blob_indices, block_roots) = self let blob_indices = self
.items .items
.entry(ProposalKey { .entry(ProposalKey {
slot: blob_sidecar.slot(), slot: blob_sidecar.slot(),
proposer: blob_sidecar.block_proposer_index(), proposer: blob_sidecar.block_proposer_index(),
}) })
.or_insert_with(|| { .or_insert_with(|| HashSet::with_capacity(T::max_blobs_per_block()));
(
HashSet::with_capacity(T::max_blobs_per_block()),
HashSet::new(),
)
});
let did_not_exist = blob_indices.insert(blob_sidecar.index); let did_not_exist = blob_indices.insert(blob_sidecar.index);
block_roots.insert(block_root);
Ok(!did_not_exist) Ok(!did_not_exist)
} }
@ -81,44 +74,12 @@ impl<T: EthSpec> ObservedBlobSidecars<T> {
slot: blob_sidecar.slot(), slot: blob_sidecar.slot(),
proposer: blob_sidecar.block_proposer_index(), proposer: blob_sidecar.block_proposer_index(),
}) })
.map_or(false, |(blob_indices, _block_roots)| { .map_or(false, |blob_indices| {
blob_indices.contains(&blob_sidecar.index) blob_indices.contains(&blob_sidecar.index)
}); });
Ok(is_known) Ok(is_known)
} }
/// Returns `Ok(true)` if the `block_root` has been observed in a blob sidecar message before, `Ok(false)` if not.
/// Does not update the cache, so calling this function multiple times will continue to return
/// `Ok(false)`, until `Self::observe_proposer` is called.
///
/// ## Errors
///
/// - `key.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is equal to or less than the latest pruned `finalized_slot`.
pub fn proposer_has_been_observed(
&self,
slot: Slot,
proposer: u64,
block_root: Hash256,
) -> Result<SeenBlock, Error> {
let key = ProposalKey { slot, proposer };
if let Some((_, block_roots)) = self.items.get(&key) {
let block_already_known = block_roots.contains(&block_root);
let no_prev_known_blocks =
block_roots.difference(&HashSet::from([block_root])).count() == 0;
if !no_prev_known_blocks {
Ok(SeenBlock::Slashable)
} else if block_already_known {
Ok(SeenBlock::Duplicate)
} else {
Ok(SeenBlock::UniqueNonSlashable)
}
} else {
Ok(SeenBlock::UniqueNonSlashable)
}
}
fn sanitize_blob_sidecar(&self, blob_sidecar: &BlobSidecar<T>) -> Result<(), Error> { fn sanitize_blob_sidecar(&self, blob_sidecar: &BlobSidecar<T>) -> Result<(), Error> {
if blob_sidecar.index >= T::max_blobs_per_block() as u64 { if blob_sidecar.index >= T::max_blobs_per_block() as u64 {
return Err(Error::InvalidBlobIndex(blob_sidecar.index)); return Err(Error::InvalidBlobIndex(blob_sidecar.index));
@ -148,6 +109,7 @@ impl<T: EthSpec> ObservedBlobSidecars<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bls::Hash256;
use std::sync::Arc; use std::sync::Arc;
use types::{BlobSidecar, MainnetEthSpec}; use types::{BlobSidecar, MainnetEthSpec};
@ -189,7 +151,7 @@ mod tests {
"only one (validator_index, slot) tuple should be present" "only one (validator_index, slot) tuple should be present"
); );
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0))) .get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -198,11 +160,6 @@ mod tests {
1, 1,
"only one proposer should be present" "only one proposer should be present"
); );
assert_eq!(
cached_block_roots.len(),
1,
"only one block root should be present"
);
/* /*
* Check that a prune at the genesis slot does nothing. * Check that a prune at the genesis slot does nothing.
@ -212,7 +169,7 @@ mod tests {
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!(cache.items.len(), 1, "only one slot should be present");
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0))) .get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -221,11 +178,6 @@ mod tests {
1, 1,
"only one proposer should be present" "only one proposer should be present"
); );
assert_eq!(
cached_block_roots.len(),
1,
"only one block root should be present"
);
/* /*
* Check that a prune empties the cache * Check that a prune empties the cache
@ -274,7 +226,7 @@ mod tests {
); );
assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!(cache.items.len(), 1, "only one slot should be present");
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs)))
.expect("the three epochs slot should be present"); .expect("the three epochs slot should be present");
@ -283,11 +235,6 @@ mod tests {
1, 1,
"only one proposer should be present" "only one proposer should be present"
); );
assert_eq!(
cached_block_roots.len(),
1,
"only one block root should be present"
);
/* /*
* Check that a prune doesnt wipe later blocks * Check that a prune doesnt wipe later blocks
@ -303,7 +250,7 @@ mod tests {
); );
assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!(cache.items.len(), 1, "only one slot should be present");
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs)))
.expect("the three epochs slot should be present"); .expect("the three epochs slot should be present");
@ -312,11 +259,6 @@ mod tests {
1, 1,
"only one proposer should be present" "only one proposer should be present"
); );
assert_eq!(
cached_block_roots.len(),
1,
"only one block root should be present"
);
} }
#[test] #[test]
@ -353,7 +295,7 @@ mod tests {
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!(cache.items.len(), 1, "only one slot should be present");
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0))) .get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -362,11 +304,6 @@ mod tests {
1, 1,
"only one proposer should be present" "only one proposer should be present"
); );
assert_eq!(
cached_block_roots.len(),
1,
"only one block root should be present"
);
// Slot 1, proposer 0 // Slot 1, proposer 0
@ -396,7 +333,7 @@ mod tests {
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 2, "two slots should be present"); assert_eq!(cache.items.len(), 2, "two slots should be present");
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0))) .get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -405,12 +342,7 @@ mod tests {
1, 1,
"only one proposer should be present in slot 0" "only one proposer should be present in slot 0"
); );
assert_eq!( let cached_blob_indices = cache
cached_block_roots.len(),
1,
"only one block root should be present in slot 0"
);
let (cached_blob_indices, cached_block_roots) = cache
.items .items
.get(&ProposalKey::new(proposer_index_b, Slot::new(1))) .get(&ProposalKey::new(proposer_index_b, Slot::new(1)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -419,11 +351,6 @@ mod tests {
1, 1,
"only one proposer should be present in slot 1" "only one proposer should be present in slot 1"
); );
assert_eq!(
cached_block_roots.len(),
1,
"only one block root should be present in slot 1"
);
// Slot 0, index 1 // Slot 0, index 1
let sidecar_c = get_blob_sidecar(0, proposer_index_a, 1); let sidecar_c = get_blob_sidecar(0, proposer_index_a, 1);
@ -451,7 +378,7 @@ mod tests {
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 2, "two slots should be present"); assert_eq!(cache.items.len(), 2, "two slots should be present");
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0))) .get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -460,13 +387,6 @@ mod tests {
2, 2,
"two blob indices should be present in slot 0" "two blob indices should be present in slot 0"
); );
// Changing the blob index doesn't change the block root, so only one unique signed
// header should be in the cache.
assert_eq!(
cached_block_roots.len(),
1,
"one block root should be present in slot 0"
);
// Create a sidecar sharing slot and proposer but with a different block root. // Create a sidecar sharing slot and proposer but with a different block root.
let mut sidecar_d: BlobSidecar<E> = BlobSidecar { let mut sidecar_d: BlobSidecar<E> = BlobSidecar {
@ -488,7 +408,7 @@ mod tests {
Ok(true), Ok(true),
"indicates sidecar proposer was observed" "indicates sidecar proposer was observed"
); );
let (cached_blob_indices, cached_block_roots) = cache let cached_blob_indices = cache
.items .items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0))) .get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present"); .expect("slot zero should be present");
@ -497,11 +417,6 @@ mod tests {
2, 2,
"two blob indices should be present in slot 0" "two blob indices should be present in slot 0"
); );
assert_eq!(
cached_block_roots.len(),
2,
"two block root should be present in slot 0"
);
// Try adding an out of bounds index // Try adding an out of bounds index
let invalid_index = E::max_blobs_per_block() as u64; let invalid_index = E::max_blobs_per_block() as u64;

View File

@ -0,0 +1,486 @@
//! Provides the `ObservedSlashable` struct which tracks slashable messages seen in
//! gossip or via RPC. Useful in supporting `broadcast_validation` in the Beacon API.
use crate::observed_block_producers::Error;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use types::{EthSpec, Hash256, Slot, Unsigned};
#[derive(Eq, Hash, PartialEq, Debug, Default)]
pub struct ProposalKey {
pub slot: Slot,
pub proposer: u64,
}
/// Maintains a cache of observed `(block.slot, block.proposer)`.
///
/// The cache supports pruning based upon the finalized epoch. It does not automatically prune, you
/// must call `Self::prune` manually.
///
/// The maximum size of the cache is determined by `slots_since_finality *
/// VALIDATOR_REGISTRY_LIMIT`. This is quite a large size, so it's important that upstream
/// functions only use this cache for blocks with a valid signature. Only allowing valid signed
/// blocks reduces the theoretical maximum size of this cache to `slots_since_finality *
/// active_validator_count`, however in reality that is more like `slots_since_finality *
/// known_distinct_shufflings` which is much smaller.
pub struct ObservedSlashable<E: EthSpec> {
finalized_slot: Slot,
items: HashMap<ProposalKey, HashSet<Hash256>>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> Default for ObservedSlashable<E> {
/// Instantiates `Self` with `finalized_slot == 0`.
fn default() -> Self {
Self {
finalized_slot: Slot::new(0),
items: HashMap::new(),
_phantom: PhantomData,
}
}
}
impl<E: EthSpec> ObservedSlashable<E> {
/// Observe that the `header` was produced by `header.proposer_index` at `header.slot`. This will
/// update `self` so future calls to it indicate that this block is known.
///
/// The supplied `block` **MUST** be signature verified (see struct-level documentation).
///
/// ## Errors
///
/// - `header.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`.
/// - `header.slot` is equal to or less than the latest pruned `finalized_slot`.
pub fn observe_slashable(
&mut self,
slot: Slot,
proposer_index: u64,
block_root: Hash256,
) -> Result<(), Error> {
self.sanitize_header(slot, proposer_index)?;
let key = ProposalKey {
slot,
proposer: proposer_index,
};
let entry = self.items.entry(key);
match entry {
Entry::Occupied(mut occupied_entry) => {
let block_roots = occupied_entry.get_mut();
block_roots.insert(block_root);
}
Entry::Vacant(vacant_entry) => {
let block_roots = HashSet::from([block_root]);
vacant_entry.insert(block_roots);
}
}
Ok(())
}
/// Returns `Ok(true)` if the `block_root` is slashable, `Ok(false)` if not. Does not
/// update the cache, so calling this function multiple times will continue to return
/// `Ok(false)`, until `Self::observe_proposer` is called.
///
/// ## Errors
///
/// - `proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`.
/// - `slot` is equal to or less than the latest pruned `finalized_slot`.
pub fn is_slashable(
&self,
slot: Slot,
proposer_index: u64,
block_root: Hash256,
) -> Result<bool, Error> {
self.sanitize_header(slot, proposer_index)?;
let key = ProposalKey {
slot,
proposer: proposer_index,
};
if let Some(block_roots) = self.items.get(&key) {
let no_prev_known_blocks =
block_roots.difference(&HashSet::from([block_root])).count() == 0;
Ok(!no_prev_known_blocks)
} else {
Ok(false)
}
}
/// Returns `Ok(())` if the given `header` is sane.
fn sanitize_header(&self, slot: Slot, proposer_index: u64) -> Result<(), Error> {
if proposer_index >= E::ValidatorRegistryLimit::to_u64() {
return Err(Error::ValidatorIndexTooHigh(proposer_index));
}
let finalized_slot = self.finalized_slot;
if finalized_slot > 0 && slot <= finalized_slot {
return Err(Error::FinalizedBlock {
slot,
finalized_slot,
});
}
Ok(())
}
/// Removes all observations of blocks equal to or earlier than `finalized_slot`.
///
/// Stores `finalized_slot` in `self`, so that `self` will reject any block that has a slot
/// equal to or less than `finalized_slot`.
///
/// No-op if `finalized_slot == 0`.
pub fn prune(&mut self, finalized_slot: Slot) {
if finalized_slot == 0 {
return;
}
self.finalized_slot = finalized_slot;
self.items.retain(|key, _| key.slot > finalized_slot);
}
}
#[cfg(test)]
mod tests {
use super::*;
use types::{BeaconBlock, Graffiti, MainnetEthSpec};
type E = MainnetEthSpec;
fn get_block(slot: u64, proposer: u64) -> BeaconBlock<E> {
let mut block = BeaconBlock::empty(&E::default_spec());
*block.slot_mut() = slot.into();
*block.proposer_index_mut() = proposer;
block
}
#[test]
fn pruning() {
let mut cache = ObservedSlashable::<E>::default();
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 0, "no slots should be present");
// Slot 0, proposer 0
let block_a = get_block(0, 0);
let block_root = block_a.canonical_root();
assert_eq!(
cache.observe_slashable(block_a.slot(), block_a.proposer_index(), block_root),
Ok(()),
"can observe proposer"
);
/*
* Preconditions.
*/
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present");
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(0),
proposer: 0
})
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present"
);
/*
* Check that a prune at the genesis slot does nothing.
*/
cache.prune(Slot::new(0));
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present");
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(0),
proposer: 0
})
.expect("slot zero should be present")
.len(),
1,
"only one block root should be present"
);
/*
* Check that a prune empties the cache
*/
cache.prune(E::slots_per_epoch().into());
assert_eq!(
cache.finalized_slot,
Slot::from(E::slots_per_epoch()),
"finalized slot is updated"
);
assert_eq!(cache.items.len(), 0, "no items left");
/*
* Check that we can't insert a finalized block
*/
// First slot of finalized epoch, proposer 0
let block_b = get_block(E::slots_per_epoch(), 0);
let block_root_b = block_b.canonical_root();
assert_eq!(
cache.observe_slashable(block_b.slot(), block_b.proposer_index(), block_root_b),
Err(Error::FinalizedBlock {
slot: E::slots_per_epoch().into(),
finalized_slot: E::slots_per_epoch().into(),
}),
"cant insert finalized block"
);
assert_eq!(cache.items.len(), 0, "block was not added");
/*
* Check that we _can_ insert a non-finalized block
*/
let three_epochs = E::slots_per_epoch() * 3;
// First slot of finalized epoch, proposer 0
let block_b = get_block(three_epochs, 0);
assert_eq!(
cache.observe_slashable(block_b.slot(), block_b.proposer_index(), block_root_b),
Ok(()),
"can insert non-finalized block"
);
assert_eq!(cache.items.len(), 1, "only one slot should be present");
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(three_epochs),
proposer: 0
})
.expect("the three epochs slot should be present")
.len(),
1,
"only one proposer should be present"
);
/*
* Check that a prune doesnt wipe later blocks
*/
let two_epochs = E::slots_per_epoch() * 2;
cache.prune(two_epochs.into());
assert_eq!(
cache.finalized_slot,
Slot::from(two_epochs),
"finalized slot is updated"
);
assert_eq!(cache.items.len(), 1, "only one slot should be present");
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(three_epochs),
proposer: 0
})
.expect("the three epochs slot should be present")
.len(),
1,
"only one block root should be present"
);
}
#[test]
fn simple_observations() {
let mut cache = ObservedSlashable::<E>::default();
// Slot 0, proposer 0
let block_a = get_block(0, 0);
let block_root_a = block_a.canonical_root();
assert_eq!(
cache.is_slashable(
block_a.slot(),
block_a.proposer_index(),
block_a.canonical_root()
),
Ok(false),
"no observation in empty cache"
);
assert_eq!(
cache.observe_slashable(block_a.slot(), block_a.proposer_index(), block_root_a),
Ok(()),
"can observe proposer"
);
assert_eq!(
cache.is_slashable(
block_a.slot(),
block_a.proposer_index(),
block_a.canonical_root()
),
Ok(false),
"observed but unslashed block"
);
assert_eq!(
cache.observe_slashable(block_a.slot(), block_a.proposer_index(), block_root_a),
Ok(()),
"observing again"
);
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present");
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(0),
proposer: 0
})
.expect("slot zero should be present")
.len(),
1,
"only one block root should be present"
);
// Slot 1, proposer 0
let block_b = get_block(1, 0);
let block_root_b = block_b.canonical_root();
assert_eq!(
cache.is_slashable(
block_b.slot(),
block_b.proposer_index(),
block_b.canonical_root()
),
Ok(false),
"not slashable for new slot"
);
assert_eq!(
cache.observe_slashable(block_b.slot(), block_b.proposer_index(), block_root_b),
Ok(()),
"can observe proposer for new slot"
);
assert_eq!(
cache.is_slashable(
block_b.slot(),
block_b.proposer_index(),
block_b.canonical_root()
),
Ok(false),
"observed but not slashable block in slot 1"
);
assert_eq!(
cache.observe_slashable(block_b.slot(), block_b.proposer_index(), block_root_b),
Ok(()),
"observing slot 1 again"
);
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 2, "two slots should be present");
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(0),
proposer: 0
})
.expect("slot zero should be present")
.len(),
1,
"only one block root should be present in slot 0"
);
assert_eq!(
cache
.items
.get(&ProposalKey {
slot: Slot::new(1),
proposer: 0
})
.expect("slot zero should be present")
.len(),
1,
"only one block root should be present in slot 1"
);
// Slot 0, proposer 1
let block_c = get_block(0, 1);
let block_root_c = block_c.canonical_root();
assert_eq!(
cache.is_slashable(
block_c.slot(),
block_c.proposer_index(),
block_c.canonical_root()
),
Ok(false),
"not slashable due to new proposer"
);
assert_eq!(
cache.observe_slashable(block_c.slot(), block_c.proposer_index(), block_root_c),
Ok(()),
"can observe new proposer, indicates proposer unobserved"
);
assert_eq!(
cache.is_slashable(
block_c.slot(),
block_c.proposer_index(),
block_c.canonical_root()
),
Ok(false),
"not slashable due to new proposer"
);
assert_eq!(
cache.observe_slashable(block_c.slot(), block_c.proposer_index(), block_root_c),
Ok(()),
"observing new proposer again"
);
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 3, "three slots should be present");
assert_eq!(
cache
.items
.iter()
.filter(|(k, _)| k.slot == cache.finalized_slot)
.count(),
2,
"two proposers should be present in slot 0"
);
assert_eq!(
cache
.items
.iter()
.filter(|(k, _)| k.slot == Slot::new(1))
.count(),
1,
"only one proposer should be present in slot 1"
);
// Slot 0, proposer 1 (again)
let mut block_d = get_block(0, 1);
*block_d.body_mut().graffiti_mut() = Graffiti::from(*b"this is slashable ");
let block_root_d = block_d.canonical_root();
assert_eq!(
cache.is_slashable(
block_d.slot(),
block_d.proposer_index(),
block_d.canonical_root()
),
Ok(true),
"slashable due to new proposer"
);
assert_eq!(
cache.observe_slashable(block_d.slot(), block_d.proposer_index(), block_root_d),
Ok(()),
"can observe new proposer, indicates proposer unobserved"
);
}
}

View File

@ -1757,6 +1757,32 @@ where
((signed_block, blobs), state) ((signed_block, blobs), state)
} }
pub async fn make_blob_with_modifier(
&self,
state: BeaconState<E>,
slot: Slot,
blob_modifier: impl FnOnce(&mut BlobsList<E>),
) -> (SignedBlockContentsTuple<E>, BeaconState<E>) {
assert_ne!(slot, 0, "can't produce a block at slot 0");
assert!(slot >= state.slot());
let ((block, mut blobs), state) = self.make_block_return_pre_state(state, slot).await;
let (block, _) = block.deconstruct();
blob_modifier(&mut blobs.as_mut().unwrap().1);
let proposer_index = state.get_beacon_proposer_index(slot, &self.spec).unwrap();
let signed_block = block.sign(
&self.validator_keypairs[proposer_index].sk,
&state.fork(),
state.genesis_validators_root(),
&self.spec,
);
((signed_block, blobs), state)
}
pub fn make_deposits<'a>( pub fn make_deposits<'a>(
&self, &self,
state: &'a mut BeaconState<E>, state: &'a mut BeaconState<E>,

View File

@ -60,7 +60,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
ProvenancedBlock::Local(block_contents, _) => (block_contents, true), ProvenancedBlock::Local(block_contents, _) => (block_contents, true),
ProvenancedBlock::Builder(block_contents, _) => (block_contents, false), ProvenancedBlock::Builder(block_contents, _) => (block_contents, false),
}; };
let block = block_contents.inner_block(); let block = block_contents.inner_block().clone();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
debug!(log, "Signed block received in HTTP API"; "slot" => block.slot()); debug!(log, "Signed block received in HTTP API"; "slot" => block.slot());
@ -175,45 +175,20 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
seen_timestamp, seen_timestamp,
), ),
BroadcastValidation::ConsensusAndEquivocation => { BroadcastValidation::ConsensusAndEquivocation => {
if chain_clone check_slashable(
.observed_block_producers &chain_clone,
.read() &blobs_opt,
.proposer_has_been_observed(block_clone.message(), block_root) block_root,
.map_err(|e| BlockError::BeaconChainError(e.into()))? &block_clone,
.is_slashable() &log_clone,
{ )?;
warn!( publish_block(
log_clone, block_clone,
"Not publishing equivocating block"; blobs_opt,
"slot" => block_clone.slot() sender_clone,
); log_clone,
Err(BlockError::Slashable) seen_timestamp,
} else if chain_clone )
.observed_blob_sidecars
.read()
.proposer_has_been_observed(
block_clone.slot(),
block_clone.message().proposer_index(),
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
.is_slashable()
{
warn!(
log_clone,
"Not publishing equivocating blob";
"slot" => block_clone.slot()
);
Err(BlockError::Slashable)
} else {
publish_block(
block_clone,
blobs_opt,
sender_clone,
log_clone,
seen_timestamp,
)
}
} }
}; };
@ -470,3 +445,46 @@ fn late_block_logging<T: BeaconChainTypes, P: AbstractExecPayload<T::EthSpec>>(
) )
} }
} }
/// Check if any of the blobs or the block are slashable. Returns `BlockError::Slashable` if so.
fn check_slashable<T: BeaconChainTypes>(
chain_clone: &BeaconChain<T>,
blobs_opt: &Option<BlobSidecarList<T::EthSpec>>,
block_root: Hash256,
block_clone: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
log_clone: &Logger,
) -> Result<(), BlockError<T::EthSpec>> {
let slashable_cache = chain_clone.observed_slashable.read();
if let Some(blobs) = blobs_opt.as_ref() {
blobs.iter().try_for_each(|blob| {
if slashable_cache
.is_slashable(blob.slot(), blob.block_proposer_index(), blob.block_root())
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
warn!(
log_clone,
"Not publishing equivocating blob";
"slot" => block_clone.slot()
);
return Err(BlockError::Slashable);
}
Ok(())
})?;
};
if slashable_cache
.is_slashable(
block_clone.slot(),
block_clone.message().proposer_index(),
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
warn!(
log_clone,
"Not publishing equivocating block";
"slot" => block_clone.slot()
);
return Err(BlockError::Slashable);
}
Ok(())
}

View File

@ -1226,9 +1226,13 @@ pub async fn blinded_equivocation_gossip() {
); );
} }
/// This test checks that a block that is valid from both a gossip and consensus perspective but that equivocates **late** is rejected when using `broadcast_validation=consensus_and_equivocation`. /// This test checks that a block that is valid from both a gossip and
/// consensus perspective but that equivocates **late** is rejected when using
/// `broadcast_validation=consensus_and_equivocation`.
/// ///
/// This test is unique in that we can't actually test the HTTP API directly, but instead have to hook into the `publish_blocks` code manually. This is in order to handle the late equivocation case. /// This test is unique in that we can't actually test the HTTP API directly,
/// but instead have to hook into the `publish_blocks` code manually. This is
/// in order to handle the late equivocation case.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn blinded_equivocation_consensus_late_equivocation() { pub async fn blinded_equivocation_consensus_late_equivocation() {
/* this test targets gossip-level validation */ /* this test targets gossip-level validation */

View File

@ -155,13 +155,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Checks if a block from this proposer is already known. // Checks if a block from this proposer is already known.
let block_equivocates = || { let block_equivocates = || {
match self match self.chain.observed_slashable.read().is_slashable(
.chain block.slot(),
.observed_block_producers block.message().proposer_index(),
.read() block.canonical_root(),
.proposer_has_been_observed(block.message(), block.canonical_root()) ) {
{ Ok(is_slashable) => is_slashable,
Ok(seen_status) => seen_status.is_slashable(),
//Both of these blocks will be rejected, so reject them now rather //Both of these blocks will be rejected, so reject them now rather
// than re-queuing them. // than re-queuing them.
Err(ObserveError::FinalizedBlock { .. }) Err(ObserveError::FinalizedBlock { .. })