diff --git a/Cargo.lock b/Cargo.lock index 2b68136b5..6efffd62e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -665,6 +665,7 @@ dependencies = [ "tokio", "tokio-stream", "tree_hash", + "tree_hash_derive", "types", "unused_port", ] @@ -2343,6 +2344,8 @@ dependencies = [ "libsecp256k1", "lighthouse_network", "mediatype", + "mime", + "pretty_reqwest_error", "procinfo", "proto_array", "psutil", @@ -2353,6 +2356,7 @@ dependencies = [ "serde_json", "slashing_protection", "store", + "tokio", "types", ] @@ -2795,6 +2799,7 @@ dependencies = [ "lru 0.7.8", "mev-rs", "parking_lot 0.12.1", + "pretty_reqwest_error", "rand 0.8.5", "reqwest", "sensitive_url", @@ -6275,6 +6280,14 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "pretty_reqwest_error" +version = "0.1.0" +dependencies = [ + "reqwest", + "sensitive_url", +] + [[package]] name = "prettyplease" version = "0.1.25" @@ -7925,7 +7938,8 @@ dependencies = [ [[package]] name = "ssz_types" version = "0.5.3" -source = "git+https://github.com/sigp/ssz_types?rev=63a80d04286c8561d5c211230a21bf1299d66059#63a80d04286c8561d5c211230a21bf1299d66059" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e43767964a80b2fdeda7a79a57a2b6cbca966688d5b81da8fe91140a94f552a1" dependencies = [ "arbitrary", "derivative", diff --git a/Cargo.toml b/Cargo.toml index 092ccf32f..bd2561351 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "common/lru_cache", "common/malloc_utils", "common/oneshot_broadcast", + "common/pretty_reqwest_error", "common/sensitive_url", "common/slot_clock", "common/system_health", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 504f4114a..e00b44301 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -35,9 +35,10 @@ sloggers = { version = "2.1.1", features = ["json"] } slot_clock = { path = "../../common/slot_clock" } ethereum_hashing = "1.0.0-beta.2" ethereum_ssz = "0.5.0" -ssz_types = "0.5.0" +ssz_types = "0.5.3" ethereum_ssz_derive = "0.5.0" state_processing = { path = "../../consensus/state_processing" } +tree_hash_derive = "0.5.0" tree_hash = "0.5.0" types = { path = "../../consensus/types" } tokio = "1.14.0" diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 04f601fad..6df0758b2 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -117,14 +117,14 @@ pub enum Error { /// /// The peer has sent an invalid message. AggregatorPubkeyUnknown(u64), - /// The attestation has been seen before; either in a block, on the gossip network or from a - /// local validator. + /// The attestation or a superset of this attestation's aggregations bits for the same data + /// has been seen before; either in a block, on the gossip network or from a local validator. /// /// ## Peer scoring /// /// It's unclear if this attestation is valid, however we have already observed it and do not /// need to observe it again. - AttestationAlreadyKnown(Hash256), + AttestationSupersetKnown(Hash256), /// There has already been an aggregation observed for this validator, we refuse to process a /// second. /// @@ -268,7 +268,7 @@ enum CheckAttestationSignature { struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> { signed_aggregate: &'a SignedAggregateAndProof, indexed_attestation: IndexedAttestation, - attestation_root: Hash256, + attestation_data_root: Hash256, } /// Wraps a `Attestation` that has been verified up until the point that an `IndexedAttestation` can @@ -467,14 +467,17 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { } // Ensure the valid aggregated attestation has not already been seen locally. - let attestation_root = attestation.tree_hash_root(); + let attestation_data = &attestation.data; + let attestation_data_root = attestation_data.tree_hash_root(); + if chain .observed_attestations .write() - .is_known(attestation, attestation_root) + .is_known_subset(attestation, attestation_data_root) .map_err(|e| Error::BeaconChainError(e.into()))? { - return Err(Error::AttestationAlreadyKnown(attestation_root)); + metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_SUBSETS); + return Err(Error::AttestationSupersetKnown(attestation_data_root)); } let aggregator_index = signed_aggregate.message.aggregator_index; @@ -520,7 +523,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { if attestation.aggregation_bits.is_zero() { Err(Error::EmptyAggregationBitfield) } else { - Ok(attestation_root) + Ok(attestation_data_root) } } @@ -533,7 +536,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { let attestation = &signed_aggregate.message.aggregate; let aggregator_index = signed_aggregate.message.aggregator_index; - let attestation_root = match Self::verify_early_checks(signed_aggregate, chain) { + let attestation_data_root = match Self::verify_early_checks(signed_aggregate, chain) { Ok(root) => root, Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)), }; @@ -568,7 +571,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { Ok(IndexedAggregatedAttestation { signed_aggregate, indexed_attestation, - attestation_root, + attestation_data_root, }) } } @@ -577,7 +580,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { /// Run the checks that happen after the indexed attestation and signature have been checked. fn verify_late_checks( signed_aggregate: &SignedAggregateAndProof, - attestation_root: Hash256, + attestation_data_root: Hash256, chain: &BeaconChain, ) -> Result<(), Error> { let attestation = &signed_aggregate.message.aggregate; @@ -587,13 +590,14 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { // // It's important to double check that the attestation is not already known, otherwise two // attestations processed at the same time could be published. - if let ObserveOutcome::AlreadyKnown = chain + if let ObserveOutcome::Subset = chain .observed_attestations .write() - .observe_item(attestation, Some(attestation_root)) + .observe_item(attestation, Some(attestation_data_root)) .map_err(|e| Error::BeaconChainError(e.into()))? { - return Err(Error::AttestationAlreadyKnown(attestation_root)); + metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_SUBSETS); + return Err(Error::AttestationSupersetKnown(attestation_data_root)); } // Observe the aggregator so we don't process another aggregate from them. @@ -653,7 +657,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { let IndexedAggregatedAttestation { signed_aggregate, indexed_attestation, - attestation_root, + attestation_data_root, } = signed_aggregate; match check_signature { @@ -677,7 +681,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { CheckAttestationSignature::No => (), }; - if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_root, chain) { + if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_data_root, chain) { return Err(SignatureValid(indexed_attestation, e)); } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 40f24af77..4dde5d36f 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1016,6 +1016,17 @@ lazy_static! { "light_client_optimistic_update_verification_success_total", "Number of light client optimistic updates verified for gossip" ); + /* + * Aggregate subset metrics + */ + pub static ref SYNC_CONTRIBUTION_SUBSETS: Result = try_create_int_counter( + "beacon_sync_contribution_subsets_total", + "Count of new sync contributions that are subsets of already known aggregates" + ); + pub static ref AGGREGATED_ATTESTATION_SUBSETS: Result = try_create_int_counter( + "beacon_aggregated_attestation_subsets_total", + "Count of new aggregated attestations that are subsets of already known aggregates" + ); } /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, diff --git a/beacon_node/beacon_chain/src/observed_aggregates.rs b/beacon_node/beacon_chain/src/observed_aggregates.rs index bb0132f5f..18a761e29 100644 --- a/beacon_node/beacon_chain/src/observed_aggregates.rs +++ b/beacon_node/beacon_chain/src/observed_aggregates.rs @@ -1,7 +1,9 @@ //! Provides an `ObservedAggregates` struct which allows us to reject aggregated attestations or //! sync committee contributions if we've already seen them. -use std::collections::HashSet; +use crate::sync_committee_verification::SyncCommitteeData; +use ssz_types::{BitList, BitVector}; +use std::collections::HashMap; use std::marker::PhantomData; use tree_hash::TreeHash; use types::consts::altair::{ @@ -10,8 +12,16 @@ use types::consts::altair::{ use types::slot_data::SlotData; use types::{Attestation, EthSpec, Hash256, Slot, SyncCommitteeContribution}; -pub type ObservedSyncContributions = ObservedAggregates, E>; -pub type ObservedAggregateAttestations = ObservedAggregates, E>; +pub type ObservedSyncContributions = ObservedAggregates< + SyncCommitteeContribution, + E, + BitVector<::SyncSubcommitteeSize>, +>; +pub type ObservedAggregateAttestations = ObservedAggregates< + Attestation, + E, + BitList<::MaxValidatorsPerCommittee>, +>; /// A trait use to associate capacity constants with the type being stored in `ObservedAggregates`. pub trait Consts { @@ -69,10 +79,81 @@ impl Consts for SyncCommitteeContribution { } } +/// A trait for types that implement a behaviour where one object of that type +/// can be a subset/superset of another. +/// This trait allows us to be generic over the aggregate item that we store in the cache that +/// we want to prevent duplicates/subsets for. +pub trait SubsetItem { + /// The item that is stored for later comparison with new incoming aggregate items. + type Item; + + /// Returns `true` if `self` is a non-strict subset of `other` and `false` otherwise. + fn is_subset(&self, other: &Self::Item) -> bool; + + /// Returns `true` if `self` is a non-strict superset of `other` and `false` otherwise. + fn is_superset(&self, other: &Self::Item) -> bool; + + /// Returns the item that gets stored in `ObservedAggregates` for later subset + /// comparison with incoming aggregates. + fn get_item(&self) -> Self::Item; + + /// Returns a unique value that keys the object to the item that is being stored + /// in `ObservedAggregates`. + fn root(&self) -> Hash256; +} + +impl SubsetItem for Attestation { + type Item = BitList; + fn is_subset(&self, other: &Self::Item) -> bool { + self.aggregation_bits.is_subset(other) + } + + fn is_superset(&self, other: &Self::Item) -> bool { + other.is_subset(&self.aggregation_bits) + } + + /// Returns the sync contribution aggregation bits. + fn get_item(&self) -> Self::Item { + self.aggregation_bits.clone() + } + + /// Returns the hash tree root of the attestation data. + fn root(&self) -> Hash256 { + self.data.tree_hash_root() + } +} + +impl SubsetItem for SyncCommitteeContribution { + type Item = BitVector; + fn is_subset(&self, other: &Self::Item) -> bool { + self.aggregation_bits.is_subset(other) + } + + fn is_superset(&self, other: &Self::Item) -> bool { + other.is_subset(&self.aggregation_bits) + } + + /// Returns the sync contribution aggregation bits. + fn get_item(&self) -> Self::Item { + self.aggregation_bits.clone() + } + + /// Returns the hash tree root of the root, slot and subcommittee index + /// of the sync contribution. + fn root(&self) -> Hash256 { + SyncCommitteeData { + root: self.beacon_block_root, + slot: self.slot, + subcommittee_index: self.subcommittee_index, + } + .tree_hash_root() + } +} + #[derive(Debug, PartialEq)] pub enum ObserveOutcome { - /// This item was already known. - AlreadyKnown, + /// This item is a non-strict subset of an already known item. + Subset, /// This was the first time this item was observed. New, } @@ -94,26 +175,28 @@ pub enum Error { }, } -/// A `HashSet` that contains entries related to some `Slot`. -struct SlotHashSet { - set: HashSet, +/// A `HashMap` that contains entries related to some `Slot`. +struct SlotHashSet { + /// Contains a vector of maximally-sized aggregation bitfields/bitvectors + /// such that no bitfield/bitvector is a subset of any other in the list. + map: HashMap>, slot: Slot, max_capacity: usize, } -impl SlotHashSet { +impl SlotHashSet { pub fn new(slot: Slot, initial_capacity: usize, max_capacity: usize) -> Self { Self { slot, - set: HashSet::with_capacity(initial_capacity), + map: HashMap::with_capacity(initial_capacity), max_capacity, } } /// Store the items in self so future observations recognise its existence. - pub fn observe_item( + pub fn observe_item>( &mut self, - item: &T, + item: &S, root: Hash256, ) -> Result { if item.get_slot() != self.slot { @@ -123,29 +206,45 @@ impl SlotHashSet { }); } - if self.set.contains(&root) { - Ok(ObserveOutcome::AlreadyKnown) - } else { - // Here we check to see if this slot has reached the maximum observation count. - // - // The resulting behaviour is that we are no longer able to successfully observe new - // items, however we will continue to return `is_known` values. We could also - // disable `is_known`, however then we would stop forwarding items across the - // gossip network and I think that this is a worse case than sending some invalid ones. - // The underlying libp2p network is responsible for removing duplicate messages, so - // this doesn't risk a broadcast loop. - if self.set.len() >= self.max_capacity { - return Err(Error::ReachedMaxObservationsPerSlot(self.max_capacity)); + if let Some(aggregates) = self.map.get_mut(&root) { + for existing in aggregates { + // Check if `item` is a subset of any of the observed aggregates + if item.is_subset(existing) { + return Ok(ObserveOutcome::Subset); + // Check if `item` is a superset of any of the observed aggregates + // If true, we replace the new item with its existing subset. This allows us + // to hold fewer items in the list. + } else if item.is_superset(existing) { + *existing = item.get_item(); + return Ok(ObserveOutcome::New); + } } - - self.set.insert(root); - - Ok(ObserveOutcome::New) } + + // Here we check to see if this slot has reached the maximum observation count. + // + // The resulting behaviour is that we are no longer able to successfully observe new + // items, however we will continue to return `is_known_subset` values. We could also + // disable `is_known_subset`, however then we would stop forwarding items across the + // gossip network and I think that this is a worse case than sending some invalid ones. + // The underlying libp2p network is responsible for removing duplicate messages, so + // this doesn't risk a broadcast loop. + if self.map.len() >= self.max_capacity { + return Err(Error::ReachedMaxObservationsPerSlot(self.max_capacity)); + } + + let item = item.get_item(); + self.map.entry(root).or_default().push(item); + Ok(ObserveOutcome::New) } - /// Indicates if `item` has been observed before. - pub fn is_known(&self, item: &T, root: Hash256) -> Result { + /// Check if `item` is a non-strict subset of any of the already observed aggregates for + /// the given root and slot. + pub fn is_known_subset>( + &self, + item: &S, + root: Hash256, + ) -> Result { if item.get_slot() != self.slot { return Err(Error::IncorrectSlot { expected: self.slot, @@ -153,25 +252,28 @@ impl SlotHashSet { }); } - Ok(self.set.contains(&root)) + Ok(self + .map + .get(&root) + .map_or(false, |agg| agg.iter().any(|val| item.is_subset(val)))) } /// The number of observed items in `self`. pub fn len(&self) -> usize { - self.set.len() + self.map.len() } } /// Stores the roots of objects for some number of `Slots`, so we can determine if /// these have previously been seen on the network. -pub struct ObservedAggregates { +pub struct ObservedAggregates { lowest_permissible_slot: Slot, - sets: Vec, + sets: Vec>, _phantom_spec: PhantomData, _phantom_tree_hash: PhantomData, } -impl Default for ObservedAggregates { +impl Default for ObservedAggregates { fn default() -> Self { Self { lowest_permissible_slot: Slot::new(0), @@ -182,17 +284,17 @@ impl Default for ObservedAggregates } } -impl ObservedAggregates { - /// Store the root of `item` in `self`. +impl, E: EthSpec, I> ObservedAggregates { + /// Store `item` in `self` keyed at `root`. /// - /// `root` must equal `item.tree_hash_root()`. + /// `root` must equal `item.root::()`. pub fn observe_item( &mut self, item: &T, root_opt: Option, ) -> Result { let index = self.get_set_index(item.get_slot())?; - let root = root_opt.unwrap_or_else(|| item.tree_hash_root()); + let root = root_opt.unwrap_or_else(|| item.root()); self.sets .get_mut(index) @@ -200,17 +302,18 @@ impl ObservedAggregates { .and_then(|set| set.observe_item(item, root)) } - /// Check to see if the `root` of `item` is in self. + /// Check if `item` is a non-strict subset of any of the already observed aggregates for + /// the given root and slot. /// - /// `root` must equal `a.tree_hash_root()`. + /// `root` must equal `item.root::()`. #[allow(clippy::wrong_self_convention)] - pub fn is_known(&mut self, item: &T, root: Hash256) -> Result { + pub fn is_known_subset(&mut self, item: &T, root: Hash256) -> Result { let index = self.get_set_index(item.get_slot())?; self.sets .get(index) .ok_or(Error::InvalidSetIndex(index)) - .and_then(|set| set.is_known(item, root)) + .and_then(|set| set.is_known_subset(item, root)) } /// The maximum number of slots that items are stored for. @@ -296,7 +399,6 @@ impl ObservedAggregates { #[cfg(not(debug_assertions))] mod tests { use super::*; - use tree_hash::TreeHash; use types::{test_utils::test_random_instance, Hash256}; type E = types::MainnetEthSpec; @@ -330,7 +432,7 @@ mod tests { for a in &items { assert_eq!( - store.is_known(a, a.tree_hash_root()), + store.is_known_subset(a, a.root()), Ok(false), "should indicate an unknown attestation is unknown" ); @@ -343,13 +445,13 @@ mod tests { for a in &items { assert_eq!( - store.is_known(a, a.tree_hash_root()), + store.is_known_subset(a, a.root()), Ok(true), "should indicate a known attestation is known" ); assert_eq!( - store.observe_item(a, Some(a.tree_hash_root())), - Ok(ObserveOutcome::AlreadyKnown), + store.observe_item(a, Some(a.root())), + Ok(ObserveOutcome::Subset), "should acknowledge an existing attestation" ); } diff --git a/beacon_node/beacon_chain/src/sync_committee_verification.rs b/beacon_node/beacon_chain/src/sync_committee_verification.rs index 14cdc2400..246bb12cc 100644 --- a/beacon_node/beacon_chain/src/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/src/sync_committee_verification.rs @@ -37,6 +37,7 @@ use bls::{verify_signature_sets, PublicKeyBytes}; use derivative::Derivative; use safe_arith::ArithError; use slot_clock::SlotClock; +use ssz_derive::{Decode, Encode}; use state_processing::per_block_processing::errors::SyncCommitteeMessageValidationError; use state_processing::signature_sets::{ signed_sync_aggregate_selection_proof_signature_set, signed_sync_aggregate_signature_set, @@ -47,6 +48,7 @@ use std::borrow::Cow; use std::collections::HashMap; use strum::AsRefStr; use tree_hash::TreeHash; +use tree_hash_derive::TreeHash; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; use types::slot_data::SlotData; use types::sync_committee::Error as SyncCommitteeError; @@ -110,14 +112,14 @@ pub enum Error { /// /// The peer has sent an invalid message. AggregatorPubkeyUnknown(u64), - /// The sync contribution has been seen before; either in a block, on the gossip network or from a - /// local validator. + /// The sync contribution or a superset of this sync contribution's aggregation bits for the same data + /// has been seen before; either in a block on the gossip network or from a local validator. /// /// ## Peer scoring /// /// It's unclear if this sync contribution is valid, however we have already observed it and do not /// need to observe it again. - SyncContributionAlreadyKnown(Hash256), + SyncContributionSupersetKnown(Hash256), /// There has already been an aggregation observed for this validator, we refuse to process a /// second. /// @@ -268,6 +270,14 @@ pub struct VerifiedSyncContribution { participant_pubkeys: Vec, } +/// The sync contribution data. +#[derive(Encode, Decode, TreeHash)] +pub struct SyncCommitteeData { + pub slot: Slot, + pub root: Hash256, + pub subcommittee_index: u64, +} + /// Wraps a `SyncCommitteeMessage` that has been verified for propagation on the gossip network. #[derive(Clone)] pub struct VerifiedSyncCommitteeMessage { @@ -314,15 +324,22 @@ impl VerifiedSyncContribution { return Err(Error::AggregatorNotInCommittee { aggregator_index }); }; - // Ensure the valid sync contribution has not already been seen locally. - let contribution_root = contribution.tree_hash_root(); + // Ensure the valid sync contribution or its superset has not already been seen locally. + let contribution_data_root = SyncCommitteeData { + slot: contribution.slot, + root: contribution.beacon_block_root, + subcommittee_index: contribution.subcommittee_index, + } + .tree_hash_root(); + if chain .observed_sync_contributions .write() - .is_known(contribution, contribution_root) + .is_known_subset(contribution, contribution_data_root) .map_err(|e| Error::BeaconChainError(e.into()))? { - return Err(Error::SyncContributionAlreadyKnown(contribution_root)); + metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_SUBSETS); + return Err(Error::SyncContributionSupersetKnown(contribution_data_root)); } // Ensure there has been no other observed aggregate for the given `aggregator_index`. @@ -376,13 +393,14 @@ impl VerifiedSyncContribution { // // It's important to double check that the contribution is not already known, otherwise two // contribution processed at the same time could be published. - if let ObserveOutcome::AlreadyKnown = chain + if let ObserveOutcome::Subset = chain .observed_sync_contributions .write() - .observe_item(contribution, Some(contribution_root)) + .observe_item(contribution, Some(contribution_data_root)) .map_err(|e| Error::BeaconChainError(e.into()))? { - return Err(Error::SyncContributionAlreadyKnown(contribution_root)); + metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_SUBSETS); + return Err(Error::SyncContributionSupersetKnown(contribution_data_root)); } // Observe the aggregator so we don't process another aggregate from them. diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 1040521e5..5cea51090 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -699,8 +699,8 @@ async fn aggregated_gossip_verification() { |tester, err| { assert!(matches!( err, - AttnError::AttestationAlreadyKnown(hash) - if hash == tester.valid_aggregate.message.aggregate.tree_hash_root() + AttnError::AttestationSupersetKnown(hash) + if hash == tester.valid_aggregate.message.aggregate.data.tree_hash_root() )) }, ) diff --git a/beacon_node/beacon_chain/tests/sync_committee_verification.rs b/beacon_node/beacon_chain/tests/sync_committee_verification.rs index 4204a5121..0e4745ff6 100644 --- a/beacon_node/beacon_chain/tests/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/tests/sync_committee_verification.rs @@ -1,6 +1,6 @@ #![cfg(not(debug_assertions))] -use beacon_chain::sync_committee_verification::Error as SyncCommitteeError; +use beacon_chain::sync_committee_verification::{Error as SyncCommitteeError, SyncCommitteeData}; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType, RelativeSyncCommittee}; use int_to_bytes::int_to_bytes32; use lazy_static::lazy_static; @@ -444,11 +444,17 @@ async fn aggregated_gossip_verification() { * subcommittee index contribution.subcommittee_index. */ + let contribution = &valid_aggregate.message.contribution; + let sync_committee_data = SyncCommitteeData { + slot: contribution.slot, + root: contribution.beacon_block_root, + subcommittee_index: contribution.subcommittee_index, + }; assert_invalid!( "aggregate that has already been seen", valid_aggregate.clone(), - SyncCommitteeError::SyncContributionAlreadyKnown(hash) - if hash == valid_aggregate.message.contribution.tree_hash_root() + SyncCommitteeError::SyncContributionSupersetKnown(hash) + if hash == sync_committee_data.tree_hash_root() ); /* diff --git a/beacon_node/builder_client/src/lib.rs b/beacon_node/builder_client/src/lib.rs index 255c2fdd1..c78f686d0 100644 --- a/beacon_node/builder_client/src/lib.rs +++ b/beacon_node/builder_client/src/lib.rs @@ -72,7 +72,7 @@ impl BuilderHttpClient { .await? .json() .await - .map_err(Error::Reqwest) + .map_err(Into::into) } /// Perform a HTTP GET request, returning the `Response` for further processing. @@ -85,7 +85,7 @@ impl BuilderHttpClient { if let Some(timeout) = timeout { builder = builder.timeout(timeout); } - let response = builder.send().await.map_err(Error::Reqwest)?; + let response = builder.send().await.map_err(Error::from)?; ok_or_error(response).await } @@ -114,7 +114,7 @@ impl BuilderHttpClient { if let Some(timeout) = timeout { builder = builder.timeout(timeout); } - let response = builder.json(body).send().await.map_err(Error::Reqwest)?; + let response = builder.json(body).send().await.map_err(Error::from)?; ok_or_error(response).await } diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index f561c972f..c27ed4c65 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -23,7 +23,7 @@ bytes = "1.1.0" task_executor = { path = "../../common/task_executor" } hex = "0.4.2" ethereum_ssz = "0.5.0" -ssz_types = "0.5.0" +ssz_types = "0.5.3" eth2 = { path = "../../common/eth2" } kzg = { path = "../../crypto/kzg" } state_processing = { path = "../../consensus/state_processing" } @@ -51,3 +51,4 @@ keccak-hash = "0.10.0" hash256-std-hasher = "0.15.2" triehash = "0.8.4" hash-db = "0.15.2" +pretty_reqwest_error = { path = "../../common/pretty_reqwest_error" } \ No newline at end of file diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 0a5d155f5..7528d232f 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -11,6 +11,7 @@ pub use ethers_core::types::Transaction; use ethers_core::utils::rlp::{self, Decodable, Rlp}; use http::deposit_methods::RpcError; pub use json_structures::{JsonWithdrawal, TransitionConfigurationV1}; +use pretty_reqwest_error::PrettyReqwestError; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; @@ -35,7 +36,7 @@ pub type PayloadId = [u8; 8]; #[derive(Debug)] pub enum Error { - Reqwest(reqwest::Error), + HttpClient(PrettyReqwestError), Auth(auth::Error), BadResponse(String), RequestFailed(String), @@ -70,7 +71,7 @@ impl From for Error { ) { Error::Auth(auth::Error::InvalidToken) } else { - Error::Reqwest(e) + Error::HttpClient(e.into()) } } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 3edc6aa30..a4fea8f5f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2882,7 +2882,7 @@ pub fn serve( // It's reasonably likely that two different validators produce // identical aggregates, especially if they're using the same beacon // node. - Err(AttnError::AttestationAlreadyKnown(_)) => continue, + Err(AttnError::AttestationSupersetKnown(_)) => continue, // If we've already seen this aggregator produce an aggregate, just // skip this one. // diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index c728fbeb1..07dfb5c98 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -304,7 +304,7 @@ pub fn process_signed_contribution_and_proofs( } // If we already know the contribution, don't broadcast it or attempt to // further verify it. Return success. - Err(SyncVerificationError::SyncContributionAlreadyKnown(_)) => continue, + Err(SyncVerificationError::SyncContributionSupersetKnown(_)) => continue, // If we've already seen this aggregator produce an aggregate, just // skip this one. // diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index ca15b5ef2..6d056d835 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" discv5 = { version = "0.3.0", features = ["libp2p"]} unsigned-varint = { version = "0.6.0", features = ["codec"] } types = { path = "../../consensus/types" } -ssz_types = "0.5.0" +ssz_types = "0.5.3" serde = { version = "1.0.116", features = ["derive"] } serde_derive = "1.0.116" ethereum_ssz = "0.5.0" diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 3673a8a09..cb6d1116c 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -22,7 +22,7 @@ slot_clock = { path = "../../common/slot_clock" } slog = { version = "2.5.2", features = ["max_level_trace", "nested-values"] } hex = "0.4.2" ethereum_ssz = "0.5.0" -ssz_types = "0.5.0" +ssz_types = "0.5.3" futures = "0.3.7" error-chain = "0.12.4" tokio = { version = "1.14.0", features = ["full"] } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index a42378c1d..582f403cc 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1893,7 +1893,7 @@ impl Worker { "attn_agg_not_in_committee", ); } - AttnError::AttestationAlreadyKnown { .. } => { + AttnError::AttestationSupersetKnown { .. } => { /* * The aggregate attestation has already been observed on the network or in * a block. @@ -2405,7 +2405,7 @@ impl Worker { "sync_bad_aggregator", ); } - SyncCommitteeError::SyncContributionAlreadyKnown(_) + SyncCommitteeError::SyncContributionSupersetKnown(_) | SyncCommitteeError::AggregatorAlreadyKnown(_) => { /* * The sync committee message already been observed on the network or in diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 4eabd3ff8..d8e1a375f 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -27,6 +27,11 @@ futures = "0.3.8" store = { path = "../../beacon_node/store", optional = true } slashing_protection = { path = "../../validator_client/slashing_protection", optional = true } mediatype = "0.19.13" +mime = "0.3.16" +pretty_reqwest_error = { path = "../../common/pretty_reqwest_error" } + +[dev-dependencies] +tokio = { version = "1.14.0", features = ["full"] } [target.'cfg(target_os = "linux")'.dependencies] psutil = { version = "3.2.2", optional = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 3b0c527f6..98eb1eba9 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -19,6 +19,7 @@ use self::types::{Error as ResponseError, *}; use futures::Stream; use futures_util::StreamExt; use lighthouse_network::PeerId; +use pretty_reqwest_error::PrettyReqwestError; pub use reqwest; use reqwest::{IntoUrl, RequestBuilder, Response}; pub use reqwest::{StatusCode, Url}; @@ -39,7 +40,7 @@ pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; #[derive(Debug)] pub enum Error { /// The `reqwest` client raised an error. - Reqwest(reqwest::Error), + HttpClient(PrettyReqwestError), /// The server returned an error message where the body was able to be parsed. ServerMessage(ErrorMessage), /// The server returned an error message with an array of errors. @@ -70,7 +71,7 @@ pub enum Error { impl From for Error { fn from(error: reqwest::Error) -> Self { - Error::Reqwest(error) + Error::HttpClient(error.into()) } } @@ -78,7 +79,7 @@ impl Error { /// If the error has a HTTP status code, return it. pub fn status(&self) -> Option { match self { - Error::Reqwest(error) => error.status(), + Error::HttpClient(error) => error.inner().status(), Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::StatusCode(status) => Some(*status), @@ -278,7 +279,7 @@ impl BeaconNodeHttpClient { .await? .json() .await - .map_err(Error::Reqwest) + .map_err(Into::into) } /// Perform a HTTP POST request with a custom timeout. @@ -303,7 +304,7 @@ impl BeaconNodeHttpClient { .await? .json() .await - .map_err(Error::Reqwest) + .map_err(Error::from) } /// Generic POST function supporting arbitrary responses and timeouts. @@ -1673,7 +1674,7 @@ impl BeaconNodeHttpClient { .bytes_stream() .map(|next| match next { Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()), - Err(e) => Err(Error::Reqwest(e)), + Err(e) => Err(Error::HttpClient(e.into())), })) } diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index bb933dbe1..1b4bcc0e3 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -364,12 +364,12 @@ pub struct DatabaseInfo { impl BeaconNodeHttpClient { /// Perform a HTTP GET request, returning `None` on a 404 error. async fn get_bytes_opt(&self, url: U) -> Result>, Error> { - let response = self.client.get(url).send().await.map_err(Error::Reqwest)?; + let response = self.client.get(url).send().await.map_err(Error::from)?; match ok_or_error(response).await { Ok(resp) => Ok(Some( resp.bytes() .await - .map_err(Error::Reqwest)? + .map_err(Error::from)? .into_iter() .collect::>(), )), diff --git a/common/eth2/src/lighthouse_vc/http_client.rs b/common/eth2/src/lighthouse_vc/http_client.rs index 720d8c779..cd7873c9b 100644 --- a/common/eth2/src/lighthouse_vc/http_client.rs +++ b/common/eth2/src/lighthouse_vc/http_client.rs @@ -170,7 +170,7 @@ impl ValidatorClientHttpClient { .map_err(|_| Error::InvalidSignatureHeader)? .to_string(); - let body = response.bytes().await.map_err(Error::Reqwest)?; + let body = response.bytes().await.map_err(Error::from)?; let message = Message::parse_slice(digest(&SHA256, &body).as_ref()).expect("sha256 is 32 bytes"); @@ -222,7 +222,7 @@ impl ValidatorClientHttpClient { .headers(self.headers()?) .send() .await - .map_err(Error::Reqwest)?; + .map_err(Error::from)?; ok_or_error(response).await } @@ -236,7 +236,7 @@ impl ValidatorClientHttpClient { .await? .json() .await - .map_err(Error::Reqwest) + .map_err(Error::from) } /// Perform a HTTP GET request, returning `None` on a 404 error. @@ -266,7 +266,7 @@ impl ValidatorClientHttpClient { .json(body) .send() .await - .map_err(Error::Reqwest)?; + .map_err(Error::from)?; ok_or_error(response).await } @@ -297,7 +297,7 @@ impl ValidatorClientHttpClient { .json(body) .send() .await - .map_err(Error::Reqwest)?; + .map_err(Error::from)?; let response = ok_or_error(response).await?; self.signed_body(response).await?; Ok(()) @@ -316,7 +316,7 @@ impl ValidatorClientHttpClient { .json(body) .send() .await - .map_err(Error::Reqwest)?; + .map_err(Error::from)?; ok_or_error(response).await } diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index 07e615930..0ac70bcd6 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -38,7 +38,7 @@ BELLATRIX_FORK_VERSION: 0x02000064 BELLATRIX_FORK_EPOCH: 385536 # Capella CAPELLA_FORK_VERSION: 0x03000064 -CAPELLA_FORK_EPOCH: 18446744073709551615 +CAPELLA_FORK_EPOCH: 648704 # Deneb DENEB_FORK_VERSION: 0x04000064 DENEB_FORK_EPOCH: 18446744073709551615 diff --git a/common/pretty_reqwest_error/Cargo.toml b/common/pretty_reqwest_error/Cargo.toml new file mode 100644 index 000000000..ca9f4812b --- /dev/null +++ b/common/pretty_reqwest_error/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "pretty_reqwest_error" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +reqwest = { version = "0.11.0", features = ["json","stream"] } +sensitive_url = { path = "../sensitive_url" } diff --git a/common/pretty_reqwest_error/src/lib.rs b/common/pretty_reqwest_error/src/lib.rs new file mode 100644 index 000000000..4c605f38a --- /dev/null +++ b/common/pretty_reqwest_error/src/lib.rs @@ -0,0 +1,62 @@ +use sensitive_url::SensitiveUrl; +use std::error::Error as StdError; +use std::fmt; + +pub struct PrettyReqwestError(reqwest::Error); + +impl PrettyReqwestError { + pub fn inner(&self) -> &reqwest::Error { + &self.0 + } +} + +impl fmt::Debug for PrettyReqwestError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if let Some(url) = self.0.url() { + if let Ok(url) = SensitiveUrl::new(url.clone()) { + write!(f, "url: {}", url)?; + } else { + write!(f, "url: unable_to_parse")?; + }; + } + + let kind = if self.0.is_builder() { + "builder" + } else if self.0.is_redirect() { + "redirect" + } else if self.0.is_status() { + "status" + } else if self.0.is_timeout() { + "timeout" + } else if self.0.is_request() { + "request" + } else if self.0.is_connect() { + "connect" + } else if self.0.is_body() { + "body" + } else if self.0.is_decode() { + "decode" + } else { + "unknown" + }; + write!(f, ", kind: {}", kind)?; + + if let Some(status) = self.0.status() { + write!(f, ", status_code: {}", status)?; + } + + if let Some(ref source) = self.0.source() { + write!(f, ", detail: {}", source)?; + } else { + write!(f, ", source: unknown")?; + } + + Ok(()) + } +} + +impl From for PrettyReqwestError { + fn from(inner: reqwest::Error) -> Self { + Self(inner) + } +} diff --git a/common/sensitive_url/src/lib.rs b/common/sensitive_url/src/lib.rs index b6705eb60..b6068a2dc 100644 --- a/common/sensitive_url/src/lib.rs +++ b/common/sensitive_url/src/lib.rs @@ -75,7 +75,7 @@ impl SensitiveUrl { SensitiveUrl::new(surl) } - fn new(full: Url) -> Result { + pub fn new(full: Url) -> Result { let mut redacted = full.clone(); redacted .path_segments_mut() diff --git a/consensus/cached_tree_hash/Cargo.toml b/consensus/cached_tree_hash/Cargo.toml index c2856003b..0f43c8890 100644 --- a/consensus/cached_tree_hash/Cargo.toml +++ b/consensus/cached_tree_hash/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] ethereum-types = "0.14.1" -ssz_types = "0.5.0" +ssz_types = "0.5.3" ethereum_hashing = "1.0.0-beta.2" ethereum_ssz_derive = "0.5.0" ethereum_ssz = "0.5.0" diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index c16742782..f19cd1d29 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -15,7 +15,7 @@ integer-sqrt = "0.1.5" itertools = "0.10.0" ethereum_ssz = "0.5.0" ethereum_ssz_derive = "0.5.0" -ssz_types = "0.5.0" +ssz_types = "0.5.3" merkle_proof = { path = "../merkle_proof" } safe_arith = { path = "../safe_arith" } tree_hash = "0.5.0" diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index 65883bd27..8fdb21a06 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -28,7 +28,7 @@ serde_derive = "1.0.116" slog = "2.5.2" ethereum_ssz = { version = "0.5.0", features = ["arbitrary"] } ethereum_ssz_derive = "0.5.0" -ssz_types = { version = "0.5.0", features = ["arbitrary"] } +ssz_types = { version = "0.5.3", features = ["arbitrary"] } swap_or_not_shuffle = { path = "../swap_or_not_shuffle", features = ["arbitrary"] } test_random_derive = { path = "../../common/test_random_derive" } tree_hash = { version = "0.5.0", features = ["arbitrary"] } diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index e510b0a27..2d5b51e5f 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -856,7 +856,7 @@ impl ChainSpec { * Capella hard fork params */ capella_fork_version: [0x03, 0x00, 0x00, 0x64], - capella_fork_epoch: None, + capella_fork_epoch: Some(Epoch::new(648704)), max_validators_per_withdrawals_sweep: 8192, /*