diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 83b593f9b..fe0740fa2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -406,7 +406,7 @@ pub struct BeaconChain { /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. - pub(crate) observed_blob_sidecars: RwLock>, + pub observed_blob_sidecars: RwLock>, /// Maintains a record of which validators have submitted voluntary exits. pub(crate) observed_voluntary_exits: Mutex>, /// Maintains a record of which validators we've seen proposer slashings for. diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index cc087e74a..a4750fd6e 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -420,7 +420,7 @@ pub fn validate_blob_sidecar_for_gossip( if chain .observed_blob_sidecars .read() - .is_known(&blob_sidecar) + .proposer_is_known(&blob_sidecar) .map_err(|e| GossipBlobError::BeaconChainError(e.into()))? { return Err(GossipBlobError::RepeatBlob { diff --git a/beacon_node/beacon_chain/src/observed_blob_sidecars.rs b/beacon_node/beacon_chain/src/observed_blob_sidecars.rs index 4f8496144..488faf12a 100644 --- a/beacon_node/beacon_chain/src/observed_blob_sidecars.rs +++ b/beacon_node/beacon_chain/src/observed_blob_sidecars.rs @@ -3,9 +3,10 @@ //! Only `BlobSidecar`s that have completed proposer signature verification can be added //! to this cache to reduce DoS risks. +use crate::observed_block_producers::{ProposalKey, SeenBlock}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; -use types::{BlobSidecar, EthSpec, Slot}; +use types::{BlobSidecar, EthSpec, Hash256, Slot}; #[derive(Debug, PartialEq)] pub enum Error { @@ -29,7 +30,7 @@ pub enum Error { pub struct ObservedBlobSidecars { finalized_slot: Slot, /// Stores all received blob indices for a given `(ValidatorIndex, Slot)` tuple. - items: HashMap<(u64, Slot), HashSet>, + items: HashMap, HashSet)>, _phantom: PhantomData, } @@ -50,27 +51,74 @@ impl ObservedBlobSidecars { /// /// The supplied `blob_sidecar` **MUST** have completed proposer signature verification. pub fn observe_sidecar(&mut self, blob_sidecar: &BlobSidecar) -> Result { + let block_root = blob_sidecar.block_root(); self.sanitize_blob_sidecar(blob_sidecar)?; - let did_not_exist = self + let (blob_indices, block_roots) = self .items - .entry((blob_sidecar.block_proposer_index(), blob_sidecar.slot())) - .or_insert_with(|| HashSet::with_capacity(T::max_blobs_per_block())) - .insert(blob_sidecar.index); + .entry(ProposalKey { + slot: blob_sidecar.slot(), + proposer: blob_sidecar.block_proposer_index(), + }) + .or_insert_with(|| { + ( + HashSet::with_capacity(T::max_blobs_per_block()), + HashSet::new(), + ) + }); + let did_not_exist = blob_indices.insert(blob_sidecar.index); + block_roots.insert(block_root); Ok(!did_not_exist) } /// Returns `true` if the `blob_sidecar` has already been observed in the cache within the prune window. - pub fn is_known(&self, blob_sidecar: &BlobSidecar) -> Result { + pub fn proposer_is_known(&self, blob_sidecar: &BlobSidecar) -> Result { self.sanitize_blob_sidecar(blob_sidecar)?; let is_known = self .items - .get(&(blob_sidecar.block_proposer_index(), blob_sidecar.slot())) - .map_or(false, |set| set.contains(&blob_sidecar.index)); + .get(&ProposalKey { + slot: blob_sidecar.slot(), + proposer: blob_sidecar.block_proposer_index(), + }) + .map_or(false, |(blob_indices, _block_roots)| { + blob_indices.contains(&blob_sidecar.index) + }); Ok(is_known) } + /// Returns `Ok(true)` if the `block_root` has been observed in a blob sidecar message before, `Ok(false)` if not. + /// Does not update the cache, so calling this function multiple times will continue to return + /// `Ok(false)`, until `Self::observe_proposer` is called. + /// + /// ## Errors + /// + /// - `key.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`. + /// - `key.slot` is equal to or less than the latest pruned `finalized_slot`. + pub fn proposer_has_been_observed( + &self, + slot: Slot, + proposer: u64, + block_root: Hash256, + ) -> Result { + let key = ProposalKey { slot, proposer }; + if let Some((_, block_roots)) = self.items.get(&key) { + let block_already_known = block_roots.contains(&block_root); + let no_prev_known_blocks = + block_roots.difference(&HashSet::from([block_root])).count() == 0; + + if !no_prev_known_blocks { + Ok(SeenBlock::Slashable) + } else if block_already_known { + Ok(SeenBlock::Duplicate) + } else { + Ok(SeenBlock::UniqueNonSlashable) + } + } else { + Ok(SeenBlock::UniqueNonSlashable) + } + } + fn sanitize_blob_sidecar(&self, blob_sidecar: &BlobSidecar) -> Result<(), Error> { if blob_sidecar.index >= T::max_blobs_per_block() as u64 { return Err(Error::InvalidBlobIndex(blob_sidecar.index)); @@ -93,7 +141,7 @@ impl ObservedBlobSidecars { } self.finalized_slot = finalized_slot; - self.items.retain(|k, _| k.1 > finalized_slot); + self.items.retain(|k, _| k.slot > finalized_slot); } } @@ -140,14 +188,20 @@ mod tests { 1, "only one (validator_index, slot) tuple should be present" ); + + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present") - .len(), + cached_blob_indices.len(), 1, - "only one item should be present" + "only one proposer should be present" + ); + assert_eq!( + cached_block_roots.len(), + 1, + "only one block root should be present" ); /* @@ -158,14 +212,19 @@ mod tests { assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present") - .len(), + cached_blob_indices.len(), 1, - "only one item should be present" + "only one proposer should be present" + ); + assert_eq!( + cached_block_roots.len(), + 1, + "only one block root should be present" ); /* @@ -215,15 +274,20 @@ mod tests { ); assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) + .expect("the three epochs slot should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_b, Slot::new(three_epochs))) - .expect("the three epochs slot should be present") - .len(), + cached_blob_indices.len(), 1, "only one proposer should be present" ); + assert_eq!( + cached_block_roots.len(), + 1, + "only one block root should be present" + ); /* * Check that a prune doesnt wipe later blocks @@ -239,15 +303,20 @@ mod tests { ); assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs))) + .expect("the three epochs slot should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_b, Slot::new(three_epochs))) - .expect("the three epochs slot should be present") - .len(), + cached_blob_indices.len(), 1, "only one proposer should be present" ); + assert_eq!( + cached_block_roots.len(), + 1, + "only one block root should be present" + ); } #[test] @@ -259,7 +328,7 @@ mod tests { let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0); assert_eq!( - cache.is_known(&sidecar_a), + cache.proposer_is_known(&sidecar_a), Ok(false), "no observation in empty cache" ); @@ -271,7 +340,7 @@ mod tests { ); assert_eq!( - cache.is_known(&sidecar_a), + cache.proposer_is_known(&sidecar_a), Ok(true), "observed block is indicated as true" ); @@ -284,15 +353,20 @@ mod tests { assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 1, "only one slot should be present"); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present") - .len(), + cached_blob_indices.len(), 1, "only one proposer should be present" ); + assert_eq!( + cached_block_roots.len(), + 1, + "only one block root should be present" + ); // Slot 1, proposer 0 @@ -300,7 +374,7 @@ mod tests { let sidecar_b = get_blob_sidecar(1, proposer_index_b, 0); assert_eq!( - cache.is_known(&sidecar_b), + cache.proposer_is_known(&sidecar_b), Ok(false), "no observation for new slot" ); @@ -310,7 +384,7 @@ mod tests { "can observe proposer for new slot, indicates proposer unobserved" ); assert_eq!( - cache.is_known(&sidecar_b), + cache.proposer_is_known(&sidecar_b), Ok(true), "observed block in slot 1 is indicated as true" ); @@ -322,30 +396,40 @@ mod tests { assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 2, "two slots should be present"); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present") - .len(), + cached_blob_indices.len(), 1, "only one proposer should be present in slot 0" ); assert_eq!( - cache - .items - .get(&(proposer_index_b, Slot::new(1))) - .expect("slot zero should be present") - .len(), + cached_block_roots.len(), + 1, + "only one block root should be present in slot 0" + ); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_b, Slot::new(1))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), 1, "only one proposer should be present in slot 1" ); + assert_eq!( + cached_block_roots.len(), + 1, + "only one block root should be present in slot 1" + ); // Slot 0, index 1 let sidecar_c = get_blob_sidecar(0, proposer_index_a, 1); assert_eq!( - cache.is_known(&sidecar_c), + cache.proposer_is_known(&sidecar_c), Ok(false), "no observation for new index" ); @@ -355,7 +439,7 @@ mod tests { "can observe new index, indicates sidecar unobserved for new index" ); assert_eq!( - cache.is_known(&sidecar_c), + cache.proposer_is_known(&sidecar_c), Ok(true), "observed new sidecar is indicated as true" ); @@ -367,15 +451,57 @@ mod tests { assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 2, "two slots should be present"); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); assert_eq!( - cache - .items - .get(&(proposer_index_a, Slot::new(0))) - .expect("slot zero should be present") - .len(), + cached_blob_indices.len(), 2, "two blob indices should be present in slot 0" ); + // Changing the blob index doesn't change the block root, so only one unique signed + // header should be in the cache. + assert_eq!( + cached_block_roots.len(), + 1, + "one block root should be present in slot 0" + ); + + // Create a sidecar sharing slot and proposer but with a different block root. + let mut sidecar_d: BlobSidecar = BlobSidecar { + index: sidecar_c.index, + blob: sidecar_c.blob.clone(), + kzg_commitment: sidecar_c.kzg_commitment, + kzg_proof: sidecar_c.kzg_proof, + signed_block_header: sidecar_c.signed_block_header.clone(), + kzg_commitment_inclusion_proof: sidecar_c.kzg_commitment_inclusion_proof.clone(), + }; + sidecar_d.signed_block_header.message.body_root = Hash256::repeat_byte(7); + assert_eq!( + cache.proposer_is_known(&sidecar_d), + Ok(true), + "there has been an observation for this proposer index" + ); + assert_eq!( + cache.observe_sidecar(&sidecar_d), + Ok(true), + "indicates sidecar proposer was observed" + ); + let (cached_blob_indices, cached_block_roots) = cache + .items + .get(&ProposalKey::new(proposer_index_a, Slot::new(0))) + .expect("slot zero should be present"); + assert_eq!( + cached_blob_indices.len(), + 2, + "two blob indices should be present in slot 0" + ); + assert_eq!( + cached_block_roots.len(), + 2, + "two block root should be present in slot 0" + ); // Try adding an out of bounds index let invalid_index = E::max_blobs_per_block() as u64; diff --git a/beacon_node/beacon_chain/src/observed_block_producers.rs b/beacon_node/beacon_chain/src/observed_block_producers.rs index f76fc5379..096c8bff7 100644 --- a/beacon_node/beacon_chain/src/observed_block_producers.rs +++ b/beacon_node/beacon_chain/src/observed_block_producers.rs @@ -16,9 +16,15 @@ pub enum Error { } #[derive(Eq, Hash, PartialEq, Debug, Default)] -struct ProposalKey { - slot: Slot, - proposer: u64, +pub struct ProposalKey { + pub slot: Slot, + pub proposer: u64, +} + +impl ProposalKey { + pub fn new(proposer: u64, slot: Slot) -> Self { + Self { slot, proposer } + } } /// Maintains a cache of observed `(block.slot, block.proposer)`. diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 432d91b72..92d302f56 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -113,7 +113,10 @@ pub async fn publish_block b, - Err(BlockContentsError::BlockError(BlockError::BlockIsAlreadyKnown)) => { + Err(BlockContentsError::BlockError(BlockError::BlockIsAlreadyKnown)) + | Err(BlockContentsError::BlobError( + beacon_chain::blob_verification::GossipBlobError::RepeatBlob { .. }, + )) => { // Allow the status code for duplicate blocks to be overridden based on config. return Ok(warp::reply::with_status( warp::reply::json(&ErrorMessage { @@ -185,6 +188,23 @@ pub async fn publish_block block_clone.slot() ); Err(BlockError::Slashable) + } else if chain_clone + .observed_blob_sidecars + .read() + .proposer_has_been_observed( + block_clone.slot(), + block_clone.message().proposer_index(), + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))? + .is_slashable() + { + warn!( + log_clone, + "Not publishing equivocating blob"; + "slot" => block_clone.slot() + ); + Err(BlockError::Slashable) } else { publish_block( block_clone,