diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index a6233cde4..eb29e8d2f 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -18,13 +18,16 @@ //! types::Attestation types::SignedAggregateAndProof //! | | //! ▼ ▼ -//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation +//! IndexedUnaggregatedAttestation IndexedAggregatedAttestation +//! | | +//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation //! | | //! ------------------------------------- //! | //! ▼ -//! impl SignatureVerifiedAttestation +//! impl VerifiedAttestation //! ``` +mod batch; use crate::{ beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT}, @@ -53,6 +56,8 @@ use types::{ SelectionProof, SignedAggregateAndProof, Slot, SubnetId, }; +pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; + /// Returned when an attestation was not successfully verified. It might not have been verified for /// two reasons: /// @@ -254,53 +259,105 @@ impl From for Error { } } -/// Wraps a `SignedAggregateAndProof` that has been verified for propagation on the gossip network. -pub struct VerifiedAggregatedAttestation { - signed_aggregate: SignedAggregateAndProof, +/// Used to avoid double-checking signatures. +#[derive(Copy, Clone)] +enum CheckAttestationSignature { + Yes, + No, +} + +/// Wraps a `SignedAggregateAndProof` that has been verified up until the point that an +/// `IndexedAttestation` can be derived. +/// +/// These attestations have *not* undergone signature verification. +struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> { + signed_aggregate: &'a SignedAggregateAndProof, + indexed_attestation: IndexedAttestation, + attestation_root: Hash256, +} + +/// Wraps a `Attestation` that has been verified up until the point that an `IndexedAttestation` can +/// be derived. +/// +/// These attestations have *not* undergone signature verification. +struct IndexedUnaggregatedAttestation<'a, T: BeaconChainTypes> { + attestation: &'a Attestation, + indexed_attestation: IndexedAttestation, + subnet_id: SubnetId, + validator_index: u64, +} + +/// Wraps a `SignedAggregateAndProof` that has been fully verified for propagation on the gossip +/// network. +pub struct VerifiedAggregatedAttestation<'a, T: BeaconChainTypes> { + signed_aggregate: &'a SignedAggregateAndProof, indexed_attestation: IndexedAttestation, } -/// Wraps an `Attestation` that has been verified for propagation on the gossip network. -pub struct VerifiedUnaggregatedAttestation { - attestation: Attestation, +impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { + pub fn into_indexed_attestation(self) -> IndexedAttestation { + self.indexed_attestation + } +} + +/// Wraps an `Attestation` that has been fully verified for propagation on the gossip network. +pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> { + attestation: &'a Attestation, indexed_attestation: IndexedAttestation, subnet_id: SubnetId, } +impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { + pub fn into_indexed_attestation(self) -> IndexedAttestation { + self.indexed_attestation + } +} + /// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive /// macro. -impl Clone for VerifiedUnaggregatedAttestation { +impl<'a, T: BeaconChainTypes> Clone for IndexedUnaggregatedAttestation<'a, T> { fn clone(&self) -> Self { Self { - attestation: self.attestation.clone(), + attestation: self.attestation, indexed_attestation: self.indexed_attestation.clone(), subnet_id: self.subnet_id, + validator_index: self.validator_index, } } } /// A helper trait implemented on wrapper types that can be progressed to a state where they can be /// verified for application to fork choice. -pub trait SignatureVerifiedAttestation { +pub trait VerifiedAttestation { + fn attestation(&self) -> &Attestation; + fn indexed_attestation(&self) -> &IndexedAttestation; } -impl<'a, T: BeaconChainTypes> SignatureVerifiedAttestation for VerifiedAggregatedAttestation { +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedAggregatedAttestation<'a, T> { + fn attestation(&self) -> &Attestation { + self.attestation() + } + fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } } -impl SignatureVerifiedAttestation for VerifiedUnaggregatedAttestation { +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedUnaggregatedAttestation<'a, T> { + fn attestation(&self) -> &Attestation { + self.attestation + } + fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } } /// Information about invalid attestations which might still be slashable despite being invalid. -pub enum AttestationSlashInfo { +pub enum AttestationSlashInfo<'a, T: BeaconChainTypes, TErr> { /// The attestation is invalid, but its signature wasn't checked. - SignatureNotChecked(Attestation, TErr), + SignatureNotChecked(&'a Attestation, TErr), /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. SignatureNotCheckedIndexed(IndexedAttestation, TErr), /// The attestation's signature is invalid, so it will never be slashable. @@ -324,7 +381,7 @@ fn process_slash_info( if let Some(slasher) = chain.slasher.as_ref() { let (indexed_attestation, check_signature, err) = match slash_info { SignatureNotChecked(attestation, err) => { - match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { + match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { Ok((indexed, _)) => (indexed, true, err), Err(e) => { debug!( @@ -367,13 +424,13 @@ fn process_slash_info( } } -impl VerifiedAggregatedAttestation { +impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { /// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip /// network. pub fn verify( - signed_aggregate: SignedAggregateAndProof, + signed_aggregate: &'a SignedAggregateAndProof, chain: &BeaconChain, - ) -> Result)> { + ) -> Result { Self::verify_slashable(signed_aggregate, chain) .map(|verified_aggregate| { if let Some(slasher) = chain.slasher.as_ref() { @@ -381,9 +438,7 @@ impl VerifiedAggregatedAttestation { } verified_aggregate }) - .map_err(|(slash_info, original_aggregate)| { - (process_slash_info(slash_info, chain), original_aggregate) - }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) } /// Run the checks that happen before an indexed attestation is constructed. @@ -467,6 +522,56 @@ impl VerifiedAggregatedAttestation { } } + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable( + signed_aggregate: &'a SignedAggregateAndProof, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + let attestation = &signed_aggregate.message.aggregate; + let aggregator_index = signed_aggregate.message.aggregator_index; + let attestation_root = match Self::verify_early_checks(signed_aggregate, chain) { + Ok(root) => root, + Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)), + }; + + let indexed_attestation = + match map_attestation_committee(chain, attestation, |(committee, _)| { + // Note: this clones the signature which is known to be a relatively slow operation. + // + // Future optimizations should remove this clone. + let selection_proof = + SelectionProof::from(signed_aggregate.message.selection_proof.clone()); + + if !selection_proof + .is_aggregator(committee.committee.len(), &chain.spec) + .map_err(|e| Error::BeaconChainError(e.into()))? + { + return Err(Error::InvalidSelectionProof { aggregator_index }); + } + + // Ensure the aggregator is a member of the committee for which it is aggregating. + if !committee.committee.contains(&(aggregator_index as usize)) { + return Err(Error::AggregatorNotInCommittee { aggregator_index }); + } + + get_indexed_attestation(committee.committee, attestation) + .map_err(|e| BeaconChainError::from(e).into()) + }) { + Ok(indexed_attestation) => indexed_attestation, + Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)), + }; + + Ok(IndexedAggregatedAttestation { + signed_aggregate, + indexed_attestation, + attestation_root, + }) + } +} + +impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { /// Run the checks that happen after the indexed attestation and signature have been checked. fn verify_late_checks( signed_aggregate: &SignedAggregateAndProof, @@ -508,82 +613,70 @@ impl VerifiedAggregatedAttestation { Ok(()) } - /// Verify the attestation, producing extra information about whether it might be slashable. - // NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not - // worth creating an alias. - #[allow(clippy::type_complexity)] - pub fn verify_slashable( - signed_aggregate: SignedAggregateAndProof, + /// Verify the `signed_aggregate`. + pub fn verify( + signed_aggregate: &'a SignedAggregateAndProof, chain: &BeaconChain, - ) -> Result< - Self, - ( - AttestationSlashInfo, - SignedAggregateAndProof, - ), - > { + ) -> Result { + let indexed = IndexedAggregatedAttestation::verify(signed_aggregate, chain)?; + Self::from_indexed(indexed, chain, CheckAttestationSignature::Yes) + } + + /// Complete the verification of an indexed attestation. + fn from_indexed( + signed_aggregate: IndexedAggregatedAttestation<'a, T>, + chain: &BeaconChain, + check_signature: CheckAttestationSignature, + ) -> Result { + Self::verify_slashable(signed_aggregate, chain, check_signature) + .map(|verified_aggregate| verified_aggregate.apply_to_slasher(chain)) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + fn apply_to_slasher(self, chain: &BeaconChain) -> Self { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(self.indexed_attestation.clone()); + } + self + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + fn verify_slashable( + signed_aggregate: IndexedAggregatedAttestation<'a, T>, + chain: &BeaconChain, + check_signature: CheckAttestationSignature, + ) -> Result> { use AttestationSlashInfo::*; - let attestation = &signed_aggregate.message.aggregate; - let aggregator_index = signed_aggregate.message.aggregator_index; - let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) { - Ok(root) => root, - Err(e) => { - return Err(( - SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), + let IndexedAggregatedAttestation { + signed_aggregate, + indexed_attestation, + attestation_root, + } = signed_aggregate; + + match check_signature { + CheckAttestationSignature::Yes => { + // Ensure that all signatures are valid. + if let Err(e) = verify_signed_aggregate_signatures( + chain, signed_aggregate, - )) - } - }; - - let indexed_attestation = - match map_attestation_committee(chain, attestation, |(committee, _)| { - // Note: this clones the signature which is known to be a relatively slow operation. - // - // Future optimizations should remove this clone. - let selection_proof = - SelectionProof::from(signed_aggregate.message.selection_proof.clone()); - - if !selection_proof - .is_aggregator(committee.committee.len(), &chain.spec) - .map_err(|e| Error::BeaconChainError(e.into()))? - { - return Err(Error::InvalidSelectionProof { aggregator_index }); - } - - // Ensure the aggregator is a member of the committee for which it is aggregating. - if !committee.committee.contains(&(aggregator_index as usize)) { - return Err(Error::AggregatorNotInCommittee { aggregator_index }); - } - - get_indexed_attestation(committee.committee, attestation) - .map_err(|e| BeaconChainError::from(e).into()) - }) { - Ok(indexed_attestation) => indexed_attestation, - Err(e) => { - return Err(( - SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), - signed_aggregate, - )) - } - }; - - // Ensure that all signatures are valid. - if let Err(e) = - verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation) + &indexed_attestation, + ) .and_then(|is_valid| { if !is_valid { Err(Error::InvalidSignature) } else { Ok(()) } - }) - { - return Err((SignatureInvalid(e), signed_aggregate)); - } + }) { + return Err(SignatureInvalid(e)); + } + } + CheckAttestationSignature::No => (), + }; - if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) { - return Err((SignatureValid(indexed_attestation, e), signed_aggregate)); + if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_root, chain) { + return Err(SignatureValid(indexed_attestation, e)); } Ok(VerifiedAggregatedAttestation { @@ -592,11 +685,6 @@ impl VerifiedAggregatedAttestation { }) } - /// A helper function to add this aggregate to `beacon_chain.op_pool`. - pub fn add_to_pool(self, chain: &BeaconChain) -> Result { - chain.add_to_block_inclusion_pool(self) - } - /// Returns the underlying `attestation` for the `signed_aggregate`. pub fn attestation(&self) -> &Attestation { &self.signed_aggregate.message.aggregate @@ -604,11 +692,11 @@ impl VerifiedAggregatedAttestation { /// Returns the underlying `signed_aggregate`. pub fn aggregate(&self) -> &SignedAggregateAndProof { - &self.signed_aggregate + self.signed_aggregate } } -impl VerifiedUnaggregatedAttestation { +impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// Run the checks that happen before an indexed attestation is constructed. pub fn verify_early_checks( attestation: &Attestation, @@ -699,6 +787,75 @@ impl VerifiedUnaggregatedAttestation { Ok((validator_index, expected_subnet_id)) } + /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip + /// network. + /// + /// `subnet_id` is the subnet from which we received this attestation. This function will + /// verify that it was received on the correct subnet. + pub fn verify( + attestation: &'a Attestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result { + Self::verify_slashable(attestation, subnet_id, chain) + .map(|verified_unaggregated| { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); + } + verified_unaggregated + }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable( + attestation: &'a Attestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + if let Err(e) = Self::verify_early_checks(attestation, chain) { + return Err(SignatureNotChecked(attestation, e)); + } + + let (indexed_attestation, committees_per_slot) = + match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { + Ok(x) => x, + Err(e) => { + return Err(SignatureNotChecked(attestation, e)); + } + }; + + let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( + attestation, + &indexed_attestation, + committees_per_slot, + subnet_id, + chain, + ) { + Ok(t) => t, + Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), + }; + + Ok(Self { + attestation, + indexed_attestation, + subnet_id: expected_subnet_id, + validator_index, + }) + } + + /// Returns a mutable reference to the underlying attestation. + /// + /// Only use during testing since modifying the `IndexedAttestation` can cause the attestation + /// to no-longer be valid. + pub fn __indexed_attestation_mut(&mut self) -> &mut IndexedAttestation { + &mut self.indexed_attestation + } +} + +impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// Run the checks that apply after the signature has been checked. fn verify_late_checks( attestation: &Attestation, @@ -725,88 +882,70 @@ impl VerifiedUnaggregatedAttestation { Ok(()) } - /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip - /// network. - /// - /// `subnet_id` is the subnet from which we received this attestation. This function will - /// verify that it was received on the correct subnet. + /// Verify the `unaggregated_attestation`. pub fn verify( - attestation: Attestation, + unaggregated_attestation: &'a Attestation, subnet_id: Option, chain: &BeaconChain, - ) -> Result)> { - Self::verify_slashable(attestation, subnet_id, chain) - .map(|verified_unaggregated| { - if let Some(slasher) = chain.slasher.as_ref() { - slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); - } - verified_unaggregated - }) - .map_err(|(slash_info, original_attestation)| { - (process_slash_info(slash_info, chain), original_attestation) - }) + ) -> Result { + let indexed = + IndexedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, chain)?; + Self::from_indexed(indexed, chain, CheckAttestationSignature::Yes) + } + + /// Complete the verification of an indexed attestation. + fn from_indexed( + attestation: IndexedUnaggregatedAttestation<'a, T>, + chain: &BeaconChain, + check_signature: CheckAttestationSignature, + ) -> Result { + Self::verify_slashable(attestation, chain, check_signature) + .map(|verified_unaggregated| verified_unaggregated.apply_to_slasher(chain)) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + fn apply_to_slasher(self, chain: &BeaconChain) -> Self { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(self.indexed_attestation.clone()); + } + self } /// Verify the attestation, producing extra information about whether it might be slashable. - // NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not - // worth creating an alias. - #[allow(clippy::type_complexity)] - pub fn verify_slashable( - attestation: Attestation, - subnet_id: Option, + fn verify_slashable( + attestation: IndexedUnaggregatedAttestation<'a, T>, chain: &BeaconChain, - ) -> Result, Attestation)> { + check_signature: CheckAttestationSignature, + ) -> Result> { use AttestationSlashInfo::*; - if let Err(e) = Self::verify_early_checks(&attestation, chain) { - return Err((SignatureNotChecked(attestation.clone(), e), attestation)); - } - - let (indexed_attestation, committees_per_slot) = - match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { - Ok(x) => x, - Err(e) => { - return Err((SignatureNotChecked(attestation.clone(), e), attestation)); - } - }; - - let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( - &attestation, - &indexed_attestation, - committees_per_slot, + let IndexedUnaggregatedAttestation { + attestation, + indexed_attestation, subnet_id, - chain, - ) { - Ok(t) => t, - Err(e) => { - return Err(( - SignatureNotCheckedIndexed(indexed_attestation, e), - attestation, - )) + validator_index, + } = attestation; + + match check_signature { + CheckAttestationSignature::Yes => { + if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { + return Err(SignatureInvalid(e)); + } } + CheckAttestationSignature::No => (), }; - // The aggregate signature of the attestation is valid. - if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { - return Err((SignatureInvalid(e), attestation)); - } - - if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) { - return Err((SignatureValid(indexed_attestation, e), attestation)); + if let Err(e) = Self::verify_late_checks(attestation, validator_index, chain) { + return Err(SignatureValid(indexed_attestation, e)); } Ok(Self { attestation, indexed_attestation, - subnet_id: expected_subnet_id, + subnet_id, }) } - /// A helper function to add this attestation to `beacon_chain.naive_aggregation_pool`. - pub fn add_to_pool(self, chain: &BeaconChain) -> Result { - chain.add_to_naive_aggregation_pool(self) - } - /// Returns the correct subnet for the attestation. pub fn subnet_id(&self) -> SubnetId { self.subnet_id @@ -814,7 +953,7 @@ impl VerifiedUnaggregatedAttestation { /// Returns the wrapped `attestation`. pub fn attestation(&self) -> &Attestation { - &self.attestation + self.attestation } /// Returns the wrapped `indexed_attestation`. diff --git a/beacon_node/beacon_chain/src/attestation_verification/batch.rs b/beacon_node/beacon_chain/src/attestation_verification/batch.rs new file mode 100644 index 000000000..30f1ae7e5 --- /dev/null +++ b/beacon_node/beacon_chain/src/attestation_verification/batch.rs @@ -0,0 +1,222 @@ +//! These two `batch_...` functions provide verification of batches of attestations. They provide +//! significant CPU-time savings by performing batch verification of BLS signatures. +//! +//! In each function, attestations are "indexed" (i.e., the `IndexedAttestation` is computed), to +//! determine if they should progress to signature verification. Then, all attestations which were +//! successfully indexed have their signatures verified in a batch. If that signature batch fails +//! then all attestation signatures are verified independently. +//! +//! The outcome of each function is a `Vec` with a one-to-one mapping to the attestations +//! supplied as input. Each result provides the exact success or failure result of the corresponding +//! attestation, with no loss of fidelity when compared to individual verification. +use super::{ + CheckAttestationSignature, Error, IndexedAggregatedAttestation, IndexedUnaggregatedAttestation, + VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation, +}; +use crate::{ + beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, BeaconChain, BeaconChainError, + BeaconChainTypes, +}; +use bls::verify_signature_sets; +use state_processing::signature_sets::{ + indexed_attestation_signature_set_from_pubkeys, signed_aggregate_selection_proof_signature_set, + signed_aggregate_signature_set, +}; +use std::borrow::Cow; +use types::*; + +/// Verify aggregated attestations using batch BLS signature verification. +/// +/// See module-level docs for more info. +pub fn batch_verify_aggregated_attestations<'a, T, I>( + aggregates: I, + chain: &BeaconChain, +) -> Result, Error>>, Error> +where + T: BeaconChainTypes, + I: Iterator> + ExactSizeIterator, +{ + let mut num_indexed = 0; + let mut num_failed = 0; + + // Perform indexing of all attestations, collecting the results. + let indexing_results = aggregates + .map(|aggregate| { + let result = IndexedAggregatedAttestation::verify(aggregate, chain); + if result.is_ok() { + num_indexed += 1; + } else { + num_failed += 1; + } + result + }) + .collect::>(); + + // May be set to `No` if batch verification succeeds. + let mut check_signatures = CheckAttestationSignature::Yes; + + // Perform batch BLS verification, if any attestation signatures are worth checking. + if num_indexed > 0 { + let signature_setup_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES); + + let pubkey_cache = chain + .validator_pubkey_cache + .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?; + + let fork = chain.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.fork()))?; + + let mut signature_sets = Vec::with_capacity(num_indexed * 3); + + // Iterate, flattening to get only the `Ok` values. + for indexed in indexing_results.iter().flatten() { + let signed_aggregate = &indexed.signed_aggregate; + let indexed_attestation = &indexed.indexed_attestation; + + signature_sets.push( + signed_aggregate_selection_proof_signature_set( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + signed_aggregate, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?, + ); + signature_sets.push( + signed_aggregate_signature_set( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + signed_aggregate, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?, + ); + signature_sets.push( + indexed_attestation_signature_set_from_pubkeys( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_attestation.signature, + indexed_attestation, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?, + ); + } + + metrics::stop_timer(signature_setup_timer); + + let _signature_verification_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_TIMES); + + if verify_signature_sets(signature_sets.iter()) { + // Since all the signatures verified in a batch, there's no reason for them to be + // checked again later. + check_signatures = CheckAttestationSignature::No + } + } + + // Complete the attestation verification, potentially verifying all signatures independently. + let final_results = indexing_results + .into_iter() + .map(|result| match result { + Ok(indexed) => { + VerifiedAggregatedAttestation::from_indexed(indexed, chain, check_signatures) + } + Err(e) => Err(e), + }) + .collect(); + + Ok(final_results) +} + +/// Verify unaggregated attestations using batch BLS signature verification. +/// +/// See module-level docs for more info. +pub fn batch_verify_unaggregated_attestations<'a, T, I>( + attestations: I, + chain: &BeaconChain, +) -> Result, Error>>, Error> +where + T: BeaconChainTypes, + I: Iterator, Option)> + ExactSizeIterator, +{ + let mut num_partially_verified = 0; + let mut num_failed = 0; + + // Perform partial verification of all attestations, collecting the results. + let partial_results = attestations + .map(|(attn, subnet_opt)| { + let result = IndexedUnaggregatedAttestation::verify(attn, subnet_opt, chain); + if result.is_ok() { + num_partially_verified += 1; + } else { + num_failed += 1; + } + result + }) + .collect::>(); + + // May be set to `No` if batch verification succeeds. + let mut check_signatures = CheckAttestationSignature::Yes; + + // Perform batch BLS verification, if any attestation signatures are worth checking. + if num_partially_verified > 0 { + let signature_setup_timer = metrics::start_timer( + &metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES, + ); + + let pubkey_cache = chain + .validator_pubkey_cache + .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?; + + let fork = chain.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.fork()))?; + + let mut signature_sets = Vec::with_capacity(num_partially_verified); + + // Iterate, flattening to get only the `Ok` values. + for partially_verified in partial_results.iter().flatten() { + let indexed_attestation = &partially_verified.indexed_attestation; + + let signature_set = indexed_attestation_signature_set_from_pubkeys( + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_attestation.signature, + indexed_attestation, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + .map_err(BeaconChainError::SignatureSetError)?; + + signature_sets.push(signature_set); + } + + metrics::stop_timer(signature_setup_timer); + + let _signature_verification_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_TIMES); + + if verify_signature_sets(signature_sets.iter()) { + // Since all the signatures verified in a batch, there's no reason for them to be + // checked again later. + check_signatures = CheckAttestationSignature::No + } + } + + // Complete the attestation verification, potentially verifying all signatures independently. + let final_results = partial_results + .into_iter() + .map(|result| match result { + Ok(partial) => { + VerifiedUnaggregatedAttestation::from_indexed(partial, chain, check_signatures) + } + Err(e) => Err(e), + }) + .collect(); + + Ok(final_results) +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7236e6396..6ad0628d3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,5 +1,6 @@ use crate::attestation_verification::{ - Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, + batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations, + Error as AttestationError, VerifiedAggregatedAttestation, VerifiedAttestation, VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; @@ -1510,17 +1511,32 @@ impl BeaconChain { }) } + /// Performs the same validation as `Self::verify_unaggregated_attestation_for_gossip`, but for + /// multiple attestations using batch BLS verification. Batch verification can provide + /// significant CPU-time savings compared to individual verification. + pub fn batch_verify_unaggregated_attestations_for_gossip<'a, I>( + &self, + attestations: I, + ) -> Result< + Vec, AttestationError>>, + AttestationError, + > + where + I: Iterator, Option)> + ExactSizeIterator, + { + batch_verify_unaggregated_attestations(attestations, self) + } + /// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. /// /// The attestation must be "unaggregated", that is it must have exactly one /// aggregation bit set. - pub fn verify_unaggregated_attestation_for_gossip( + pub fn verify_unaggregated_attestation_for_gossip<'a>( &self, - unaggregated_attestation: Attestation, + unaggregated_attestation: &'a Attestation, subnet_id: Option, - ) -> Result, (AttestationError, Attestation)> - { + ) -> Result, AttestationError> { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); @@ -1539,15 +1555,25 @@ impl BeaconChain { ) } + /// Performs the same validation as `Self::verify_aggregated_attestation_for_gossip`, but for + /// multiple attestations using batch BLS verification. Batch verification can provide + /// significant CPU-time savings compared to individual verification. + pub fn batch_verify_aggregated_attestations_for_gossip<'a, I>( + &self, + aggregates: I, + ) -> Result, AttestationError>>, AttestationError> + where + I: Iterator> + ExactSizeIterator, + { + batch_verify_aggregated_attestations(aggregates, self) + } + /// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it, /// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network. - pub fn verify_aggregated_attestation_for_gossip( + pub fn verify_aggregated_attestation_for_gossip<'a>( &self, - signed_aggregate: SignedAggregateAndProof, - ) -> Result< - VerifiedAggregatedAttestation, - (AttestationError, SignedAggregateAndProof), - > { + signed_aggregate: &'a SignedAggregateAndProof, + ) -> Result, AttestationError> { metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); @@ -1597,13 +1623,13 @@ impl BeaconChain { /// Accepts some attestation-type object and attempts to verify it in the context of fork /// choice. If it is valid it is applied to `self.fork_choice`. /// - /// Common items that implement `SignatureVerifiedAttestation`: + /// Common items that implement `VerifiedAttestation`: /// /// - `VerifiedUnaggregatedAttestation` /// - `VerifiedAggregatedAttestation` pub fn apply_attestation_to_fork_choice( &self, - verified: &impl SignatureVerifiedAttestation, + verified: &impl VerifiedAttestation, ) -> Result<(), Error> { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); @@ -1623,8 +1649,8 @@ impl BeaconChain { /// and no error is returned. pub fn add_to_naive_aggregation_pool( &self, - unaggregated_attestation: VerifiedUnaggregatedAttestation, - ) -> Result, AttestationError> { + unaggregated_attestation: &impl VerifiedAttestation, + ) -> Result<(), AttestationError> { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_AGG_POOL); let attestation = unaggregated_attestation.attestation(); @@ -1660,7 +1686,7 @@ impl BeaconChain { } }; - Ok(unaggregated_attestation) + Ok(()) } /// Accepts a `VerifiedSyncCommitteeMessage` and attempts to apply it to the "naive @@ -1727,13 +1753,13 @@ impl BeaconChain { Ok(verified_sync_committee_message) } - /// Accepts a `VerifiedAggregatedAttestation` and attempts to apply it to `self.op_pool`. + /// Accepts a `VerifiedAttestation` and attempts to apply it to `self.op_pool`. /// /// The op pool is used by local block producers to pack blocks with operations. pub fn add_to_block_inclusion_pool( &self, - signed_aggregate: VerifiedAggregatedAttestation, - ) -> Result, AttestationError> { + verified_attestation: &impl VerifiedAttestation, + ) -> Result<(), AttestationError> { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL); // If there's no eth1 chain then it's impossible to produce blocks and therefore @@ -1745,7 +1771,7 @@ impl BeaconChain { self.op_pool .insert_attestation( // TODO: address this clone. - signed_aggregate.attestation().clone(), + verified_attestation.attestation().clone(), &fork, self.genesis_validators_root, &self.spec, @@ -1753,7 +1779,7 @@ impl BeaconChain { .map_err(Error::from)?; } - Ok(signed_aggregate) + Ok(()) } /// Accepts a `VerifiedSyncContribution` and attempts to apply it to `self.op_pool`. diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 0486220b8..c8c43201c 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -199,6 +199,26 @@ lazy_static! { "Time spent on the signature verification of attestation processing" ); + /* + * Batch Attestation Processing + */ + pub static ref ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_agg_signature_setup_times", + "Time spent on setting up for the signature verification of batch aggregate processing" + ); + pub static ref ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_agg_signature_times", + "Time spent on the signature verification of batch aggregate attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_unagg_signature_setup_times", + "Time spent on setting up for the signature verification of batch unaggregate processing" + ); + pub static ref ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_batch_unagg_signature_times", + "Time spent on the signature verification of batch unaggregate attestation processing" + ); + /* * Shuffling cache */ diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2b452f577..67adda5fa 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1159,29 +1159,42 @@ where } pub fn process_attestations(&self, attestations: HarnessAttestations) { - for (unaggregated_attestations, maybe_signed_aggregate) in attestations.into_iter() { - for (attestation, subnet_id) in unaggregated_attestations { - self.chain - .verify_unaggregated_attestation_for_gossip( - attestation.clone(), - Some(subnet_id), - ) - .unwrap() - .add_to_pool(&self.chain) - .unwrap(); + let num_validators = self.validator_keypairs.len(); + let mut unaggregated = Vec::with_capacity(num_validators); + // This is an over-allocation, but it should be fine. It won't be *that* memory hungry and + // it's nice to have fast tests. + let mut aggregated = Vec::with_capacity(num_validators); + + for (unaggregated_attestations, maybe_signed_aggregate) in attestations.iter() { + for (attn, subnet) in unaggregated_attestations { + unaggregated.push((attn, Some(*subnet))); } - if let Some(signed_aggregate) = maybe_signed_aggregate { - let attn = self - .chain - .verify_aggregated_attestation_for_gossip(signed_aggregate) - .unwrap(); - - self.chain.apply_attestation_to_fork_choice(&attn).unwrap(); - - self.chain.add_to_block_inclusion_pool(attn).unwrap(); + if let Some(a) = maybe_signed_aggregate { + aggregated.push(a) } } + + for result in self + .chain + .batch_verify_unaggregated_attestations_for_gossip(unaggregated.into_iter()) + .unwrap() + { + let verified = result.unwrap(); + self.chain.add_to_naive_aggregation_pool(&verified).unwrap(); + } + + for result in self + .chain + .batch_verify_aggregated_attestations_for_gossip(aggregated.into_iter()) + .unwrap() + { + let verified = result.unwrap(); + self.chain + .apply_attestation_to_fork_choice(&verified) + .unwrap(); + self.chain.add_to_block_inclusion_pool(&verified).unwrap(); + } } pub fn set_current_slot(&self, slot: Slot) { diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index dbba5e318..26aca88f8 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -18,8 +18,8 @@ use store::config::StoreConfig; use tree_hash::TreeHash; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, Attestation, BeaconStateError, - BitList, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, - SignedAggregateAndProof, SubnetId, Unsigned, + BitList, Epoch, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, + SignedAggregateAndProof, Slot, SubnetId, Unsigned, }; pub type E = MainnetEthSpec; @@ -189,713 +189,739 @@ fn get_non_aggregator( .expect("should find non-aggregator for committee") } +struct GossipTester { + harness: BeaconChainHarness>, + /* + * Valid unaggregated attestation + */ + valid_attestation: Attestation, + attester_validator_index: usize, + attester_committee_index: usize, + attester_sk: SecretKey, + attestation_subnet_id: SubnetId, + /* + * Valid unaggregated attestation for batch testing + */ + invalid_attestation: Attestation, + /* + * Valid aggregate + */ + valid_aggregate: SignedAggregateAndProof, + aggregator_validator_index: usize, + aggregator_sk: SecretKey, + /* + * Another valid aggregate for batch testing + */ + invalid_aggregate: SignedAggregateAndProof, +} + +impl GossipTester { + pub fn new() -> Self { + let harness = get_harness(VALIDATOR_COUNT); + + // Extend the chain out a few epochs so we have some chain depth to play with. + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // Advance into a slot where there have not been blocks or attestations produced. + harness.advance_slot(); + + let ( + valid_attestation, + attester_validator_index, + attester_committee_index, + attester_sk, + attestation_subnet_id, + ) = get_valid_unaggregated_attestation(&harness.chain); + + let (valid_aggregate, aggregator_validator_index, aggregator_sk) = + get_valid_aggregated_attestation(&harness.chain, valid_attestation.clone()); + + let mut invalid_attestation = valid_attestation.clone(); + invalid_attestation.data.beacon_block_root = Hash256::repeat_byte(13); + + let (mut invalid_aggregate, _, _) = + get_valid_aggregated_attestation(&harness.chain, invalid_attestation.clone()); + invalid_aggregate.message.aggregator_index = invalid_aggregate + .message + .aggregator_index + .checked_sub(1) + .unwrap(); + + Self { + harness, + valid_attestation, + attester_validator_index, + attester_committee_index, + attester_sk, + attestation_subnet_id, + invalid_attestation, + valid_aggregate, + aggregator_validator_index, + aggregator_sk, + invalid_aggregate, + } + } + + pub fn slot(&self) -> Slot { + self.harness.chain.slot().unwrap() + } + + pub fn epoch(&self) -> Epoch { + self.harness.chain.epoch().unwrap() + } + + pub fn two_epochs_ago(&self) -> Slot { + self.slot() + .as_u64() + .checked_sub(E::slots_per_epoch() + 2) + .expect("chain is not sufficiently deep for test") + .into() + } + + pub fn non_aggregator(&self) -> (usize, SecretKey) { + get_non_aggregator(&self.harness.chain, &self.valid_aggregate.message.aggregate) + } + + pub fn import_valid_aggregate(self) -> Self { + assert!( + self.harness + .chain + .verify_aggregated_attestation_for_gossip(&self.valid_aggregate) + .is_ok(), + "valid aggregate should be verified" + ); + self + } + + pub fn import_valid_unaggregate(self) -> Self { + self.harness + .chain + .verify_unaggregated_attestation_for_gossip( + &self.valid_attestation, + Some(self.attestation_subnet_id), + ) + .expect("valid attestation should be verified"); + self + } + + pub fn inspect_aggregate_err(self, desc: &str, get_attn: G, inspect_err: I) -> Self + where + G: Fn(&Self, &mut SignedAggregateAndProof), + I: Fn(&Self, AttnError), + { + let mut aggregate = self.valid_aggregate.clone(); + get_attn(&self, &mut aggregate); + + /* + * Individual verification + */ + let err = self + .harness + .chain + .verify_aggregated_attestation_for_gossip(&aggregate) + .err() + .expect(&format!( + "{} should error during verify_aggregated_attestation_for_gossip", + desc + )); + inspect_err(&self, err); + + /* + * Batch verification + */ + let mut results = self + .harness + .chain + .batch_verify_aggregated_attestations_for_gossip( + vec![&self.invalid_aggregate, &aggregate].into_iter(), + ) + .unwrap(); + assert_eq!(results.len(), 2); + let batch_err = results.pop().unwrap().err().expect(&format!( + "{} should error during batch_verify_aggregated_attestations_for_gossip", + desc + )); + inspect_err(&self, batch_err); + + self + } + + pub fn inspect_unaggregate_err(self, desc: &str, get_attn: G, inspect_err: I) -> Self + where + G: Fn(&Self, &mut Attestation, &mut SubnetId), + I: Fn(&Self, AttnError), + { + let mut attn = self.valid_attestation.clone(); + let mut subnet_id = self.attestation_subnet_id; + get_attn(&self, &mut attn, &mut subnet_id); + + /* + * Individual verification + */ + let err = self + .harness + .chain + .verify_unaggregated_attestation_for_gossip(&attn, Some(subnet_id)) + .err() + .expect(&format!( + "{} should error during verify_unaggregated_attestation_for_gossip", + desc + )); + inspect_err(&self, err); + + /* + * Batch verification + */ + let mut results = self + .harness + .chain + .batch_verify_unaggregated_attestations_for_gossip( + vec![ + (&self.invalid_attestation, Some(subnet_id)), + (&attn, Some(subnet_id)), + ] + .into_iter(), + ) + .unwrap(); + assert_eq!(results.len(), 2); + let batch_err = results.pop().unwrap().err().expect(&format!( + "{} should error during batch_verify_unaggregated_attestations_for_gossip", + desc + )); + inspect_err(&self, batch_err); + + self + } +} /// Tests verification of `SignedAggregateAndProof` from the gossip network. #[test] fn aggregated_gossip_verification() { - let harness = get_harness(VALIDATOR_COUNT); - - // Extend the chain out a few epochs so we have some chain depth to play with. - harness.extend_chain( - MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ); - - // Advance into a slot where there have not been blocks or attestations produced. - harness.advance_slot(); - - let current_slot = harness.chain.slot().expect("should get slot"); - - assert_eq!( - current_slot % E::slots_per_epoch(), - 0, - "the test requires a new epoch to avoid already-seen errors" - ); - - let (valid_attestation, _attester_index, _attester_committee_index, validator_sk, _subnet_id) = - get_valid_unaggregated_attestation(&harness.chain); - let (valid_aggregate, aggregator_index, aggregator_sk) = - get_valid_aggregated_attestation(&harness.chain, valid_attestation); - - macro_rules! assert_invalid { - ($desc: tt, $attn_getter: expr, $($error: pat) |+ $( if $guard: expr )?) => { - assert!( - matches!( - harness - .chain - .verify_aggregated_attestation_for_gossip($attn_getter) - .err() - .expect(&format!( - "{} should error during verify_aggregated_attestation_for_gossip", - $desc - )).0, - $( $error ) |+ $( if $guard )? - ), - "case: {}", - $desc, - ); - }; - } - - /* - * The following two tests ensure: - * - * Spec v0.12.1 - * - * aggregate.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a - * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot + - * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot (a client MAY - * queue future aggregates for processing at the appropriate slot). - */ - - let future_slot = current_slot + 1; - assert_invalid!( - "aggregate from future slot", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.slot = future_slot; - a - }, - AttnError::FutureSlot { attestation_slot, latest_permissible_slot } - if attestation_slot == future_slot && latest_permissible_slot == current_slot - ); - - let early_slot = current_slot - .as_u64() - .checked_sub(E::slots_per_epoch() + 2) - .expect("chain is not sufficiently deep for test") - .into(); - assert_invalid!( - "aggregate from past slot", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.slot = early_slot; - a - }, - AttnError::PastSlot { - attestation_slot, - // Subtract an additional slot since the harness will be exactly on the start of the - // slot and the propagation tolerance will allow an extra slot. - earliest_permissible_slot - } - if attestation_slot == early_slot - && earliest_permissible_slot == current_slot - E::slots_per_epoch() - 1 - ); - - /* - * The following test ensures: - * - * The aggregate attestation's epoch matches its target -- i.e. `aggregate.data.target.epoch == - * compute_epoch_at_slot(attestation.data.slot)` - * - */ - - assert_invalid!( - "attestation with invalid target epoch", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.target.epoch += 1; - a - }, - AttnError::InvalidTargetEpoch { .. } - ); - /* - * This is not in the specification for aggregate attestations (only unaggregates), but we - * check it anyway to avoid weird edge cases. - */ - let unknown_root = Hash256::from_low_u64_le(424242); - assert_invalid!( - "attestation with invalid target root", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.target.root = unknown_root; - a - }, - AttnError::InvalidTargetRoot { .. } - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The block being voted for (aggregate.data.beacon_block_root) passes validation. - */ - - let unknown_root = Hash256::from_low_u64_le(424242); - assert_invalid!( - "aggregate with unknown head block", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.beacon_block_root = unknown_root; - a - }, - AttnError::UnknownHeadBlock { - beacon_block_root - } - if beacon_block_root == unknown_root - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The attestation has participants. - */ - - assert_invalid!( - "aggregate with no participants", - { - let mut a = valid_aggregate.clone(); - let aggregation_bits = &mut a.message.aggregate.aggregation_bits; - aggregation_bits.difference_inplace(&aggregation_bits.clone()); - assert!(aggregation_bits.is_zero()); - a.message.aggregate.signature = AggregateSignature::infinity(); - a - }, - AttnError::EmptyAggregationBitfield - ); - - /* - * This test ensures: - * - * Spec v0.12.1 - * - * The aggregator signature, signed_aggregate_and_proof.signature, is valid. - */ - - assert_invalid!( - "aggregate with bad signature", - { - let mut a = valid_aggregate.clone(); - - a.signature = validator_sk.sign(Hash256::from_low_u64_be(42)); - - a - }, - AttnError::InvalidSignature - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The aggregate_and_proof.selection_proof is a valid signature of the aggregate.data.slot by - * the validator with index aggregate_and_proof.aggregator_index. - */ - - let committee_len = harness - .chain - .head() - .unwrap() - .beacon_state - .get_beacon_committee( - harness.chain.slot().unwrap(), - valid_aggregate.message.aggregate.data.index, + GossipTester::new() + /* + * The following two tests ensure: + * + * aggregate.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a + * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot + + * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot (a client MAY + * queue future aggregates for processing at the appropriate slot). + */ + .inspect_aggregate_err( + "aggregate from future slot", + |tester, a| a.message.aggregate.data.slot = tester.slot() + 1, + |tester, err| { + assert!(matches!( + err, + AttnError::FutureSlot { attestation_slot, latest_permissible_slot } + if attestation_slot == tester.slot() + 1 + && latest_permissible_slot == tester.slot() + )) + }, ) - .expect("should get committees") - .committee - .len(); - assert_invalid!( - "aggregate with bad selection proof signature", - { - let mut a = valid_aggregate.clone(); - - // Generate some random signature until happens to be a valid selection proof. We need - // this in order to reach the signature verification code. - // - // Could run for ever, but that seems _really_ improbable. - let mut i: u64 = 0; - a.message.selection_proof = loop { - i += 1; - let proof: SelectionProof = validator_sk - .sign(Hash256::from_slice(&int_to_bytes32(i))) - .into(); - if proof - .is_aggregator(committee_len, &harness.chain.spec) + .inspect_aggregate_err( + "aggregate from past slot", + |tester, a| a.message.aggregate.data.slot = tester.two_epochs_ago(), + |tester, err| { + assert!(matches!( + err, + AttnError::PastSlot { + attestation_slot, + // Subtract an additional slot since the harness will be exactly on the start of the + // slot and the propagation tolerance will allow an extra slot. + earliest_permissible_slot + } + if attestation_slot == tester.two_epochs_ago() + && earliest_permissible_slot == tester.slot() - E::slots_per_epoch() - 1 + )) + }, + ) + /* + * The following test ensures: + * + * The aggregate attestation's epoch matches its target -- i.e. `aggregate.data.target.epoch == + * compute_epoch_at_slot(attestation.data.slot)` + * + */ + .inspect_aggregate_err( + "attestation with invalid target epoch", + |_, a| a.message.aggregate.data.target.epoch += 1, + |_, err| assert!(matches!(err, AttnError::InvalidTargetEpoch { .. })), + ) + /* + * This is not in the specification for aggregate attestations (only unaggregates), but we + * check it anyway to avoid weird edge cases. + */ + .inspect_aggregate_err( + "attestation with invalid target root", + |_, a| a.message.aggregate.data.target.root = Hash256::repeat_byte(42), + |_, err| assert!(matches!(err, AttnError::InvalidTargetRoot { .. })), + ) + /* + * The following test ensures: + * + * The block being voted for (aggregate.data.beacon_block_root) passes validation. + */ + .inspect_aggregate_err( + "aggregate with unknown head block", + |_, a| a.message.aggregate.data.beacon_block_root = Hash256::repeat_byte(42), + |_, err| { + assert!(matches!( + err, + AttnError::UnknownHeadBlock { + beacon_block_root + } + if beacon_block_root == Hash256::repeat_byte(42) + )) + }, + ) + /* + * The following test ensures: + * + * The attestation has participants. + */ + .inspect_aggregate_err( + "aggregate with no participants", + |_, a| { + let aggregation_bits = &mut a.message.aggregate.aggregation_bits; + aggregation_bits.difference_inplace(&aggregation_bits.clone()); + assert!(aggregation_bits.is_zero()); + a.message.aggregate.signature = AggregateSignature::infinity(); + }, + |_, err| assert!(matches!(err, AttnError::EmptyAggregationBitfield)), + ) + /* + * This test ensures: + * + * The aggregator signature, signed_aggregate_and_proof.signature, is valid. + */ + .inspect_aggregate_err( + "aggregate with bad signature", + |tester, a| a.signature = tester.aggregator_sk.sign(Hash256::repeat_byte(42)), + |_, err| assert!(matches!(err, AttnError::InvalidSignature)), + ) + /* + * The following test ensures: + * + * The aggregate_and_proof.selection_proof is a valid signature of the aggregate.data.slot by + * the validator with index aggregate_and_proof.aggregator_index. + */ + .inspect_aggregate_err( + "aggregate with bad signature", + |tester, a| { + let committee_len = tester + .harness + .chain + .head() .unwrap() - { - break proof.into(); - } - }; + .beacon_state + .get_beacon_committee(tester.slot(), a.message.aggregate.data.index) + .expect("should get committees") + .committee + .len(); - a - }, - AttnError::InvalidSignature - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The signature of aggregate is valid. - */ - - assert_invalid!( - "aggregate with bad aggregate signature", - { - let mut a = valid_aggregate.clone(); - - let mut agg_sig = AggregateSignature::infinity(); - agg_sig.add_assign(&aggregator_sk.sign(Hash256::from_low_u64_be(42))); - a.message.aggregate.signature = agg_sig; - - a - }, - AttnError::InvalidSignature - ); - - let too_high_index = ::ValidatorRegistryLimit::to_u64() + 1; - assert_invalid!( - "aggregate with too-high aggregator index", - { - let mut a = valid_aggregate.clone(); - a.message.aggregator_index = too_high_index; - a - }, - AttnError::ValidatorIndexTooHigh(index) - if index == too_high_index as usize - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The aggregator's validator index is within the committee -- i.e. - * aggregate_and_proof.aggregator_index in get_beacon_committee(state, aggregate.data.slot, - * aggregate.data.index). - */ - - let unknown_validator = VALIDATOR_COUNT as u64; - assert_invalid!( - "aggregate with unknown aggregator index", - { - let mut a = valid_aggregate.clone(); - a.message.aggregator_index = unknown_validator; - a - }, - // Naively we should think this condition would trigger this error: - // - // AttnError::AggregatorPubkeyUnknown(unknown_validator) - // - // However the following error is triggered first: - AttnError::AggregatorNotInCommittee { - aggregator_index - } - if aggregator_index == unknown_validator - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * aggregate_and_proof.selection_proof selects the validator as an aggregator for the slot -- - * i.e. is_aggregator(state, aggregate.data.slot, aggregate.data.index, - * aggregate_and_proof.selection_proof) returns True. - */ - - let (non_aggregator_index, non_aggregator_sk) = - get_non_aggregator(&harness.chain, &valid_aggregate.message.aggregate); - assert_invalid!( - "aggregate from non-aggregator", - { - SignedAggregateAndProof::from_aggregate( - non_aggregator_index as u64, - valid_aggregate.message.aggregate.clone(), - None, - &non_aggregator_sk, - &harness.chain.head_info().unwrap().fork, - harness.chain.genesis_validators_root, - &harness.chain.spec, - ) - }, - AttnError::InvalidSelectionProof { - aggregator_index: index - } - if index == non_aggregator_index as u64 - ); - - // NOTE: from here on, the tests are stateful, and rely on the valid attestation having been - // seen. A refactor to give each test case its own state might be nice at some point - assert!( - harness - .chain - .verify_aggregated_attestation_for_gossip(valid_aggregate.clone()) - .is_ok(), - "valid aggregate should be verified" - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The valid aggregate attestation defined by hash_tree_root(aggregate) has not already been - * seen (via aggregate gossip, within a block, or through the creation of an equivalent - * aggregate locally). - */ - - assert_invalid!( - "aggregate that has already been seen", - valid_aggregate.clone(), - AttnError::AttestationAlreadyKnown(hash) - if hash == valid_aggregate.message.aggregate.tree_hash_root() - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The aggregate is the first valid aggregate received for the aggregator with index - * aggregate_and_proof.aggregator_index for the epoch aggregate.data.target.epoch. - */ - - assert_invalid!( - "aggregate from aggregator that has already been seen", - { - let mut a = valid_aggregate.clone(); - a.message.aggregate.data.beacon_block_root = Hash256::from_low_u64_le(42); - a - }, - AttnError::AggregatorAlreadyKnown(index) - if index == aggregator_index as u64 - ); + // Generate some random signature until happens to be a valid selection proof. We need + // this in order to reach the signature verification code. + // + // Could run for ever, but that seems _really_ improbable. + let mut i: u64 = 0; + a.message.selection_proof = loop { + i += 1; + let proof: SelectionProof = tester + .aggregator_sk + .sign(Hash256::from_slice(&int_to_bytes32(i))) + .into(); + if proof + .is_aggregator(committee_len, &tester.harness.chain.spec) + .unwrap() + { + break proof.into(); + } + }; + }, + |_, err| assert!(matches!(err, AttnError::InvalidSignature)), + ) + /* + * The following test ensures: + * + * The signature of aggregate is valid. + */ + .inspect_aggregate_err( + "aggregate with bad aggregate signature", + |tester, a| { + let mut agg_sig = AggregateSignature::infinity(); + agg_sig.add_assign(&tester.aggregator_sk.sign(Hash256::repeat_byte(42))); + a.message.aggregate.signature = agg_sig; + }, + |_, err| assert!(matches!(err, AttnError::InvalidSignature)), + ) + /* + * Not directly in the specification, but a sanity check. + */ + .inspect_aggregate_err( + "aggregate with too-high aggregator index", + |_, a| { + a.message.aggregator_index = ::ValidatorRegistryLimit::to_u64() + 1 + }, + |_, err| { + assert!(matches!( + err, + AttnError::ValidatorIndexTooHigh(index) + if index == (::ValidatorRegistryLimit::to_u64() + 1) as usize + )) + }, + ) + /* + * The following test ensures: + * + * The aggregator's validator index is within the committee -- i.e. + * aggregate_and_proof.aggregator_index in get_beacon_committee(state, aggregate.data.slot, + * aggregate.data.index). + */ + .inspect_aggregate_err( + "aggregate with unknown aggregator index", + |_, a| a.message.aggregator_index = VALIDATOR_COUNT as u64, + |_, err| { + assert!(matches!( + err, + // Naively we should think this condition would trigger this error: + // + // AttnError::AggregatorPubkeyUnknown(unknown_validator) + // + // However the following error is triggered first: + AttnError::AggregatorNotInCommittee { + aggregator_index + } + if aggregator_index == VALIDATOR_COUNT as u64 + )) + }, + ) + /* + * The following test ensures: + * + * aggregate_and_proof.selection_proof selects the validator as an aggregator for the slot -- + * i.e. is_aggregator(state, aggregate.data.slot, aggregate.data.index, + * aggregate_and_proof.selection_proof) returns True. + */ + .inspect_aggregate_err( + "aggregate from non-aggregator", + |tester, a| { + let chain = &tester.harness.chain; + let (index, sk) = tester.non_aggregator(); + *a = SignedAggregateAndProof::from_aggregate( + index as u64, + tester.valid_aggregate.message.aggregate.clone(), + None, + &sk, + &chain.head_info().unwrap().fork, + chain.genesis_validators_root, + &chain.spec, + ) + }, + |tester, err| { + let (val_index, _) = tester.non_aggregator(); + assert!(matches!( + err, + AttnError::InvalidSelectionProof { + aggregator_index: index + } + if index == val_index as u64 + )) + }, + ) + // NOTE: from here on, the tests are stateful, and rely on the valid attestation having + // been seen. + .import_valid_aggregate() + /* + * The following test ensures: + * + * The valid aggregate attestation defined by hash_tree_root(aggregate) has not already been + * seen (via aggregate gossip, within a block, or through the creation of an equivalent + * aggregate locally). + */ + .inspect_aggregate_err( + "aggregate that has already been seen", + |_, _| {}, + |tester, err| { + assert!(matches!( + err, + AttnError::AttestationAlreadyKnown(hash) + if hash == tester.valid_aggregate.message.aggregate.tree_hash_root() + )) + }, + ) + /* + * The following test ensures: + * + * The aggregate is the first valid aggregate received for the aggregator with index + * aggregate_and_proof.aggregator_index for the epoch aggregate.data.target.epoch. + */ + .inspect_aggregate_err( + "aggregate from aggregator that has already been seen", + |_, a| a.message.aggregate.data.beacon_block_root = Hash256::repeat_byte(42), + |tester, err| { + assert!(matches!( + err, + AttnError::AggregatorAlreadyKnown(index) + if index == tester.aggregator_validator_index as u64 + )) + }, + ); } /// Tests the verification conditions for an unaggregated attestation on the gossip network. #[test] fn unaggregated_gossip_verification() { - let harness = get_harness(VALIDATOR_COUNT); - - // Extend the chain out a few epochs so we have some chain depth to play with. - harness.extend_chain( - MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ); - - // Advance into a slot where there have not been blocks or attestations produced. - harness.advance_slot(); - - let current_slot = harness.chain.slot().expect("should get slot"); - let current_epoch = harness.chain.epoch().expect("should get epoch"); - - assert_eq!( - current_slot % E::slots_per_epoch(), - 0, - "the test requires a new epoch to avoid already-seen errors" - ); - - let ( - valid_attestation, - expected_validator_index, - validator_committee_index, - validator_sk, - subnet_id, - ) = get_valid_unaggregated_attestation(&harness.chain); - - macro_rules! assert_invalid { - ($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat) |+ $( if $guard: expr )?) => { - assert!( - matches!( - harness - .chain - .verify_unaggregated_attestation_for_gossip($attn_getter, Some($subnet_getter)) - .err() - .expect(&format!( - "{} should error during verify_unaggregated_attestation_for_gossip", - $desc - )).0, - $( $error ) |+ $( if $guard )? - ), - "case: {}", - $desc, - ); - }; - } - - /* - * The following test ensures: - * - * Spec v0.12.3 - * - * The committee index is within the expected range -- i.e. `data.index < - * get_committee_count_per_slot(state, data.target.epoch)`. - */ - assert_invalid!( - "attestation with invalid committee index", - { - let mut a = valid_attestation.clone(); - a.data.index = harness - .chain - .head() - .unwrap() - .beacon_state - .get_committee_count_at_slot(a.data.slot) - .unwrap(); - a - }, - subnet_id, - AttnError::NoCommitteeForSlotAndIndex { .. } - ); - - /* - * The following test ensures: - * - * Spec v0.12.1 - * - * The attestation is for the correct subnet (i.e. compute_subnet_for_attestation(state, - * attestation.data.slot, attestation.data.index) == subnet_id). - */ - let id: u64 = subnet_id.into(); - let invalid_subnet_id = SubnetId::new(id + 1); - assert_invalid!( - "attestation from future slot", - { - valid_attestation.clone() - }, - invalid_subnet_id, - AttnError::InvalidSubnetId { - received, - expected, - } - if received == invalid_subnet_id && expected == subnet_id - ); - - /* - * The following two tests ensure: - * - * Spec v0.12.1 - * - * attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a - * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. attestation.data.slot + - * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot (a client MAY - * queue future attestations for processing at the appropriate slot). - */ - - let future_slot = current_slot + 1; - assert_invalid!( - "attestation from future slot", - { - let mut a = valid_attestation.clone(); - a.data.slot = future_slot; - a - }, - subnet_id, - AttnError::FutureSlot { - attestation_slot, - latest_permissible_slot, - } - if attestation_slot == future_slot && latest_permissible_slot == current_slot - ); - - let early_slot = current_slot - .as_u64() - .checked_sub(E::slots_per_epoch() + 2) - .expect("chain is not sufficiently deep for test") - .into(); - assert_invalid!( - "attestation from past slot", - { - let mut a = valid_attestation.clone(); - a.data.slot = early_slot; - a.data.target.epoch = early_slot.epoch(E::slots_per_epoch()); - a - }, - subnet_id, - AttnError::PastSlot { - attestation_slot, - // Subtract an additional slot since the harness will be exactly on the start of the - // slot and the propagation tolerance will allow an extra slot. - earliest_permissible_slot, - } - if attestation_slot == early_slot && earliest_permissible_slot == current_slot - E::slots_per_epoch() - 1 - ); - - /* - * The following test ensures: - * - * Spec v0.12.3 - * - * The attestation's epoch matches its target -- i.e. `attestation.data.target.epoch == - * compute_epoch_at_slot(attestation.data.slot)` - * - */ - - assert_invalid!( - "attestation with invalid target epoch", - { - let mut a = valid_attestation.clone(); - a.data.target.epoch += 1; - a - }, - subnet_id, - AttnError::InvalidTargetEpoch { .. } - ); - - /* - * The following two tests ensure: - * - * Spec v0.12.1 - * - * The attestation is unaggregated -- that is, it has exactly one participating validator - * (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). - */ - - assert_invalid!( - "attestation without any aggregation bits set", - { - let mut a = valid_attestation.clone(); - a.aggregation_bits - .set(validator_committee_index, false) - .expect("should unset aggregation bit"); - assert_eq!( - a.aggregation_bits.num_set_bits(), - 0, - "test requires no set bits" - ); - a - }, - subnet_id, - AttnError::NotExactlyOneAggregationBitSet(0) - ); - - assert_invalid!( - "attestation with two aggregation bits set", - { - let mut a = valid_attestation.clone(); - a.aggregation_bits - .set(validator_committee_index + 1, true) - .expect("should set second aggregation bit"); - a - }, - subnet_id, - AttnError::NotExactlyOneAggregationBitSet(2) - ); - - /* - * The following test ensures: - * - * Spec v0.12.3 - * - * The number of aggregation bits matches the committee size -- i.e. - * `len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, - * data.index))`. - */ - assert_invalid!( - "attestation with invalid bitfield", - { - let mut a = valid_attestation.clone(); - let bits = a.aggregation_bits.iter().collect::>(); - a.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); - for (i, bit) in bits.into_iter().enumerate() { - a.aggregation_bits.set(i, bit).unwrap(); - } - a - }, - subnet_id, - AttnError::Invalid(AttestationValidationError::BeaconStateError( - BeaconStateError::InvalidBitfield - )) - ); - - /* - * The following test ensures that: - * - * Spec v0.12.1 - * - * The block being voted for (attestation.data.beacon_block_root) passes validation. - */ - - let unknown_root = Hash256::from_low_u64_le(424242); // No one wants one of these - assert_invalid!( - "attestation with unknown head block", - { - let mut a = valid_attestation.clone(); - a.data.beacon_block_root = unknown_root; - a - }, - subnet_id, - AttnError::UnknownHeadBlock { - beacon_block_root, - } - if beacon_block_root == unknown_root - ); - - /* - * The following test ensures that: - * - * Spec v0.12.3 - * - * The attestation's target block is an ancestor of the block named in the LMD vote - */ - - let unknown_root = Hash256::from_low_u64_le(424242); - assert_invalid!( - "attestation with invalid target root", - { - let mut a = valid_attestation.clone(); - a.data.target.root = unknown_root; - a - }, - subnet_id, - AttnError::InvalidTargetRoot { .. } - ); - - /* - * The following test ensures that: - * - * Spec v0.12.1 - * - * The signature of attestation is valid. - */ - - assert_invalid!( - "attestation with bad signature", - { - let mut a = valid_attestation.clone(); - - let mut agg_sig = AggregateSignature::infinity(); - agg_sig.add_assign(&validator_sk.sign(Hash256::from_low_u64_be(42))); - a.signature = agg_sig; - - a - }, - subnet_id, - AttnError::InvalidSignature - ); - - harness - .chain - .verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), Some(subnet_id)) - .expect("valid attestation should be verified"); - - /* - * The following test ensures that: - * - * Spec v0.12.1 - * - * - * There has been no other valid attestation seen on an attestation subnet that has an - * identical attestation.data.target.epoch and participating validator index. - */ - - assert_invalid!( - "attestation that has already been seen", - valid_attestation.clone(), - subnet_id, - AttnError::PriorAttestationKnown { - validator_index, - epoch, - } - if validator_index == expected_validator_index as u64 && epoch == current_epoch - ); + GossipTester::new() + /* + * The following test ensures: + * + * The committee index is within the expected range -- i.e. `data.index < + * get_committee_count_per_slot(state, data.target.epoch)`. + */ + .inspect_unaggregate_err( + "attestation with invalid committee index", + |tester, a, _| { + a.data.index = tester + .harness + .chain + .head() + .unwrap() + .beacon_state + .get_committee_count_at_slot(a.data.slot) + .unwrap() + }, + |_, err| assert!(matches!(err, AttnError::NoCommitteeForSlotAndIndex { .. })), + ) + /* + * The following test ensures: + * + * The attestation is for the correct subnet (i.e. compute_subnet_for_attestation(state, + * attestation.data.slot, attestation.data.index) == subnet_id). + */ + .inspect_unaggregate_err( + "attestation with invalid committee index", + |_, _, subnet_id| *subnet_id = SubnetId::new(42), + |tester, err| { + assert!(matches!( + err, + AttnError::InvalidSubnetId { + received, + expected, + } + if received == SubnetId::new(42) && expected == tester.attestation_subnet_id + )) + }, + ) + /* + * The following two tests ensure: + * + * attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a + * MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. attestation.data.slot + + * ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot (a client MAY + * queue future attestations for processing at the appropriate slot). + */ + .inspect_unaggregate_err( + "attestation from future slot", + |tester, a, _| a.data.slot = tester.slot() + 1, + |tester, err| { + assert!(matches!( + err, + AttnError::FutureSlot { + attestation_slot, + latest_permissible_slot, + } + if attestation_slot == tester.slot() + 1 && latest_permissible_slot == tester.slot() + )) + }, + ) + .inspect_unaggregate_err( + "attestation from past slot", + |tester, a, _| { + let early_slot = tester.two_epochs_ago(); + a.data.slot = early_slot; + a.data.target.epoch = early_slot.epoch(E::slots_per_epoch()); + }, + |tester, err| { + dbg!(&err); + assert!(matches!( + err, + AttnError::PastSlot { + attestation_slot, + // Subtract an additional slot since the harness will be exactly on the start of the + // slot and the propagation tolerance will allow an extra slot. + earliest_permissible_slot, + } + if attestation_slot == tester.two_epochs_ago() + && earliest_permissible_slot == tester.slot() - E::slots_per_epoch() - 1 + )) + }, + ) + /* + * The following test ensures: + * + * The attestation's epoch matches its target -- i.e. `attestation.data.target.epoch == + * compute_epoch_at_slot(attestation.data.slot)` + * + */ + .inspect_unaggregate_err( + "attestation with invalid target epoch", + |_, a, _| a.data.target.epoch += 1, + |_, err| { + assert!(matches!( + err, + AttnError::InvalidTargetEpoch { .. } + )) + }, + ) + /* + * The following two tests ensure: + * + * The attestation is unaggregated -- that is, it has exactly one participating validator + * (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). + */ + .inspect_unaggregate_err( + "attestation without any aggregation bits set", + |tester, a, _| { + a.aggregation_bits + .set(tester.attester_committee_index, false) + .expect("should unset aggregation bit"); + assert_eq!( + a.aggregation_bits.num_set_bits(), + 0, + "test requires no set bits" + ); + }, + |_, err| { + assert!(matches!( + err, + AttnError::NotExactlyOneAggregationBitSet(0) + )) + }, + ) + .inspect_unaggregate_err( + "attestation with two aggregation bits set", + |tester, a, _| { + a.aggregation_bits + .set(tester.attester_committee_index + 1, true) + .expect("should set second aggregation bit"); + }, + |_, err| { + assert!(matches!( + err, + AttnError::NotExactlyOneAggregationBitSet(2) + )) + }, + ) + /* + * The following test ensures: + * + * The number of aggregation bits matches the committee size -- i.e. + * `len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, + * data.index))`. + */ + .inspect_unaggregate_err( + "attestation with invalid bitfield", + |_, a, _| { + let bits = a.aggregation_bits.iter().collect::>(); + a.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); + for (i, bit) in bits.into_iter().enumerate() { + a.aggregation_bits.set(i, bit).unwrap(); + } + }, + |_, err| { + assert!(matches!( + err, + AttnError::Invalid(AttestationValidationError::BeaconStateError( + BeaconStateError::InvalidBitfield + )) + )) + }, + ) + /* + * The following test ensures that: + * + * The block being voted for (attestation.data.beacon_block_root) passes validation. + */ + .inspect_unaggregate_err( + "attestation with unknown head block", + |_, a, _| { + a.data.beacon_block_root = Hash256::repeat_byte(42); + }, + |_, err| { + assert!(matches!( + err, + AttnError::UnknownHeadBlock { + beacon_block_root, + } + if beacon_block_root == Hash256::repeat_byte(42) + )) + }, + ) + /* + * The following test ensures that: + * + * Spec v0.12.3 + * + * The attestation's target block is an ancestor of the block named in the LMD vote + */ + .inspect_unaggregate_err( + "attestation with invalid target root", + |_, a, _| { + a.data.target.root = Hash256::repeat_byte(42); + }, + |_, err| { + assert!(matches!( + err, + AttnError::InvalidTargetRoot { .. } + )) + }, + ) + /* + * The following test ensures that: + * + * The signature of attestation is valid. + */ + .inspect_unaggregate_err( + "attestation with bad signature", + |tester, a, _| { + let mut agg_sig = AggregateSignature::infinity(); + agg_sig.add_assign(&tester.attester_sk.sign(Hash256::repeat_byte(42))); + a.signature = agg_sig; + }, + |_, err| { + assert!(matches!( + err, + AttnError::InvalidSignature + )) + }, + ) + // NOTE: from here on, the tests are stateful, and rely on the valid attestation having + // been seen. + .import_valid_unaggregate() + /* + * The following test ensures that: + * + * + * There has been no other valid attestation seen on an attestation subnet that has an + * identical attestation.data.target.epoch and participating validator index. + */ + .inspect_unaggregate_err( + "attestation that has already been seen", + |_, _, _| {}, + |tester, err| { + assert!(matches!( + err, + AttnError::PriorAttestationKnown { + validator_index, + epoch, + } + if validator_index == tester.attester_validator_index as u64 && epoch == tester.epoch() + )) + }, + ); } /// Ensures that an attestation that skips epochs can still be processed. @@ -965,7 +991,7 @@ fn attestation_that_skips_epochs() { harness .chain - .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) .expect("should gossip verify attestation that skips slots"); } @@ -998,7 +1024,7 @@ fn verify_aggregate_for_gossip_doppelganger_detection() { harness .chain - .verify_aggregated_attestation_for_gossip(valid_aggregate.clone()) + .verify_aggregated_attestation_for_gossip(&valid_aggregate) .expect("should verify aggregate attestation"); let epoch = valid_aggregate.message.aggregate.data.target.epoch; @@ -1053,7 +1079,7 @@ fn verify_attestation_for_gossip_doppelganger_detection() { harness .chain - .verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&valid_attestation, Some(subnet_id)) .expect("should verify attestation"); let epoch = valid_attestation.data.target.epoch; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index a04d2b9a4..b73056aed 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -332,7 +332,7 @@ fn epoch_boundary_state_attestation_processing() { let res = harness .chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), Some(subnet_id)); + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); let expected_attestation_slot = attestation.data.slot; @@ -344,7 +344,7 @@ fn epoch_boundary_state_attestation_processing() { { checked_pre_fin = true; assert!(matches!( - res.err().unwrap().0, + res.err().unwrap(), AttnError::PastSlot { attestation_slot, earliest_permissible_slot, diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index cea746628..f4399ed37 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -529,7 +529,7 @@ fn attestations_with_increasing_slots() { for (attestation, subnet_id) in attestations.into_iter().flatten() { let res = harness .chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), Some(subnet_id)); + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); let expected_attestation_slot = attestation.data.slot; @@ -538,7 +538,7 @@ fn attestations_with_increasing_slots() { if expected_attestation_slot < expected_earliest_permissible_slot { assert!(matches!( - res.err().unwrap().0, + res.err().unwrap(), AttnError::PastSlot { attestation_slot, earliest_permissible_slot, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bb9ee822f..be5521223 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -16,7 +16,7 @@ mod validator_inclusion; mod version; use beacon_chain::{ - attestation_verification::SignatureVerifiedAttestation, + attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, validator_monitor::{get_block_delay_ms, timestamp_now}, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, @@ -1066,7 +1066,7 @@ pub fn serve( for (index, attestation) in attestations.as_slice().iter().enumerate() { let attestation = match chain - .verify_unaggregated_attestation_for_gossip(attestation.clone(), None) + .verify_unaggregated_attestation_for_gossip(attestation, None) { Ok(attestation) => attestation, Err(e) => { @@ -1121,7 +1121,7 @@ pub fn serve( )); }; - if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { + if let Err(e) = chain.add_to_naive_aggregation_pool(&attestation) { error!(log, "Failure adding verified attestation to the naive aggregation pool"; "error" => ?e, @@ -1958,7 +1958,7 @@ pub fn serve( let mut failures = Vec::new(); // Verify that all messages in the post are valid before processing further - for (index, aggregate) in aggregates.into_iter().enumerate() { + for (index, aggregate) in aggregates.iter().enumerate() { match chain.verify_aggregated_attestation_for_gossip(aggregate) { Ok(verified_aggregate) => { messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( @@ -1984,8 +1984,8 @@ pub fn serve( // It's reasonably likely that two different validators produce // identical aggregates, especially if they're using the same beacon // node. - Err((AttnError::AttestationAlreadyKnown(_), _)) => continue, - Err((e, aggregate)) => { + Err(AttnError::AttestationAlreadyKnown(_)) => continue, + Err(e) => { error!(log, "Failure verifying aggregate and proofs"; "error" => format!("{:?}", e), @@ -2017,7 +2017,7 @@ pub fn serve( ); failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); } - if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + if let Err(e) = chain.add_to_block_inclusion_pool(&verified_aggregate) { warn!(log, "Could not add verified aggregate attestation to the inclusion pool"; "error" => format!("{:?}", e), diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index d6c4bf77b..63868b2df 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -46,7 +46,8 @@ use eth2_libp2p::{ }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; -use slog::{debug, error, trace, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; +use std::cmp; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -70,7 +71,7 @@ mod tests; mod work_reprocessing_queue; mod worker; -pub use worker::ProcessId; +pub use worker::{GossipAggregatePackage, GossipAttestationPackage, ProcessId}; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -159,11 +160,27 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); +/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single +/// batch. +/// +/// Choosing these values is difficult since there is a trade-off between: +/// +/// - It is faster to verify one large batch than multiple smaller batches. +/// - "Poisoning" attacks have a larger impact as the batch size increases. +/// +/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single +/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to +/// individually verifying each attestation signature. +const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; +const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; + /// Unique IDs used for metrics and testing. pub const WORKER_FREED: &str = "worker_freed"; pub const NOTHING_TO_DO: &str = "nothing_to_do"; pub const GOSSIP_ATTESTATION: &str = "gossip_attestation"; +pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; +pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; @@ -564,6 +581,9 @@ pub enum Work { should_import: bool, seen_timestamp: Duration, }, + GossipAttestationBatch { + packages: Vec>, + }, GossipAggregate { message_id: MessageId, peer_id: PeerId, @@ -576,6 +596,9 @@ pub enum Work { aggregate: Box>, seen_timestamp: Duration, }, + GossipAggregateBatch { + packages: Vec>, + }, GossipBlock { message_id: MessageId, peer_id: PeerId, @@ -644,7 +667,9 @@ impl Work { fn str_id(&self) -> &'static str { match self { Work::GossipAttestation { .. } => GOSSIP_ATTESTATION, + Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH, Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, + Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, @@ -922,10 +947,103 @@ impl BeaconProcessor { // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. - } else if let Some(item) = aggregate_queue.pop() { - self.spawn_worker(item, toolbox); - } else if let Some(item) = attestation_queue.pop() { - self.spawn_worker(item, toolbox); + } else if aggregate_queue.len() > 0 { + let batch_size = + cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE); + + if batch_size < 2 { + // One single aggregate is in the queue, process it individually. + if let Some(item) = aggregate_queue.pop() { + self.spawn_worker(item, toolbox); + } + } else { + // Collect two or more aggregates into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAggregate` item into a + // `Work::GossipAggregateBatch` item. + let mut packages = Vec::with_capacity(batch_size); + for _ in 0..batch_size { + if let Some(item) = aggregate_queue.pop() { + match item { + Work::GossipAggregate { + message_id, + peer_id, + aggregate, + seen_timestamp, + } => { + packages.push(GossipAggregatePackage::new( + message_id, + peer_id, + aggregate, + seen_timestamp, + )); + } + _ => { + error!(self.log, "Invalid item in aggregate queue") + } + } + } + } + + // Process all aggregates with a single worker. + self.spawn_worker(Work::GossipAggregateBatch { packages }, toolbox) + } + // Check the unaggregated attestation queue. + // + // Potentially use batching. + } else if attestation_queue.len() > 0 { + let batch_size = cmp::min( + attestation_queue.len(), + MAX_GOSSIP_ATTESTATION_BATCH_SIZE, + ); + + if batch_size < 2 { + // One single attestation is in the queue, process it individually. + if let Some(item) = attestation_queue.pop() { + self.spawn_worker(item, toolbox); + } + } else { + // Collect two or more attestations into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAttestation` item into a + // `Work::GossipAttestationBatch` item. + let mut packages = Vec::with_capacity(batch_size); + for _ in 0..batch_size { + if let Some(item) = attestation_queue.pop() { + match item { + Work::GossipAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + } => { + packages.push(GossipAttestationPackage::new( + message_id, + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + )); + } + _ => error!( + self.log, + "Invalid item in attestation queue" + ), + } + } + } + + // Process all attestations with a single worker. + self.spawn_worker( + Work::GossipAttestationBatch { packages }, + toolbox, + ) + } // Check sync committee messages after attestations as their rewards are lesser // and they don't influence fork choice. } else if let Some(item) = sync_contribution_queue.pop() { @@ -1009,7 +1127,21 @@ impl BeaconProcessor { match work { _ if can_spawn => self.spawn_worker(work, toolbox), Work::GossipAttestation { .. } => attestation_queue.push(work), + // Attestation batches are formed internally within the + // `BeaconProcessor`, they are not sent from external services. + Work::GossipAttestationBatch { .. } => crit!( + self.log, + "Unsupported inbound event"; + "type" => "GossipAttestationBatch" + ), Work::GossipAggregate { .. } => aggregate_queue.push(work), + // Aggregate batches are formed internally within the `BeaconProcessor`, + // they are not sent from external services. + Work::GossipAggregateBatch { .. } => crit!( + self.log, + "Unsupported inbound event"; + "type" => "GossipAggregateBatch" + ), Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } @@ -1180,7 +1312,7 @@ impl BeaconProcessor { match work { /* - * Unaggregated attestation verification. + * Individual unaggregated attestation verification. */ Work::GossipAttestation { message_id, @@ -1192,14 +1324,19 @@ impl BeaconProcessor { } => worker.process_gossip_attestation( message_id, peer_id, - *attestation, + attestation, subnet_id, should_import, Some(work_reprocessing_tx), seen_timestamp, ), /* - * Aggregated attestation verification. + * Batched unaggregated attestation verification. + */ + Work::GossipAttestationBatch { packages } => worker + .process_gossip_attestation_batch(packages, Some(work_reprocessing_tx)), + /* + * Individual aggregated attestation verification. */ Work::GossipAggregate { message_id, @@ -1209,10 +1346,16 @@ impl BeaconProcessor { } => worker.process_gossip_aggregate( message_id, peer_id, - *aggregate, + aggregate, Some(work_reprocessing_tx), seen_timestamp, ), + /* + * Batched aggregated attestation verification. + */ + Work::GossipAggregateBatch { packages } => { + worker.process_gossip_aggregate_batch(packages, Some(work_reprocessing_tx)) + } /* * Verification for beacon blocks received on gossip. */ @@ -1345,7 +1488,7 @@ impl BeaconProcessor { } => worker.process_gossip_attestation( message_id, peer_id, - *attestation, + attestation, subnet_id, should_import, None, // Do not allow this attestation to be re-processed beyond this point. @@ -1359,7 +1502,7 @@ impl BeaconProcessor { } => worker.process_gossip_aggregate( message_id, peer_id, - *aggregate, + aggregate, None, seen_timestamp, ), diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index e581b681e..81028d476 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,22 +1,22 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{ - attestation_verification::{Error as AttnError, SignatureVerifiedAttestation}, + attestation_verification::{Error as AttnError, VerifiedAttestation}, observed_operations::ObservationOutcome, sync_committee_verification::Error as SyncCommitteeError, validator_monitor::get_block_delay_ms, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, }; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; -use slog::{debug, error, info, trace, warn}; +use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, + SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use super::{ @@ -26,6 +26,60 @@ use super::{ Worker, }; +/// An attestation that has been validated by the `BeaconChain`. +/// +/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to +/// construct this from components which have not passed `BeaconChain` validation. +struct VerifiedUnaggregate { + attestation: Box>, + indexed_attestation: IndexedAttestation, +} + +/// This implementation allows `Self` to be imported to fork choice and other functions on the +/// `BeaconChain`. +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedUnaggregate { + fn attestation(&self) -> &Attestation { + &self.attestation + } + + fn indexed_attestation(&self) -> &IndexedAttestation { + &self.indexed_attestation + } +} + +/// An attestation that failed validation by the `BeaconChain`. +struct RejectedUnaggregate { + attestation: Box>, + error: AttnError, +} + +/// An aggregate that has been validated by the `BeaconChain`. +/// +/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to +/// construct this from components which have not passed `BeaconChain` validation. +struct VerifiedAggregate { + signed_aggregate: Box>, + indexed_attestation: IndexedAttestation, +} + +/// This implementation allows `Self` to be imported to fork choice and other functions on the +/// `BeaconChain`. +impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedAggregate { + fn attestation(&self) -> &Attestation { + &self.signed_aggregate.message.aggregate + } + + fn indexed_attestation(&self) -> &IndexedAttestation { + &self.indexed_attestation + } +} + +/// An attestation that failed validation by the `BeaconChain`. +struct RejectedAggregate { + signed_aggregate: Box>, + error: AttnError, +} + /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { @@ -41,7 +95,7 @@ enum FailedAtt { } impl FailedAtt { - pub fn root(&self) -> &Hash256 { + pub fn beacon_block_root(&self) -> &Hash256 { match self { FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root, FailedAtt::Aggregate { attestation, .. } => { @@ -58,6 +112,66 @@ impl FailedAtt { } } +/// Items required to verify a batch of unaggregated gossip attestations. +#[derive(Debug)] +pub struct GossipAttestationPackage { + message_id: MessageId, + peer_id: PeerId, + attestation: Box>, + subnet_id: SubnetId, + beacon_block_root: Hash256, + should_import: bool, + seen_timestamp: Duration, +} + +impl GossipAttestationPackage { + pub fn new( + message_id: MessageId, + peer_id: PeerId, + attestation: Box>, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + ) -> Self { + Self { + message_id, + peer_id, + beacon_block_root: attestation.data.beacon_block_root, + attestation, + subnet_id, + should_import, + seen_timestamp, + } + } +} + +/// Items required to verify a batch of aggregated gossip attestations. +#[derive(Debug)] +pub struct GossipAggregatePackage { + message_id: MessageId, + peer_id: PeerId, + aggregate: Box>, + beacon_block_root: Hash256, + seen_timestamp: Duration, +} + +impl GossipAggregatePackage { + pub fn new( + message_id: MessageId, + peer_id: PeerId, + aggregate: Box>, + seen_timestamp: Duration, + ) -> Self { + Self { + message_id, + peer_id, + beacon_block_root: aggregate.message.aggregate.data.beacon_block_root, + aggregate, + seen_timestamp, + } + } +} + impl Worker { /* Auxiliary functions */ @@ -103,88 +217,200 @@ impl Worker { self, message_id: MessageId, peer_id: PeerId, - attestation: Attestation, + attestation: Box>, subnet_id: SubnetId, should_import: bool, reprocess_tx: Option>>, seen_timestamp: Duration, ) { - let beacon_block_root = attestation.data.beacon_block_root; - - let attestation = match self + let result = match self .chain - .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) { - Ok(attestation) => attestation, - Err((e, attestation)) => { - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::Unaggregate { - attestation: Box::new(attestation), - subnet_id, - should_import, - seen_timestamp, - }, - reprocess_tx, - e, + Ok(verified_attestation) => Ok(VerifiedUnaggregate { + indexed_attestation: verified_attestation.into_indexed_attestation(), + attestation, + }), + Err(error) => Err(RejectedUnaggregate { attestation, error }), + }; + + self.process_gossip_attestation_result( + result, + message_id, + peer_id, + subnet_id, + reprocess_tx, + should_import, + seen_timestamp, + ); + } + + pub fn process_gossip_attestation_batch( + self, + packages: Vec>, + reprocess_tx: Option>>, + ) { + let attestations_and_subnets = packages + .iter() + .map(|package| (package.attestation.as_ref(), Some(package.subnet_id))); + + let results = match self + .chain + .batch_verify_unaggregated_attestations_for_gossip(attestations_and_subnets) + { + Ok(results) => results, + Err(e) => { + error!( + self.log, + "Batch unagg. attn verification failed"; + "error" => ?e ); return; } }; - // Register the attestation with any monitored validators. - self.chain - .validator_monitor - .read() - .register_gossip_unaggregated_attestation( - seen_timestamp, - attestation.indexed_attestation(), - &self.chain.slot_clock, - ); - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - - if !should_import { - return; + // Sanity check. + if results.len() != packages.len() { + // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong + // peer. + crit!( + self.log, + "Batch attestation result mismatch"; + "results" => results.len(), + "packages" => packages.len(), + ) } - metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL); + // Map the results into a new `Vec` so that `results` no longer holds a reference to + // `packages`. + #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker. + let results = results + .into_iter() + .map(|result| result.map(|verified| verified.into_indexed_attestation())) + .collect::>(); - if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) { - match e { - BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + for (result, package) in results.into_iter().zip(packages.into_iter()) { + let result = match result { + Ok(indexed_attestation) => Ok(VerifiedUnaggregate { + indexed_attestation, + attestation: package.attestation, + }), + Err(error) => Err(RejectedUnaggregate { + attestation: package.attestation, + error, + }), + }; + + self.process_gossip_attestation_result( + result, + package.message_id, + package.peer_id, + package.subnet_id, + reprocess_tx.clone(), + package.should_import, + package.seen_timestamp, + ); + } + } + + // Clippy warning is is ignored since the arguments are all of a different type (i.e., they + // cant' be mixed-up) and creating a struct would result in more complexity. + #[allow(clippy::too_many_arguments)] + fn process_gossip_attestation_result( + &self, + result: Result, RejectedUnaggregate>, + message_id: MessageId, + peer_id: PeerId, + subnet_id: SubnetId, + reprocess_tx: Option>>, + should_import: bool, + seen_timestamp: Duration, + ) { + match result { + Ok(verified_attestation) => { + let indexed_attestation = &verified_attestation.indexed_attestation; + let beacon_block_root = indexed_attestation.data.beacon_block_root; + + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_unaggregated_attestation( + seen_timestamp, + indexed_attestation, + &self.chain.slot_clock, + ); + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + if !should_import { + return; + } + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + if let Err(e) = self + .chain + .apply_attestation_to_fork_choice(&verified_attestation) + { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( + e, + )) => { + debug!( + self.log, + "Attestation invalid for fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ) + } + e => error!( + self.log, + "Error applying attestation to fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ), + } + } + + if let Err(e) = self + .chain + .add_to_naive_aggregation_pool(&verified_attestation) + { debug!( self.log, - "Attestation invalid for fork choice"; + "Attestation invalid for agg pool"; "reason" => ?e, "peer" => %peer_id, "beacon_block_root" => ?beacon_block_root ) } - e => error!( - self.log, - "Error applying attestation to fork choice"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ), + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + } + Err(RejectedUnaggregate { attestation, error }) => { + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::Unaggregate { + attestation, + subnet_id, + should_import, + seen_timestamp, + }, + reprocess_tx, + error, + ); } } - - if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) { - debug!( - self.log, - "Attestation invalid for agg pool"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ) - } - - metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL); } /// Process the aggregated attestation received from the gossip network and: @@ -198,82 +424,191 @@ impl Worker { self, message_id: MessageId, peer_id: PeerId, - aggregate: SignedAggregateAndProof, + aggregate: Box>, reprocess_tx: Option>>, seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; - let aggregate = match self + let result = match self .chain - .verify_aggregated_attestation_for_gossip(aggregate) + .verify_aggregated_attestation_for_gossip(&aggregate) { - Ok(aggregate) => aggregate, - Err((e, attestation)) => { - // Report the failure to gossipsub - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::Aggregate { - attestation: Box::new(attestation), - seen_timestamp, - }, - reprocess_tx, - e, + Ok(verified_aggregate) => Ok(VerifiedAggregate { + indexed_attestation: verified_aggregate.into_indexed_attestation(), + signed_aggregate: aggregate, + }), + Err(error) => Err(RejectedAggregate { + signed_aggregate: aggregate, + error, + }), + }; + + self.process_gossip_aggregate_result( + result, + beacon_block_root, + message_id, + peer_id, + reprocess_tx, + seen_timestamp, + ); + } + + pub fn process_gossip_aggregate_batch( + self, + packages: Vec>, + reprocess_tx: Option>>, + ) { + let aggregates = packages.iter().map(|package| package.aggregate.as_ref()); + + let results = match self + .chain + .batch_verify_aggregated_attestations_for_gossip(aggregates) + { + Ok(results) => results, + Err(e) => { + error!( + self.log, + "Batch agg. attn verification failed"; + "error" => ?e ); return; } }; - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Sanity check. + if results.len() != packages.len() { + // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong + // peer. + crit!( + self.log, + "Batch agg. attestation result mismatch"; + "results" => results.len(), + "packages" => packages.len(), + ) + } - // Register the attestation with any monitored validators. - self.chain - .validator_monitor - .read() - .register_gossip_aggregated_attestation( - seen_timestamp, - aggregate.aggregate(), - aggregate.indexed_attestation(), - &self.chain.slot_clock, + // Map the results into a new `Vec` so that `results` no longer holds a reference to + // `packages`. + #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker. + let results = results + .into_iter() + .map(|result| result.map(|verified| verified.into_indexed_attestation())) + .collect::>(); + + for (result, package) in results.into_iter().zip(packages.into_iter()) { + let result = match result { + Ok(indexed_attestation) => Ok(VerifiedAggregate { + indexed_attestation, + signed_aggregate: package.aggregate, + }), + Err(error) => Err(RejectedAggregate { + signed_aggregate: package.aggregate, + error, + }), + }; + + self.process_gossip_aggregate_result( + result, + package.beacon_block_root, + package.message_id, + package.peer_id, + reprocess_tx.clone(), + package.seen_timestamp, ); + } + } - metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); + fn process_gossip_aggregate_result( + &self, + result: Result, RejectedAggregate>, + beacon_block_root: Hash256, + message_id: MessageId, + peer_id: PeerId, + reprocess_tx: Option>>, + seen_timestamp: Duration, + ) { + match result { + Ok(verified_aggregate) => { + let aggregate = &verified_aggregate.signed_aggregate; + let indexed_attestation = &verified_aggregate.indexed_attestation; - if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) { - match e { - BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Register the attestation with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_aggregated_attestation( + seen_timestamp, + aggregate, + indexed_attestation, + &self.chain.slot_clock, + ); + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + if let Err(e) = self + .chain + .apply_attestation_to_fork_choice(&verified_aggregate) + { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation( + e, + )) => { + debug!( + self.log, + "Aggregate invalid for fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ) + } + e => error!( + self.log, + "Error applying aggregate to fork choice"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root + ), + } + } + + if let Err(e) = self.chain.add_to_block_inclusion_pool(&verified_aggregate) { debug!( self.log, - "Aggregate invalid for fork choice"; + "Attestation invalid for op pool"; "reason" => ?e, "peer" => %peer_id, "beacon_block_root" => ?beacon_block_root ) } - e => error!( - self.log, - "Error applying aggregate to fork choice"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ), + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + } + Err(RejectedAggregate { + signed_aggregate, + error, + }) => { + // Report the failure to gossipsub + self.handle_attestation_verification_failure( + peer_id, + message_id, + FailedAtt::Aggregate { + attestation: signed_aggregate, + seen_timestamp, + }, + reprocess_tx, + error, + ); } } - - if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) { - debug!( - self.log, - "Attestation invalid for op pool"; - "reason" => ?e, - "peer" => %peer_id, - "beacon_block_root" => ?beacon_block_root - ) - } - - metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL); } /// Process the beacon block received from the gossip network and: @@ -834,7 +1169,7 @@ impl Worker { reprocess_tx: Option>>, error: AttnError, ) { - let beacon_block_root = failed_att.root(); + let beacon_block_root = failed_att.beacon_block_root(); let attestation_type = failed_att.kind(); metrics::register_attestation_error(&error); match &error { diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index 58fec22d5..b9d78900b 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -9,6 +9,7 @@ mod gossip_methods; mod rpc_methods; mod sync_methods; +pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage}; pub use sync_methods::ProcessId; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 61e6d56ea..f299de1b6 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -413,7 +413,7 @@ impl ForkChoiceTest { let mut verified_attestation = self .harness .chain - .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) .expect("precondition: should gossip verify attestation"); if let MutationDelay::Blocks(slots) = delay {