From be11437c27720b33652bf03cf4fc0d872ba5469b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 22 Sep 2021 08:49:41 +0000 Subject: [PATCH] Batch BLS verification for attestations (#2399) ## Issue Addressed NA ## Proposed Changes Adds the ability to verify batches of aggregated/unaggregated attestations from the network. When the `BeaconProcessor` finds there are messages in the aggregated or unaggregated attestation queues, it will first check the length of the queue: - `== 1` verify the attestation individually. - `>= 2` take up to 64 of those attestations and verify them in a batch. Notably, we only perform batch verification if the queue has a backlog. We don't apply any artificial delays to attestations to try and force them into batches. ### Batching Details To assist with implementing batches we modify `beacon_chain::attestation_verification` to have two distinct categories for attestations: - *Indexed* attestations: those which have passed initial validation and were valid enough for us to derive an `IndexedAttestation`. - *Verified* attestations: those attestations which were indexed *and also* passed signature verification. These are well-formed, interesting messages which were signed by validators. The batching functions accept `n` attestations and then return `n` attestation verification `Result`s, where those `Result`s can be any combination of `Ok` or `Err`. In other words, we attempt to verify as many attestations as possible and return specific per-attestation results so peer scores can be updated, if required. When we batch verify attestations, we first try to map all those attestations to *indexed* attestations. If any of those attestations were able to be indexed, we then perform batch BLS verification on those indexed attestations. If the batch verification succeeds, we convert them into *verified* attestations, disabling individual signature checking. If the batch fails, we convert to verified attestations with individual signature checking enabled. Ultimately, we optimistically try to do a batch verification of attestation signatures and fall-back to individual verification if it fails. This opens an attach vector for "poisoning" the attestations and causing us to waste a batch verification. I argue that peer scoring should do a good-enough job of defending against this and the typical-case gains massively outweigh the worst-case losses. ## Additional Info Before this PR, attestation verification took the attestations by value (instead of by reference). It turns out that this was unnecessary and, in my opinion, resulted in some undesirable ergonomics (e.g., we had to pass the attestation back in the `Err` variant to avoid clones). In this PR I've modified attestation verification so that it now takes a reference. I refactored the `beacon_chain/tests/attestation_verification.rs` tests so they use a builder-esque "tester" struct instead of a weird macro. It made it easier for me to test individual/batch with the same set of tests and I think it was a nice tidy-up. Notably, I did this last to try and make sure my new refactors to *actual* production code would pass under the existing test suite. --- .../src/attestation_verification.rs | 451 ++++-- .../src/attestation_verification/batch.rs | 222 +++ beacon_node/beacon_chain/src/beacon_chain.rs | 68 +- beacon_node/beacon_chain/src/metrics.rs | 20 + beacon_node/beacon_chain/src/test_utils.rs | 51 +- .../tests/attestation_verification.rs | 1428 +++++++++-------- beacon_node/beacon_chain/tests/store_tests.rs | 4 +- beacon_node/beacon_chain/tests/tests.rs | 4 +- beacon_node/http_api/src/lib.rs | 14 +- .../network/src/beacon_processor/mod.rs | 167 +- .../beacon_processor/worker/gossip_methods.rs | 567 +++++-- .../src/beacon_processor/worker/mod.rs | 1 + consensus/fork_choice/tests/tests.rs | 2 +- 13 files changed, 1962 insertions(+), 1037 deletions(-) create mode 100644 beacon_node/beacon_chain/src/attestation_verification/batch.rs diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index a6233cde4..eb29e8d2f 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -18,13 +18,16 @@ //! types::Attestation types::SignedAggregateAndProof //! | | //! ▼ ▼ -//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation +//! IndexedUnaggregatedAttestation IndexedAggregatedAttestation +//! | | +//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation //! | | //! ------------------------------------- //! | //! ▼ -//! impl SignatureVerifiedAttestation +//! impl VerifiedAttestation //! ``` +mod batch; use crate::{ beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT}, @@ -53,6 +56,8 @@ use types::{ SelectionProof, SignedAggregateAndProof, Slot, SubnetId, }; +pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; + /// Returned when an attestation was not successfully verified. It might not have been verified for /// two reasons: /// @@ -254,53 +259,105 @@ impl From for Error { } } -/// Wraps a `SignedAggregateAndProof` that has been verified for propagation on the gossip network. -pub struct VerifiedAggregatedAttestation { - signed_aggregate: SignedAggregateAndProof, +/// Used to avoid double-checking signatures. +#[derive(Copy, Clone)] +enum CheckAttestationSignature { + Yes, + No, +} + +/// Wraps a `SignedAggregateAndProof` that has been verified up until the point that an +/// `IndexedAttestation` can be derived. +/// +/// These attestations have *not* undergone signature verification. +struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> { + signed_aggregate: &'a SignedAggregateAndProof, + indexed_attestation: IndexedAttestation, + attestation_root: Hash256, +} + +/// Wraps a `Attestation` that has been verified up until the point that an `IndexedAttestation` can +/// be derived. +/// +/// These attestations have *not* undergone signature verification. +struct IndexedUnaggregatedAttestation<'a, T: BeaconChainTypes> { + attestation: &'a Attestation, + indexed_attestation: IndexedAttestation, + subnet_id: SubnetId, + validator_index: u64, +} + +/// Wraps a `SignedAggregateAndProof` that has been fully verified for propagation on the gossip +/// network. +pub struct VerifiedAggregatedAttestation<'a, T: BeaconChainTypes> { + signed_aggregate: &'a SignedAggregateAndProof, indexed_attestation: IndexedAttestation, } -/// Wraps an `Attestation` that has been verified for propagation on the gossip network. -pub struct VerifiedUnaggregatedAttestation { - attestation: Attestation, +impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { + pub fn into_indexed_attestation(self) -> IndexedAttestation { + self.indexed_attestation + } +} + +/// Wraps an `Attestation` that has been fully verified for propagation on the gossip network. +pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> { + attestation: &'a Attestation, indexed_attestation: IndexedAttestation, subnet_id: SubnetId, } +impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { + pub fn into_indexed_attestation(self) -> IndexedAttestation { + self.indexed_attestation + } +} + /// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive /// macro. -impl Clone for VerifiedUnaggregatedAttestation { +impl<'a, T: BeaconChainTypes> Clone for IndexedUnaggregatedAttestation<'a, T> { fn clone(&self) -> Self { Self { - attestation: self.attestation.clone(), + attestation: self.attestation, indexed_attestation: self.indexed_attestation.clone(), subnet_id: self.subnet_id, + validator_index: self.validator_index, } } } /// A helper trait implemented on wrapper types that can be progressed to a state where they can be /// verified for application to fork choice. -pub trait SignatureVerifiedAttestation { +pub trait VerifiedAttestation { + fn attestation(&self) -> &Attestation; + fn indexed_attestation(&self) -> &IndexedAttestation; } -impl<'a, T: BeaconChainTypes> SignatureVerifiedAttestation for VerifiedAggregatedAttestation { +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedAggregatedAttestation<'a, T> { + fn attestation(&self) -> &Attestation { + self.attestation() + } + fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } } -impl SignatureVerifiedAttestation for VerifiedUnaggregatedAttestation { +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedUnaggregatedAttestation<'a, T> { + fn attestation(&self) -> &Attestation { + self.attestation + } + fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } } /// Information about invalid attestations which might still be slashable despite being invalid. -pub enum AttestationSlashInfo { +pub enum AttestationSlashInfo<'a, T: BeaconChainTypes, TErr> { /// The attestation is invalid, but its signature wasn't checked. - SignatureNotChecked(Attestation, TErr), + SignatureNotChecked(&'a Attestation, TErr), /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. SignatureNotCheckedIndexed(IndexedAttestation, TErr), /// The attestation's signature is invalid, so it will never be slashable. @@ -324,7 +381,7 @@ fn process_slash_info( if let Some(slasher) = chain.slasher.as_ref() { let (indexed_attestation, check_signature, err) = match slash_info { SignatureNotChecked(attestation, err) => { - match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { + match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { Ok((indexed, _)) => (indexed, true, err), Err(e) => { debug!( @@ -367,13 +424,13 @@ fn process_slash_info( } } -impl VerifiedAggregatedAttestation { +impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { /// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip /// network. pub fn verify( - signed_aggregate: SignedAggregateAndProof, + signed_aggregate: &'a SignedAggregateAndProof, chain: &BeaconChain, - ) -> Result)> { + ) -> Result { Self::verify_slashable(signed_aggregate, chain) .map(|verified_aggregate| { if let Some(slasher) = chain.slasher.as_ref() { @@ -381,9 +438,7 @@ impl VerifiedAggregatedAttestation { } verified_aggregate }) - .map_err(|(slash_info, original_aggregate)| { - (process_slash_info(slash_info, chain), original_aggregate) - }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) } /// Run the checks that happen before an indexed attestation is constructed. @@ -467,6 +522,56 @@ impl VerifiedAggregatedAttestation { } } + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable( + signed_aggregate: &'a SignedAggregateAndProof, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + let attestation = &signed_aggregate.message.aggregate; + let aggregator_index = signed_aggregate.message.aggregator_index; + let attestation_root = match Self::verify_early_checks(signed_aggregate, chain) { + Ok(root) => root, + Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)), + }; + + let indexed_attestation = + match map_attestation_committee(chain, attestation, |(committee, _)| { + // Note: this clones the signature which is known to be a relatively slow operation. + // + // Future optimizations should remove this clone. + let selection_proof = + SelectionProof::from(signed_aggregate.message.selection_proof.clone()); + + if !selection_proof + .is_aggregator(committee.committee.len(), &chain.spec) + .map_err(|e| Error::BeaconChainError(e.into()))? + { + return Err(Error::InvalidSelectionProof { aggregator_index }); + } + + // Ensure the aggregator is a member of the committee for which it is aggregating. + if !committee.committee.contains(&(aggregator_index as usize)) { + return Err(Error::AggregatorNotInCommittee { aggregator_index }); + } + + get_indexed_attestation(committee.committee, attestation) + .map_err(|e| BeaconChainError::from(e).into()) + }) { + Ok(indexed_attestation) => indexed_attestation, + Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)), + }; + + Ok(IndexedAggregatedAttestation { + signed_aggregate, + indexed_attestation, + attestation_root, + }) + } +} + +impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { /// Run the checks that happen after the indexed attestation and signature have been checked. fn verify_late_checks( signed_aggregate: &SignedAggregateAndProof, @@ -508,82 +613,70 @@ impl VerifiedAggregatedAttestation { Ok(()) } - /// Verify the attestation, producing extra information about whether it might be slashable. - // NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not - // worth creating an alias. - #[allow(clippy::type_complexity)] - pub fn verify_slashable( - signed_aggregate: SignedAggregateAndProof, + /// Verify the `signed_aggregate`. + pub fn verify( + signed_aggregate: &'a SignedAggregateAndProof, chain: &BeaconChain, - ) -> Result< - Self, - ( - AttestationSlashInfo, - SignedAggregateAndProof, - ), - > { + ) -> Result { + let indexed = IndexedAggregatedAttestation::verify(signed_aggregate, chain)?; + Self::from_indexed(indexed, chain, CheckAttestationSignature::Yes) + } + + /// Complete the verification of an indexed attestation. + fn from_indexed( + signed_aggregate: IndexedAggregatedAttestation<'a, T>, + chain: &BeaconChain, + check_signature: CheckAttestationSignature, + ) -> Result { + Self::verify_slashable(signed_aggregate, chain, check_signature) + .map(|verified_aggregate| verified_aggregate.apply_to_slasher(chain)) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + fn apply_to_slasher(self, chain: &BeaconChain) -> Self { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(self.indexed_attestation.clone()); + } + self + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + fn verify_slashable( + signed_aggregate: IndexedAggregatedAttestation<'a, T>, + chain: &BeaconChain, + check_signature: CheckAttestationSignature, + ) -> Result> { use AttestationSlashInfo::*; - let attestation = &signed_aggregate.message.aggregate; - let aggregator_index = signed_aggregate.message.aggregator_index; - let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) { - Ok(root) => root, - Err(e) => { - return Err(( - SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), + let IndexedAggregatedAttestation { + signed_aggregate, + indexed_attestation, + attestation_root, + } = signed_aggregate; + + match check_signature { + CheckAttestationSignature::Yes => { + // Ensure that all signatures are valid. + if let Err(e) = verify_signed_aggregate_signatures( + chain, signed_aggregate, - )) - } - }; - - let indexed_attestation = - match map_attestation_committee(chain, attestation, |(committee, _)| { - // Note: this clones the signature which is known to be a relatively slow operation. - // - // Future optimizations should remove this clone. - let selection_proof = - SelectionProof::from(signed_aggregate.message.selection_proof.clone()); - - if !selection_proof - .is_aggregator(committee.committee.len(), &chain.spec) - .map_err(|e| Error::BeaconChainError(e.into()))? - { - return Err(Error::InvalidSelectionProof { aggregator_index }); - } - - // Ensure the aggregator is a member of the committee for which it is aggregating. - if !committee.committee.contains(&(aggregator_index as usize)) { - return Err(Error::AggregatorNotInCommittee { aggregator_index }); - } - - get_indexed_attestation(committee.committee, attestation) - .map_err(|e| BeaconChainError::from(e).into()) - }) { - Ok(indexed_attestation) => indexed_attestation, - Err(e) => { - return Err(( - SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), - signed_aggregate, - )) - } - }; - - // Ensure that all signatures are valid. - if let Err(e) = - verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation) + &indexed_attestation, + ) .and_then(|is_valid| { if !is_valid { Err(Error::InvalidSignature) } else { Ok(()) } - }) - { - return Err((SignatureInvalid(e), signed_aggregate)); - } + }) { + return Err(SignatureInvalid(e)); + } + } + CheckAttestationSignature::No => (), + }; - if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) { - return Err((SignatureValid(indexed_attestation, e), signed_aggregate)); + if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_root, chain) { + return Err(SignatureValid(indexed_attestation, e)); } Ok(VerifiedAggregatedAttestation { @@ -592,11 +685,6 @@ impl VerifiedAggregatedAttestation { }) } - /// A helper function to add this aggregate to `beacon_chain.op_pool`. - pub fn add_to_pool(self, chain: &BeaconChain) -> Result { - chain.add_to_block_inclusion_pool(self) - } - /// Returns the underlying `attestation` for the `signed_aggregate`. pub fn attestation(&self) -> &Attestation { &self.signed_aggregate.message.aggregate @@ -604,11 +692,11 @@ impl VerifiedAggregatedAttestation { /// Returns the underlying `signed_aggregate`. pub fn aggregate(&self) -> &SignedAggregateAndProof { - &self.signed_aggregate + self.signed_aggregate } } -impl VerifiedUnaggregatedAttestation { +impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// Run the checks that happen before an indexed attestation is constructed. pub fn verify_early_checks( attestation: &Attestation, @@ -699,6 +787,75 @@ impl VerifiedUnaggregatedAttestation { Ok((validator_index, expected_subnet_id)) } + /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip + /// network. + /// + /// `subnet_id` is the subnet from which we received this attestation. This function will + /// verify that it was received on the correct subnet. + pub fn verify( + attestation: &'a Attestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result { + Self::verify_slashable(attestation, subnet_id, chain) + .map(|verified_unaggregated| { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); + } + verified_unaggregated + }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable( + attestation: &'a Attestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + if let Err(e) = Self::verify_early_checks(attestation, chain) { + return Err(SignatureNotChecked(attestation, e)); + } + + let (indexed_attestation, committees_per_slot) = + match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { + Ok(x) => x, + Err(e) => { + return Err(SignatureNotChecked(attestation, e)); + } + }; + + let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( + attestation, + &indexed_attestation, + committees_per_slot, + subnet_id, + chain, + ) { + Ok(t) => t, + Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), + }; + + Ok(Self { + attestation, + indexed_attestation, + subnet_id: expected_subnet_id, + validator_index, + }) + } + + /// Returns a mutable reference to the underlying attestation. + /// + /// Only use during testing since modifying the `IndexedAttestation` can cause the attestation + /// to no-longer be valid. + pub fn __indexed_attestation_mut(&mut self) -> &mut IndexedAttestation { + &mut self.indexed_attestation + } +} + +impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// Run the checks that apply after the signature has been checked. fn verify_late_checks( attestation: &Attestation, @@ -725,88 +882,70 @@ impl VerifiedUnaggregatedAttestation { Ok(()) } - /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip - /// network. - /// - /// `subnet_id` is the subnet from which we received this attestation. This function will - /// verify that it was received on the correct subnet. + /// Verify the `unaggregated_attestation`. pub fn verify( - attestation: Attestation, + unaggregated_attestation: &'a Attestation, subnet_id: Option, chain: &BeaconChain, - ) -> Result)> { - Self::verify_slashable(attestation, subnet_id, chain) - .map(|verified_unaggregated| { - if let Some(slasher) = chain.slasher.as_ref() { - slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); - } - verified_unaggregated - }) - .map_err(|(slash_info, original_attestation)| { - (process_slash_info(slash_info, chain), original_attestation) - }) + ) -> Result { + let indexed = + IndexedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, chain)?; + Self::from_indexed(indexed, chain, CheckAttestationSignature::Yes) + } + + /// Complete the verification of an indexed attestation. + fn from_indexed( + attestation: IndexedUnaggregatedAttestation<'a, T>, + chain: &BeaconChain, + check_signature: CheckAttestationSignature, + ) -> Result { + Self::verify_slashable(attestation, chain, check_signature) + .map(|verified_unaggregated| verified_unaggregated.apply_to_slasher(chain)) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + fn apply_to_slasher(self, chain: &BeaconChain) -> Self { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(self.indexed_attestation.clone()); + } + self } /// Verify the attestation, producing extra information about whether it might be slashable. - // NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not - // worth creating an alias. - #[allow(clippy::type_complexity)] - pub fn verify_slashable( - attestation: Attestation, - subnet_id: Option, + fn verify_slashable( + attestation: IndexedUnaggregatedAttestation<'a, T>, chain: &BeaconChain, - ) -> Result, Attestation)> { + check_signature: CheckAttestationSignature, + ) -> Result> { use AttestationSlashInfo::*; - if let Err(e) = Self::verify_early_checks(&attestation, chain) { - return Err((SignatureNotChecked(attestation.clone(), e), attestation)); - } - - let (indexed_attestation, committees_per_slot) = - match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { - Ok(x) => x, - Err(e) => { - return Err((SignatureNotChecked(attestation.clone(), e), attestation)); - } - }; - - let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( - &attestation, - &indexed_attestation, - committees_per_slot, + let IndexedUnaggregatedAttestation { + attestation, + indexed_attestation, subnet_id, - chain, - ) { - Ok(t) => t, - Err(e) => { - return Err(( - SignatureNotCheckedIndexed(indexed_attestation, e), - attestation, - )) + validator_index, + } = attestation; + + match check_signature { + CheckAttestationSignature::Yes => { + if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { + return Err(SignatureInvalid(e)); + } } + CheckAttestationSignature::No => (), }; - // The aggregate signature of the attestation is valid. - if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { - return Err((SignatureInvalid(e), attestation)); - } - - if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) { - return Err((SignatureValid(indexed_attestation, e), attestation)); + if let Err(e) = Self::verify_late_checks(attestation, validator_index, chain) { + return Err(SignatureValid(indexed_attestation, e)); } Ok(Self { attestation, indexed_attestation, - subnet_id: expected_subnet_id, + subnet_id, }) } - /// A helper function to add this attestation to `beacon_chain.naive_aggregation_pool`. - pub fn add_to_pool(self, chain: &BeaconChain) -> Result { - chain.add_to_naive_aggregation_pool(self) - } - /// Returns the correct subnet for the attestation. pub fn subnet_id(&self) -> SubnetId { self.subnet_id @@ -814,7 +953,7 @@ impl VerifiedUnaggregatedAttestation { /// Returns the wrapped `attestation`. pub fn attestation(&self) -> &Attestation { - &self.attestation + self.attestation } /// Returns the wrapped `indexed_attestation`. diff --git a/beacon_node/beacon_chain/src/attestation_verification/batch.rs b/beacon_node/beacon_chain/src/attestation_verification/batch.rs new file mode 100644 index 000000000..30f1ae7e5 --- /dev/null +++ b/beacon_node/beacon_chain/src/attestation_verification/batch.rs @@ -0,0 +1,222 @@ +//! These two `batch_...` functions provide verification of batches of attestations. They provide +//! significant CPU-time savings by performing batch verification of BLS signatures. +//! +//! In each function, attestations are "indexed" (i.e., the `IndexedAttestation` is computed), to +//! determine if they should progress to signature verification. Then, all attestations which were +//! successfully indexed have their signatures verified in a batch. If that signature batch fails +//! then all attestation signatures are verified independently. +//! +//! The outcome of each function is a `Vec` with a one-to-one mapping to the attestations +//! supplied as input. Each result provides the exact success or failure result of the corresponding +//! attestation, with no loss of fidelity when compared to individual verification. +use super::{ + CheckAttestationSignature, Error, IndexedAggregatedAttestation, IndexedUnaggregatedAttestation, + VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation, +}; +use crate::{ + beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, BeaconChain, BeaconChainError, + BeaconChainTypes, +}; +use bls::verify_signature_sets; +use state_processing::signature_sets::{ + indexed_attestation_signature_set_from_pubkeys, signed_aggregate_selection_proof_signature_set, + signed_aggregate_signature_set, +}; +use std::borrow::Cow; +use types::*; + +/// Verify aggregated attestations using batch BLS signature verification. +/// +/// See module-level docs for more info. +pub fn batch_verify_aggregated_attestations<'a, T, I>( + aggregates: I, + chain: &BeaconChain, +) -> Result, Error>>, Error> +where + T: BeaconChainTypes, + I: Iterator> + ExactSizeIterator, +{ + let mut num_indexed = 0; + let mut num_failed = 0; + + // Perform indexing of all attestations, collecting the results. + let indexing_results = aggregates + .map(|aggregate| { + let result = IndexedAggregatedAttestation::verify(aggregate, chain); + if result.is_ok() { + num_indexed += 1; + } else { + num_failed += 1; + } + result + }) + .collect::>(); + + // May be set to `No` if batch verification succeeds. + let mut check_signatures = CheckAttestationSignature::Yes; + + // Perform batch BLS verification, if any attestation signatures are worth checking. + if num_indexed > 0 { + let signature_setup_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES); + + let pubkey_cache = chain + .validator_pubkey_cache + .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?; + + let fork = chain.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.fork()))?; + + let mut signature_sets = Vec::with_capacity(num_indexed * 3); + + // Iterate, flattening to get only the `Ok` values. + for indexed in indexing_results.iter().flatten() { + let signed_aggregate = &indexed.signed_aggregate; + let indexed_attestation = &indexed.indexed_attestation; + + signature_sets.push( + signed_aggregate_selection_proof_signature_set( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + signed_aggregate, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?, + ); + signature_sets.push( + signed_aggregate_signature_set( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + signed_aggregate, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?, + ); + signature_sets.push( + indexed_attestation_signature_set_from_pubkeys( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_attestation.signature, + indexed_attestation, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?, + ); + } + + metrics::stop_timer(signature_setup_timer); + + let _signature_verification_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_TIMES); + + if verify_signature_sets(signature_sets.iter()) { + // Since all the signatures verified in a batch, there's no reason for them to be + // checked again later. + check_signatures = CheckAttestationSignature::No + } + } + + // Complete the attestation verification, potentially verifying all signatures independently. + let final_results = indexing_results + .into_iter() + .map(|result| match result { + Ok(indexed) => { + VerifiedAggregatedAttestation::from_indexed(indexed, chain, check_signatures) + } + Err(e) => Err(e), + }) + .collect(); + + Ok(final_results) +} + +/// Verify unaggregated attestations using batch BLS signature verification. +/// +/// See module-level docs for more info. +pub fn batch_verify_unaggregated_attestations<'a, T, I>( + attestations: I, + chain: &BeaconChain, +) -> Result, Error>>, Error> +where + T: BeaconChainTypes, + I: Iterator, Option)> + ExactSizeIterator, +{ + let mut num_partially_verified = 0; + let mut num_failed = 0; + + // Perform partial verification of all attestations, collecting the results. + let partial_results = attestations + .map(|(attn, subnet_opt)| { + let result = IndexedUnaggregatedAttestation::verify(attn, subnet_opt, chain); + if result.is_ok() { + num_partially_verified += 1; + } else { + num_failed += 1; + } + result + }) + .collect::>(); + + // May be set to `No` if batch verification succeeds. + let mut check_signatures = CheckAttestationSignature::Yes; + + // Perform batch BLS verification, if any attestation signatures are worth checking. + if num_partially_verified > 0 { + let signature_setup_timer = metrics::start_timer( + &metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES, + ); + + let pubkey_cache = chain + .validator_pubkey_cache + .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?; + + let fork = chain.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.fork()))?; + + let mut signature_sets = Vec::with_capacity(num_partially_verified); + + // Iterate, flattening to get only the `Ok` values. + for partially_verified in partial_results.iter().flatten() { + let indexed_attestation = &partially_verified.indexed_attestation; + + let signature_set = indexed_attestation_signature_set_from_pubkeys( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_attestation.signature, + indexed_attestation, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?; + + signature_sets.push(signature_set); + } + + metrics::stop_timer(signature_setup_timer); + + let _signature_verification_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_TIMES); + + if verify_signature_sets(signature_sets.iter()) { + // Since all the signatures verified in a batch, there's no reason for them to be + // checked again later. + check_signatures = CheckAttestationSignature::No + } + } + + // Complete the attestation verification, potentially verifying all signatures independently. + let final_results = partial_results + .into_iter() + .map(|result| match result { + Ok(partial) => { + VerifiedUnaggregatedAttestation::from_indexed(partial, chain, check_signatures) + } + Err(e) => Err(e), + }) + .collect(); + + Ok(final_results) +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7236e6396..6ad0628d3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,5 +1,6 @@ use crate::attestation_verification::{ - Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, + batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations, + Error as AttestationError, VerifiedAggregatedAttestation, VerifiedAttestation, VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; @@ -1510,17 +1511,32 @@ impl BeaconChain { }) } + /// Performs the same validation as `Self::verify_unaggregated_attestation_for_gossip`, but for + /// multiple attestations using batch BLS verification. Batch verification can provide + /// significant CPU-time savings compared to individual verification. + pub fn batch_verify_unaggregated_attestations_for_gossip<'a, I>( + &self, + attestations: I, + ) -> Result< + Vec, AttestationError>>, + AttestationError, + > + where + I: Iterator, Option)> + ExactSizeIterator, + { + batch_verify_unaggregated_attestations(attestations, self) + } + /// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. /// /// The attestation must be "unaggregated", that is it must have exactly one /// aggregation bit set. - pub fn verify_unaggregated_attestation_for_gossip( + pub fn verify_unaggregated_attestation_for_gossip<'a>( &self, - unaggregated_attestation: Attestation, + unaggregated_attestation: &'a Attestation, subnet_id: Option, - ) -> Result, (AttestationError, Attestation)> - { + ) -> Result, AttestationError> { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); @@ -1539,15 +1555,25 @@ impl BeaconChain { ) } + /// Performs the same validation as `Self::verify_aggregated_attestation_for_gossip`, but for + /// multiple attestations using batch BLS verification. Batch verification can provide + /// significant CPU-time savings compared to individual verification. + pub fn batch_verify_aggregated_attestations_for_gossip<'a, I>( + &self, + aggregates: I, + ) -> Result, AttestationError>>, AttestationError> + where + I: Iterator> + ExactSizeIterator, + { + batch_verify_aggregated_attestations(aggregates, self) + } + /// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it, /// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network. - pub fn verify_aggregated_attestation_for_gossip( + pub fn verify_aggregated_attestation_for_gossip<'a>( &self, - signed_aggregate: SignedAggregateAndProof, - ) -> Result< - VerifiedAggregatedAttestation, - (AttestationError, SignedAggregateAndProof), - > { + signed_aggregate: &'a SignedAggregateAndProof, + ) -> Result, AttestationError> { metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); @@ -1597,13 +1623,13 @@ impl BeaconChain { /// Accepts some attestation-type object and attempts to verify it in the context of fork /// choice. If it is valid it is applied to `self.fork_choice`. /// - /// Common items that implement `SignatureVerifiedAttestation`: + /// Common items that implement `VerifiedAttestation`: /// /// - `VerifiedUnaggregatedAttestation` /// - `VerifiedAggregatedAttestation` pub fn apply_attestation_to_fork_choice( &self, - verified: &impl SignatureVerifiedAttestation, + verified: &impl VerifiedAttestation, ) -> Result<(), Error> { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); @@ -1623,8 +1649,8 @@ impl BeaconChain { /// and no error is returned. pub fn add_to_naive_aggregation_pool( &self, - unaggregated_attestation: VerifiedUnaggregatedAttestation, - ) -> Result, AttestationError> { + unaggregated_attestation: &impl VerifiedAttestation, + ) -> Result<(), AttestationError> { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_AGG_POOL); let attestation = unaggregated_attestation.attestation(); @@ -1660,7 +1686,7 @@ impl BeaconChain { } }; - Ok(unaggregated_attestation) + Ok(()) } /// Accepts a `VerifiedSyncCommitteeMessage` and attempts to apply it to the "naive @@ -1727,13 +1753,13 @@ impl BeaconChain { Ok(verified_sync_committee_message) } - /// Accepts a `VerifiedAggregatedAttestation` and attempts to apply it to `self.op_pool`. + /// Accepts a `VerifiedAttestation` and attempts to apply it to `self.op_pool`. /// /// The op pool is used by local block producers to pack blocks with operations. pub fn add_to_block_inclusion_pool( &self, - signed_aggregate: VerifiedAggregatedAttestation, - ) -> Result, AttestationError> { + verified_attestation: &impl VerifiedAttestation, + ) -> Result<(), AttestationError> { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL); // If there's no eth1 chain then it's impossible to produce blocks and therefore @@ -1745,7 +1771,7 @@ impl BeaconChain { self.op_pool .insert_attestation( // TODO: address this clone. - signed_aggregate.attestation().clone(), + verified_attestation.attestation().clone(), &fork, self.genesis_validators_root, &self.spec, @@ -1753,7 +1779,7 @@ impl BeaconChain { .map_err(Error::from)?; } - Ok(signed_aggregate) + Ok(()) } /// Accepts a `VerifiedSyncContribution` and attempts to apply it to `self.op_pool`. diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 0486220b8..c8c43201c 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -199,6 +199,26 @@ lazy_static! { "Time spent on the signature verification of attestation processing" ); + /* + * Batch Attestation Processing + */ + pub static ref ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_agg_signature_setup_times", + "Time spent on setting up for the signature verification of batch aggregate processing" + ); + pub static ref ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_agg_signature_times", + "Time spent on the signature verification of batch aggregate attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_unagg_signature_setup_times", + "Time spent on setting up for the signature verification of batch unaggregate processing" + ); + pub static ref ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_unagg_signature_times", + "Time spent on the signature verification of batch unaggregate attestation processing" + ); + /* * Shuffling cache */ diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2b452f577..67adda5fa 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1159,29 +1159,42 @@ where } pub fn process_attestations(&self, attestations: HarnessAttestations) { - for (unaggregated_attestations, maybe_signed_aggregate) in attestations.into_iter() { - for (attestation, subnet_id) in unaggregated_attestations { - self.chain - .verify_unaggregated_attestation_for_gossip( - attestation.clone(), - Some(subnet_id), - ) - .unwrap() - .add_to_pool(&self.chain) - .unwrap(); + let num_validators = self.validator_keypairs.len(); + let mut unaggregated = Vec::with_capacity(num_validators); + // This is an over-allocation, but it should be fine. It won't be *that* memory hungry and + // it's nice to have fast tests. + let mut aggregated = Vec::with_capacity(num_validators); + + for (unaggregated_attestations, maybe_signed_aggregate) in attestations.iter() { + for (attn, subnet) in unaggregated_attestations { + unaggregated.push((attn, Some(*subnet))); } - if let Some(signed_aggregate) = maybe_signed_aggregate { - let attn = self - .chain - .verify_aggregated_attestation_for_gossip(signed_aggregate) - .unwrap(); - - self.chain.apply_attestation_to_fork_choice(&attn).unwrap(); - - self.chain.add_to_block_inclusion_pool(attn).unwrap(); + if let Some(a) = maybe_signed_aggregate { + aggregated.push(a) } } + + for result in self + .chain + .batch_verify_unaggregated_attestations_for_gossip(unaggregated.into_iter()) + .unwrap() + { + let verified = result.unwrap(); + self.chain.add_to_naive_aggregation_pool(&verified).unwrap(); + } + + for result in self + .chain + .batch_verify_aggregated_attestations_for_gossip(aggregated.into_iter()) + .unwrap() + { + let verified = result.unwrap(); + self.chain + .apply_attestation_to_fork_choice(&verified) + .unwrap(); + self.chain.add_to_block_inclusion_pool(&verified).unwrap(); + } } pub fn set_current_slot(&self, slot: Slot) { diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index dbba5e318..26aca88f8 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -18,8 +18,8 @@ use store::config::StoreConfig; use tree_hash::TreeHash; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, Attestation, BeaconStateError, - BitList, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, - SignedAggregateAndProof, SubnetId, Unsigned, + BitList, Epoch, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, + SignedAggregateAndProof, Slot, SubnetId, Unsigned, }; pub type E = MainnetEthSpec; @@ -189,713 +189,739 @@ fn get_non_aggregator( .expect("should find non-aggregator for committee") } +struct GossipTester { + harness: BeaconChainHarness>, + /* + * Valid unaggregated attestation + */ + valid_attestation: Attestation, + attester_validator_index: usize, + attester_committee_index: usize, + attester_sk: SecretKey, + attestation_subnet_id: SubnetId, + /* + * Valid unaggregated attestation for batch testing + */ + invalid_attestation: Attestation, + /* + * Valid aggregate + */ + valid_aggregate: SignedAggregateAndProof, + aggregator_validator_index: usize, + aggregator_sk: SecretKey, + /* + * Another valid aggregate for batch testing + */ + invalid_aggregate: SignedAggregateAndProof, +} + +impl GossipTester { + pub fn new() -> Self { + let harness = get_harness(VALIDATOR_COUNT); + + // Extend the chain out a few epochs so we have some chain depth to play with. + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // Advance into a slot where there have not been blocks or attestations produced. + harness.advance_slot(); + + let ( + valid_attestation, + attester_validator_index, + attester_committee_index, + attester_sk, + attestation_subnet_id, + ) = get_valid_unaggregated_attestation(&harness.chain); + + let (valid_aggregate, aggregator_validator_index, aggregator_sk) = + get_valid_aggregated_attestation(&harness.chain, valid_attestation.clone()); + + let mut invalid_attestation = valid_attestation.clone(); + invalid_attestation.data.beacon_block_root = Hash256::repeat_byte(13); + + let (mut invalid_aggregate, _, _) = + get_valid_aggregated_attestation(&harness.chain, invalid_attestation.clone()); + invalid_aggregate.message.aggregator_index = invalid_aggregate + .message + .aggregator_index + .checked_sub(1) + .unwrap(); + + Self { + harness, + valid_attestation, + attester_validator_index, + attester_committee_index, + attester_sk, + attestation_subnet_id, + invalid_attestation, + valid_aggregate, + aggregator_validator_index, + aggregator_sk, + invalid_aggregate, + } + } + + pub fn slot(&self) -> Slot { + self.harness.chain.slot().unwrap() + } + + pub fn epoch(&self) -> Epoch { + self.harness.chain.epoch().unwrap() + } + + pub fn two_epochs_ago(&self) -> Slot { + self.slot() + .as_u64() + .checked_sub(E::slots_per_epoch() + 2) + .expect("chain is not sufficiently deep for test") + .into() + } + + pub fn non_aggregator(&self) -> (usize, SecretKey) { + get_non_aggregator(&self.harness.chain, &self.valid_aggregate.message.aggregate) + } + + pub fn import_valid_aggregate(self) -> Self { + assert!( + self.harness + .chain + .verify_aggregated_attestation_for_gossip(&self.valid_aggregate) + .is_ok(), + "valid aggregate should be verified" + ); + self + } + + pub fn import_valid_unaggregate(self) -> Self { + self.harness + .chain + .verify_unaggregated_attestation_for_gossip( + &self.valid_attestation, + Some(self.attestation_subnet_id), + ) + .expect("valid attestation should be verified"); + self + } + + pub fn inspect_aggregate_err(self, desc: &str, get_attn: G, inspect_err: I) -> Self + where + G: Fn(&Self, &mut SignedAggregateAndProof), + I: Fn(&Self, AttnError), + { + let mut aggregate = self.valid_aggregate.clone(); + get_attn(&self, &mut aggregate); + + /* + * Individual verification + */ + let err = self + .harness + .chain + .verify_aggregated_attestation_for_gossip(&aggregate) + .err() + .expect(&format!( + "{} should error during verify_aggregated_attestation_for_gossip", + desc + )); + inspect_err(&self, err); + + /* + * Batch verification + */ + let mut results = self + .harness + .chain + .batch_verify_aggregated_attestations_for_gossip( + vec![&self.invalid_aggregate, &aggregate].into_iter(), + ) + .unwrap(); + assert_eq!(results.len(), 2); + let batch_err = results.pop().unwrap().err().expect(&format!( + "{} should error during batch_verify_aggregated_attestations_for_gossip", + desc + )); + inspect_err(&self, batch_err); + + self + } + + pub fn inspect_unaggregate_err(self, desc: &str, get_attn: G, inspect_err: I) -> Self + where + G: Fn(&Self, &mut Attestation, &mut SubnetId), + I: Fn(&Self, AttnError), + { + let mut attn = self.valid_attestation.clone(); + let mut subnet_id = self.attestation_subnet_id; + get_attn(&self, &mut attn, &mut subnet_id); + + /* + * Individual verification + */ + let err = self + .harness + .chain + .verify_unaggregated_attestation_for_gossip(&attn, Some(subnet_id)) + .err() + .expect(&format!( + "{} should error during verify_unaggregated_attestation_for_gossip", + desc + )); + inspect_err(&self, err); + + /* + * Batch verification + */ + let mut results = self + .harness + .chain + .batch_verify_unaggregated_attestations_for_gossip( + vec![ + (&self.invalid_attestation, Some(subnet_id)), + (&attn, Some(subnet_id)), + ] + .into_iter(), + ) + .unwrap(); + assert_eq!(results.len(), 2); + let batch_err = results.pop().unwrap().err().expect(&format!( + "{} should error during batch_verify_unaggregated_attestations_for_gossip", + desc + )); + inspect_err(&self, batch_err); + + self + } +} /// Tests verification of `SignedAggregateAndProof` from the gossip network. #[test] fn aggregated_gossip_verification() { - let harness = get_harness(VALIDATOR_COUNT); - - // Extend the chain out a few epochs so we have some chain depth to play with. - harness.extend_chain( - MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ); - - // Advance into a slot where there have not been blocks or attestations produced. - harness.advance_slot(); - - let current_slot = harness.chain.slot().expect("should get slot"); - - assert_eq!( - current_slot % E::slots_per_epoch(), - 0, - "the test requires a new epoch to avoid already-seen errors" - ); - - let (valid_attestation, _attester_index, _attester_committee_index, validator_sk, _subnet_id) = - get_valid_unaggregated_attestation(&harness.chain); - let (valid_aggregate, aggregator_index, aggregator_sk) = - get_valid_aggregated_attestation(&harness.chain, valid_attestation); - - macro_rules! assert_invalid { - ($desc: tt, $attn_getter: expr, $($error: pat) |+ $( if $guard: expr )?) => { - assert!( - matches!( - harness - .chain - .verify_aggregated_attestation_for_gossip($attn_getter) - .err() - .expect(&format!( - "{} should error during verify_aggregated_attestation_for_gossip", - $desc - )).0, - $( $error ) |+ $( if $guard )? - ), - "case: {}", - $desc, - ); - }; - } - - /* - * The following two tests ensure: - * - * Spec v0.12.1 - * - * aggregate.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a - * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot + - * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot (a client MAY - * queue future aggregates for processing at the appropriate slot). - */ - - let future_slot = current_slot + 1; - assert_invalid!( - "aggregate from future slot", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.slot = future_slot; - a - }, - AttnError::FutureSlot { attestation_slot, latest_permissible_slot } - if attestation_slot == future_slot && latest_permissible_slot == current_slot - ); - - let early_slot = current_slot - .as_u64() - .checked_sub(E::slots_per_epoch() + 2) - .expect("chain is not sufficiently deep for test") - .into(); - assert_invalid!( - "aggregate from past slot", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.slot = early_slot; - a - }, - AttnError::PastSlot { - attestation_slot, - // Subtract an additional slot since the harness will be exactly on the start of the - // slot and the propagation tolerance will allow an extra slot. - earliest_permissible_slot - } - if attestation_slot == early_slot - && earliest_permissible_slot == current_slot - E::slots_per_epoch() - 1 - ); - - /* - * The following test ensures: - * - * The aggregate attestation's epoch matches its target -- i.e. `aggregate.data.target.epoch == - * compute_epoch_at_slot(attestation.data.slot)` - * - */ - - assert_invalid!( - "attestation with invalid target epoch", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.target.epoch += 1; - a - }, - AttnError::InvalidTargetEpoch { .. } - ); - /* - * This is not in the specification for aggregate attestations (only unaggregates), but we - * check it anyway to avoid weird edge cases. - */ - let unknown_root = Hash256::from_low_u64_le(424242); - assert_invalid!( - "attestation with invalid target root", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.target.root = unknown_root; - a - }, - AttnError::InvalidTargetRoot { .. } - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The block being voted for (aggregate.data.beacon_block_root) passes validation. - */ - - let unknown_root = Hash256::from_low_u64_le(424242); - assert_invalid!( - "aggregate with unknown head block", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.beacon_block_root = unknown_root; - a - }, - AttnError::UnknownHeadBlock { - beacon_block_root - } - if beacon_block_root == unknown_root - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The attestation has participants. - */ - - assert_invalid!( - "aggregate with no participants", - { - let mut a = valid_aggregate.clone(); - let aggregation_bits = &mut a.message.aggregate.aggregation_bits; - aggregation_bits.difference_inplace(&aggregation_bits.clone()); - assert!(aggregation_bits.is_zero()); - a.message.aggregate.signature = AggregateSignature::infinity(); - a - }, - AttnError::EmptyAggregationBitfield - ); - - /* - * This test ensures: - * - * Spec v0.12.1 - * - * The aggregator signature, signed_aggregate_and_proof.signature, is valid. - */ - - assert_invalid!( - "aggregate with bad signature", - { - let mut a = valid_aggregate.clone(); - - a.signature = validator_sk.sign(Hash256::from_low_u64_be(42)); - - a - }, - AttnError::InvalidSignature - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The aggregate_and_proof.selection_proof is a valid signature of the aggregate.data.slot by - * the validator with index aggregate_and_proof.aggregator_index. - */ - - let committee_len = harness - .chain - .head() - .unwrap() - .beacon_state - .get_beacon_committee( - harness.chain.slot().unwrap(), - valid_aggregate.message.aggregate.data.index, + GossipTester::new() + /* + * The following two tests ensure: + * + * aggregate.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a + * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot + + * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot (a client MAY + * queue future aggregates for processing at the appropriate slot). + */ + .inspect_aggregate_err( + "aggregate from future slot", + |tester, a| a.message.aggregate.data.slot = tester.slot() + 1, + |tester, err| { + assert!(matches!( + err, + AttnError::FutureSlot { attestation_slot, latest_permissible_slot } + if attestation_slot == tester.slot() + 1 + && latest_permissible_slot == tester.slot() + )) + }, ) - .expect("should get committees") - .committee - .len(); - assert_invalid!( - "aggregate with bad selection proof signature", - { - let mut a = valid_aggregate.clone(); - - // Generate some random signature until happens to be a valid selection proof. We need - // this in order to reach the signature verification code. - // - // Could run for ever, but that seems _really_ improbable. - let mut i: u64 = 0; - a.message.selection_proof = loop { - i += 1; - let proof: SelectionProof = validator_sk - .sign(Hash256::from_slice(&int_to_bytes32(i))) - .into(); - if proof - .is_aggregator(committee_len, &harness.chain.spec) + .inspect_aggregate_err( + "aggregate from past slot", + |tester, a| a.message.aggregate.data.slot = tester.two_epochs_ago(), + |tester, err| { + assert!(matches!( + err, + AttnError::PastSlot { + attestation_slot, + // Subtract an additional slot since the harness will be exactly on the start of the + // slot and the propagation tolerance will allow an extra slot. + earliest_permissible_slot + } + if attestation_slot == tester.two_epochs_ago() + && earliest_permissible_slot == tester.slot() - E::slots_per_epoch() - 1 + )) + }, + ) + /* + * The following test ensures: + * + * The aggregate attestation's epoch matches its target -- i.e. `aggregate.data.target.epoch == + * compute_epoch_at_slot(attestation.data.slot)` + * + */ + .inspect_aggregate_err( + "attestation with invalid target epoch", + |_, a| a.message.aggregate.data.target.epoch += 1, + |_, err| assert!(matches!(err, AttnError::InvalidTargetEpoch { .. })), + ) + /* + * This is not in the specification for aggregate attestations (only unaggregates), but we + * check it anyway to avoid weird edge cases. + */ + .inspect_aggregate_err( + "attestation with invalid target root", + |_, a| a.message.aggregate.data.target.root = Hash256::repeat_byte(42), + |_, err| assert!(matches!(err, AttnError::InvalidTargetRoot { .. })), + ) + /* + * The following test ensures: + * + * The block being voted for (aggregate.data.beacon_block_root) passes validation. + */ + .inspect_aggregate_err( + "aggregate with unknown head block", + |_, a| a.message.aggregate.data.beacon_block_root = Hash256::repeat_byte(42), + |_, err| { + assert!(matches!( + err, + AttnError::UnknownHeadBlock { + beacon_block_root + } + if beacon_block_root == Hash256::repeat_byte(42) + )) + }, + ) + /* + * The following test ensures: + * + * The attestation has participants. + */ + .inspect_aggregate_err( + "aggregate with no participants", + |_, a| { + let aggregation_bits = &mut a.message.aggregate.aggregation_bits; + aggregation_bits.difference_inplace(&aggregation_bits.clone()); + assert!(aggregation_bits.is_zero()); + a.message.aggregate.signature = AggregateSignature::infinity(); + }, + |_, err| assert!(matches!(err, AttnError::EmptyAggregationBitfield)), + ) + /* + * This test ensures: + * + * The aggregator signature, signed_aggregate_and_proof.signature, is valid. + */ + .inspect_aggregate_err( + "aggregate with bad signature", + |tester, a| a.signature = tester.aggregator_sk.sign(Hash256::repeat_byte(42)), + |_, err| assert!(matches!(err, AttnError::InvalidSignature)), + ) + /* + * The following test ensures: + * + * The aggregate_and_proof.selection_proof is a valid signature of the aggregate.data.slot by + * the validator with index aggregate_and_proof.aggregator_index. + */ + .inspect_aggregate_err( + "aggregate with bad signature", + |tester, a| { + let committee_len = tester + .harness + .chain + .head() .unwrap() - { - break proof.into(); - } - }; + .beacon_state + .get_beacon_committee(tester.slot(), a.message.aggregate.data.index) + .expect("should get committees") + .committee + .len(); - a - }, - AttnError::InvalidSignature - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The signature of aggregate is valid. - */ - - assert_invalid!( - "aggregate with bad aggregate signature", - { - let mut a = valid_aggregate.clone(); - - let mut agg_sig = AggregateSignature::infinity(); - agg_sig.add_assign(&aggregator_sk.sign(Hash256::from_low_u64_be(42))); - a.message.aggregate.signature = agg_sig; - - a - }, - AttnError::InvalidSignature - ); - - let too_high_index = ::ValidatorRegistryLimit::to_u64() + 1; - assert_invalid!( - "aggregate with too-high aggregator index", - { - let mut a = valid_aggregate.clone(); - a.message.aggregator_index = too_high_index; - a - }, - AttnError::ValidatorIndexTooHigh(index) - if index == too_high_index as usize - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The aggregator's validator index is within the committee -- i.e. - * aggregate_and_proof.aggregator_index in get_beacon_committee(state, aggregate.data.slot, - * aggregate.data.index). - */ - - let unknown_validator = VALIDATOR_COUNT as u64; - assert_invalid!( - "aggregate with unknown aggregator index", - { - let mut a = valid_aggregate.clone(); - a.message.aggregator_index = unknown_validator; - a - }, - // Naively we should think this condition would trigger this error: - // - // AttnError::AggregatorPubkeyUnknown(unknown_validator) - // - // However the following error is triggered first: - AttnError::AggregatorNotInCommittee { - aggregator_index - } - if aggregator_index == unknown_validator - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * aggregate_and_proof.selection_proof selects the validator as an aggregator for the slot -- - * i.e. is_aggregator(state, aggregate.data.slot, aggregate.data.index, - * aggregate_and_proof.selection_proof) returns True. - */ - - let (non_aggregator_index, non_aggregator_sk) = - get_non_aggregator(&harness.chain, &valid_aggregate.message.aggregate); - assert_invalid!( - "aggregate from non-aggregator", - { - SignedAggregateAndProof::from_aggregate( - non_aggregator_index as u64, - valid_aggregate.message.aggregate.clone(), - None, - &non_aggregator_sk, - &harness.chain.head_info().unwrap().fork, - harness.chain.genesis_validators_root, - &harness.chain.spec, - ) - }, - AttnError::InvalidSelectionProof { - aggregator_index: index - } - if index == non_aggregator_index as u64 - ); - - // NOTE: from here on, the tests are stateful, and rely on the valid attestation having been - // seen. A refactor to give each test case its own state might be nice at some point - assert!( - harness - .chain - .verify_aggregated_attestation_for_gossip(valid_aggregate.clone()) - .is_ok(), - "valid aggregate should be verified" - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The valid aggregate attestation defined by hash_tree_root(aggregate) has not already been - * seen (via aggregate gossip, within a block, or through the creation of an equivalent - * aggregate locally). - */ - - assert_invalid!( - "aggregate that has already been seen", - valid_aggregate.clone(), - AttnError::AttestationAlreadyKnown(hash) - if hash == valid_aggregate.message.aggregate.tree_hash_root() - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The aggregate is the first valid aggregate received for the aggregator with index - * aggregate_and_proof.aggregator_index for the epoch aggregate.data.target.epoch. - */ - - assert_invalid!( - "aggregate from aggregator that has already been seen", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.beacon_block_root = Hash256::from_low_u64_le(42); - a - }, - AttnError::AggregatorAlreadyKnown(index) - if index == aggregator_index as u64 - ); + // Generate some random signature until happens to be a valid selection proof. We need + // this in order to reach the signature verification code. + // + // Could run for ever, but that seems _really_ improbable. + let mut i: u64 = 0; + a.message.selection_proof = loop { + i += 1; + let proof: SelectionProof = tester + .aggregator_sk + .sign(Hash256::from_slice(&int_to_bytes32(i))) + .into(); + if proof + .is_aggregator(committee_len, &tester.harness.chain.spec) + .unwrap() + { + break proof.into(); + } + }; + }, + |_, err| assert!(matches!(err, AttnError::InvalidSignature)), + ) + /* + * The following test ensures: + * + * The signature of aggregate is valid. + */ + .inspect_aggregate_err( + "aggregate with bad aggregate signature", + |tester, a| { + let mut agg_sig = AggregateSignature::infinity(); + agg_sig.add_assign(&tester.aggregator_sk.sign(Hash256::repeat_byte(42))); + a.message.aggregate.signature = agg_sig; + }, + |_, err| assert!(matches!(err, AttnError::InvalidSignature)), + ) + /* + * Not directly in the specification, but a sanity check. + */ + .inspect_aggregate_err( + "aggregate with too-high aggregator index", + |_, a| { + a.message.aggregator_index = ::ValidatorRegistryLimit::to_u64() + 1 + }, + |_, err| { + assert!(matches!( + err, + AttnError::ValidatorIndexTooHigh(index) + if index == (::ValidatorRegistryLimit::to_u64() + 1) as usize + )) + }, + ) + /* + * The following test ensures: + * + * The aggregator's validator index is within the committee -- i.e. + * aggregate_and_proof.aggregator_index in get_beacon_committee(state, aggregate.data.slot, + * aggregate.data.index). + */ + .inspect_aggregate_err( + "aggregate with unknown aggregator index", + |_, a| a.message.aggregator_index = VALIDATOR_COUNT as u64, + |_, err| { + assert!(matches!( + err, + // Naively we should think this condition would trigger this error: + // + // AttnError::AggregatorPubkeyUnknown(unknown_validator) + // + // However the following error is triggered first: + AttnError::AggregatorNotInCommittee { + aggregator_index + } + if aggregator_index == VALIDATOR_COUNT as u64 + )) + }, + ) + /* + * The following test ensures: + * + * aggregate_and_proof.selection_proof selects the validator as an aggregator for the slot -- + * i.e. is_aggregator(state, aggregate.data.slot, aggregate.data.index, + * aggregate_and_proof.selection_proof) returns True. + */ + .inspect_aggregate_err( + "aggregate from non-aggregator", + |tester, a| { + let chain = &tester.harness.chain; + let (index, sk) = tester.non_aggregator(); + *a = SignedAggregateAndProof::from_aggregate( + index as u64, + tester.valid_aggregate.message.aggregate.clone(), + None, + &sk, + &chain.head_info().unwrap().fork, + chain.genesis_validators_root, + &chain.spec, + ) + }, + |tester, err| { + let (val_index, _) = tester.non_aggregator(); + assert!(matches!( + err, + AttnError::InvalidSelectionProof { + aggregator_index: index + } + if index == val_index as u64 + )) + }, + ) + // NOTE: from here on, the tests are stateful, and rely on the valid attestation having + // been seen. + .import_valid_aggregate() + /* + * The following test ensures: + * + * The valid aggregate attestation defined by hash_tree_root(aggregate) has not already been + * seen (via aggregate gossip, within a block, or through the creation of an equivalent + * aggregate locally). + */ + .inspect_aggregate_err( + "aggregate that has already been seen", + |_, _| {}, + |tester, err| { + assert!(matches!( + err, + AttnError::AttestationAlreadyKnown(hash) + if hash == tester.valid_aggregate.message.aggregate.tree_hash_root() + )) + }, + ) + /* + * The following test ensures: + * + * The aggregate is the first valid aggregate received for the aggregator with index + * aggregate_and_proof.aggregator_index for the epoch aggregate.data.target.epoch. + */ + .inspect_aggregate_err( + "aggregate from aggregator that has already been seen", + |_, a| a.message.aggregate.data.beacon_block_root = Hash256::repeat_byte(42), + |tester, err| { + assert!(matches!( + err, + AttnError::AggregatorAlreadyKnown(index) + if index == tester.aggregator_validator_index as u64 + )) + }, + ); } /// Tests the verification conditions for an unaggregated attestation on the gossip network. #[test] fn unaggregated_gossip_verification() { - let harness = get_harness(VALIDATOR_COUNT); - - // Extend the chain out a few epochs so we have some chain depth to play with. - harness.extend_chain( - MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ); - - // Advance into a slot where there have not been blocks or attestations produced. - harness.advance_slot(); - - let current_slot = harness.chain.slot().expect("should get slot"); - let current_epoch = harness.chain.epoch().expect("should get epoch"); - - assert_eq!( - current_slot % E::slots_per_epoch(), - 0, - "the test requires a new epoch to avoid already-seen errors" - ); - - let ( - valid_attestation, - expected_validator_index, - validator_committee_index, - validator_sk, - subnet_id, - ) = get_valid_unaggregated_attestation(&harness.chain); - - macro_rules! assert_invalid { - ($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat) |+ $( if $guard: expr )?) => { - assert!( - matches!( - harness - .chain - .verify_unaggregated_attestation_for_gossip($attn_getter, Some($subnet_getter)) - .err() - .expect(&format!( - "{} should error during verify_unaggregated_attestation_for_gossip", - $desc - )).0, - $( $error ) |+ $( if $guard )? - ), - "case: {}", - $desc, - ); - }; - } - - /* - * The following test ensures: - * - * Spec v0.12.3 - * - * The committee index is within the expected range -- i.e. `data.index < - * get_committee_count_per_slot(state, data.target.epoch)`. - */ - assert_invalid!( - "attestation with invalid committee index", - { - let mut a = valid_attestation.clone(); - a.data.index = harness - .chain - .head() - .unwrap() - .beacon_state - .get_committee_count_at_slot(a.data.slot) - .unwrap(); - a - }, - subnet_id, - AttnError::NoCommitteeForSlotAndIndex { .. } - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The attestation is for the correct subnet (i.e. compute_subnet_for_attestation(state, - * attestation.data.slot, attestation.data.index) == subnet_id). - */ - let id: u64 = subnet_id.into(); - let invalid_subnet_id = SubnetId::new(id + 1); - assert_invalid!( - "attestation from future slot", - { - valid_attestation.clone() - }, - invalid_subnet_id, - AttnError::InvalidSubnetId { - received, - expected, - } - if received == invalid_subnet_id && expected == subnet_id - ); - - /* - * The following two tests ensure: - * - * Spec v0.12.1 - * - * attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a - * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. attestation.data.slot + - * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot (a client MAY - * queue future attestations for processing at the appropriate slot). - */ - - let future_slot = current_slot + 1; - assert_invalid!( - "attestation from future slot", - { - let mut a = valid_attestation.clone(); - a.data.slot = future_slot; - a - }, - subnet_id, - AttnError::FutureSlot { - attestation_slot, - latest_permissible_slot, - } - if attestation_slot == future_slot && latest_permissible_slot == current_slot - ); - - let early_slot = current_slot - .as_u64() - .checked_sub(E::slots_per_epoch() + 2) - .expect("chain is not sufficiently deep for test") - .into(); - assert_invalid!( - "attestation from past slot", - { - let mut a = valid_attestation.clone(); - a.data.slot = early_slot; - a.data.target.epoch = early_slot.epoch(E::slots_per_epoch()); - a - }, - subnet_id, - AttnError::PastSlot { - attestation_slot, - // Subtract an additional slot since the harness will be exactly on the start of the - // slot and the propagation tolerance will allow an extra slot. - earliest_permissible_slot, - } - if attestation_slot == early_slot && earliest_permissible_slot == current_slot - E::slots_per_epoch() - 1 - ); - - /* - * The following test ensures: - * - * Spec v0.12.3 - * - * The attestation's epoch matches its target -- i.e. `attestation.data.target.epoch == - * compute_epoch_at_slot(attestation.data.slot)` - * - */ - - assert_invalid!( - "attestation with invalid target epoch", - { - let mut a = valid_attestation.clone(); - a.data.target.epoch += 1; - a - }, - subnet_id, - AttnError::InvalidTargetEpoch { .. } - ); - - /* - * The following two tests ensure: - * - * Spec v0.12.1 - * - * The attestation is unaggregated -- that is, it has exactly one participating validator - * (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). - */ - - assert_invalid!( - "attestation without any aggregation bits set", - { - let mut a = valid_attestation.clone(); - a.aggregation_bits - .set(validator_committee_index, false) - .expect("should unset aggregation bit"); - assert_eq!( - a.aggregation_bits.num_set_bits(), - 0, - "test requires no set bits" - ); - a - }, - subnet_id, - AttnError::NotExactlyOneAggregationBitSet(0) - ); - - assert_invalid!( - "attestation with two aggregation bits set", - { - let mut a = valid_attestation.clone(); - a.aggregation_bits - .set(validator_committee_index + 1, true) - .expect("should set second aggregation bit"); - a - }, - subnet_id, - AttnError::NotExactlyOneAggregationBitSet(2) - ); - - /* - * The following test ensures: - * - * Spec v0.12.3 - * - * The number of aggregation bits matches the committee size -- i.e. - * `len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, - * data.index))`. - */ - assert_invalid!( - "attestation with invalid bitfield", - { - let mut a = valid_attestation.clone(); - let bits = a.aggregation_bits.iter().collect::>(); - a.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); - for (i, bit) in bits.into_iter().enumerate() { - a.aggregation_bits.set(i, bit).unwrap(); - } - a - }, - subnet_id, - AttnError::Invalid(AttestationValidationError::BeaconStateError( - BeaconStateError::InvalidBitfield - )) - ); - - /* - * The following test ensures that: - * - * Spec v0.12.1 - * - * The block being voted for (attestation.data.beacon_block_root) passes validation. - */ - - let unknown_root = Hash256::from_low_u64_le(424242); // No one wants one of these - assert_invalid!( - "attestation with unknown head block", - { - let mut a = valid_attestation.clone(); - a.data.beacon_block_root = unknown_root; - a - }, - subnet_id, - AttnError::UnknownHeadBlock { - beacon_block_root, - } - if beacon_block_root == unknown_root - ); - - /* - * The following test ensures that: - * - * Spec v0.12.3 - * - * The attestation's target block is an ancestor of the block named in the LMD vote - */ - - let unknown_root = Hash256::from_low_u64_le(424242); - assert_invalid!( - "attestation with invalid target root", - { - let mut a = valid_attestation.clone(); - a.data.target.root = unknown_root; - a - }, - subnet_id, - AttnError::InvalidTargetRoot { .. } - ); - - /* - * The following test ensures that: - * - * Spec v0.12.1 - * - * The signature of attestation is valid. - */ - - assert_invalid!( - "attestation with bad signature", - { - let mut a = valid_attestation.clone(); - - let mut agg_sig = AggregateSignature::infinity(); - agg_sig.add_assign(&validator_sk.sign(Hash256::from_low_u64_be(42))); - a.signature = agg_sig; - - a - }, - subnet_id, - AttnError::InvalidSignature - ); - - harness - .chain - .verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), Some(subnet_id)) - .expect("valid attestation should be verified"); - - /* - * The following test ensures that: - * - * Spec v0.12.1 - * - * - * There has been no other valid attestation seen on an attestation subnet that has an - * identical attestation.data.target.epoch and participating validator index. - */ - - assert_invalid!( - "attestation that has already been seen", - valid_attestation.clone(), - subnet_id, - AttnError::PriorAttestationKnown { - validator_index, - epoch, - } - if validator_index == expected_validator_index as u64 && epoch == current_epoch - ); + GossipTester::new() + /* + * The following test ensures: + * + * The committee index is within the expected range -- i.e. `data.index < + * get_committee_count_per_slot(state, data.target.epoch)`. + */ + .inspect_unaggregate_err( + "attestation with invalid committee index", + |tester, a, _| { + a.data.index = tester + .harness + .chain + .head() + .unwrap() + .beacon_state + .get_committee_count_at_slot(a.data.slot) + .unwrap() + }, + |_, err| assert!(matches!(err, AttnError::NoCommitteeForSlotAndIndex { .. })), + ) + /* + * The following test ensures: + * + * The attestation is for the correct subnet (i.e. compute_subnet_for_attestation(state, + * attestation.data.slot, attestation.data.index) == subnet_id). + */ + .inspect_unaggregate_err( + "attestation with invalid committee index", + |_, _, subnet_id| *subnet_id = SubnetId::new(42), + |tester, err| { + assert!(matches!( + err, + AttnError::InvalidSubnetId { + received, + expected, + } + if received == SubnetId::new(42) && expected == tester.attestation_subnet_id + )) + }, + ) + /* + * The following two tests ensure: + * + * attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a + * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. attestation.data.slot + + * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot (a client MAY + * queue future attestations for processing at the appropriate slot). + */ + .inspect_unaggregate_err( + "attestation from future slot", + |tester, a, _| a.data.slot = tester.slot() + 1, + |tester, err| { + assert!(matches!( + err, + AttnError::FutureSlot { + attestation_slot, + latest_permissible_slot, + } + if attestation_slot == tester.slot() + 1 && latest_permissible_slot == tester.slot() + )) + }, + ) + .inspect_unaggregate_err( + "attestation from past slot", + |tester, a, _| { + let early_slot = tester.two_epochs_ago(); + a.data.slot = early_slot; + a.data.target.epoch = early_slot.epoch(E::slots_per_epoch()); + }, + |tester, err| { + dbg!(&err); + assert!(matches!( + err, + AttnError::PastSlot { + attestation_slot, + // Subtract an additional slot since the harness will be exactly on the start of the + // slot and the propagation tolerance will allow an extra slot. + earliest_permissible_slot, + } + if attestation_slot == tester.two_epochs_ago() + && earliest_permissible_slot == tester.slot() - E::slots_per_epoch() - 1 + )) + }, + ) + /* + * The following test ensures: + * + * The attestation's epoch matches its target -- i.e. `attestation.data.target.epoch == + * compute_epoch_at_slot(attestation.data.slot)` + * + */ + .inspect_unaggregate_err( + "attestation with invalid target epoch", + |_, a, _| a.data.target.epoch += 1, + |_, err| { + assert!(matches!( + err, + AttnError::InvalidTargetEpoch { .. } + )) + }, + ) + /* + * The following two tests ensure: + * + * The attestation is unaggregated -- that is, it has exactly one participating validator + * (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). + */ + .inspect_unaggregate_err( + "attestation without any aggregation bits set", + |tester, a, _| { + a.aggregation_bits + .set(tester.attester_committee_index, false) + .expect("should unset aggregation bit"); + assert_eq!( + a.aggregation_bits.num_set_bits(), + 0, + "test requires no set bits" + ); + }, + |_, err| { + assert!(matches!( + err, + AttnError::NotExactlyOneAggregationBitSet(0) + )) + }, + ) + .inspect_unaggregate_err( + "attestation with two aggregation bits set", + |tester, a, _| { + a.aggregation_bits + .set(tester.attester_committee_index + 1, true) + .expect("should set second aggregation bit"); + }, + |_, err| { + assert!(matches!( + err, + AttnError::NotExactlyOneAggregationBitSet(2) + )) + }, + ) + /* + * The following test ensures: + * + * The number of aggregation bits matches the committee size -- i.e. + * `len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, + * data.index))`. + */ + .inspect_unaggregate_err( + "attestation with invalid bitfield", + |_, a, _| { + let bits = a.aggregation_bits.iter().collect::>(); + a.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); + for (i, bit) in bits.into_iter().enumerate() { + a.aggregation_bits.set(i, bit).unwrap(); + } + }, + |_, err| { + assert!(matches!( + err, + AttnError::Invalid(AttestationValidationError::BeaconStateError( + BeaconStateError::InvalidBitfield + )) + )) + }, + ) + /* + * The following test ensures that: + * + * The block being voted for (attestation.data.beacon_block_root) passes validation. + */ + .inspect_unaggregate_err( + "attestation with unknown head block", + |_, a, _| { + a.data.beacon_block_root = Hash256::repeat_byte(42); + }, + |_, err| { + assert!(matches!( + err, + AttnError::UnknownHeadBlock { + beacon_block_root, + } + if beacon_block_root == Hash256::repeat_byte(42) + )) + }, + ) + /* + * The following test ensures that: + * + * Spec v0.12.3 + * + * The attestation's target block is an ancestor of the block named in the LMD vote + */ + .inspect_unaggregate_err( + "attestation with invalid target root", + |_, a, _| { + a.data.target.root = Hash256::repeat_byte(42); + }, + |_, err| { + assert!(matches!( + err, + AttnError::InvalidTargetRoot { .. } + )) + }, + ) + /* + * The following test ensures that: + * + * The signature of attestation is valid. + */ + .inspect_unaggregate_err( + "attestation with bad signature", + |tester, a, _| { + let mut agg_sig = AggregateSignature::infinity(); + agg_sig.add_assign(&tester.attester_sk.sign(Hash256::repeat_byte(42))); + a.signature = agg_sig; + }, + |_, err| { + assert!(matches!( + err, + AttnError::InvalidSignature + )) + }, + ) + // NOTE: from here on, the tests are stateful, and rely on the valid attestation having + // been seen. + .import_valid_unaggregate() + /* + * The following test ensures that: + * + * + * There has been no other valid attestation seen on an attestation subnet that has an + * identical attestation.data.target.epoch and participating validator index. + */ + .inspect_unaggregate_err( + "attestation that has already been seen", + |_, _, _| {}, + |tester, err| { + assert!(matches!( + err, + AttnError::PriorAttestationKnown { + validator_index, + epoch, + } + if validator_index == tester.attester_validator_index as u64 && epoch == tester.epoch() + )) + }, + ); } /// Ensures that an attestation that skips epochs can still be processed. @@ -965,7 +991,7 @@ fn attestation_that_skips_epochs() { harness .chain - .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) .expect("should gossip verify attestation that skips slots"); } @@ -998,7 +1024,7 @@ fn verify_aggregate_for_gossip_doppelganger_detection() { harness .chain - .verify_aggregated_attestation_for_gossip(valid_aggregate.clone()) + .verify_aggregated_attestation_for_gossip(&valid_aggregate) .expect("should verify aggregate attestation"); let epoch = valid_aggregate.message.aggregate.data.target.epoch; @@ -1053,7 +1079,7 @@ fn verify_attestation_for_gossip_doppelganger_detection() { harness .chain - .verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&valid_attestation, Some(subnet_id)) .expect("should verify attestation"); let epoch = valid_attestation.data.target.epoch; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index a04d2b9a4..b73056aed 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -332,7 +332,7 @@ fn epoch_boundary_state_attestation_processing() { let res = harness .chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), Some(subnet_id)); + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); let expected_attestation_slot = attestation.data.slot; @@ -344,7 +344,7 @@ fn epoch_boundary_state_attestation_processing() { { checked_pre_fin = true; assert!(matches!( - res.err().unwrap().0, + res.err().unwrap(), AttnError::PastSlot { attestation_slot, earliest_permissible_slot, diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index cea746628..f4399ed37 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -529,7 +529,7 @@ fn attestations_with_increasing_slots() { for (attestation, subnet_id) in attestations.into_iter().flatten() { let res = harness .chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), Some(subnet_id)); + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); let expected_attestation_slot = attestation.data.slot; @@ -538,7 +538,7 @@ fn attestations_with_increasing_slots() { if expected_attestation_slot < expected_earliest_permissible_slot { assert!(matches!( - res.err().unwrap().0, + res.err().unwrap(), AttnError::PastSlot { attestation_slot, earliest_permissible_slot, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bb9ee822f..be5521223 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -16,7 +16,7 @@ mod validator_inclusion; mod version; use beacon_chain::{ - attestation_verification::SignatureVerifiedAttestation, + attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, validator_monitor::{get_block_delay_ms, timestamp_now}, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, @@ -1066,7 +1066,7 @@ pub fn serve( for (index, attestation) in attestations.as_slice().iter().enumerate() { let attestation = match chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), None) + .verify_unaggregated_attestation_for_gossip(attestation, None) { Ok(attestation) => attestation, Err(e) => { @@ -1121,7 +1121,7 @@ pub fn serve( )); }; - if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { + if let Err(e) = chain.add_to_naive_aggregation_pool(&attestation) { error!(log, "Failure adding verified attestation to the naive aggregation pool"; "error" => ?e, @@ -1958,7 +1958,7 @@ pub fn serve( let mut failures = Vec::new(); // Verify that all messages in the post are valid before processing further - for (index, aggregate) in aggregates.into_iter().enumerate() { + for (index, aggregate) in aggregates.iter().enumerate() { match chain.verify_aggregated_attestation_for_gossip(aggregate) { Ok(verified_aggregate) => { messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( @@ -1984,8 +1984,8 @@ pub fn serve( // It's reasonably likely that two different validators produce // identical aggregates, especially if they're using the same beacon // node. - Err((AttnError::AttestationAlreadyKnown(_), _)) => continue, - Err((e, aggregate)) => { + Err(AttnError::AttestationAlreadyKnown(_)) => continue, + Err(e) => { error!(log, "Failure verifying aggregate and proofs"; "error" => format!("{:?}", e), @@ -2017,7 +2017,7 @@ pub fn serve( ); failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); } - if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + if let Err(e) = chain.add_to_block_inclusion_pool(&verified_aggregate) { warn!(log, "Could not add verified aggregate attestation to the inclusion pool"; "error" => format!("{:?}", e), diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index d6c4bf77b..63868b2df 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -46,7 +46,8 @@ use eth2_libp2p::{ }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; -use slog::{debug, error, trace, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; +use std::cmp; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -70,7 +71,7 @@ mod tests; mod work_reprocessing_queue; mod worker; -pub use worker::ProcessId; +pub use worker::{GossipAggregatePackage, GossipAttestationPackage, ProcessId}; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -159,11 +160,27 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); +/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single +/// batch. +/// +/// Choosing these values is difficult since there is a trade-off between: +/// +/// - It is faster to verify one large batch than multiple smaller batches. +/// - "Poisoning" attacks have a larger impact as the batch size increases. +/// +/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single +/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to +/// individually verifying each attestation signature. +const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; +const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; + /// Unique IDs used for metrics and testing. pub const WORKER_FREED: &str = "worker_freed"; pub const NOTHING_TO_DO: &str = "nothing_to_do"; pub const GOSSIP_ATTESTATION: &str = "gossip_attestation"; +pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; +pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; @@ -564,6 +581,9 @@ pub enum Work { should_import: bool, seen_timestamp: Duration, }, + GossipAttestationBatch { + packages: Vec>, + }, GossipAggregate { message_id: MessageId, peer_id: PeerId, @@ -576,6 +596,9 @@ pub enum Work { aggregate: Box>, seen_timestamp: Duration, }, + GossipAggregateBatch { + packages: Vec>, + }, GossipBlock { message_id: MessageId, peer_id: PeerId, @@ -644,7 +667,9 @@ impl Work { fn str_id(&self) -> &'static str { match self { Work::GossipAttestation { .. } => GOSSIP_ATTESTATION, + Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH, Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, + Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, @@ -922,10 +947,103 @@ impl BeaconProcessor { // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. - } else if let Some(item) = aggregate_queue.pop() { - self.spawn_worker(item, toolbox); - } else if let Some(item) = attestation_queue.pop() { - self.spawn_worker(item, toolbox); + } else if aggregate_queue.len() > 0 { + let batch_size = + cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE); + + if batch_size < 2 { + // One single aggregate is in the queue, process it individually. + if let Some(item) = aggregate_queue.pop() { + self.spawn_worker(item, toolbox); + } + } else { + // Collect two or more aggregates into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAggregate` item into a + // `Work::GossipAggregateBatch` item. + let mut packages = Vec::with_capacity(batch_size); + for _ in 0..batch_size { + if let Some(item) = aggregate_queue.pop() { + match item { + Work::GossipAggregate { + message_id, + peer_id, + aggregate, + seen_timestamp, + } => { + packages.push(GossipAggregatePackage::new( + message_id, + peer_id, + aggregate, + seen_timestamp, + )); + } + _ => { + error!(self.log, "Invalid item in aggregate queue") + } + } + } + } + + // Process all aggregates with a single worker. + self.spawn_worker(Work::GossipAggregateBatch { packages }, toolbox) + } + // Check the unaggregated attestation queue. + // + // Potentially use batching. + } else if attestation_queue.len() > 0 { + let batch_size = cmp::min( + attestation_queue.len(), + MAX_GOSSIP_ATTESTATION_BATCH_SIZE, + ); + + if batch_size < 2 { + // One single attestation is in the queue, process it individually. + if let Some(item) = attestation_queue.pop() { + self.spawn_worker(item, toolbox); + } + } else { + // Collect two or more attestations into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAttestation` item into a + // `Work::GossipAttestationBatch` item. + let mut packages = Vec::with_capacity(batch_size); + for _ in 0..batch_size { + if let Some(item) = attestation_queue.pop() { + match item { + Work::GossipAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + } => { + packages.push(GossipAttestationPackage::new( + message_id, + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + )); + } + _ => error!( + self.log, + "Invalid item in attestation queue" + ), + } + } + } + + // Process all attestations with a single worker. + self.spawn_worker( + Work::GossipAttestationBatch { packages }, + toolbox, + ) + } // Check sync committee messages after attestations as their rewards are lesser // and they don't influence fork choice. } else if let Some(item) = sync_contribution_queue.pop() { @@ -1009,7 +1127,21 @@ impl BeaconProcessor { match work { _ if can_spawn => self.spawn_worker(work, toolbox), Work::GossipAttestation { .. } => attestation_queue.push(work), + // Attestation batches are formed internally within the + // `BeaconProcessor`, they are not sent from external services. + Work::GossipAttestationBatch { .. } => crit!( + self.log, + "Unsupported inbound event"; + "type" => "GossipAttestationBatch" + ), Work::GossipAggregate { .. } => aggregate_queue.push(work), + // Aggregate batches are formed internally within the `BeaconProcessor`, + // they are not sent from external services. + Work::GossipAggregateBatch { .. } => crit!( + self.log, + "Unsupported inbound event"; + "type" => "GossipAggregateBatch" + ), Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } @@ -1180,7 +1312,7 @@ impl BeaconProcessor { match work { /* - * Unaggregated attestation verification. + * Individual unaggregated attestation verification. */ Work::GossipAttestation { message_id, @@ -1192,14 +1324,19 @@ impl BeaconProcessor { } => worker.process_gossip_attestation( message_id, peer_id, - *attestation, + attestation, subnet_id, should_import, Some(work_reprocessing_tx), seen_timestamp, ), /* - * Aggregated attestation verification. + * Batched unaggregated attestation verification. + */ + Work::GossipAttestationBatch { packages } => worker + .process_gossip_attestation_batch(packages, Some(work_reprocessing_tx)), + /* + * Individual aggregated attestation verification. */ Work::GossipAggregate { message_id, @@ -1209,10 +1346,16 @@ impl BeaconProcessor { } => worker.process_gossip_aggregate( message_id, peer_id, - *aggregate, + aggregate, Some(work_reprocessing_tx), seen_timestamp, ), + /* + * Batched aggregated attestation verification. + */ + Work::GossipAggregateBatch { packages } => { + worker.process_gossip_aggregate_batch(packages, Some(work_reprocessing_tx)) + } /* * Verification for beacon blocks received on gossip. */ @@ -1345,7 +1488,7 @@ impl BeaconProcessor { } => worker.process_gossip_attestation( message_id, peer_id, - *attestation, + attestation, subnet_id, should_import, None, // Do not allow this attestation to be re-processed beyond this point. @@ -1359,7 +1502,7 @@ impl BeaconProcessor { } => worker.process_gossip_aggregate( message_id, peer_id, - *aggregate, + aggregate, None, seen_timestamp, ), diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index e581b681e..81028d476 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,22 +1,22 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{ - attestation_verification::{Error as AttnError, SignatureVerifiedAttestation}, + attestation_verification::{Error as AttnError, VerifiedAttestation}, observed_operations::ObservationOutcome, sync_committee_verification::Error as SyncCommitteeError, validator_monitor::get_block_delay_ms, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, }; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; -use slog::{debug, error, info, trace, warn}; +use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, + SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use super::{ @@ -26,6 +26,60 @@ use super::{ Worker, }; +/// An attestation that has been validated by the `BeaconChain`. +/// +/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to +/// construct this from components which have not passed `BeaconChain` validation. +struct VerifiedUnaggregate { + attestation: Box>, + indexed_attestation: IndexedAttestation, +} + +/// This implementation allows `Self` to be imported to fork choice and other functions on the +/// `BeaconChain`. +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedUnaggregate { + fn attestation(&self) -> &Attestation { + &self.attestation + } + + fn indexed_attestation(&self) -> &IndexedAttestation { + &self.indexed_attestation + } +} + +/// An attestation that failed validation by the `BeaconChain`. +struct RejectedUnaggregate { + attestation: Box>, + error: AttnError, +} + +/// An aggregate that has been validated by the `BeaconChain`. +/// +/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to +/// construct this from components which have not passed `BeaconChain` validation. +struct VerifiedAggregate { + signed_aggregate: Box>, + indexed_attestation: IndexedAttestation, +} + +/// This implementation allows `Self` to be imported to fork choice and other functions on the +/// `BeaconChain`. +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedAggregate { + fn attestation(&self) -> &Attestation { + &self.signed_aggregate.message.aggregate + } + + fn indexed_attestation(&self) -> &IndexedAttestation { + &self.indexed_attestation + } +} + +/// An attestation that failed validation by the `BeaconChain`. +struct RejectedAggregate { + signed_aggregate: Box>, + error: AttnError, +} + /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { @@ -41,7 +95,7 @@ enum FailedAtt { } impl FailedAtt { - pub fn root(&self) -> &Hash256 { + pub fn beacon_block_root(&self) -> &Hash256 { match self { FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root, FailedAtt::Aggregate { attestation, .. } => { @@ -58,6 +112,66 @@ impl FailedAtt { } } +/// Items required to verify a batch of unaggregated gossip attestations. +#[derive(Debug)] +pub struct GossipAttestationPackage { + message_id: MessageId, + peer_id: PeerId, + attestation: Box>, + subnet_id: SubnetId, + beacon_block_root: Hash256, + should_import: bool, + seen_timestamp: Duration, +} + +impl GossipAttestationPackage { + pub fn new( + message_id: MessageId, + peer_id: PeerId, + attestation: Box>, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + ) -> Self { + Self { + message_id, + peer_id, + beacon_block_root: attestation.data.beacon_block_root, + attestation, + subnet_id, + should_import, + seen_timestamp, + } + } +} + +/// Items required to verify a batch of aggregated gossip attestations. +#[derive(Debug)] +pub struct GossipAggregatePackage { + message_id: MessageId, + peer_id: PeerId, + aggregate: Box>, + beacon_block_root: Hash256, + seen_timestamp: Duration, +} + +impl GossipAggregatePackage { + pub fn new( + message_id: MessageId, + peer_id: PeerId, + aggregate: Box>, + seen_timestamp: Duration, + ) -> Self { + Self { + message_id, + peer_id, + beacon_block_root: aggregate.message.aggregate.data.beacon_block_root, + aggregate, + seen_timestamp, + } + } +} + impl Worker { /* Auxiliary functions */ @@ -103,88 +217,200 @@ impl Worker { self, message_id: MessageId, peer_id: PeerId, - attestation: Attestation, + attestation: Box>, subnet_id: SubnetId, should_import: bool, reprocess_tx: Option>>, seen_timestamp: Duration, ) { - let beacon_block_root = attestation.data.beacon_block_root; - - let attestation = match self + let result = match self .chain - .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) { - Ok(attestation) => attestation, - Err((e, attestation)) => { - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::Unaggregate { - attestation: Box::new(attestation), - subnet_id, - should_import, - seen_timestamp, - }, - reprocess_tx, - e, + Ok(verified_attestation) => Ok(VerifiedUnaggregate { + indexed_attestation: verified_attestation.into_indexed_attestation(), + attestation, + }), + Err(error) => Err(RejectedUnaggregate { attestation, error }), + }; + + self.process_gossip_attestation_result( + result, + message_id, + peer_id, + subnet_id, + reprocess_tx, + should_import, + seen_timestamp, + ); + } + + pub fn process_gossip_attestation_batch( + self, + packages: Vec>, + reprocess_tx: Option>>, + ) { + let attestations_and_subnets = packages + .iter() + .map(|package| (package.attestation.as_ref(), Some(package.subnet_id))); + + let results = match self + .chain + .batch_verify_unaggregated_attestations_for_gossip(attestations_and_subnets) + { + Ok(results) => results, + Err(e) => { + error!( + self.log, + "Batch unagg. attn verification failed"; + "error" => ?e ); return; } }; - // Register the attestation with any monitored validators. - self.chain - .validator_monitor - .read() - .register_gossip_unaggregated_attestation( - seen_timestamp, - attestation.indexed_attestation(), - &self.chain.slot_clock, - ); - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - - if !should_import { - return; + // Sanity check. + if results.len() != packages.len() { + // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong + // peer. + crit!( + self.log, + "Batch attestation result mismatch"; + "results" => results.len(), + "packages" => packages.len(), + ) } - metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL); + // Map the results into a new `Vec` so that `results` no longer holds a reference to + // `packages`. + #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker. + let results = results + .into_iter() + .map(|result| result.map(|verified| verified.into_indexed_attestation())) + .collect::>(); - if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) { - match e { - BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + for (result, package) in results.into_iter().zip(packages.into_iter()) { + let result = match result { + Ok(indexed_attestation) => Ok(VerifiedUnaggregate { + indexed_attestation, + attestation: package.attestation, + }), + Err(error) => Err(RejectedUnaggregate { + attestation: package.attestation, + error, + }), + }; + + self.process_gossip_attestation_result( + result, + package.message_id, + package.peer_id, + package.subnet_id, + reprocess_tx.clone(), + package.should_import, + package.seen_timestamp, + ); + } + } + + // Clippy warning is is ignored since the arguments are all of a different type (i.e., they + // cant' be mixed-up) and creating a struct would result in more complexity. + #[allow(clippy::too_many_arguments)] + fn process_gossip_attestation_result( + &self, + result: Result, RejectedUnaggregate>, + message_id: MessageId, + peer_id: PeerId, + subnet_id: SubnetId, + reprocess_tx: Option>>, + should_import: bool, + seen_timestamp: Duration, + ) { + match result { + Ok(verified_attestation) => { + let indexed_attestation = &verified_attestation.indexed_attestation; + let beacon_block_root = indexed_attestation.data.beacon_block_root; + + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_unaggregated_attestation( + seen_timestamp, + indexed_attestation, + &self.chain.slot_clock, + ); + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + if !should_import { + return; + } + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + if let Err(e) = self + .chain + .apply_attestation_to_fork_choice(&verified_attestation) + { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( + e, + )) => { + debug!( + self.log, + "Attestation invalid for fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ) + } + e => error!( + self.log, + "Error applying attestation to fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ), + } + } + + if let Err(e) = self + .chain + .add_to_naive_aggregation_pool(&verified_attestation) + { debug!( self.log, - "Attestation invalid for fork choice"; + "Attestation invalid for agg pool"; "reason" => ?e, "peer" => %peer_id, "beacon_block_root" => ?beacon_block_root ) } - e => error!( - self.log, - "Error applying attestation to fork choice"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ), + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + } + Err(RejectedUnaggregate { attestation, error }) => { + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::Unaggregate { + attestation, + subnet_id, + should_import, + seen_timestamp, + }, + reprocess_tx, + error, + ); } } - - if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) { - debug!( - self.log, - "Attestation invalid for agg pool"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ) - } - - metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL); } /// Process the aggregated attestation received from the gossip network and: @@ -198,82 +424,191 @@ impl Worker { self, message_id: MessageId, peer_id: PeerId, - aggregate: SignedAggregateAndProof, + aggregate: Box>, reprocess_tx: Option>>, seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; - let aggregate = match self + let result = match self .chain - .verify_aggregated_attestation_for_gossip(aggregate) + .verify_aggregated_attestation_for_gossip(&aggregate) { - Ok(aggregate) => aggregate, - Err((e, attestation)) => { - // Report the failure to gossipsub - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::Aggregate { - attestation: Box::new(attestation), - seen_timestamp, - }, - reprocess_tx, - e, + Ok(verified_aggregate) => Ok(VerifiedAggregate { + indexed_attestation: verified_aggregate.into_indexed_attestation(), + signed_aggregate: aggregate, + }), + Err(error) => Err(RejectedAggregate { + signed_aggregate: aggregate, + error, + }), + }; + + self.process_gossip_aggregate_result( + result, + beacon_block_root, + message_id, + peer_id, + reprocess_tx, + seen_timestamp, + ); + } + + pub fn process_gossip_aggregate_batch( + self, + packages: Vec>, + reprocess_tx: Option>>, + ) { + let aggregates = packages.iter().map(|package| package.aggregate.as_ref()); + + let results = match self + .chain + .batch_verify_aggregated_attestations_for_gossip(aggregates) + { + Ok(results) => results, + Err(e) => { + error!( + self.log, + "Batch agg. attn verification failed"; + "error" => ?e ); return; } }; - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Sanity check. + if results.len() != packages.len() { + // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong + // peer. + crit!( + self.log, + "Batch agg. attestation result mismatch"; + "results" => results.len(), + "packages" => packages.len(), + ) + } - // Register the attestation with any monitored validators. - self.chain - .validator_monitor - .read() - .register_gossip_aggregated_attestation( - seen_timestamp, - aggregate.aggregate(), - aggregate.indexed_attestation(), - &self.chain.slot_clock, + // Map the results into a new `Vec` so that `results` no longer holds a reference to + // `packages`. + #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker. + let results = results + .into_iter() + .map(|result| result.map(|verified| verified.into_indexed_attestation())) + .collect::>(); + + for (result, package) in results.into_iter().zip(packages.into_iter()) { + let result = match result { + Ok(indexed_attestation) => Ok(VerifiedAggregate { + indexed_attestation, + signed_aggregate: package.aggregate, + }), + Err(error) => Err(RejectedAggregate { + signed_aggregate: package.aggregate, + error, + }), + }; + + self.process_gossip_aggregate_result( + result, + package.beacon_block_root, + package.message_id, + package.peer_id, + reprocess_tx.clone(), + package.seen_timestamp, ); + } + } - metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); + fn process_gossip_aggregate_result( + &self, + result: Result, RejectedAggregate>, + beacon_block_root: Hash256, + message_id: MessageId, + peer_id: PeerId, + reprocess_tx: Option>>, + seen_timestamp: Duration, + ) { + match result { + Ok(verified_aggregate) => { + let aggregate = &verified_aggregate.signed_aggregate; + let indexed_attestation = &verified_aggregate.indexed_attestation; - if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) { - match e { - BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_aggregated_attestation( + seen_timestamp, + aggregate, + indexed_attestation, + &self.chain.slot_clock, + ); + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + if let Err(e) = self + .chain + .apply_attestation_to_fork_choice(&verified_aggregate) + { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( + e, + )) => { + debug!( + self.log, + "Aggregate invalid for fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ) + } + e => error!( + self.log, + "Error applying aggregate to fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ), + } + } + + if let Err(e) = self.chain.add_to_block_inclusion_pool(&verified_aggregate) { debug!( self.log, - "Aggregate invalid for fork choice"; + "Attestation invalid for op pool"; "reason" => ?e, "peer" => %peer_id, "beacon_block_root" => ?beacon_block_root ) } - e => error!( - self.log, - "Error applying aggregate to fork choice"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ), + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + } + Err(RejectedAggregate { + signed_aggregate, + error, + }) => { + // Report the failure to gossipsub + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::Aggregate { + attestation: signed_aggregate, + seen_timestamp, + }, + reprocess_tx, + error, + ); } } - - if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) { - debug!( - self.log, - "Attestation invalid for op pool"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ) - } - - metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL); } /// Process the beacon block received from the gossip network and: @@ -834,7 +1169,7 @@ impl Worker { reprocess_tx: Option>>, error: AttnError, ) { - let beacon_block_root = failed_att.root(); + let beacon_block_root = failed_att.beacon_block_root(); let attestation_type = failed_att.kind(); metrics::register_attestation_error(&error); match &error { diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index 58fec22d5..b9d78900b 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -9,6 +9,7 @@ mod gossip_methods; mod rpc_methods; mod sync_methods; +pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage}; pub use sync_methods::ProcessId; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 61e6d56ea..f299de1b6 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -413,7 +413,7 @@ impl ForkChoiceTest { let mut verified_attestation = self .harness .chain - .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) .expect("precondition: should gossip verify attestation"); if let MutationDelay::Blocks(slots) = delay {