Batch BLS verification for attestations (#2399)

## Issue Addressed

NA

## Proposed Changes

Adds the ability to verify batches of aggregated/unaggregated attestations from the network.

When the `BeaconProcessor` finds there are messages in the aggregated or unaggregated attestation queues, it will first check the length of the queue:

- `== 1` verify the attestation individually.
- `>= 2` take up to 64 of those attestations and verify them in a batch.

Notably, we only perform batch verification if the queue has a backlog. We don't apply any artificial delays to attestations to try and force them into batches. 

### Batching Details

To assist with implementing batches we modify `beacon_chain::attestation_verification` to have two distinct categories for attestations:

- *Indexed* attestations: those which have passed initial validation and were valid enough for us to derive an `IndexedAttestation`.
- *Verified* attestations: those attestations which were indexed *and also* passed signature verification. These are well-formed, interesting messages which were signed by validators.

The batching functions accept `n` attestations and then return `n` attestation verification `Result`s, where those `Result`s can be any combination of `Ok` or `Err`. In other words, we attempt to verify as many attestations as possible and return specific per-attestation results so peer scores can be updated, if required.

When we batch verify attestations, we first try to map all those attestations to *indexed* attestations. If any of those attestations were able to be indexed, we then perform batch BLS verification on those indexed attestations. If the batch verification succeeds, we convert them into *verified* attestations, disabling individual signature checking. If the batch fails, we convert to verified attestations with individual signature checking enabled.

Ultimately, we optimistically try to do a batch verification of attestation signatures and fall-back to individual verification if it fails. This opens an attach vector for "poisoning" the attestations and causing us to waste a batch verification. I argue that peer scoring should do a good-enough job of defending against this and the typical-case gains massively outweigh the worst-case losses.

## Additional Info

Before this PR, attestation verification took the attestations by value (instead of by reference). It turns out that this was unnecessary and, in my opinion, resulted in some undesirable ergonomics (e.g., we had to pass the attestation back in the `Err` variant to avoid clones). In this PR I've modified attestation verification so that it now takes a reference.

I refactored the `beacon_chain/tests/attestation_verification.rs` tests so they use a builder-esque "tester" struct instead of a weird macro. It made it easier for me to test individual/batch with the same set of tests and I think it was a nice tidy-up. Notably, I did this last to try and make sure my new refactors to *actual* production code would pass under the existing test suite.
This commit is contained in:
Paul Hauner 2021-09-22 08:49:41 +00:00
parent 9667dc2f03
commit be11437c27
13 changed files with 1962 additions and 1037 deletions

View File

@ -18,13 +18,16 @@
//! types::Attestation types::SignedAggregateAndProof //! types::Attestation types::SignedAggregateAndProof
//! | | //! | |
//! ▼ ▼ //! ▼ ▼
//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation //! IndexedUnaggregatedAttestation IndexedAggregatedAttestation
//! | |
//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation
//! | | //! | |
//! ------------------------------------- //! -------------------------------------
//! | //! |
//! ▼ //! ▼
//! impl SignatureVerifiedAttestation //! impl VerifiedAttestation
//! ``` //! ```
mod batch;
use crate::{ use crate::{
beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT}, beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT},
@ -53,6 +56,8 @@ use types::{
SelectionProof, SignedAggregateAndProof, Slot, SubnetId, SelectionProof, SignedAggregateAndProof, Slot, SubnetId,
}; };
pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations};
/// Returned when an attestation was not successfully verified. It might not have been verified for /// Returned when an attestation was not successfully verified. It might not have been verified for
/// two reasons: /// two reasons:
/// ///
@ -254,53 +259,105 @@ impl From<BeaconChainError> for Error {
} }
} }
/// Wraps a `SignedAggregateAndProof` that has been verified for propagation on the gossip network. /// Used to avoid double-checking signatures.
pub struct VerifiedAggregatedAttestation<T: BeaconChainTypes> { #[derive(Copy, Clone)]
signed_aggregate: SignedAggregateAndProof<T::EthSpec>, enum CheckAttestationSignature {
Yes,
No,
}
/// Wraps a `SignedAggregateAndProof` that has been verified up until the point that an
/// `IndexedAttestation` can be derived.
///
/// These attestations have *not* undergone signature verification.
struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> {
signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
attestation_root: Hash256,
}
/// Wraps a `Attestation` that has been verified up until the point that an `IndexedAttestation` can
/// be derived.
///
/// These attestations have *not* undergone signature verification.
struct IndexedUnaggregatedAttestation<'a, T: BeaconChainTypes> {
attestation: &'a Attestation<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
subnet_id: SubnetId,
validator_index: u64,
}
/// Wraps a `SignedAggregateAndProof` that has been fully verified for propagation on the gossip
/// network.
pub struct VerifiedAggregatedAttestation<'a, T: BeaconChainTypes> {
signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>, indexed_attestation: IndexedAttestation<T::EthSpec>,
} }
/// Wraps an `Attestation` that has been verified for propagation on the gossip network. impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
pub struct VerifiedUnaggregatedAttestation<T: BeaconChainTypes> { pub fn into_indexed_attestation(self) -> IndexedAttestation<T::EthSpec> {
attestation: Attestation<T::EthSpec>, self.indexed_attestation
}
}
/// Wraps an `Attestation` that has been fully verified for propagation on the gossip network.
pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> {
attestation: &'a Attestation<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>, indexed_attestation: IndexedAttestation<T::EthSpec>,
subnet_id: SubnetId, subnet_id: SubnetId,
} }
impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
pub fn into_indexed_attestation(self) -> IndexedAttestation<T::EthSpec> {
self.indexed_attestation
}
}
/// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive /// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive
/// macro. /// macro.
impl<T: BeaconChainTypes> Clone for VerifiedUnaggregatedAttestation<T> { impl<'a, T: BeaconChainTypes> Clone for IndexedUnaggregatedAttestation<'a, T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
attestation: self.attestation.clone(), attestation: self.attestation,
indexed_attestation: self.indexed_attestation.clone(), indexed_attestation: self.indexed_attestation.clone(),
subnet_id: self.subnet_id, subnet_id: self.subnet_id,
validator_index: self.validator_index,
} }
} }
} }
/// A helper trait implemented on wrapper types that can be progressed to a state where they can be /// A helper trait implemented on wrapper types that can be progressed to a state where they can be
/// verified for application to fork choice. /// verified for application to fork choice.
pub trait SignatureVerifiedAttestation<T: BeaconChainTypes> { pub trait VerifiedAttestation<T: BeaconChainTypes> {
fn attestation(&self) -> &Attestation<T::EthSpec>;
fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec>; fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec>;
} }
impl<'a, T: BeaconChainTypes> SignatureVerifiedAttestation<T> for VerifiedAggregatedAttestation<T> { impl<'a, T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedAggregatedAttestation<'a, T> {
fn attestation(&self) -> &Attestation<T::EthSpec> {
self.attestation()
}
fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> { fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
&self.indexed_attestation &self.indexed_attestation
} }
} }
impl<T: BeaconChainTypes> SignatureVerifiedAttestation<T> for VerifiedUnaggregatedAttestation<T> { impl<'a, T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedUnaggregatedAttestation<'a, T> {
fn attestation(&self) -> &Attestation<T::EthSpec> {
self.attestation
}
fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> { fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
&self.indexed_attestation &self.indexed_attestation
} }
} }
/// Information about invalid attestations which might still be slashable despite being invalid. /// Information about invalid attestations which might still be slashable despite being invalid.
pub enum AttestationSlashInfo<T: BeaconChainTypes, TErr> { pub enum AttestationSlashInfo<'a, T: BeaconChainTypes, TErr> {
/// The attestation is invalid, but its signature wasn't checked. /// The attestation is invalid, but its signature wasn't checked.
SignatureNotChecked(Attestation<T::EthSpec>, TErr), SignatureNotChecked(&'a Attestation<T::EthSpec>, TErr),
/// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`.
SignatureNotCheckedIndexed(IndexedAttestation<T::EthSpec>, TErr), SignatureNotCheckedIndexed(IndexedAttestation<T::EthSpec>, TErr),
/// The attestation's signature is invalid, so it will never be slashable. /// The attestation's signature is invalid, so it will never be slashable.
@ -324,7 +381,7 @@ fn process_slash_info<T: BeaconChainTypes>(
if let Some(slasher) = chain.slasher.as_ref() { if let Some(slasher) = chain.slasher.as_ref() {
let (indexed_attestation, check_signature, err) = match slash_info { let (indexed_attestation, check_signature, err) = match slash_info {
SignatureNotChecked(attestation, err) => { SignatureNotChecked(attestation, err) => {
match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) {
Ok((indexed, _)) => (indexed, true, err), Ok((indexed, _)) => (indexed, true, err),
Err(e) => { Err(e) => {
debug!( debug!(
@ -367,13 +424,13 @@ fn process_slash_info<T: BeaconChainTypes>(
} }
} }
impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> { impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
/// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip /// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip
/// network. /// network.
pub fn verify( pub fn verify(
signed_aggregate: SignedAggregateAndProof<T::EthSpec>, signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<Self, (Error, SignedAggregateAndProof<T::EthSpec>)> { ) -> Result<Self, Error> {
Self::verify_slashable(signed_aggregate, chain) Self::verify_slashable(signed_aggregate, chain)
.map(|verified_aggregate| { .map(|verified_aggregate| {
if let Some(slasher) = chain.slasher.as_ref() { if let Some(slasher) = chain.slasher.as_ref() {
@ -381,9 +438,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
} }
verified_aggregate verified_aggregate
}) })
.map_err(|(slash_info, original_aggregate)| { .map_err(|slash_info| process_slash_info(slash_info, chain))
(process_slash_info(slash_info, chain), original_aggregate)
})
} }
/// Run the checks that happen before an indexed attestation is constructed. /// Run the checks that happen before an indexed attestation is constructed.
@ -467,6 +522,56 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
} }
} }
/// Verify the attestation, producing extra information about whether it might be slashable.
pub fn verify_slashable(
signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, AttestationSlashInfo<'a, T, Error>> {
use AttestationSlashInfo::*;
let attestation = &signed_aggregate.message.aggregate;
let aggregator_index = signed_aggregate.message.aggregator_index;
let attestation_root = match Self::verify_early_checks(signed_aggregate, chain) {
Ok(root) => root,
Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)),
};
let indexed_attestation =
match map_attestation_committee(chain, attestation, |(committee, _)| {
// Note: this clones the signature which is known to be a relatively slow operation.
//
// Future optimizations should remove this clone.
let selection_proof =
SelectionProof::from(signed_aggregate.message.selection_proof.clone());
if !selection_proof
.is_aggregator(committee.committee.len(), &chain.spec)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::InvalidSelectionProof { aggregator_index });
}
// Ensure the aggregator is a member of the committee for which it is aggregating.
if !committee.committee.contains(&(aggregator_index as usize)) {
return Err(Error::AggregatorNotInCommittee { aggregator_index });
}
get_indexed_attestation(committee.committee, attestation)
.map_err(|e| BeaconChainError::from(e).into())
}) {
Ok(indexed_attestation) => indexed_attestation,
Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)),
};
Ok(IndexedAggregatedAttestation {
signed_aggregate,
indexed_attestation,
attestation_root,
})
}
}
impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
/// Run the checks that happen after the indexed attestation and signature have been checked. /// Run the checks that happen after the indexed attestation and signature have been checked.
fn verify_late_checks( fn verify_late_checks(
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>, signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
@ -508,82 +613,70 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
Ok(()) Ok(())
} }
/// Verify the attestation, producing extra information about whether it might be slashable. /// Verify the `signed_aggregate`.
// NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not pub fn verify(
// worth creating an alias. signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
#[allow(clippy::type_complexity)]
pub fn verify_slashable(
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result< ) -> Result<Self, Error> {
Self, let indexed = IndexedAggregatedAttestation::verify(signed_aggregate, chain)?;
( Self::from_indexed(indexed, chain, CheckAttestationSignature::Yes)
AttestationSlashInfo<T, Error>, }
SignedAggregateAndProof<T::EthSpec>,
), /// Complete the verification of an indexed attestation.
> { fn from_indexed(
signed_aggregate: IndexedAggregatedAttestation<'a, T>,
chain: &BeaconChain<T>,
check_signature: CheckAttestationSignature,
) -> Result<Self, Error> {
Self::verify_slashable(signed_aggregate, chain, check_signature)
.map(|verified_aggregate| verified_aggregate.apply_to_slasher(chain))
.map_err(|slash_info| process_slash_info(slash_info, chain))
}
fn apply_to_slasher(self, chain: &BeaconChain<T>) -> Self {
if let Some(slasher) = chain.slasher.as_ref() {
slasher.accept_attestation(self.indexed_attestation.clone());
}
self
}
/// Verify the attestation, producing extra information about whether it might be slashable.
fn verify_slashable(
signed_aggregate: IndexedAggregatedAttestation<'a, T>,
chain: &BeaconChain<T>,
check_signature: CheckAttestationSignature,
) -> Result<Self, AttestationSlashInfo<'a, T, Error>> {
use AttestationSlashInfo::*; use AttestationSlashInfo::*;
let attestation = &signed_aggregate.message.aggregate; let IndexedAggregatedAttestation {
let aggregator_index = signed_aggregate.message.aggregator_index; signed_aggregate,
let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) { indexed_attestation,
Ok(root) => root, attestation_root,
Err(e) => { } = signed_aggregate;
return Err((
SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), match check_signature {
CheckAttestationSignature::Yes => {
// Ensure that all signatures are valid.
if let Err(e) = verify_signed_aggregate_signatures(
chain,
signed_aggregate, signed_aggregate,
)) &indexed_attestation,
} )
};
let indexed_attestation =
match map_attestation_committee(chain, attestation, |(committee, _)| {
// Note: this clones the signature which is known to be a relatively slow operation.
//
// Future optimizations should remove this clone.
let selection_proof =
SelectionProof::from(signed_aggregate.message.selection_proof.clone());
if !selection_proof
.is_aggregator(committee.committee.len(), &chain.spec)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::InvalidSelectionProof { aggregator_index });
}
// Ensure the aggregator is a member of the committee for which it is aggregating.
if !committee.committee.contains(&(aggregator_index as usize)) {
return Err(Error::AggregatorNotInCommittee { aggregator_index });
}
get_indexed_attestation(committee.committee, attestation)
.map_err(|e| BeaconChainError::from(e).into())
}) {
Ok(indexed_attestation) => indexed_attestation,
Err(e) => {
return Err((
SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e),
signed_aggregate,
))
}
};
// Ensure that all signatures are valid.
if let Err(e) =
verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation)
.and_then(|is_valid| { .and_then(|is_valid| {
if !is_valid { if !is_valid {
Err(Error::InvalidSignature) Err(Error::InvalidSignature)
} else { } else {
Ok(()) Ok(())
} }
}) }) {
{ return Err(SignatureInvalid(e));
return Err((SignatureInvalid(e), signed_aggregate)); }
} }
CheckAttestationSignature::No => (),
};
if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) { if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_root, chain) {
return Err((SignatureValid(indexed_attestation, e), signed_aggregate)); return Err(SignatureValid(indexed_attestation, e));
} }
Ok(VerifiedAggregatedAttestation { Ok(VerifiedAggregatedAttestation {
@ -592,11 +685,6 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
}) })
} }
/// A helper function to add this aggregate to `beacon_chain.op_pool`.
pub fn add_to_pool(self, chain: &BeaconChain<T>) -> Result<Self, Error> {
chain.add_to_block_inclusion_pool(self)
}
/// Returns the underlying `attestation` for the `signed_aggregate`. /// Returns the underlying `attestation` for the `signed_aggregate`.
pub fn attestation(&self) -> &Attestation<T::EthSpec> { pub fn attestation(&self) -> &Attestation<T::EthSpec> {
&self.signed_aggregate.message.aggregate &self.signed_aggregate.message.aggregate
@ -604,11 +692,11 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
/// Returns the underlying `signed_aggregate`. /// Returns the underlying `signed_aggregate`.
pub fn aggregate(&self) -> &SignedAggregateAndProof<T::EthSpec> { pub fn aggregate(&self) -> &SignedAggregateAndProof<T::EthSpec> {
&self.signed_aggregate self.signed_aggregate
} }
} }
impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> { impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> {
/// Run the checks that happen before an indexed attestation is constructed. /// Run the checks that happen before an indexed attestation is constructed.
pub fn verify_early_checks( pub fn verify_early_checks(
attestation: &Attestation<T::EthSpec>, attestation: &Attestation<T::EthSpec>,
@ -699,6 +787,75 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
Ok((validator_index, expected_subnet_id)) Ok((validator_index, expected_subnet_id))
} }
/// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip
/// network.
///
/// `subnet_id` is the subnet from which we received this attestation. This function will
/// verify that it was received on the correct subnet.
pub fn verify(
attestation: &'a Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
chain: &BeaconChain<T>,
) -> Result<Self, Error> {
Self::verify_slashable(attestation, subnet_id, chain)
.map(|verified_unaggregated| {
if let Some(slasher) = chain.slasher.as_ref() {
slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone());
}
verified_unaggregated
})
.map_err(|slash_info| process_slash_info(slash_info, chain))
}
/// Verify the attestation, producing extra information about whether it might be slashable.
pub fn verify_slashable(
attestation: &'a Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
chain: &BeaconChain<T>,
) -> Result<Self, AttestationSlashInfo<'a, T, Error>> {
use AttestationSlashInfo::*;
if let Err(e) = Self::verify_early_checks(attestation, chain) {
return Err(SignatureNotChecked(attestation, e));
}
let (indexed_attestation, committees_per_slot) =
match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) {
Ok(x) => x,
Err(e) => {
return Err(SignatureNotChecked(attestation, e));
}
};
let (validator_index, expected_subnet_id) = match Self::verify_middle_checks(
attestation,
&indexed_attestation,
committees_per_slot,
subnet_id,
chain,
) {
Ok(t) => t,
Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)),
};
Ok(Self {
attestation,
indexed_attestation,
subnet_id: expected_subnet_id,
validator_index,
})
}
/// Returns a mutable reference to the underlying attestation.
///
/// Only use during testing since modifying the `IndexedAttestation` can cause the attestation
/// to no-longer be valid.
pub fn __indexed_attestation_mut(&mut self) -> &mut IndexedAttestation<T::EthSpec> {
&mut self.indexed_attestation
}
}
impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
/// Run the checks that apply after the signature has been checked. /// Run the checks that apply after the signature has been checked.
fn verify_late_checks( fn verify_late_checks(
attestation: &Attestation<T::EthSpec>, attestation: &Attestation<T::EthSpec>,
@ -725,88 +882,70 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
Ok(()) Ok(())
} }
/// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip /// Verify the `unaggregated_attestation`.
/// network.
///
/// `subnet_id` is the subnet from which we received this attestation. This function will
/// verify that it was received on the correct subnet.
pub fn verify( pub fn verify(
attestation: Attestation<T::EthSpec>, unaggregated_attestation: &'a Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>, subnet_id: Option<SubnetId>,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<Self, (Error, Attestation<T::EthSpec>)> { ) -> Result<Self, Error> {
Self::verify_slashable(attestation, subnet_id, chain) let indexed =
.map(|verified_unaggregated| { IndexedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, chain)?;
if let Some(slasher) = chain.slasher.as_ref() { Self::from_indexed(indexed, chain, CheckAttestationSignature::Yes)
slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); }
}
verified_unaggregated /// Complete the verification of an indexed attestation.
}) fn from_indexed(
.map_err(|(slash_info, original_attestation)| { attestation: IndexedUnaggregatedAttestation<'a, T>,
(process_slash_info(slash_info, chain), original_attestation) chain: &BeaconChain<T>,
}) check_signature: CheckAttestationSignature,
) -> Result<Self, Error> {
Self::verify_slashable(attestation, chain, check_signature)
.map(|verified_unaggregated| verified_unaggregated.apply_to_slasher(chain))
.map_err(|slash_info| process_slash_info(slash_info, chain))
}
fn apply_to_slasher(self, chain: &BeaconChain<T>) -> Self {
if let Some(slasher) = chain.slasher.as_ref() {
slasher.accept_attestation(self.indexed_attestation.clone());
}
self
} }
/// Verify the attestation, producing extra information about whether it might be slashable. /// Verify the attestation, producing extra information about whether it might be slashable.
// NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not fn verify_slashable(
// worth creating an alias. attestation: IndexedUnaggregatedAttestation<'a, T>,
#[allow(clippy::type_complexity)]
pub fn verify_slashable(
attestation: Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<Self, (AttestationSlashInfo<T, Error>, Attestation<T::EthSpec>)> { check_signature: CheckAttestationSignature,
) -> Result<Self, AttestationSlashInfo<'a, T, Error>> {
use AttestationSlashInfo::*; use AttestationSlashInfo::*;
if let Err(e) = Self::verify_early_checks(&attestation, chain) { let IndexedUnaggregatedAttestation {
return Err((SignatureNotChecked(attestation.clone(), e), attestation)); attestation,
} indexed_attestation,
let (indexed_attestation, committees_per_slot) =
match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) {
Ok(x) => x,
Err(e) => {
return Err((SignatureNotChecked(attestation.clone(), e), attestation));
}
};
let (validator_index, expected_subnet_id) = match Self::verify_middle_checks(
&attestation,
&indexed_attestation,
committees_per_slot,
subnet_id, subnet_id,
chain, validator_index,
) { } = attestation;
Ok(t) => t,
Err(e) => { match check_signature {
return Err(( CheckAttestationSignature::Yes => {
SignatureNotCheckedIndexed(indexed_attestation, e), if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) {
attestation, return Err(SignatureInvalid(e));
)) }
} }
CheckAttestationSignature::No => (),
}; };
// The aggregate signature of the attestation is valid. if let Err(e) = Self::verify_late_checks(attestation, validator_index, chain) {
if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { return Err(SignatureValid(indexed_attestation, e));
return Err((SignatureInvalid(e), attestation));
}
if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) {
return Err((SignatureValid(indexed_attestation, e), attestation));
} }
Ok(Self { Ok(Self {
attestation, attestation,
indexed_attestation, indexed_attestation,
subnet_id: expected_subnet_id, subnet_id,
}) })
} }
/// A helper function to add this attestation to `beacon_chain.naive_aggregation_pool`.
pub fn add_to_pool(self, chain: &BeaconChain<T>) -> Result<Self, Error> {
chain.add_to_naive_aggregation_pool(self)
}
/// Returns the correct subnet for the attestation. /// Returns the correct subnet for the attestation.
pub fn subnet_id(&self) -> SubnetId { pub fn subnet_id(&self) -> SubnetId {
self.subnet_id self.subnet_id
@ -814,7 +953,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
/// Returns the wrapped `attestation`. /// Returns the wrapped `attestation`.
pub fn attestation(&self) -> &Attestation<T::EthSpec> { pub fn attestation(&self) -> &Attestation<T::EthSpec> {
&self.attestation self.attestation
} }
/// Returns the wrapped `indexed_attestation`. /// Returns the wrapped `indexed_attestation`.

View File

@ -0,0 +1,222 @@
//! These two `batch_...` functions provide verification of batches of attestations. They provide
//! significant CPU-time savings by performing batch verification of BLS signatures.
//!
//! In each function, attestations are "indexed" (i.e., the `IndexedAttestation` is computed), to
//! determine if they should progress to signature verification. Then, all attestations which were
//! successfully indexed have their signatures verified in a batch. If that signature batch fails
//! then all attestation signatures are verified independently.
//!
//! The outcome of each function is a `Vec<Result>` with a one-to-one mapping to the attestations
//! supplied as input. Each result provides the exact success or failure result of the corresponding
//! attestation, with no loss of fidelity when compared to individual verification.
use super::{
CheckAttestationSignature, Error, IndexedAggregatedAttestation, IndexedUnaggregatedAttestation,
VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation,
};
use crate::{
beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, BeaconChain, BeaconChainError,
BeaconChainTypes,
};
use bls::verify_signature_sets;
use state_processing::signature_sets::{
indexed_attestation_signature_set_from_pubkeys, signed_aggregate_selection_proof_signature_set,
signed_aggregate_signature_set,
};
use std::borrow::Cow;
use types::*;
/// Verify aggregated attestations using batch BLS signature verification.
///
/// See module-level docs for more info.
pub fn batch_verify_aggregated_attestations<'a, T, I>(
aggregates: I,
chain: &BeaconChain<T>,
) -> Result<Vec<Result<VerifiedAggregatedAttestation<'a, T>, Error>>, Error>
where
T: BeaconChainTypes,
I: Iterator<Item = &'a SignedAggregateAndProof<T::EthSpec>> + ExactSizeIterator,
{
let mut num_indexed = 0;
let mut num_failed = 0;
// Perform indexing of all attestations, collecting the results.
let indexing_results = aggregates
.map(|aggregate| {
let result = IndexedAggregatedAttestation::verify(aggregate, chain);
if result.is_ok() {
num_indexed += 1;
} else {
num_failed += 1;
}
result
})
.collect::<Vec<_>>();
// May be set to `No` if batch verification succeeds.
let mut check_signatures = CheckAttestationSignature::Yes;
// Perform batch BLS verification, if any attestation signatures are worth checking.
if num_indexed > 0 {
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES);
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let fork = chain.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.fork()))?;
let mut signature_sets = Vec::with_capacity(num_indexed * 3);
// Iterate, flattening to get only the `Ok` values.
for indexed in indexing_results.iter().flatten() {
let signed_aggregate = &indexed.signed_aggregate;
let indexed_attestation = &indexed.indexed_attestation;
signature_sets.push(
signed_aggregate_selection_proof_signature_set(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
signed_aggregate,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?,
);
signature_sets.push(
signed_aggregate_signature_set(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
signed_aggregate,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?,
);
signature_sets.push(
indexed_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&indexed_attestation.signature,
indexed_attestation,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?,
);
}
metrics::stop_timer(signature_setup_timer);
let _signature_verification_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_TIMES);
if verify_signature_sets(signature_sets.iter()) {
// Since all the signatures verified in a batch, there's no reason for them to be
// checked again later.
check_signatures = CheckAttestationSignature::No
}
}
// Complete the attestation verification, potentially verifying all signatures independently.
let final_results = indexing_results
.into_iter()
.map(|result| match result {
Ok(indexed) => {
VerifiedAggregatedAttestation::from_indexed(indexed, chain, check_signatures)
}
Err(e) => Err(e),
})
.collect();
Ok(final_results)
}
/// Verify unaggregated attestations using batch BLS signature verification.
///
/// See module-level docs for more info.
pub fn batch_verify_unaggregated_attestations<'a, T, I>(
attestations: I,
chain: &BeaconChain<T>,
) -> Result<Vec<Result<VerifiedUnaggregatedAttestation<'a, T>, Error>>, Error>
where
T: BeaconChainTypes,
I: Iterator<Item = (&'a Attestation<T::EthSpec>, Option<SubnetId>)> + ExactSizeIterator,
{
let mut num_partially_verified = 0;
let mut num_failed = 0;
// Perform partial verification of all attestations, collecting the results.
let partial_results = attestations
.map(|(attn, subnet_opt)| {
let result = IndexedUnaggregatedAttestation::verify(attn, subnet_opt, chain);
if result.is_ok() {
num_partially_verified += 1;
} else {
num_failed += 1;
}
result
})
.collect::<Vec<_>>();
// May be set to `No` if batch verification succeeds.
let mut check_signatures = CheckAttestationSignature::Yes;
// Perform batch BLS verification, if any attestation signatures are worth checking.
if num_partially_verified > 0 {
let signature_setup_timer = metrics::start_timer(
&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES,
);
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let fork = chain.with_head(|head| Ok::<_, BeaconChainError>(head.beacon_state.fork()))?;
let mut signature_sets = Vec::with_capacity(num_partially_verified);
// Iterate, flattening to get only the `Ok` values.
for partially_verified in partial_results.iter().flatten() {
let indexed_attestation = &partially_verified.indexed_attestation;
let signature_set = indexed_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&indexed_attestation.signature,
indexed_attestation,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?;
signature_sets.push(signature_set);
}
metrics::stop_timer(signature_setup_timer);
let _signature_verification_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_TIMES);
if verify_signature_sets(signature_sets.iter()) {
// Since all the signatures verified in a batch, there's no reason for them to be
// checked again later.
check_signatures = CheckAttestationSignature::No
}
}
// Complete the attestation verification, potentially verifying all signatures independently.
let final_results = partial_results
.into_iter()
.map(|result| match result {
Ok(partial) => {
VerifiedUnaggregatedAttestation::from_indexed(partial, chain, check_signatures)
}
Err(e) => Err(e),
})
.collect();
Ok(final_results)
}

View File

@ -1,5 +1,6 @@
use crate::attestation_verification::{ use crate::attestation_verification::{
Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations,
Error as AttestationError, VerifiedAggregatedAttestation, VerifiedAttestation,
VerifiedUnaggregatedAttestation, VerifiedUnaggregatedAttestation,
}; };
use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::attester_cache::{AttesterCache, AttesterCacheKey};
@ -1510,17 +1511,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}) })
} }
/// Performs the same validation as `Self::verify_unaggregated_attestation_for_gossip`, but for
/// multiple attestations using batch BLS verification. Batch verification can provide
/// significant CPU-time savings compared to individual verification.
pub fn batch_verify_unaggregated_attestations_for_gossip<'a, I>(
&self,
attestations: I,
) -> Result<
Vec<Result<VerifiedUnaggregatedAttestation<'a, T>, AttestationError>>,
AttestationError,
>
where
I: Iterator<Item = (&'a Attestation<T::EthSpec>, Option<SubnetId>)> + ExactSizeIterator,
{
batch_verify_unaggregated_attestations(attestations, self)
}
/// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if /// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if
/// it is valid to be (re)broadcast on the gossip network. /// it is valid to be (re)broadcast on the gossip network.
/// ///
/// The attestation must be "unaggregated", that is it must have exactly one /// The attestation must be "unaggregated", that is it must have exactly one
/// aggregation bit set. /// aggregation bit set.
pub fn verify_unaggregated_attestation_for_gossip( pub fn verify_unaggregated_attestation_for_gossip<'a>(
&self, &self,
unaggregated_attestation: Attestation<T::EthSpec>, unaggregated_attestation: &'a Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>, subnet_id: Option<SubnetId>,
) -> Result<VerifiedUnaggregatedAttestation<T>, (AttestationError, Attestation<T::EthSpec>)> ) -> Result<VerifiedUnaggregatedAttestation<'a, T>, AttestationError> {
{
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS);
let _timer = let _timer =
metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);
@ -1539,15 +1555,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) )
} }
/// Performs the same validation as `Self::verify_aggregated_attestation_for_gossip`, but for
/// multiple attestations using batch BLS verification. Batch verification can provide
/// significant CPU-time savings compared to individual verification.
pub fn batch_verify_aggregated_attestations_for_gossip<'a, I>(
&self,
aggregates: I,
) -> Result<Vec<Result<VerifiedAggregatedAttestation<'a, T>, AttestationError>>, AttestationError>
where
I: Iterator<Item = &'a SignedAggregateAndProof<T::EthSpec>> + ExactSizeIterator,
{
batch_verify_aggregated_attestations(aggregates, self)
}
/// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it, /// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it,
/// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network. /// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network.
pub fn verify_aggregated_attestation_for_gossip( pub fn verify_aggregated_attestation_for_gossip<'a>(
&self, &self,
signed_aggregate: SignedAggregateAndProof<T::EthSpec>, signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
) -> Result< ) -> Result<VerifiedAggregatedAttestation<'a, T>, AttestationError> {
VerifiedAggregatedAttestation<T>,
(AttestationError, SignedAggregateAndProof<T::EthSpec>),
> {
metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_REQUESTS); metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_REQUESTS);
let _timer = let _timer =
metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);
@ -1597,13 +1623,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Accepts some attestation-type object and attempts to verify it in the context of fork /// Accepts some attestation-type object and attempts to verify it in the context of fork
/// choice. If it is valid it is applied to `self.fork_choice`. /// choice. If it is valid it is applied to `self.fork_choice`.
/// ///
/// Common items that implement `SignatureVerifiedAttestation`: /// Common items that implement `VerifiedAttestation`:
/// ///
/// - `VerifiedUnaggregatedAttestation` /// - `VerifiedUnaggregatedAttestation`
/// - `VerifiedAggregatedAttestation` /// - `VerifiedAggregatedAttestation`
pub fn apply_attestation_to_fork_choice( pub fn apply_attestation_to_fork_choice(
&self, &self,
verified: &impl SignatureVerifiedAttestation<T>, verified: &impl VerifiedAttestation<T>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
@ -1623,8 +1649,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// and no error is returned. /// and no error is returned.
pub fn add_to_naive_aggregation_pool( pub fn add_to_naive_aggregation_pool(
&self, &self,
unaggregated_attestation: VerifiedUnaggregatedAttestation<T>, unaggregated_attestation: &impl VerifiedAttestation<T>,
) -> Result<VerifiedUnaggregatedAttestation<T>, AttestationError> { ) -> Result<(), AttestationError> {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_AGG_POOL); let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_AGG_POOL);
let attestation = unaggregated_attestation.attestation(); let attestation = unaggregated_attestation.attestation();
@ -1660,7 +1686,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
}; };
Ok(unaggregated_attestation) Ok(())
} }
/// Accepts a `VerifiedSyncCommitteeMessage` and attempts to apply it to the "naive /// Accepts a `VerifiedSyncCommitteeMessage` and attempts to apply it to the "naive
@ -1727,13 +1753,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(verified_sync_committee_message) Ok(verified_sync_committee_message)
} }
/// Accepts a `VerifiedAggregatedAttestation` and attempts to apply it to `self.op_pool`. /// Accepts a `VerifiedAttestation` and attempts to apply it to `self.op_pool`.
/// ///
/// The op pool is used by local block producers to pack blocks with operations. /// The op pool is used by local block producers to pack blocks with operations.
pub fn add_to_block_inclusion_pool( pub fn add_to_block_inclusion_pool(
&self, &self,
signed_aggregate: VerifiedAggregatedAttestation<T>, verified_attestation: &impl VerifiedAttestation<T>,
) -> Result<VerifiedAggregatedAttestation<T>, AttestationError> { ) -> Result<(), AttestationError> {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL); let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL);
// If there's no eth1 chain then it's impossible to produce blocks and therefore // If there's no eth1 chain then it's impossible to produce blocks and therefore
@ -1745,7 +1771,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.op_pool self.op_pool
.insert_attestation( .insert_attestation(
// TODO: address this clone. // TODO: address this clone.
signed_aggregate.attestation().clone(), verified_attestation.attestation().clone(),
&fork, &fork,
self.genesis_validators_root, self.genesis_validators_root,
&self.spec, &self.spec,
@ -1753,7 +1779,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(Error::from)?; .map_err(Error::from)?;
} }
Ok(signed_aggregate) Ok(())
} }
/// Accepts a `VerifiedSyncContribution` and attempts to apply it to `self.op_pool`. /// Accepts a `VerifiedSyncContribution` and attempts to apply it to `self.op_pool`.

View File

@ -199,6 +199,26 @@ lazy_static! {
"Time spent on the signature verification of attestation processing" "Time spent on the signature verification of attestation processing"
); );
/*
* Batch Attestation Processing
*/
pub static ref ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES: Result<Histogram> = try_create_histogram(
"beacon_attestation_processing_batch_agg_signature_setup_times",
"Time spent on setting up for the signature verification of batch aggregate processing"
);
pub static ref ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_TIMES: Result<Histogram> = try_create_histogram(
"beacon_attestation_processing_batch_agg_signature_times",
"Time spent on the signature verification of batch aggregate attestation processing"
);
pub static ref ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES: Result<Histogram> = try_create_histogram(
"beacon_attestation_processing_batch_unagg_signature_setup_times",
"Time spent on setting up for the signature verification of batch unaggregate processing"
);
pub static ref ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_TIMES: Result<Histogram> = try_create_histogram(
"beacon_attestation_processing_batch_unagg_signature_times",
"Time spent on the signature verification of batch unaggregate attestation processing"
);
/* /*
* Shuffling cache * Shuffling cache
*/ */

View File

@ -1159,29 +1159,42 @@ where
} }
pub fn process_attestations(&self, attestations: HarnessAttestations<E>) { pub fn process_attestations(&self, attestations: HarnessAttestations<E>) {
for (unaggregated_attestations, maybe_signed_aggregate) in attestations.into_iter() { let num_validators = self.validator_keypairs.len();
for (attestation, subnet_id) in unaggregated_attestations { let mut unaggregated = Vec::with_capacity(num_validators);
self.chain // This is an over-allocation, but it should be fine. It won't be *that* memory hungry and
.verify_unaggregated_attestation_for_gossip( // it's nice to have fast tests.
attestation.clone(), let mut aggregated = Vec::with_capacity(num_validators);
Some(subnet_id),
) for (unaggregated_attestations, maybe_signed_aggregate) in attestations.iter() {
.unwrap() for (attn, subnet) in unaggregated_attestations {
.add_to_pool(&self.chain) unaggregated.push((attn, Some(*subnet)));
.unwrap();
} }
if let Some(signed_aggregate) = maybe_signed_aggregate { if let Some(a) = maybe_signed_aggregate {
let attn = self aggregated.push(a)
.chain
.verify_aggregated_attestation_for_gossip(signed_aggregate)
.unwrap();
self.chain.apply_attestation_to_fork_choice(&attn).unwrap();
self.chain.add_to_block_inclusion_pool(attn).unwrap();
} }
} }
for result in self
.chain
.batch_verify_unaggregated_attestations_for_gossip(unaggregated.into_iter())
.unwrap()
{
let verified = result.unwrap();
self.chain.add_to_naive_aggregation_pool(&verified).unwrap();
}
for result in self
.chain
.batch_verify_aggregated_attestations_for_gossip(aggregated.into_iter())
.unwrap()
{
let verified = result.unwrap();
self.chain
.apply_attestation_to_fork_choice(&verified)
.unwrap();
self.chain.add_to_block_inclusion_pool(&verified).unwrap();
}
} }
pub fn set_current_slot(&self, slot: Slot) { pub fn set_current_slot(&self, slot: Slot) {

File diff suppressed because it is too large Load Diff

View File

@ -332,7 +332,7 @@ fn epoch_boundary_state_attestation_processing() {
let res = harness let res = harness
.chain .chain
.verify_unaggregated_attestation_for_gossip(attestation.clone(), Some(subnet_id)); .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id));
let current_slot = harness.chain.slot().expect("should get slot"); let current_slot = harness.chain.slot().expect("should get slot");
let expected_attestation_slot = attestation.data.slot; let expected_attestation_slot = attestation.data.slot;
@ -344,7 +344,7 @@ fn epoch_boundary_state_attestation_processing() {
{ {
checked_pre_fin = true; checked_pre_fin = true;
assert!(matches!( assert!(matches!(
res.err().unwrap().0, res.err().unwrap(),
AttnError::PastSlot { AttnError::PastSlot {
attestation_slot, attestation_slot,
earliest_permissible_slot, earliest_permissible_slot,

View File

@ -529,7 +529,7 @@ fn attestations_with_increasing_slots() {
for (attestation, subnet_id) in attestations.into_iter().flatten() { for (attestation, subnet_id) in attestations.into_iter().flatten() {
let res = harness let res = harness
.chain .chain
.verify_unaggregated_attestation_for_gossip(attestation.clone(), Some(subnet_id)); .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id));
let current_slot = harness.chain.slot().expect("should get slot"); let current_slot = harness.chain.slot().expect("should get slot");
let expected_attestation_slot = attestation.data.slot; let expected_attestation_slot = attestation.data.slot;
@ -538,7 +538,7 @@ fn attestations_with_increasing_slots() {
if expected_attestation_slot < expected_earliest_permissible_slot { if expected_attestation_slot < expected_earliest_permissible_slot {
assert!(matches!( assert!(matches!(
res.err().unwrap().0, res.err().unwrap(),
AttnError::PastSlot { AttnError::PastSlot {
attestation_slot, attestation_slot,
earliest_permissible_slot, earliest_permissible_slot,

View File

@ -16,7 +16,7 @@ mod validator_inclusion;
mod version; mod version;
use beacon_chain::{ use beacon_chain::{
attestation_verification::SignatureVerifiedAttestation, attestation_verification::VerifiedAttestation,
observed_operations::ObservationOutcome, observed_operations::ObservationOutcome,
validator_monitor::{get_block_delay_ms, timestamp_now}, validator_monitor::{get_block_delay_ms, timestamp_now},
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
@ -1066,7 +1066,7 @@ pub fn serve<T: BeaconChainTypes>(
for (index, attestation) in attestations.as_slice().iter().enumerate() { for (index, attestation) in attestations.as_slice().iter().enumerate() {
let attestation = match chain let attestation = match chain
.verify_unaggregated_attestation_for_gossip(attestation.clone(), None) .verify_unaggregated_attestation_for_gossip(attestation, None)
{ {
Ok(attestation) => attestation, Ok(attestation) => attestation,
Err(e) => { Err(e) => {
@ -1121,7 +1121,7 @@ pub fn serve<T: BeaconChainTypes>(
)); ));
}; };
if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { if let Err(e) = chain.add_to_naive_aggregation_pool(&attestation) {
error!(log, error!(log,
"Failure adding verified attestation to the naive aggregation pool"; "Failure adding verified attestation to the naive aggregation pool";
"error" => ?e, "error" => ?e,
@ -1958,7 +1958,7 @@ pub fn serve<T: BeaconChainTypes>(
let mut failures = Vec::new(); let mut failures = Vec::new();
// Verify that all messages in the post are valid before processing further // Verify that all messages in the post are valid before processing further
for (index, aggregate) in aggregates.into_iter().enumerate() { for (index, aggregate) in aggregates.iter().enumerate() {
match chain.verify_aggregated_attestation_for_gossip(aggregate) { match chain.verify_aggregated_attestation_for_gossip(aggregate) {
Ok(verified_aggregate) => { Ok(verified_aggregate) => {
messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new(
@ -1984,8 +1984,8 @@ pub fn serve<T: BeaconChainTypes>(
// It's reasonably likely that two different validators produce // It's reasonably likely that two different validators produce
// identical aggregates, especially if they're using the same beacon // identical aggregates, especially if they're using the same beacon
// node. // node.
Err((AttnError::AttestationAlreadyKnown(_), _)) => continue, Err(AttnError::AttestationAlreadyKnown(_)) => continue,
Err((e, aggregate)) => { Err(e) => {
error!(log, error!(log,
"Failure verifying aggregate and proofs"; "Failure verifying aggregate and proofs";
"error" => format!("{:?}", e), "error" => format!("{:?}", e),
@ -2017,7 +2017,7 @@ pub fn serve<T: BeaconChainTypes>(
); );
failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e)));
} }
if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { if let Err(e) = chain.add_to_block_inclusion_pool(&verified_aggregate) {
warn!(log, warn!(log,
"Could not add verified aggregate attestation to the inclusion pool"; "Could not add verified aggregate attestation to the inclusion pool";
"error" => format!("{:?}", e), "error" => format!("{:?}", e),

View File

@ -46,7 +46,8 @@ use eth2_libp2p::{
}; };
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use futures::task::Poll; use futures::task::Poll;
use slog::{debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
@ -70,7 +71,7 @@ mod tests;
mod work_reprocessing_queue; mod work_reprocessing_queue;
mod worker; mod worker;
pub use worker::ProcessId; pub use worker::{GossipAggregatePackage, GossipAttestationPackage, ProcessId};
/// The maximum size of the channel for work events to the `BeaconProcessor`. /// The maximum size of the channel for work events to the `BeaconProcessor`.
/// ///
@ -159,11 +160,27 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker";
/// The minimum interval between log messages indicating that a queue is full. /// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single
/// batch.
///
/// Choosing these values is difficult since there is a trade-off between:
///
/// - It is faster to verify one large batch than multiple smaller batches.
/// - "Poisoning" attacks have a larger impact as the batch size increases.
///
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
/// individually verifying each attestation signature.
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
/// Unique IDs used for metrics and testing. /// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed"; pub const WORKER_FREED: &str = "worker_freed";
pub const NOTHING_TO_DO: &str = "nothing_to_do"; pub const NOTHING_TO_DO: &str = "nothing_to_do";
pub const GOSSIP_ATTESTATION: &str = "gossip_attestation"; pub const GOSSIP_ATTESTATION: &str = "gossip_attestation";
pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const GOSSIP_BLOCK: &str = "gossip_block";
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
@ -564,6 +581,9 @@ pub enum Work<T: BeaconChainTypes> {
should_import: bool, should_import: bool,
seen_timestamp: Duration, seen_timestamp: Duration,
}, },
GossipAttestationBatch {
packages: Vec<GossipAttestationPackage<T::EthSpec>>,
},
GossipAggregate { GossipAggregate {
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
@ -576,6 +596,9 @@ pub enum Work<T: BeaconChainTypes> {
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>, aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
seen_timestamp: Duration, seen_timestamp: Duration,
}, },
GossipAggregateBatch {
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
},
GossipBlock { GossipBlock {
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
@ -644,7 +667,9 @@ impl<T: BeaconChainTypes> Work<T> {
fn str_id(&self) -> &'static str { fn str_id(&self) -> &'static str {
match self { match self {
Work::GossipAttestation { .. } => GOSSIP_ATTESTATION, Work::GossipAttestation { .. } => GOSSIP_ATTESTATION,
Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH,
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
Work::GossipBlock { .. } => GOSSIP_BLOCK, Work::GossipBlock { .. } => GOSSIP_BLOCK,
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
@ -922,10 +947,103 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// Check the aggregates, *then* the unaggregates since we assume that // Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us // aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time. // more information with less signature verification time.
} else if let Some(item) = aggregate_queue.pop() { } else if aggregate_queue.len() > 0 {
self.spawn_worker(item, toolbox); let batch_size =
} else if let Some(item) = attestation_queue.pop() { cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE);
self.spawn_worker(item, toolbox);
if batch_size < 2 {
// One single aggregate is in the queue, process it individually.
if let Some(item) = aggregate_queue.pop() {
self.spawn_worker(item, toolbox);
}
} else {
// Collect two or more aggregates into a batch, so they can take
// advantage of batch signature verification.
//
// Note: this will convert the `Work::GossipAggregate` item into a
// `Work::GossipAggregateBatch` item.
let mut packages = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(item) = aggregate_queue.pop() {
match item {
Work::GossipAggregate {
message_id,
peer_id,
aggregate,
seen_timestamp,
} => {
packages.push(GossipAggregatePackage::new(
message_id,
peer_id,
aggregate,
seen_timestamp,
));
}
_ => {
error!(self.log, "Invalid item in aggregate queue")
}
}
}
}
// Process all aggregates with a single worker.
self.spawn_worker(Work::GossipAggregateBatch { packages }, toolbox)
}
// Check the unaggregated attestation queue.
//
// Potentially use batching.
} else if attestation_queue.len() > 0 {
let batch_size = cmp::min(
attestation_queue.len(),
MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
);
if batch_size < 2 {
// One single attestation is in the queue, process it individually.
if let Some(item) = attestation_queue.pop() {
self.spawn_worker(item, toolbox);
}
} else {
// Collect two or more attestations into a batch, so they can take
// advantage of batch signature verification.
//
// Note: this will convert the `Work::GossipAttestation` item into a
// `Work::GossipAttestationBatch` item.
let mut packages = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(item) = attestation_queue.pop() {
match item {
Work::GossipAttestation {
message_id,
peer_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
} => {
packages.push(GossipAttestationPackage::new(
message_id,
peer_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
));
}
_ => error!(
self.log,
"Invalid item in attestation queue"
),
}
}
}
// Process all attestations with a single worker.
self.spawn_worker(
Work::GossipAttestationBatch { packages },
toolbox,
)
}
// Check sync committee messages after attestations as their rewards are lesser // Check sync committee messages after attestations as their rewards are lesser
// and they don't influence fork choice. // and they don't influence fork choice.
} else if let Some(item) = sync_contribution_queue.pop() { } else if let Some(item) = sync_contribution_queue.pop() {
@ -1009,7 +1127,21 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
match work { match work {
_ if can_spawn => self.spawn_worker(work, toolbox), _ if can_spawn => self.spawn_worker(work, toolbox),
Work::GossipAttestation { .. } => attestation_queue.push(work), Work::GossipAttestation { .. } => attestation_queue.push(work),
// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Work::GossipAttestationBatch { .. } => crit!(
self.log,
"Unsupported inbound event";
"type" => "GossipAttestationBatch"
),
Work::GossipAggregate { .. } => aggregate_queue.push(work), Work::GossipAggregate { .. } => aggregate_queue.push(work),
// Aggregate batches are formed internally within the `BeaconProcessor`,
// they are not sent from external services.
Work::GossipAggregateBatch { .. } => crit!(
self.log,
"Unsupported inbound event";
"type" => "GossipAggregateBatch"
),
Work::GossipBlock { .. } => { Work::GossipBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log) gossip_block_queue.push(work, work_id, &self.log)
} }
@ -1180,7 +1312,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
match work { match work {
/* /*
* Unaggregated attestation verification. * Individual unaggregated attestation verification.
*/ */
Work::GossipAttestation { Work::GossipAttestation {
message_id, message_id,
@ -1192,14 +1324,19 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
} => worker.process_gossip_attestation( } => worker.process_gossip_attestation(
message_id, message_id,
peer_id, peer_id,
*attestation, attestation,
subnet_id, subnet_id,
should_import, should_import,
Some(work_reprocessing_tx), Some(work_reprocessing_tx),
seen_timestamp, seen_timestamp,
), ),
/* /*
* Aggregated attestation verification. * Batched unaggregated attestation verification.
*/
Work::GossipAttestationBatch { packages } => worker
.process_gossip_attestation_batch(packages, Some(work_reprocessing_tx)),
/*
* Individual aggregated attestation verification.
*/ */
Work::GossipAggregate { Work::GossipAggregate {
message_id, message_id,
@ -1209,10 +1346,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
} => worker.process_gossip_aggregate( } => worker.process_gossip_aggregate(
message_id, message_id,
peer_id, peer_id,
*aggregate, aggregate,
Some(work_reprocessing_tx), Some(work_reprocessing_tx),
seen_timestamp, seen_timestamp,
), ),
/*
* Batched aggregated attestation verification.
*/
Work::GossipAggregateBatch { packages } => {
worker.process_gossip_aggregate_batch(packages, Some(work_reprocessing_tx))
}
/* /*
* Verification for beacon blocks received on gossip. * Verification for beacon blocks received on gossip.
*/ */
@ -1345,7 +1488,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
} => worker.process_gossip_attestation( } => worker.process_gossip_attestation(
message_id, message_id,
peer_id, peer_id,
*attestation, attestation,
subnet_id, subnet_id,
should_import, should_import,
None, // Do not allow this attestation to be re-processed beyond this point. None, // Do not allow this attestation to be re-processed beyond this point.
@ -1359,7 +1502,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
} => worker.process_gossip_aggregate( } => worker.process_gossip_aggregate(
message_id, message_id,
peer_id, peer_id,
*aggregate, aggregate,
None, None,
seen_timestamp, seen_timestamp,
), ),

View File

@ -1,22 +1,22 @@
use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{ use beacon_chain::{
attestation_verification::{Error as AttnError, SignatureVerifiedAttestation}, attestation_verification::{Error as AttnError, VerifiedAttestation},
observed_operations::ObservationOutcome, observed_operations::ObservationOutcome,
sync_committee_verification::Error as SyncCommitteeError, sync_committee_verification::Error as SyncCommitteeError,
validator_monitor::get_block_delay_ms, validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock,
}; };
use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use slog::{debug, error, info, trace, warn}; use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::Encode; use ssz::Encode;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{ use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
SyncCommitteeMessage, SyncSubnetId, SubnetId, SyncCommitteeMessage, SyncSubnetId,
}; };
use super::{ use super::{
@ -26,6 +26,60 @@ use super::{
Worker, Worker,
}; };
/// An attestation that has been validated by the `BeaconChain`.
///
/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to
/// construct this from components which have not passed `BeaconChain` validation.
struct VerifiedUnaggregate<T: BeaconChainTypes> {
attestation: Box<Attestation<T::EthSpec>>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
}
/// This implementation allows `Self` to be imported to fork choice and other functions on the
/// `BeaconChain`.
impl<'a, T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedUnaggregate<T> {
fn attestation(&self) -> &Attestation<T::EthSpec> {
&self.attestation
}
fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
&self.indexed_attestation
}
}
/// An attestation that failed validation by the `BeaconChain`.
struct RejectedUnaggregate<T: EthSpec> {
attestation: Box<Attestation<T>>,
error: AttnError,
}
/// An aggregate that has been validated by the `BeaconChain`.
///
/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to
/// construct this from components which have not passed `BeaconChain` validation.
struct VerifiedAggregate<T: BeaconChainTypes> {
signed_aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
}
/// This implementation allows `Self` to be imported to fork choice and other functions on the
/// `BeaconChain`.
impl<'a, T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedAggregate<T> {
fn attestation(&self) -> &Attestation<T::EthSpec> {
&self.signed_aggregate.message.aggregate
}
fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
&self.indexed_attestation
}
}
/// An attestation that failed validation by the `BeaconChain`.
struct RejectedAggregate<T: EthSpec> {
signed_aggregate: Box<SignedAggregateAndProof<T>>,
error: AttnError,
}
/// Data for an aggregated or unaggregated attestation that failed verification. /// Data for an aggregated or unaggregated attestation that failed verification.
enum FailedAtt<T: EthSpec> { enum FailedAtt<T: EthSpec> {
Unaggregate { Unaggregate {
@ -41,7 +95,7 @@ enum FailedAtt<T: EthSpec> {
} }
impl<T: EthSpec> FailedAtt<T> { impl<T: EthSpec> FailedAtt<T> {
pub fn root(&self) -> &Hash256 { pub fn beacon_block_root(&self) -> &Hash256 {
match self { match self {
FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root, FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root,
FailedAtt::Aggregate { attestation, .. } => { FailedAtt::Aggregate { attestation, .. } => {
@ -58,6 +112,66 @@ impl<T: EthSpec> FailedAtt<T> {
} }
} }
/// Items required to verify a batch of unaggregated gossip attestations.
#[derive(Debug)]
pub struct GossipAttestationPackage<E: EthSpec> {
message_id: MessageId,
peer_id: PeerId,
attestation: Box<Attestation<E>>,
subnet_id: SubnetId,
beacon_block_root: Hash256,
should_import: bool,
seen_timestamp: Duration,
}
impl<E: EthSpec> GossipAttestationPackage<E> {
pub fn new(
message_id: MessageId,
peer_id: PeerId,
attestation: Box<Attestation<E>>,
subnet_id: SubnetId,
should_import: bool,
seen_timestamp: Duration,
) -> Self {
Self {
message_id,
peer_id,
beacon_block_root: attestation.data.beacon_block_root,
attestation,
subnet_id,
should_import,
seen_timestamp,
}
}
}
/// Items required to verify a batch of aggregated gossip attestations.
#[derive(Debug)]
pub struct GossipAggregatePackage<E: EthSpec> {
message_id: MessageId,
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<E>>,
beacon_block_root: Hash256,
seen_timestamp: Duration,
}
impl<E: EthSpec> GossipAggregatePackage<E> {
pub fn new(
message_id: MessageId,
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<E>>,
seen_timestamp: Duration,
) -> Self {
Self {
message_id,
peer_id,
beacon_block_root: aggregate.message.aggregate.data.beacon_block_root,
aggregate,
seen_timestamp,
}
}
}
impl<T: BeaconChainTypes> Worker<T> { impl<T: BeaconChainTypes> Worker<T> {
/* Auxiliary functions */ /* Auxiliary functions */
@ -103,88 +217,200 @@ impl<T: BeaconChainTypes> Worker<T> {
self, self,
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
attestation: Attestation<T::EthSpec>, attestation: Box<Attestation<T::EthSpec>>,
subnet_id: SubnetId, subnet_id: SubnetId,
should_import: bool, should_import: bool,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>, reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
let beacon_block_root = attestation.data.beacon_block_root; let result = match self
let attestation = match self
.chain .chain
.verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id))
{ {
Ok(attestation) => attestation, Ok(verified_attestation) => Ok(VerifiedUnaggregate {
Err((e, attestation)) => { indexed_attestation: verified_attestation.into_indexed_attestation(),
self.handle_attestation_verification_failure( attestation,
peer_id, }),
message_id, Err(error) => Err(RejectedUnaggregate { attestation, error }),
FailedAtt::Unaggregate { };
attestation: Box::new(attestation),
subnet_id, self.process_gossip_attestation_result(
should_import, result,
seen_timestamp, message_id,
}, peer_id,
reprocess_tx, subnet_id,
e, reprocess_tx,
should_import,
seen_timestamp,
);
}
pub fn process_gossip_attestation_batch(
self,
packages: Vec<GossipAttestationPackage<T::EthSpec>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
) {
let attestations_and_subnets = packages
.iter()
.map(|package| (package.attestation.as_ref(), Some(package.subnet_id)));
let results = match self
.chain
.batch_verify_unaggregated_attestations_for_gossip(attestations_and_subnets)
{
Ok(results) => results,
Err(e) => {
error!(
self.log,
"Batch unagg. attn verification failed";
"error" => ?e
); );
return; return;
} }
}; };
// Register the attestation with any monitored validators. // Sanity check.
self.chain if results.len() != packages.len() {
.validator_monitor // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong
.read() // peer.
.register_gossip_unaggregated_attestation( crit!(
seen_timestamp, self.log,
attestation.indexed_attestation(), "Batch attestation result mismatch";
&self.chain.slot_clock, "results" => results.len(),
); "packages" => packages.len(),
)
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if !should_import {
return;
} }
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL); // Map the results into a new `Vec` so that `results` no longer holds a reference to
// `packages`.
#[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker.
let results = results
.into_iter()
.map(|result| result.map(|verified| verified.into_indexed_attestation()))
.collect::<Vec<_>>();
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) { for (result, package) in results.into_iter().zip(packages.into_iter()) {
match e { let result = match result {
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { Ok(indexed_attestation) => Ok(VerifiedUnaggregate {
indexed_attestation,
attestation: package.attestation,
}),
Err(error) => Err(RejectedUnaggregate {
attestation: package.attestation,
error,
}),
};
self.process_gossip_attestation_result(
result,
package.message_id,
package.peer_id,
package.subnet_id,
reprocess_tx.clone(),
package.should_import,
package.seen_timestamp,
);
}
}
// Clippy warning is is ignored since the arguments are all of a different type (i.e., they
// cant' be mixed-up) and creating a struct would result in more complexity.
#[allow(clippy::too_many_arguments)]
fn process_gossip_attestation_result(
&self,
result: Result<VerifiedUnaggregate<T>, RejectedUnaggregate<T::EthSpec>>,
message_id: MessageId,
peer_id: PeerId,
subnet_id: SubnetId,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
should_import: bool,
seen_timestamp: Duration,
) {
match result {
Ok(verified_attestation) => {
let indexed_attestation = &verified_attestation.indexed_attestation;
let beacon_block_root = indexed_attestation.data.beacon_block_root;
// Register the attestation with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_unaggregated_attestation(
seen_timestamp,
indexed_attestation,
&self.chain.slot_clock,
);
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if !should_import {
return;
}
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
if let Err(e) = self
.chain
.apply_attestation_to_fork_choice(&verified_attestation)
{
match e {
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(
e,
)) => {
debug!(
self.log,
"Attestation invalid for fork choice";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root
)
}
e => error!(
self.log,
"Error applying attestation to fork choice";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root
),
}
}
if let Err(e) = self
.chain
.add_to_naive_aggregation_pool(&verified_attestation)
{
debug!( debug!(
self.log, self.log,
"Attestation invalid for fork choice"; "Attestation invalid for agg pool";
"reason" => ?e, "reason" => ?e,
"peer" => %peer_id, "peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root "beacon_block_root" => ?beacon_block_root
) )
} }
e => error!(
self.log, metrics::inc_counter(
"Error applying attestation to fork choice"; &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
"reason" => ?e, );
"peer" => %peer_id, }
"beacon_block_root" => ?beacon_block_root Err(RejectedUnaggregate { attestation, error }) => {
), self.handle_attestation_verification_failure(
peer_id,
message_id,
FailedAtt::Unaggregate {
attestation,
subnet_id,
should_import,
seen_timestamp,
},
reprocess_tx,
error,
);
} }
} }
if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) {
debug!(
self.log,
"Attestation invalid for agg pool";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root
)
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL);
} }
/// Process the aggregated attestation received from the gossip network and: /// Process the aggregated attestation received from the gossip network and:
@ -198,82 +424,191 @@ impl<T: BeaconChainTypes> Worker<T> {
self, self,
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
aggregate: SignedAggregateAndProof<T::EthSpec>, aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>, reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
let aggregate = match self let result = match self
.chain .chain
.verify_aggregated_attestation_for_gossip(aggregate) .verify_aggregated_attestation_for_gossip(&aggregate)
{ {
Ok(aggregate) => aggregate, Ok(verified_aggregate) => Ok(VerifiedAggregate {
Err((e, attestation)) => { indexed_attestation: verified_aggregate.into_indexed_attestation(),
// Report the failure to gossipsub signed_aggregate: aggregate,
self.handle_attestation_verification_failure( }),
peer_id, Err(error) => Err(RejectedAggregate {
message_id, signed_aggregate: aggregate,
FailedAtt::Aggregate { error,
attestation: Box::new(attestation), }),
seen_timestamp, };
},
reprocess_tx, self.process_gossip_aggregate_result(
e, result,
beacon_block_root,
message_id,
peer_id,
reprocess_tx,
seen_timestamp,
);
}
pub fn process_gossip_aggregate_batch(
self,
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
) {
let aggregates = packages.iter().map(|package| package.aggregate.as_ref());
let results = match self
.chain
.batch_verify_aggregated_attestations_for_gossip(aggregates)
{
Ok(results) => results,
Err(e) => {
error!(
self.log,
"Batch agg. attn verification failed";
"error" => ?e
); );
return; return;
} }
}; };
// Indicate to the `Network` service that this message is valid and can be // Sanity check.
// propagated on the gossip network. if results.len() != packages.len() {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong
// peer.
crit!(
self.log,
"Batch agg. attestation result mismatch";
"results" => results.len(),
"packages" => packages.len(),
)
}
// Register the attestation with any monitored validators. // Map the results into a new `Vec` so that `results` no longer holds a reference to
self.chain // `packages`.
.validator_monitor #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker.
.read() let results = results
.register_gossip_aggregated_attestation( .into_iter()
seen_timestamp, .map(|result| result.map(|verified| verified.into_indexed_attestation()))
aggregate.aggregate(), .collect::<Vec<_>>();
aggregate.indexed_attestation(),
&self.chain.slot_clock, for (result, package) in results.into_iter().zip(packages.into_iter()) {
let result = match result {
Ok(indexed_attestation) => Ok(VerifiedAggregate {
indexed_attestation,
signed_aggregate: package.aggregate,
}),
Err(error) => Err(RejectedAggregate {
signed_aggregate: package.aggregate,
error,
}),
};
self.process_gossip_aggregate_result(
result,
package.beacon_block_root,
package.message_id,
package.peer_id,
reprocess_tx.clone(),
package.seen_timestamp,
); );
}
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); fn process_gossip_aggregate_result(
&self,
result: Result<VerifiedAggregate<T>, RejectedAggregate<T::EthSpec>>,
beacon_block_root: Hash256,
message_id: MessageId,
peer_id: PeerId,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration,
) {
match result {
Ok(verified_aggregate) => {
let aggregate = &verified_aggregate.signed_aggregate;
let indexed_attestation = &verified_aggregate.indexed_attestation;
if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) { // Indicate to the `Network` service that this message is valid and can be
match e { // propagated on the gossip network.
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
// Register the attestation with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_aggregated_attestation(
seen_timestamp,
aggregate,
indexed_attestation,
&self.chain.slot_clock,
);
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
if let Err(e) = self
.chain
.apply_attestation_to_fork_choice(&verified_aggregate)
{
match e {
BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(
e,
)) => {
debug!(
self.log,
"Aggregate invalid for fork choice";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root
)
}
e => error!(
self.log,
"Error applying aggregate to fork choice";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root
),
}
}
if let Err(e) = self.chain.add_to_block_inclusion_pool(&verified_aggregate) {
debug!( debug!(
self.log, self.log,
"Aggregate invalid for fork choice"; "Attestation invalid for op pool";
"reason" => ?e, "reason" => ?e,
"peer" => %peer_id, "peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root "beacon_block_root" => ?beacon_block_root
) )
} }
e => error!(
self.log, metrics::inc_counter(
"Error applying aggregate to fork choice"; &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
"reason" => ?e, );
"peer" => %peer_id, }
"beacon_block_root" => ?beacon_block_root Err(RejectedAggregate {
), signed_aggregate,
error,
}) => {
// Report the failure to gossipsub
self.handle_attestation_verification_failure(
peer_id,
message_id,
FailedAtt::Aggregate {
attestation: signed_aggregate,
seen_timestamp,
},
reprocess_tx,
error,
);
} }
} }
if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) {
debug!(
self.log,
"Attestation invalid for op pool";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root
)
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL);
} }
/// Process the beacon block received from the gossip network and: /// Process the beacon block received from the gossip network and:
@ -834,7 +1169,7 @@ impl<T: BeaconChainTypes> Worker<T> {
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>, reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
error: AttnError, error: AttnError,
) { ) {
let beacon_block_root = failed_att.root(); let beacon_block_root = failed_att.beacon_block_root();
let attestation_type = failed_att.kind(); let attestation_type = failed_att.kind();
metrics::register_attestation_error(&error); metrics::register_attestation_error(&error);
match &error { match &error {

View File

@ -9,6 +9,7 @@ mod gossip_methods;
mod rpc_methods; mod rpc_methods;
mod sync_methods; mod sync_methods;
pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage};
pub use sync_methods::ProcessId; pub use sync_methods::ProcessId;
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;

View File

@ -413,7 +413,7 @@ impl ForkChoiceTest {
let mut verified_attestation = self let mut verified_attestation = self
.harness .harness
.chain .chain
.verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id))
.expect("precondition: should gossip verify attestation"); .expect("precondition: should gossip verify attestation");
if let MutationDelay::Blocks(slots) = delay { if let MutationDelay::Blocks(slots) = delay {