diff --git a/Cargo.lock b/Cargo.lock index 8b8a0b9db..64c0e6836 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2971,7 +2971,7 @@ dependencies = [ [[package]] name = "operation_pool" -version = "0.2.0" +version = "0.1.0" dependencies = [ "eth2_ssz", "eth2_ssz_derive", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 06000b0b4..1065862f3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -8,6 +8,7 @@ use crate::events::{EventHandler, EventKind}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; use crate::head_tracker::HeadTracker; use crate::metrics; +use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool}; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; use crate::snapshot_cache::SnapshotCache; @@ -62,6 +63,23 @@ pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32]; pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32]; pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32]; +#[derive(Debug, PartialEq)] +pub enum AttestationType { + /// An attestation with a single-signature that has been published in accordance with the naive + /// aggregation strategy. + /// + /// These attestations may have come from a `committee_index{subnet_id}_beacon_attestation` + /// gossip subnet or they have have come directly from a validator attached to our API. + /// + /// If `should_store == true`, the attestation will be added to the `NaiveAggregationPool`. + Unaggregated { should_store: bool }, + /// An attestation with one more more signatures that has passed through the aggregation phase + /// of the naive aggregation scheme. + /// + /// These attestations must have come from the `beacon_aggregate_and_proof` gossip subnet. + Aggregated, +} + #[derive(Debug, PartialEq)] pub enum AttestationProcessingOutcome { Processed, @@ -142,6 +160,12 @@ pub struct BeaconChain { /// Stores all operations (e.g., `Attestation`, `Deposit`, etc) that are candidates for /// inclusion in a block. pub op_pool: OperationPool, + /// A pool of attestations dedicated to the "naive aggregation strategy" defined in the eth2 + /// specs. + /// + /// This pool accepts `Attestation` objects that only have one aggregation bit set and provides + /// a method to get an aggregated `Attestation` for some `AttestationData`. + pub naive_aggregation_pool: NaiveAggregationPool, /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. @@ -676,27 +700,14 @@ impl BeaconChain { } } - /// Produce an aggregate attestation that has been collected for this slot and committee. - // TODO: Check and optimize - pub fn return_aggregate_attestation( + /// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`. + /// + /// The attestation will be obtained from `self.naive_aggregation_pool`. + pub fn get_aggregated_attestation( &self, - slot: Slot, - index: CommitteeIndex, - ) -> Result, Error> { - let epoch = |slot: Slot| slot.epoch(T::EthSpec::slots_per_epoch()); - let head_state = &self.head()?.beacon_state; - - let state = if epoch(slot) == epoch(head_state.slot) { - self.head()?.beacon_state - } else { - // The block proposer shuffling is not affected by the state roots, so we don't need to - // calculate them. - self.state_at_slot(slot, StateSkipConfig::WithoutStateRoots)? - }; - - self.op_pool - .get_raw_aggregated_attestations(&slot, &index, &state, &self.spec) - .map_err(Error::from) + data: &AttestationData, + ) -> Result>, Error> { + self.naive_aggregation_pool.get(data).map_err(Into::into) } /// Produce a raw unsigned `Attestation` that is valid for the given `slot` and `index`. @@ -824,12 +835,12 @@ impl BeaconChain { pub fn process_attestation( &self, attestation: Attestation, - store_raw: Option, + attestation_type: AttestationType, ) -> Result { metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS); let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES); - let outcome = self.process_attestation_internal(attestation.clone(), store_raw); + let outcome = self.process_attestation_internal(attestation.clone(), attestation_type); match &outcome { Ok(outcome) => match outcome { @@ -883,7 +894,7 @@ impl BeaconChain { pub fn process_attestation_internal( &self, attestation: Attestation, - store_raw: Option, + attestation_type: AttestationType, ) -> Result { let initial_validation_timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_INITIAL_VALIDATION_TIMES); @@ -1120,20 +1131,61 @@ impl BeaconChain { // subnet without a validator responsible for aggregating it, we don't store it in the // op pool. if self.eth1_chain.is_some() { - if let Some(is_raw) = store_raw { - if is_raw { - // This is a raw un-aggregated attestation received from a subnet with a - // connected validator required to aggregate and publish these attestations - self.op_pool - .insert_raw_attestation(attestation, &fork, &self.spec)?; - } else { - // This an aggregate attestation received from the aggregate attestation - // channel - self.op_pool.insert_aggregate_attestation( - attestation, - &fork, - &self.spec, - )?; + match attestation_type { + AttestationType::Unaggregated { should_store } if should_store => { + match self.naive_aggregation_pool.insert(&attestation) { + Ok(outcome) => trace!( + self.log, + "Stored unaggregated attestation"; + "outcome" => format!("{:?}", outcome), + "index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + Err(NaiveAggregationError::SlotTooLow { + slot, + lowest_permissible_slot, + }) => { + trace!( + self.log, + "Refused to store unaggregated attestation"; + "lowest_permissible_slot" => lowest_permissible_slot.as_u64(), + "slot" => slot.as_u64(), + ); + } + Err(e) => error!( + self.log, + "Failed to store unaggregated attestation"; + "error" => format!("{:?}", e), + "index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + } + } + AttestationType::Unaggregated { .. } => trace!( + self.log, + "Did not store unaggregated attestation"; + "index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + AttestationType::Aggregated => { + let index = attestation.data.index; + let slot = attestation.data.slot; + + match self + .op_pool + .insert_attestation(attestation, &fork, &self.spec) + { + Ok(_) => {} + Err(e) => { + error!( + self.log, + "Failed to add attestation to op pool"; + "error" => format!("{:?}", e), + "index" => index, + "slot" => slot.as_u64(), + ); + } + } } } } @@ -1181,7 +1233,7 @@ impl BeaconChain { if let Ok(Some(pubkey)) = self.validator_pubkey(aggregate_and_proof.aggregator_index as usize) { - if !signed_aggregate_and_proof.is_valid(&pubkey, &state.fork) { + if !signed_aggregate_and_proof.is_valid(&pubkey, &state.fork, &self.spec) { Err(AttestationDropReason::AggregatorSignatureInvalid) } else { Ok(()) @@ -1915,18 +1967,7 @@ impl BeaconChain { pub fn per_slot_task(&self) { trace!(self.log, "Running beacon chain per slot tasks"); if let Some(slot) = self.slot_clock.now() { - self.op_pool.prune_committee_attestations(&slot) - } - } - - /// Called by the timer on every epoch. - /// - /// Performs epoch-based pruning. - pub fn per_epoch_task(&self) { - trace!(self.log, "Running beacon chain per epoch tasks"); - if let Some(slot) = self.slot_clock.now() { - let current_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - self.op_pool.prune_attestations(¤t_epoch); + self.naive_aggregation_pool.prune(slot); } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index f99fcfdbc..45231af5e 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -415,6 +415,8 @@ where op_pool: self .op_pool .ok_or_else(|| "Cannot build without op pool".to_string())?, + // TODO: allow for persisting and loading the pool from disk. + naive_aggregation_pool: <_>::default(), eth1_chain: self.eth1_chain, canonical_head: TimeoutRwLock::new(canonical_head.clone()), genesis_block_root: self diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 2a3ca5f32..016aa33cc 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,5 +1,6 @@ use crate::eth1_chain::Error as Eth1ChainError; use crate::fork_choice::Error as ForkChoiceError; +use crate::naive_aggregation_pool::Error as NaiveAggregationError; use operation_pool::OpPoolError; use ssz::DecodeError; use ssz_types::Error as SszTypesError; @@ -64,6 +65,7 @@ pub enum BeaconChainError { DuplicateValidatorPublicKey, ValidatorPubkeyCacheFileError(String), OpPoolError(OpPoolError), + NaiveAggregationError(NaiveAggregationError), } easy_from_to!(SlotProcessingError, BeaconChainError); @@ -71,6 +73,7 @@ easy_from_to!(AttestationValidationError, BeaconChainError); easy_from_to!(SszTypesError, BeaconChainError); easy_from_to!(OpPoolError, BeaconChainError); easy_from_to!(BlockSignatureVerifierError, BeaconChainError); +easy_from_to!(NaiveAggregationError, BeaconChainError); #[derive(Debug, PartialEq)] pub enum BlockProductionError { diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4ba36418a..4c42a5dff 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -12,6 +12,7 @@ pub mod events; mod fork_choice; mod head_tracker; mod metrics; +mod naive_aggregation_pool; mod persisted_beacon_chain; mod shuffling_cache; mod snapshot_cache; @@ -20,7 +21,7 @@ mod timeout_rw_lock; mod validator_pubkey_cache; pub use self::beacon_chain::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, StateSkipConfig, + AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, StateSkipConfig, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::errors::{BeaconChainError, BlockProductionError}; diff --git a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs new file mode 100644 index 000000000..6f2031dab --- /dev/null +++ b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs @@ -0,0 +1,478 @@ +use parking_lot::RwLock; +use std::collections::HashMap; +use types::{Attestation, AttestationData, EthSpec, Slot}; + +/// The number of slots that will be stored in the pool. +/// +/// For example, if `SLOTS_RETAINED == 3` and the pool is pruned at slot `6`, then all attestations +/// at slots less than `4` will be dropped and any future attestation with a slot less than `4` +/// will be refused. +const SLOTS_RETAINED: usize = 3; + +/// The maximum number of distinct `AttestationData` that will be stored in each slot. +/// +/// This is a DoS protection measure. +const MAX_ATTESTATIONS_PER_SLOT: usize = 16_384; + +/// Returned upon successfully inserting an attestation into the pool. +#[derive(Debug, PartialEq)] +pub enum InsertOutcome { + /// The `attestation.data` had not been seen before and was added to the pool. + NewAttestationData { committee_index: usize }, + /// A validator signature for the given `attestation.data` was already known. No changes were + /// made. + SignatureAlreadyKnown { committee_index: usize }, + /// The `attestation.data` was known, but a signature for the given validator was not yet + /// known. The signature was aggregated into the pool. + SignatureAggregated { committee_index: usize }, +} + +#[derive(Debug, PartialEq)] +pub enum Error { + /// The given `attestation.data.slot` was too low to be stored. No changes were made. + SlotTooLow { + slot: Slot, + lowest_permissible_slot: Slot, + }, + /// The given `attestation.aggregation_bits` field was empty. + NoAggregationBitsSet, + /// The given `attestation.aggregation_bits` field had more than one signature. The number of + /// signatures found is included. + MoreThanOneAggregationBitSet(usize), + /// We have reached the maximum number of unique `AttestationData` that can be stored in a + /// slot. This is a DoS protection function. + ReachedMaxAttestationsPerSlot(usize), + /// The given `attestation.aggregation_bits` field had a different length to the one currently + /// stored. This indicates a fairly serious error somewhere in the code that called this + /// function. + InconsistentBitfieldLengths, + /// The function to obtain a map index failed, this is an internal error. + InvalidMapIndex(usize), + /// The given `attestation` was for the incorrect slot. This is an internal error. + IncorrectSlot { expected: Slot, attestation: Slot }, +} + +/// A collection of `Attestation` objects, keyed by their `attestation.data`. Enforces that all +/// `attestation` are from the same slot. +struct AggregatedAttestationMap { + map: HashMap>, + slot: Slot, +} + +impl AggregatedAttestationMap { + /// Create an empty collection that will only contain attestation for the given `slot`. + pub fn new(slot: Slot) -> Self { + Self { + slot, + map: <_>::default(), + } + } + + /// Insert an attestation into `self`, aggregating it into the pool. + /// + /// The given attestation (`a`) must only have one signature and be from the slot that `self` + /// was initialized with. + pub fn insert(&mut self, a: &Attestation) -> Result { + if a.data.slot != self.slot { + return Err(Error::IncorrectSlot { + expected: self.slot, + attestation: a.data.slot, + }); + } + + let set_bits = a + .aggregation_bits + .iter() + .enumerate() + .filter(|(_i, bit)| *bit) + .map(|(i, _bit)| i) + .collect::>(); + + let committee_index = set_bits + .first() + .copied() + .ok_or_else(|| Error::NoAggregationBitsSet)?; + + if set_bits.len() > 1 { + return Err(Error::MoreThanOneAggregationBitSet(set_bits.len())); + } + + if let Some(existing_attestation) = self.map.get_mut(&a.data) { + if existing_attestation + .aggregation_bits + .get(committee_index) + .map_err(|_| Error::InconsistentBitfieldLengths)? + { + Ok(InsertOutcome::SignatureAlreadyKnown { committee_index }) + } else { + existing_attestation.aggregate(a); + Ok(InsertOutcome::SignatureAggregated { committee_index }) + } + } else { + if self.map.len() >= MAX_ATTESTATIONS_PER_SLOT { + return Err(Error::ReachedMaxAttestationsPerSlot( + MAX_ATTESTATIONS_PER_SLOT, + )); + } + + self.map.insert(a.data.clone(), a.clone()); + Ok(InsertOutcome::NewAttestationData { committee_index }) + } + } + + /// Returns an aggregated `Attestation` with the given `data`, if any. + /// + /// The given `a.data.slot` must match the slot that `self` was initialized with. + pub fn get(&self, data: &AttestationData) -> Result>, Error> { + if data.slot != self.slot { + return Err(Error::IncorrectSlot { + expected: self.slot, + attestation: data.slot, + }); + } + + Ok(self.map.get(data).cloned()) + } +} + +/// A pool of `Attestation` that is specially designed to store "unaggregated" attestations from +/// the native aggregation scheme. +/// +/// **The `NaiveAggregationPool` does not do any signature or attestation verification. It assumes +/// that all `Attestation` objects provided are valid.** +/// +/// ## Details +/// +/// The pool sorts the `Attestation` by `attestation.data.slot`, then by `attestation.data`. +/// +/// As each unaggregated attestation is added it is aggregated with any existing `attestation` with +/// the same `AttestationData`. Considering that the pool only accepts attestations with a single +/// signature, there should only ever be a single aggregated `Attestation` for any given +/// `AttestationData`. +/// +/// The pool has a capacity for `SLOTS_RETAINED` slots, when a new `attestation.data.slot` is +/// provided, the oldest slot is dropped and replaced with the new slot. The pool can also be +/// pruned by supplying a `current_slot`; all existing attestations with a slot lower than +/// `current_slot - SLOTS_RETAINED` will be removed and any future attestation with a slot lower +/// than that will also be refused. Pruning is done automatically based upon the attestations it +/// receives and it can be triggered manually. +pub struct NaiveAggregationPool { + lowest_permissible_slot: RwLock, + maps: RwLock>>, +} + +impl Default for NaiveAggregationPool { + fn default() -> Self { + Self { + lowest_permissible_slot: RwLock::new(Slot::new(0)), + maps: RwLock::new(vec![]), + } + } +} + +impl NaiveAggregationPool { + /// Insert an attestation into `self`, aggregating it into the pool. + /// + /// The given attestation (`a`) must only have one signature and have an + /// `attestation.data.slot` that is not lower than `self.lowest_permissible_slot`. + /// + /// The pool may be pruned if the given `attestation.data` has a slot higher than any + /// previously seen. + pub fn insert(&self, attestation: &Attestation) -> Result { + let lowest_permissible_slot = *self.lowest_permissible_slot.read(); + + // Reject any attestations that are too old. + if attestation.data.slot < lowest_permissible_slot { + return Err(Error::SlotTooLow { + slot: attestation.data.slot, + lowest_permissible_slot, + }); + } + + // Prune the pool if this attestation indicates that the current slot has advanced. + if (lowest_permissible_slot + SLOTS_RETAINED as u64) < attestation.data.slot + 1 { + self.prune(attestation.data.slot) + } + + let index = self.get_map_index(attestation.data.slot); + + self.maps + .write() + .get_mut(index) + .ok_or_else(|| Error::InvalidMapIndex(index))? + .insert(attestation) + } + + /// Returns an aggregated `Attestation` with the given `data`, if any. + pub fn get(&self, data: &AttestationData) -> Result>, Error> { + self.maps + .read() + .iter() + .find(|map| map.slot == data.slot) + .map(|map| map.get(data)) + .unwrap_or_else(|| Ok(None)) + } + + /// Removes any attestations with a slot lower than `current_slot` and bars any future + /// attestations with a slot lower than `current_slot - SLOTS_RETAINED`. + pub fn prune(&self, current_slot: Slot) { + // Taking advantage of saturating subtraction on `Slot`. + let lowest_permissible_slot = current_slot - Slot::from(SLOTS_RETAINED); + + self.maps + .write() + .retain(|map| map.slot >= lowest_permissible_slot); + + *self.lowest_permissible_slot.write() = lowest_permissible_slot; + } + + /// Returns the index of `self.maps` that matches `slot`. + /// + /// If there is no existing map for this slot one will be created. If `self.maps.len() >= + /// SLOTS_RETAINED`, the map with the lowest slot will be replaced. + fn get_map_index(&self, slot: Slot) -> usize { + let mut maps = self.maps.write(); + + if let Some(index) = maps.iter().position(|map| map.slot == slot) { + return index; + } + + if maps.len() < SLOTS_RETAINED || maps.is_empty() { + let index = maps.len(); + maps.push(AggregatedAttestationMap::new(slot)); + return index; + } + + let index = maps + .iter() + .enumerate() + .min_by_key(|(_i, map)| map.slot) + .map(|(i, _map)| i) + .expect("maps cannot be empty due to previous .is_empty() check"); + + maps[index] = AggregatedAttestationMap::new(slot); + + index + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ssz_types::BitList; + use types::{ + test_utils::{generate_deterministic_keypair, test_random_instance}, + Fork, Hash256, + }; + + type E = types::MainnetEthSpec; + + fn get_attestation(slot: Slot) -> Attestation { + let mut a: Attestation = test_random_instance(); + a.data.slot = slot; + a.aggregation_bits = BitList::with_capacity(4).expect("should create bitlist"); + a + } + + fn sign(a: &mut Attestation, i: usize) { + a.sign( + &generate_deterministic_keypair(i).sk, + i, + &Fork::default(), + &E::default_spec(), + ) + .expect("should sign attestation"); + } + + fn unset_bit(a: &mut Attestation, i: usize) { + a.aggregation_bits + .set(i, false) + .expect("should unset aggregation bit") + } + + #[test] + fn single_attestation() { + let mut a = get_attestation(Slot::new(0)); + + let pool = NaiveAggregationPool::default(); + + assert_eq!( + pool.insert(&a), + Err(Error::NoAggregationBitsSet), + "should not accept attestation without any signatures" + ); + + sign(&mut a, 0); + + assert_eq!( + pool.insert(&a), + Ok(InsertOutcome::NewAttestationData { committee_index: 0 }), + "should accept new attestation" + ); + assert_eq!( + pool.insert(&a), + Ok(InsertOutcome::SignatureAlreadyKnown { committee_index: 0 }), + "should acknowledge duplicate signature" + ); + + let retrieved = pool + .get(&a.data) + .expect("should not error while getting attestation") + .expect("should get an attestation"); + assert_eq!( + retrieved, a, + "retrieved attestation should equal the one inserted" + ); + + sign(&mut a, 1); + + assert_eq!( + pool.insert(&a), + Err(Error::MoreThanOneAggregationBitSet(2)), + "should not accept attestation with multiple signatures" + ); + } + + #[test] + fn multiple_attestations() { + let mut a_0 = get_attestation(Slot::new(0)); + let mut a_1 = a_0.clone(); + + sign(&mut a_0, 0); + sign(&mut a_1, 1); + + let pool = NaiveAggregationPool::default(); + + assert_eq!( + pool.insert(&a_0), + Ok(InsertOutcome::NewAttestationData { committee_index: 0 }), + "should accept a_0" + ); + assert_eq!( + pool.insert(&a_1), + Ok(InsertOutcome::SignatureAggregated { committee_index: 1 }), + "should accept a_1" + ); + + let retrieved = pool + .get(&a_0.data) + .expect("should not error while getting attestation") + .expect("should get an attestation"); + + let mut a_01 = a_0.clone(); + a_01.aggregate(&a_1); + + assert_eq!( + retrieved, a_01, + "retrieved attestation should be aggregated" + ); + + /* + * Throw a different attestation data in there and ensure it isn't aggregated + */ + + let mut a_different = a_0.clone(); + let different_root = Hash256::from_low_u64_be(1337); + unset_bit(&mut a_different, 0); + sign(&mut a_different, 2); + assert!(a_different.data.beacon_block_root != different_root); + a_different.data.beacon_block_root = different_root; + + assert_eq!( + pool.insert(&a_different), + Ok(InsertOutcome::NewAttestationData { committee_index: 2 }), + "should accept a_different" + ); + + assert_eq!( + pool.get(&a_0.data) + .expect("should not error while getting attestation") + .expect("should get an attestation"), + retrieved, + "should not have aggregated different attestation data" + ); + } + + #[test] + fn auto_pruning() { + let mut base = get_attestation(Slot::new(0)); + sign(&mut base, 0); + + let pool = NaiveAggregationPool::default(); + + for i in 0..SLOTS_RETAINED * 2 { + let slot = Slot::from(i); + let mut a = base.clone(); + a.data.slot = slot; + + assert_eq!( + pool.insert(&a), + Ok(InsertOutcome::NewAttestationData { committee_index: 0 }), + "should accept new attestation" + ); + + if i < SLOTS_RETAINED { + let len = i + 1; + assert_eq!( + pool.maps.read().len(), + len, + "the pool should have length {}", + len + ); + } else { + assert_eq!( + pool.maps.read().len(), + SLOTS_RETAINED, + "the pool should have length SLOTS_RETAINED" + ); + + let mut pool_slots = pool + .maps + .read() + .iter() + .map(|map| map.slot) + .collect::>(); + + pool_slots.sort_unstable(); + + for (j, pool_slot) in pool_slots.iter().enumerate() { + let expected_slot = slot - (SLOTS_RETAINED - 1 - j) as u64; + assert_eq!( + *pool_slot, expected_slot, + "the slot of the map should be {}", + expected_slot + ) + } + } + } + } + + #[test] + fn max_attestations() { + let mut base = get_attestation(Slot::new(0)); + sign(&mut base, 0); + + let pool = NaiveAggregationPool::default(); + + for i in 0..=MAX_ATTESTATIONS_PER_SLOT { + let mut a = base.clone(); + a.data.beacon_block_root = Hash256::from_low_u64_be(i as u64); + + if i < MAX_ATTESTATIONS_PER_SLOT { + assert_eq!( + pool.insert(&a), + Ok(InsertOutcome::NewAttestationData { committee_index: 0 }), + "should accept attestation below limit" + ); + } else { + assert_eq!( + pool.insert(&a), + Err(Error::ReachedMaxAttestationsPerSlot( + MAX_ATTESTATIONS_PER_SLOT + )), + "should not accept attestation above limit" + ); + } + } + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 837cd0f0b..5d393d511 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -6,7 +6,7 @@ use crate::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, events::NullEventHandler, - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, StateSkipConfig, + AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, StateSkipConfig, }; use genesis::interop_genesis_state; use rayon::prelude::*; @@ -342,7 +342,7 @@ where .for_each(|attestation| { match self .chain - .process_attestation(attestation, Some(false)) + .process_attestation(attestation, AttestationType::Aggregated) .expect("should not error during attestation processing") { AttestationProcessingOutcome::Processed => (), diff --git a/beacon_node/beacon_chain/tests/attestation_tests.rs b/beacon_node/beacon_chain/tests/attestation_tests.rs index 0420f2b6c..137746c7f 100644 --- a/beacon_node/beacon_chain/tests/attestation_tests.rs +++ b/beacon_node/beacon_chain/tests/attestation_tests.rs @@ -6,7 +6,7 @@ extern crate lazy_static; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType, }; -use beacon_chain::AttestationProcessingOutcome; +use beacon_chain::{AttestationProcessingOutcome, AttestationType}; use state_processing::per_slot_processing; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, BitList, EthSpec, Hash256, @@ -56,7 +56,7 @@ fn attestation_validity() { .expect("should get at least one attestation"); assert_eq!( - chain.process_attestation(valid_attestation.clone(), Some(false)), + chain.process_attestation(valid_attestation.clone(), AttestationType::Aggregated), Ok(AttestationProcessingOutcome::Processed), "should accept valid attestation" ); @@ -71,7 +71,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(epoch_mismatch_attestation, Some(false)), + .process_attestation(epoch_mismatch_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::BadTargetEpoch), "should not accept attestation where the slot is not in the same epoch as the target" ); @@ -87,7 +87,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(early_attestation, Some(false)), + .process_attestation(early_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::FutureEpoch { attestation_epoch: current_epoch + 1, current_epoch @@ -122,7 +122,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(late_attestation, Some(false)), + .process_attestation(late_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::PastEpoch { attestation_epoch: current_epoch - 2, current_epoch @@ -140,7 +140,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(bad_target_attestation, Some(false)), + .process_attestation(bad_target_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::UnknownTargetRoot( Hash256::from_low_u64_be(42) )), @@ -157,7 +157,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(future_block_attestation, Some(false)), + .process_attestation(future_block_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::AttestsToFutureBlock { block: current_slot, attestation: current_slot - 1 @@ -175,7 +175,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(bad_head_attestation, Some(false)), + .process_attestation(bad_head_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root: Hash256::from_low_u64_be(42) }), @@ -195,7 +195,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(bad_signature_attestation, Some(false)), + .process_attestation(bad_signature_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::InvalidSignature), "should not accept bad_signature attestation" ); @@ -211,7 +211,7 @@ fn attestation_validity() { assert_eq!( harness .chain - .process_attestation(empty_bitfield_attestation, Some(false)), + .process_attestation(empty_bitfield_attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::EmptyAggregationBitfield), "should not accept empty_bitfield attestation" ); @@ -259,7 +259,9 @@ fn attestation_that_skips_epochs() { .expect("should get at least one attestation"); assert_eq!( - harness.chain.process_attestation(attestation, Some(false)), + harness + .chain + .process_attestation(attestation, AttestationType::Aggregated), Ok(AttestationProcessingOutcome::Processed), "should process attestation that skips slots" ); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 6af117a92..d9cfdbc20 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -6,7 +6,7 @@ extern crate lazy_static; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, }; -use beacon_chain::AttestationProcessingOutcome; +use beacon_chain::{AttestationProcessingOutcome, AttestationType}; use rand::Rng; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; @@ -306,7 +306,7 @@ fn epoch_boundary_state_attestation_processing() { .epoch; let res = harness .chain - .process_attestation_internal(attestation.clone(), Some(true)); + .process_attestation_internal(attestation.clone(), AttestationType::Aggregated); let current_epoch = harness.chain.epoch().expect("should get epoch"); let attestation_epoch = attestation.data.target.epoch; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index a238a222d..aecddd2dc 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -6,7 +6,7 @@ extern crate lazy_static; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType, OP_POOL_DB_KEY, }; -use beacon_chain::AttestationProcessingOutcome; +use beacon_chain::{AttestationProcessingOutcome, AttestationType}; use operation_pool::PersistedOperationPool; use state_processing::{ per_slot_processing, per_slot_processing::Error as SlotProcessingError, EpochProcessingError, @@ -449,7 +449,9 @@ fn attestations_with_increasing_slots() { for attestation in attestations { let attestation_epoch = attestation.data.target.epoch; - let res = harness.chain.process_attestation(attestation, Some(false)); + let res = harness + .chain + .process_attestation(attestation, AttestationType::Aggregated); if attestation_epoch + 1 < current_epoch { assert_eq!( diff --git a/beacon_node/rest_api/src/error.rs b/beacon_node/rest_api/src/error.rs index dc8f4c91e..913fa8bd6 100644 --- a/beacon_node/rest_api/src/error.rs +++ b/beacon_node/rest_api/src/error.rs @@ -33,11 +33,11 @@ impl ApiError { impl Into> for ApiError { fn into(self) -> Response { - let status_code = self.status_code(); + let (status_code, desc) = self.status_code(); Response::builder() - .status(status_code.0) + .status(status_code) .header("content-type", "text/plain; charset=utf-8") - .body(Body::from(status_code.1)) + .body(Body::from(desc)) .expect("Response should always be created.") } } diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 708b05221..573b47117 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -10,7 +10,7 @@ use network::NetworkMessage; use ssz::Decode; use store::{iter::AncestorIter, Store}; use types::{ - Attestation, BeaconState, CommitteeIndex, Epoch, EthSpec, Hash256, RelativeEpoch, Signature, + Attestation, BeaconState, CommitteeIndex, Epoch, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, Slot, }; @@ -58,19 +58,21 @@ pub fn check_content_type_for_json(req: &Request) -> Result<(), ApiError> } } -/// Parse a signature from a `0x` prefixed string. -pub fn parse_signature(string: &str) -> Result { +/// Parse an SSZ object from some hex-encoded bytes. +/// +/// E.g., A signature is `"0x0000000000000000000000000000000000000000000000000000000000000000"` +pub fn parse_hex_ssz_bytes(string: &str) -> Result { const PREFIX: &str = "0x"; if string.starts_with(PREFIX) { let trimmed = string.trim_start_matches(PREFIX); let bytes = hex::decode(trimmed) - .map_err(|e| ApiError::BadRequest(format!("Unable to parse signature hex: {:?}", e)))?; - Signature::from_ssz_bytes(&bytes) - .map_err(|e| ApiError::BadRequest(format!("Unable to parse signature bytes: {:?}", e))) + .map_err(|e| ApiError::BadRequest(format!("Unable to parse SSZ hex: {:?}", e)))?; + T::from_ssz_bytes(&bytes) + .map_err(|e| ApiError::BadRequest(format!("Unable to parse SSZ bytes: {:?}", e))) } else { Err(ApiError::BadRequest( - "Signature must have a 0x prefix".to_string(), + "Hex bytes must have a 0x prefix".to_string(), )) } } diff --git a/beacon_node/rest_api/src/url_query.rs b/beacon_node/rest_api/src/url_query.rs index 10e9878d8..fee0cf437 100644 --- a/beacon_node/rest_api/src/url_query.rs +++ b/beacon_node/rest_api/src/url_query.rs @@ -1,7 +1,7 @@ -use crate::helpers::{parse_committee_index, parse_epoch, parse_signature, parse_slot}; +use crate::helpers::{parse_committee_index, parse_epoch, parse_hex_ssz_bytes, parse_slot}; use crate::ApiError; use hyper::Request; -use types::{CommitteeIndex, Epoch, Signature, Slot}; +use types::{AttestationData, CommitteeIndex, Epoch, Signature, Slot}; /// Provides handy functions for parsing the query parameters of a URL. @@ -106,7 +106,13 @@ impl<'a> UrlQuery<'a> { /// Returns the value of the first occurrence of the `randao_reveal` key. pub fn randao_reveal(self) -> Result { self.first_of(&["randao_reveal"]) - .and_then(|(_key, value)| parse_signature(&value)) + .and_then(|(_key, value)| parse_hex_ssz_bytes(&value)) + } + + /// Returns the value of the first occurrence of the `attestation_data` key. + pub fn attestation_data(self) -> Result { + self.first_of(&["attestation_data"]) + .and_then(|(_key, value)| parse_hex_ssz_bytes(&value)) } } diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index caa4b9120..d8308f944 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -5,7 +5,8 @@ use crate::helpers::{ use crate::response_builder::ResponseBuilder; use crate::{ApiError, ApiResult, BoxFut, NetworkChannel, UrlQuery}; use beacon_chain::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockError, StateSkipConfig, + AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, BlockError, + StateSkipConfig, }; use bls::PublicKeyBytes; use futures::{Future, Stream}; @@ -456,14 +457,18 @@ pub fn get_aggregate_attestation( ) -> ApiResult { let query = UrlQuery::from_request(&req)?; - let slot = query.slot()?; - let index = query.committee_index()?; + let attestation_data = query.attestation_data()?; - let aggregate_attestation = beacon_chain - .return_aggregate_attestation(slot, index) - .map_err(|e| ApiError::BadRequest(format!("Unable to produce attestation: {:?}", e)))?; - - ResponseBuilder::new(&req)?.body(&aggregate_attestation) + match beacon_chain.get_aggregated_attestation(&attestation_data) { + Ok(Some(attestation)) => ResponseBuilder::new(&req)?.body(&attestation), + Ok(None) => Err(ApiError::NotFound( + "No matching aggregate attestation is known".into(), + )), + Err(e) => Err(ApiError::ServerError(format!( + "Unable to obtain attestation: {:?}", + e + ))), + } } /// HTTP Handler to publish a list of Attestations, which have been signed by a number of validators. @@ -497,7 +502,17 @@ pub fn publish_attestations( // to be stored in the op-pool. This is minimal however as the op_pool gets pruned // every slot attestations.par_iter().try_for_each(|attestation| { - match beacon_chain.process_attestation(attestation.clone(), Some(true)) { + // In accordance with the naive aggregation strategy, the validator client should + // only publish attestations to this endpoint with a single signature. + if attestation.aggregation_bits.num_set_bits() != 1 { + return Err(ApiError::BadRequest(format!("Attestation should have exactly one aggregation bit set"))) + } + + // TODO: we only need to store these attestations if we're aggregating for the + // given subnet. + let attestation_type = AttestationType::Unaggregated { should_store: true }; + + match beacon_chain.process_attestation(attestation.clone(), attestation_type) { Ok(AttestationProcessingOutcome::Processed) => { // Block was processed, publish via gossipsub info!( @@ -569,7 +584,6 @@ pub fn publish_aggregate_and_proofs( }) }) .and_then(move |signed_proofs: Vec>| { - // Verify the signatures for the aggregate and proof and if valid process the // aggregate // TODO: Double check speed and logic consistency of handling current fork vs @@ -587,48 +601,57 @@ pub fn publish_aggregate_and_proofs( ApiError::ProcessingError(format!("The validator is known")) })?; - if signed_proof.is_valid(validator_pubkey, fork) { - let attestation = &agg_proof.aggregate; - match beacon_chain.process_attestation(attestation.clone(), Some(false)) { - Ok(AttestationProcessingOutcome::Processed) => { - // Block was processed, publish via gossipsub - info!( - log, - "Attestation from local validator"; - "target" => attestation.data.source.epoch, - "source" => attestation.data.source.epoch, - "index" => attestation.data.index, - "slot" => attestation.data.slot, - ); - Ok(()) - } - Ok(outcome) => { - warn!( - log, - "Invalid attestation from local validator"; - "outcome" => format!("{:?}", outcome) - ); - Err(ApiError::ProcessingError(format!( - "The Attestation could not be processed and has not been published: {:?}", - outcome - ))) - } - Err(e) => { - error!( - log, - "Error whilst processing attestation"; - "error" => format!("{:?}", e) - ); + /* + * TODO: checking that `signed_proof.is_valid()` is not sufficient. It + * is also necessary to check that the validator is actually designated as an + * aggregator for this attestation. + * + * I (Paul H) will pick this up in a future PR. + */ - Err(ApiError::ServerError(format!( - "Error while processing attestation: {:?}", - e - ))) - } - } + if signed_proof.is_valid(validator_pubkey, fork, &beacon_chain.spec) { + let attestation = &agg_proof.aggregate; - } else { + match beacon_chain.process_attestation(attestation.clone(), AttestationType::Aggregated) { + Ok(AttestationProcessingOutcome::Processed) => { + // Block was processed, publish via gossipsub + info!( + log, + "Attestation from local validator"; + "target" => attestation.data.source.epoch, + "source" => attestation.data.source.epoch, + "index" => attestation.data.index, + "slot" => attestation.data.slot, + ); + Ok(()) + } + Ok(outcome) => { + warn!( + log, + "Invalid attestation from local validator"; + "outcome" => format!("{:?}", outcome) + ); + + Err(ApiError::ProcessingError(format!( + "The Attestation could not be processed and has not been published: {:?}", + outcome + ))) + } + Err(e) => { + error!( + log, + "Error whilst processing attestation"; + "error" => format!("{:?}", e) + ); + + Err(ApiError::ServerError(format!( + "Error while processing attestation: {:?}", + e + ))) + } + } + } else { error!( log, "Invalid AggregateAndProof Signature" @@ -636,12 +659,12 @@ pub fn publish_aggregate_and_proofs( Err(ApiError::ServerError(format!( "Invalid AggregateAndProof Signature" ))) - } - })?; + } + })?; Ok(signed_proofs) }) .and_then(move |signed_proofs| { - publish_aggregate_attestations_to_network::(network_chan, signed_proofs) + publish_aggregate_attestations_to_network::(network_chan, signed_proofs) }) .and_then(|_| response_builder?.body_no_ssz(&())), ) diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index 599b2dd7b..165381d31 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -17,7 +17,8 @@ use types::{ generate_deterministic_keypair, AttesterSlashingTestTask, ProposerSlashingTestTask, }, BeaconBlock, BeaconState, ChainSpec, Domain, Epoch, EthSpec, MinimalEthSpec, PublicKey, - RelativeEpoch, Signature, SignedBeaconBlock, SignedRoot, Slot, Validator, + RelativeEpoch, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot, + Validator, }; use version; @@ -134,7 +135,31 @@ fn validator_produce_attestation() { .expect("should fetch duties from http api"); let duties = &duties[0]; - // Try publishing the attestation without a signature, ensure it is flagged as invalid. + // Try publishing the attestation without a signature or a committee bit set, ensure it is + // raises an error. + let publish_result = env.runtime().block_on( + remote_node + .http + .validator() + .publish_attestations(vec![attestation.clone()]), + ); + assert!( + publish_result.is_err(), + "the unsigned published attestation should return error" + ); + + // Set the aggregation bit. + attestation + .aggregation_bits + .set( + duties + .attestation_committee_position + .expect("should have committee position"), + true, + ) + .expect("should set attestation bit"); + + // Try publishing with an aggreagation bit set, but an invalid signature. let publish_status = env .runtime() .block_on( @@ -143,12 +168,23 @@ fn validator_produce_attestation() { .validator() .publish_attestations(vec![attestation.clone()]), ) - .expect("should publish attestation"); + .expect("should publish attestation with invalid signature"); assert!( !publish_status.is_valid(), "the unsigned published attestation should not be valid" ); + // Un-set the aggregation bit, so signing doesn't error. + attestation + .aggregation_bits + .set( + duties + .attestation_committee_position + .expect("should have committee position"), + false, + ) + .expect("should un-set attestation bit"); + attestation .sign( &keypair.sk, @@ -167,13 +203,48 @@ fn validator_produce_attestation() { remote_node .http .validator() - .publish_attestations(vec![attestation]), + .publish_attestations(vec![attestation.clone()]), ) .expect("should publish attestation"); assert!( publish_status.is_valid(), "the signed published attestation should be valid" ); + + // Try obtaining an aggregated attestation with a matching attestation data to the previous + // one. + let aggregated_attestation = env + .runtime() + .block_on( + remote_node + .http + .validator() + .produce_aggregate_attestation(&attestation.data), + ) + .expect("should fetch aggregated attestation from http api"); + + let signed_aggregate_and_proof = SignedAggregateAndProof::from_aggregate( + validator_index as u64, + aggregated_attestation, + &keypair.sk, + &state.fork, + spec, + ); + + // Publish the signed aggregate. + let publish_status = env + .runtime() + .block_on( + remote_node + .http + .validator() + .publish_aggregate_and_proof(vec![signed_aggregate_and_proof]), + ) + .expect("should publish aggregate and proof"); + assert!( + publish_status.is_valid(), + "the signed aggregate and proof should be valid" + ); } #[test] diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index 27e7bc642..20054d854 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -3,59 +3,13 @@ //! This service allows task execution on the beacon node for various functionality. use beacon_chain::{BeaconChain, BeaconChainTypes}; -use futures::prelude::*; -use slog::warn; +use futures::{future, prelude::*}; +use slog::error; use slot_clock::SlotClock; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; -use types::EthSpec; - -/// A collection of timers that can execute actions on the beacon node. -/// -/// This currently only has a per-slot timer, although others may be added in the future -struct Timer { - /// Beacon chain associated. - beacon_chain: Arc>, - /// A timer that fires every slot. - per_slot_timer: Interval, - /// The logger for the timer. - log: slog::Logger, -} - -impl Timer { - pub fn new( - beacon_chain: Arc>, - milliseconds_per_slot: u64, - log: slog::Logger, - ) -> Result { - let duration_to_next_slot = beacon_chain - .slot_clock - .duration_to_next_slot() - .ok_or_else(|| "slot_notifier unable to determine time to next slot")?; - - let slot_duration = Duration::from_millis(milliseconds_per_slot); - // A per-slot timer - let start_instant = Instant::now() + duration_to_next_slot; - let per_slot_timer = Interval::new(start_instant, slot_duration); - - Ok(Timer { - beacon_chain, - per_slot_timer, - log, - }) - } - - /// Tasks that occur on a per-slot basis. - pub fn per_slot_task(&self) { - self.beacon_chain.per_slot_task(); - } - - pub fn per_epoch_task(&self) { - self.beacon_chain.per_epoch_task(); - } -} /// Spawns a timer service which periodically executes tasks for the beacon chain pub fn spawn( @@ -64,34 +18,33 @@ pub fn spawn( milliseconds_per_slot: u64, log: slog::Logger, ) -> Result, &'static str> { - //let thread_log = log.clone(); - let mut timer = Timer::new(beacon_chain, milliseconds_per_slot, log)?; - let (exit_signal, mut exit) = tokio::sync::oneshot::channel(); + let (exit_signal, exit) = tokio::sync::oneshot::channel(); - executor.spawn(futures::future::poll_fn(move || -> Result<_, ()> { - if let Ok(Async::Ready(_)) | Err(_) = exit.poll() { - // notifier is terminating, end the process - return Ok(Async::Ready(())); - } + let start_instant = Instant::now() + + beacon_chain + .slot_clock + .duration_to_next_slot() + .ok_or_else(|| "slot_notifier unable to determine time to next slot")?; - while let Async::Ready(_) = timer - .per_slot_timer - .poll() - .map_err(|e| warn!(timer.log, "Per slot timer error"; "error" => format!("{:?}", e)))? - { - timer.per_slot_task(); - match timer - .beacon_chain - .slot_clock - .now() - .map(|slot| (slot % T::EthSpec::slots_per_epoch()).as_u64()) - { - Some(0) => timer.per_epoch_task(), - _ => {} - } - } - Ok(Async::NotReady) - })); + let timer_future = Interval::new(start_instant, Duration::from_millis(milliseconds_per_slot)) + .map_err(move |e| { + error!( + log, + "Beacon chain timer failed"; + "error" => format!("{:?}", e) + ) + }) + .for_each(move |_| { + beacon_chain.per_slot_task(); + future::ok(()) + }); + + executor.spawn( + exit.map_err(|_| ()) + .select(timer_future) + .map(|_| ()) + .map_err(|_| ()), + ); Ok(exit_signal) } diff --git a/book/src/http_validator.md b/book/src/http_validator.md index 27902e741..b33d6b337 100644 --- a/book/src/http_validator.md +++ b/book/src/http_validator.md @@ -367,7 +367,9 @@ Typical Responses | 200/202 ### Request Body -Expects a JSON encoded signed`Attestation` object in the POST request body: +Expects a JSON encoded signed `Attestation` object in the POST request body. In +accordance with the naive aggregation scheme, the attestation _must_ have +exactly one of the `attestation.aggregation_bits` fields set. ### Returns diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index 0f6b2e0b9..7bfc86063 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "operation_pool" -version = "0.2.0" +version = "0.1.0" authors = ["Michael Sproul "] edition = "2018" diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index 4c809efe6..be2307708 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -10,8 +10,8 @@ use attestation_id::AttestationId; use max_cover::maximum_cover; use parking_lot::RwLock; use state_processing::per_block_processing::errors::{ - AttestationInvalid, AttestationValidationError, AttesterSlashingValidationError, - ExitValidationError, ProposerSlashingValidationError, + AttestationValidationError, AttesterSlashingValidationError, ExitValidationError, + ProposerSlashingValidationError, }; use state_processing::per_block_processing::{ get_slashable_indices_modular, verify_attestation_for_block_inclusion, @@ -22,43 +22,25 @@ use std::collections::{hash_map, HashMap, HashSet}; use std::marker::PhantomData; use types::{ typenum::Unsigned, Attestation, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, - CommitteeIndex, Epoch, EthSpec, Fork, ProposerSlashing, RelativeEpoch, SignedVoluntaryExit, - Slot, Validator, + EthSpec, Fork, ProposerSlashing, RelativeEpoch, SignedVoluntaryExit, Validator, }; -/// The number of slots we keep shard subnet attestations in the operation pool for. A value of 0 -/// means we remove the attestation pool as soon as the slot ends. -const ATTESTATION_SUBNET_SLOT_DURATION: u64 = 1; - #[derive(Default, Debug)] pub struct OperationPool { - /// Map from attestation ID (see `attestation_id`) to vectors of attestations. - /// - /// These are collected from the aggregate channel. They should already be aggregated but we - /// check for disjoint attestations in the unlikely event we receive disjoint attestations. - aggregate_attestations: RwLock>>>, - /// A collection of aggregated attestations for a particular slot and committee index. - /// - /// Un-aggregated attestations are collected on a shard subnet and if a connected validator is - /// required to aggregate these attestations they are aggregated and stored here until the - /// validator is required to publish the aggregate attestation. - /// This segregates attestations into (slot,committee_index) then by `AttestationId`. - committee_attestations: - RwLock>>>, + /// Map from attestation ID (see below) to vectors of attestations. + attestations: RwLock>>>, /// Map from two attestation IDs to a slashing for those IDs. attester_slashings: RwLock>>, /// Map from proposer index to slashing. proposer_slashings: RwLock>, /// Map from exiting validator to their exit data. voluntary_exits: RwLock>, - /// Marker to pin the generics. _phantom: PhantomData, } #[derive(Debug, PartialEq)] pub enum OpPoolError { GetAttestationsTotalBalanceError(BeaconStateError), - NoAttestationsForSlotCommittee, } impl OperationPool { @@ -67,13 +49,12 @@ impl OperationPool { Self::default() } - /// Insert an attestation from the aggregate channel into the pool, checking if the - /// aggregate can be further aggregated + /// Insert an attestation into the pool, aggregating it with existing attestations if possible. /// /// ## Note /// /// This function assumes the given `attestation` is valid. - pub fn insert_aggregate_attestation( + pub fn insert_attestation( &self, attestation: Attestation, fork: &Fork, @@ -82,7 +63,7 @@ impl OperationPool { let id = AttestationId::from_data(&attestation.data, fork, spec); // Take a write lock on the attestations map. - let mut attestations = self.aggregate_attestations.write(); + let mut attestations = self.attestations.write(); let existing_attestations = match attestations.entry(id) { hash_map::Entry::Vacant(entry) => { @@ -109,90 +90,9 @@ impl OperationPool { Ok(()) } - /// Insert a raw un-aggregated attestation into the pool, for a given (slot, committee_index). - /// - /// ## Note - /// - /// It would be a fair assumption that all attestations here are unaggregated and we - /// therefore do not need to check if `signers_disjoint_form`. However the cost of doing - /// so is low, so we perform this check for added safety. - pub fn insert_raw_attestation( - &self, - attestation: Attestation, - fork: &Fork, - spec: &ChainSpec, - ) -> Result<(), AttestationValidationError> { - let id = AttestationId::from_data(&attestation.data, fork, spec); - - let slot = attestation.data.slot.clone(); - let committee_index = attestation.data.index.clone(); - - // Take a write lock on the attestations map. - let mut attestations = self.committee_attestations.write(); - - let slot_index_map = attestations - .entry((slot, committee_index)) - .or_insert_with(|| HashMap::new()); - - let existing_attestation = match slot_index_map.entry(id) { - hash_map::Entry::Vacant(entry) => { - entry.insert(attestation); - return Ok(()); - } - hash_map::Entry::Occupied(entry) => entry.into_mut(), - }; - - if existing_attestation.signers_disjoint_from(&attestation) { - existing_attestation.aggregate(&attestation); - } else if *existing_attestation != attestation { - return Err(AttestationValidationError::Invalid( - AttestationInvalid::NotDisjoint, - )); - } - - Ok(()) - } - - /// Total number of aggregate attestations in the pool from the aggregate channel, including attestations for the same data. - pub fn num_attestations(&self) -> usize { - self.aggregate_attestations - .read() - .values() - .map(Vec::len) - .sum() - } - /// Total number of attestations in the pool, including attestations for the same data. - pub fn total_num_attestations(&self) -> usize { - self.num_attestations().saturating_add( - self.committee_attestations - .read() - .values() - .map(|map| map.values().len()) - .sum(), - ) - } - - /// Get the aggregated raw attestations for a (slot, committee) - //TODO: Check this logic and optimize - pub fn get_raw_aggregated_attestations( - &self, - slot: &Slot, - index: &CommitteeIndex, - state: &BeaconState, - spec: &ChainSpec, - ) -> Result, OpPoolError> { - let curr_domain_bytes = - AttestationId::compute_domain_bytes(state.current_epoch(), &state.fork, spec); - self.committee_attestations - .read() - .get(&(*slot, *index)) - .ok_or_else(|| OpPoolError::NoAttestationsForSlotCommittee)? - .iter() - .filter(|(key, _)| key.domain_bytes_match(&curr_domain_bytes)) - .next() - .map(|(_key, attestation)| attestation.clone()) - .ok_or_else(|| OpPoolError::NoAttestationsForSlotCommittee) + pub fn num_attestations(&self) -> usize { + self.attestations.read().values().map(Vec::len).sum() } /// Get a list of attestations for inclusion in a block. @@ -209,7 +109,7 @@ impl OperationPool { let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, &state.fork, spec); let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, &state.fork, spec); - let reader = self.aggregate_attestations.read(); + let reader = self.attestations.read(); let active_indices = state .get_cached_active_validator_indices(RelativeEpoch::Current) .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; @@ -241,38 +141,19 @@ impl OperationPool { )) } - /// Removes aggregate attestations which are too old to be included in a block. - /// - /// This leaves the committee_attestations intact. The committee attestations have their own - /// prune function as these are not for block inclusion and can be pruned more frequently. - /// See `prune_committee_attestations`. - //TODO: Michael to check this before merge - pub fn prune_attestations(&self, current_epoch: &Epoch) { + /// Remove attestations which are too old to be included in a block. + pub fn prune_attestations(&self, finalized_state: &BeaconState) { // We know we can include an attestation if: // state.slot <= attestation_slot + SLOTS_PER_EPOCH // We approximate this check using the attestation's epoch, to avoid computing // the slot or relying on the committee cache of the finalized state. - self.aggregate_attestations - .write() - .retain(|_, attestations| { - // All the attestations in this bucket have the same data, so we only need to - // check the first one. - attestations - .first() - .map_or(false, |att| *current_epoch <= att.data.target.epoch + 1) - }); - } - - /// Removes old committee attestations. These should be used in the slot that they are - /// collected. We keep these around for one extra slot (i.e current_slot + 1) to account for - /// potential delays. - /// - /// The beacon chain should call this function every slot with the current slot as the - /// parameter. - pub fn prune_committee_attestations(&self, current_slot: &Slot) { - self.committee_attestations - .write() - .retain(|(slot, _), _| *slot + ATTESTATION_SUBNET_SLOT_DURATION >= *current_slot) + self.attestations.write().retain(|_, attestations| { + // All the attestations in this bucket have the same data, so we only need to + // check the first one. + attestations.first().map_or(false, |att| { + finalized_state.current_epoch() <= att.data.target.epoch + 1 + }) + }); } /// Insert a proposer slashing into the pool. @@ -451,8 +332,8 @@ impl OperationPool { } /// Prune all types of transactions given the latest finalized state. - // TODO: Michael - Can we shift these to per-epoch? pub fn prune_all(&self, finalized_state: &BeaconState, spec: &ChainSpec) { + self.prune_attestations(finalized_state); self.prune_proposer_slashings(finalized_state); self.prune_attester_slashings(finalized_state, spec); self.prune_voluntary_exits(finalized_state); @@ -502,8 +383,7 @@ fn prune_validator_hash_map( /// Compare two operation pools. impl PartialEq for OperationPool { fn eq(&self, other: &Self) -> bool { - *self.aggregate_attestations.read() == *other.aggregate_attestations.read() - && *self.committee_attestations.read() == *other.committee_attestations.read() + *self.attestations.read() == *other.attestations.read() && *self.attester_slashings.read() == *other.attester_slashings.read() && *self.proposer_slashings.read() == *other.proposer_slashings.read() && *self.voluntary_exits.read() == *other.voluntary_exits.read() @@ -669,16 +549,11 @@ mod release_tests { spec, None, ); - op_pool - .insert_aggregate_attestation(att, &state.fork, spec) - .unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } } - assert_eq!( - op_pool.aggregate_attestations.read().len(), - committees.len() - ); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!(op_pool.num_attestations(), committees.len()); // Before the min attestation inclusion delay, get_attestations shouldn't return anything. @@ -706,15 +581,13 @@ mod release_tests { ); // Prune attestations shouldn't do anything at this point. - let epoch = state.slot.epoch(MainnetEthSpec::slots_per_epoch()); - op_pool.prune_attestations(&epoch); + op_pool.prune_attestations(state); assert_eq!(op_pool.num_attestations(), committees.len()); // But once we advance to more than an epoch after the attestation, it should prune it // out of existence. state.slot += 2 * MainnetEthSpec::slots_per_epoch(); - let epoch = state.slot.epoch(MainnetEthSpec::slots_per_epoch()); - op_pool.prune_attestations(&epoch); + op_pool.prune_attestations(state); assert_eq!(op_pool.num_attestations(), 0); } @@ -745,11 +618,9 @@ mod release_tests { None, ); op_pool - .insert_aggregate_attestation(att.clone(), &state.fork, spec) - .unwrap(); - op_pool - .insert_aggregate_attestation(att, &state.fork, spec) + .insert_attestation(att.clone(), &state.fork, spec) .unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } assert_eq!(op_pool.num_attestations(), committees.len()); @@ -786,18 +657,13 @@ mod release_tests { spec, None, ); - op_pool - .insert_aggregate_attestation(att, &state.fork, spec) - .unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } } // The attestations should get aggregated into two attestations that comprise all // validators. - assert_eq!( - op_pool.aggregate_attestations.read().len(), - committees.len() - ); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!(op_pool.num_attestations(), 2 * committees.len()); } @@ -839,9 +705,7 @@ mod release_tests { spec, if i == 0 { None } else { Some(0) }, ); - op_pool - .insert_aggregate_attestation(att, &state.fork, spec) - .unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } }; @@ -856,10 +720,7 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!( - op_pool.aggregate_attestations.read().len(), - committees.len() - ); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!( op_pool.num_attestations(), (num_small + num_big) * committees.len() @@ -917,9 +778,7 @@ mod release_tests { spec, if i == 0 { None } else { Some(0) }, ); - op_pool - .insert_aggregate_attestation(att, &state.fork, spec) - .unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } }; @@ -934,10 +793,7 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!( - op_pool.aggregate_attestations.read().len(), - committees.len() - ); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!( op_pool.num_attestations(), (num_small + num_big) * committees.len() diff --git a/eth2/operation_pool/src/persistence.rs b/eth2/operation_pool/src/persistence.rs index ebb2fb7ba..592d4d18b 100644 --- a/eth2/operation_pool/src/persistence.rs +++ b/eth2/operation_pool/src/persistence.rs @@ -17,9 +17,7 @@ pub struct PersistedOperationPool { /// Mapping from attestation ID to attestation mappings. // We could save space by not storing the attestation ID, but it might // be difficult to make that roundtrip due to eager aggregation. - // Note: That we don't store the committee attestations as these are short lived and not worth - // persisting - aggregate_attestations: Vec<(AttestationId, Vec>)>, + attestations: Vec<(AttestationId, Vec>)>, /// Attester slashings. attester_slashings: Vec>, /// Proposer slashings. @@ -31,8 +29,8 @@ pub struct PersistedOperationPool { impl PersistedOperationPool { /// Convert an `OperationPool` into serializable form. pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { - let aggregate_attestations = operation_pool - .aggregate_attestations + let attestations = operation_pool + .attestations .read() .iter() .map(|(att_id, att)| (att_id.clone(), att.clone())) @@ -60,7 +58,7 @@ impl PersistedOperationPool { .collect(); Self { - aggregate_attestations, + attestations, attester_slashings, proposer_slashings, voluntary_exits, @@ -69,7 +67,7 @@ impl PersistedOperationPool { /// Reconstruct an `OperationPool`. pub fn into_operation_pool(self, state: &BeaconState, spec: &ChainSpec) -> OperationPool { - let aggregate_attestations = RwLock::new(self.aggregate_attestations.into_iter().collect()); + let attestations = RwLock::new(self.attestations.into_iter().collect()); let attester_slashings = RwLock::new( self.attester_slashings .into_iter() @@ -95,8 +93,7 @@ impl PersistedOperationPool { ); OperationPool { - aggregate_attestations, - committee_attestations: Default::default(), + attestations, attester_slashings, proposer_slashings, voluntary_exits, diff --git a/eth2/types/src/aggregate_and_proof.rs b/eth2/types/src/aggregate_and_proof.rs index d814349e8..e67bc5ffc 100644 --- a/eth2/types/src/aggregate_and_proof.rs +++ b/eth2/types/src/aggregate_and_proof.rs @@ -1,4 +1,6 @@ -use super::{Attestation, Domain, EthSpec, Fork, PublicKey, SecretKey, Signature, SignedRoot}; +use super::{ + Attestation, ChainSpec, Domain, EthSpec, Fork, PublicKey, SecretKey, Signature, SignedRoot, +}; use crate::test_utils::TestRandom; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; @@ -21,60 +23,45 @@ pub struct AggregateAndProof { } impl AggregateAndProof { - pub fn is_valid_selection_proof(&self, validator_pubkey: &PublicKey, fork: &Fork) -> bool { + /// Produces a new `AggregateAndProof` with a `selection_proof` generated by signing + /// `aggregate.data.slot` with `secret_key`. + pub fn from_aggregate( + aggregator_index: u64, + aggregate: Attestation, + secret_key: &SecretKey, + fork: &Fork, + spec: &ChainSpec, + ) -> Self { + let slot = aggregate.data.slot; + + let domain = spec.get_domain( + slot.epoch(T::slots_per_epoch()), + Domain::SelectionProof, + fork, + ); + + let message = slot.signing_root(domain); + + Self { + aggregator_index, + aggregate, + selection_proof: Signature::new(message.as_bytes(), secret_key), + } + } + + /// Returns `true` if `validator_pubkey` signed over `self.aggregate.data.slot`. + pub fn is_valid_selection_proof( + &self, + validator_pubkey: &PublicKey, + fork: &Fork, + spec: &ChainSpec, + ) -> bool { let target_epoch = self.aggregate.data.slot.epoch(T::slots_per_epoch()); - let domain = T::default_spec().get_domain(target_epoch, Domain::SelectionProof, fork); + let domain = spec.get_domain(target_epoch, Domain::SelectionProof, fork); let message = self.aggregate.data.slot.signing_root(domain); self.selection_proof .verify(message.as_bytes(), validator_pubkey) } - - /// Converts Self into a SignedAggregateAndProof. - pub fn into_signed(self, secret_key: &SecretKey, fork: &Fork) -> SignedAggregateAndProof { - let target_epoch = self.aggregate.data.slot.epoch(T::slots_per_epoch()); - let domain = T::default_spec().get_domain(target_epoch, Domain::AggregateAndProof, fork); - let sign_message = self.signing_root(domain); - let signature = Signature::new(sign_message.as_bytes(), &secret_key); - - SignedAggregateAndProof { - message: self, - signature, - } - } } impl SignedRoot for AggregateAndProof {} - -/// A Validators signed aggregate proof to publish on the `beacon_aggregate_and_proof` -/// gossipsub topic. -/// -/// Spec v0.10.1 -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom, TreeHash)] -#[serde(bound = "T: EthSpec")] -pub struct SignedAggregateAndProof { - /// The `AggregateAndProof` that was signed. - pub message: AggregateAndProof, - /// The aggregate attestation. - pub signature: Signature, -} - -impl SignedRoot for SignedAggregateAndProof {} - -impl SignedAggregateAndProof { - /// Verifies the signature of the `AggregateAndProof` - pub fn is_valid_signature(&self, validator_pubkey: &PublicKey, fork: &Fork) -> bool { - let target_epoch = self.message.aggregate.data.slot.epoch(T::slots_per_epoch()); - let domain = T::default_spec().get_domain(target_epoch, Domain::AggregateAndProof, fork); - let message = self.signing_root(domain); - self.signature.verify(message.as_bytes(), validator_pubkey) - } - - /// Verifies the signature of the `AggregateAndProof` as well the underlying selection_proof in - /// the contained `AggregateAndProof`. - pub fn is_valid(&self, validator_pubkey: &PublicKey, fork: &Fork) -> bool { - self.is_valid_signature(validator_pubkey, fork) - && self - .message - .is_valid_selection_proof(validator_pubkey, fork) - } -} diff --git a/eth2/types/src/lib.rs b/eth2/types/src/lib.rs index f77aac0d5..dbd71fefd 100644 --- a/eth2/types/src/lib.rs +++ b/eth2/types/src/lib.rs @@ -31,6 +31,7 @@ pub mod indexed_attestation; pub mod pending_attestation; pub mod proposer_slashing; pub mod relative_epoch; +pub mod signed_aggregate_and_proof; pub mod signed_beacon_block; pub mod signed_beacon_block_header; pub mod signed_voluntary_exit; @@ -46,7 +47,7 @@ mod tree_hash_impls; use ethereum_types::{H160, H256}; -pub use crate::aggregate_and_proof::{AggregateAndProof, SignedAggregateAndProof}; +pub use crate::aggregate_and_proof::AggregateAndProof; pub use crate::attestation::{Attestation, Error as AttestationError}; pub use crate::attestation_data::AttestationData; pub use crate::attestation_duty::AttestationDuty; @@ -70,6 +71,7 @@ pub use crate::indexed_attestation::IndexedAttestation; pub use crate::pending_attestation::PendingAttestation; pub use crate::proposer_slashing::ProposerSlashing; pub use crate::relative_epoch::{Error as RelativeEpochError, RelativeEpoch}; +pub use crate::signed_aggregate_and_proof::SignedAggregateAndProof; pub use crate::signed_beacon_block::SignedBeaconBlock; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; pub use crate::signed_voluntary_exit::SignedVoluntaryExit; diff --git a/eth2/types/src/signed_aggregate_and_proof.rs b/eth2/types/src/signed_aggregate_and_proof.rs new file mode 100644 index 000000000..5cde11992 --- /dev/null +++ b/eth2/types/src/signed_aggregate_and_proof.rs @@ -0,0 +1,68 @@ +use super::{ + AggregateAndProof, Attestation, ChainSpec, Domain, EthSpec, Fork, PublicKey, SecretKey, + Signature, SignedRoot, +}; +use crate::test_utils::TestRandom; +use serde_derive::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use test_random_derive::TestRandom; +use tree_hash_derive::TreeHash; + +/// A Validators signed aggregate proof to publish on the `beacon_aggregate_and_proof` +/// gossipsub topic. +/// +/// Spec v0.10.1 +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom, TreeHash)] +#[serde(bound = "T: EthSpec")] +pub struct SignedAggregateAndProof { + /// The `AggregateAndProof` that was signed. + pub message: AggregateAndProof, + /// The aggregate attestation. + pub signature: Signature, +} + +impl SignedAggregateAndProof { + /// Produces a new `SignedAggregateAndProof` with a `selection_proof` generated by signing + /// `aggregate.data.slot` with `secret_key`. + pub fn from_aggregate( + aggregator_index: u64, + aggregate: Attestation, + secret_key: &SecretKey, + fork: &Fork, + spec: &ChainSpec, + ) -> Self { + let message = + AggregateAndProof::from_aggregate(aggregator_index, aggregate, secret_key, fork, spec); + + let target_epoch = message.aggregate.data.slot.epoch(T::slots_per_epoch()); + let domain = spec.get_domain(target_epoch, Domain::AggregateAndProof, fork); + let signing_message = message.signing_root(domain); + + SignedAggregateAndProof { + message, + signature: Signature::new(signing_message.as_bytes(), &secret_key), + } + } + + /// Verifies the signature of the `AggregateAndProof` + pub fn is_valid_signature( + &self, + validator_pubkey: &PublicKey, + fork: &Fork, + spec: &ChainSpec, + ) -> bool { + let target_epoch = self.message.aggregate.data.slot.epoch(T::slots_per_epoch()); + let domain = spec.get_domain(target_epoch, Domain::AggregateAndProof, fork); + let message = self.message.signing_root(domain); + self.signature.verify(message.as_bytes(), validator_pubkey) + } + + /// Verifies the signature of the `AggregateAndProof` as well the underlying selection_proof in + /// the contained `AggregateAndProof`. + pub fn is_valid(&self, validator_pubkey: &PublicKey, fork: &Fork, spec: &ChainSpec) -> bool { + self.is_valid_signature(validator_pubkey, fork, spec) + && self + .message + .is_valid_selection_proof(validator_pubkey, fork, spec) + } +} diff --git a/eth2/types/src/test_utils/mod.rs b/eth2/types/src/test_utils/mod.rs index 0e5a6d41d..593be11ab 100644 --- a/eth2/types/src/test_utils/mod.rs +++ b/eth2/types/src/test_utils/mod.rs @@ -12,4 +12,4 @@ pub use generate_deterministic_keypairs::load_keypairs_from_yaml; pub use keypairs_file::KeypairsFile; pub use rand::{RngCore, SeedableRng}; pub use rand_xorshift::XorShiftRng; -pub use test_random::TestRandom; +pub use test_random::{test_random_instance, TestRandom}; diff --git a/eth2/types/src/test_utils/test_random.rs b/eth2/types/src/test_utils/test_random.rs index fa1a41815..f92d26717 100644 --- a/eth2/types/src/test_utils/test_random.rs +++ b/eth2/types/src/test_utils/test_random.rs @@ -1,5 +1,7 @@ use crate::*; use rand::RngCore; +use rand::SeedableRng; +use rand_xorshift::XorShiftRng; use ssz_types::typenum::Unsigned; mod address; @@ -12,6 +14,11 @@ mod secret_key; mod signature; mod signature_bytes; +pub fn test_random_instance() -> T { + let mut rng = XorShiftRng::from_seed([0x42; 16]); + T::random_for_test(&mut rng) +} + pub trait TestRandom { fn random_for_test(rng: &mut impl RngCore) -> Self; } diff --git a/eth2/utils/remote_beacon_node/src/lib.rs b/eth2/utils/remote_beacon_node/src/lib.rs index 25baf9906..8de9d0976 100644 --- a/eth2/utils/remote_beacon_node/src/lib.rs +++ b/eth2/utils/remote_beacon_node/src/lib.rs @@ -14,9 +14,9 @@ use ssz::Encode; use std::marker::PhantomData; use std::time::Duration; use types::{ - Attestation, AttesterSlashing, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Fork, - Hash256, ProposerSlashing, PublicKey, Signature, SignedAggregateAndProof, SignedBeaconBlock, - Slot, + Attestation, AttestationData, AttesterSlashing, BeaconBlock, BeaconState, CommitteeIndex, + Epoch, EthSpec, Fork, Hash256, ProposerSlashing, PublicKey, Signature, SignedAggregateAndProof, + SignedBeaconBlock, Slot, }; use url::Url; @@ -213,13 +213,12 @@ impl Validator { /// Produces an aggregate attestation. pub fn produce_aggregate_attestation( &self, - slot: Slot, - committee_index: CommitteeIndex, + attestation_data: &AttestationData, ) -> impl Future, Error = Error> { - let query_params = vec![ - ("slot".into(), format!("{}", slot)), - ("committee_index".into(), format!("{}", committee_index)), - ]; + let query_params = vec![( + "attestation_data".into(), + as_ssz_hex_string(attestation_data), + )]; let client = self.0.clone(); self.url("aggregate_attestation") @@ -337,7 +336,7 @@ impl Validator { url, vec![ ("slot".into(), format!("{}", slot.as_u64())), - ("randao_reveal".into(), signature_as_string(&randao_reveal)), + ("randao_reveal".into(), as_ssz_hex_string(&randao_reveal)), ], ) }) @@ -693,8 +692,8 @@ fn root_as_string(root: Hash256) -> String { format!("0x{:?}", root) } -fn signature_as_string(signature: &Signature) -> String { - format!("0x{}", hex::encode(signature.as_ssz_bytes())) +fn as_ssz_hex_string(item: &T) -> String { + format!("0x{}", hex::encode(item.as_ssz_bytes())) } impl From for Error { diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 0f63a2a03..c41fa6e1e 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -4,17 +4,17 @@ use crate::{ }; use environment::RuntimeContext; use exit_future::Signal; -use futures::{Future, Stream}; +use futures::{future, Future, Stream}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; -use rest_types::{ValidatorDuty, ValidatorSubscription}; -use slog::{crit, info, trace}; +use rest_types::ValidatorSubscription; +use slog::{crit, debug, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::{Delay, Interval}; -use types::{AggregateAndProof, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot}; /// Builds an `AttestationService`. pub struct AttestationServiceBuilder { @@ -193,13 +193,10 @@ impl AttestationService { // If a validator needs to publish an aggregate attestation, they must do so at 2/3 // through the slot. This delay triggers at this time - let aggregator_delay_instant = { - if duration_to_next_slot <= slot_duration / 3 { - Instant::now() - } else { - Instant::now() + duration_to_next_slot - (slot_duration / 3) - } - }; + let aggregate_production_instant = Instant::now() + + duration_to_next_slot + .checked_sub(slot_duration / 3) + .unwrap_or_else(|| Duration::from_secs(0)); let epoch = slot.epoch(E::slots_per_epoch()); // Check if any attestation subscriptions are required. If there a new attestation duties for @@ -217,61 +214,39 @@ impl AttestationService { .executor .spawn(self.clone().send_subscriptions(duties_to_subscribe)); - // Builds a map of committee index and spawn individual tasks to process raw attestations - // and aggregated attestations - let mut committee_indices: HashMap> = HashMap::new(); - let mut aggregator_committee_indices: HashMap> = - HashMap::new(); - - service + let duties_by_committee_index: HashMap> = service .duties_service .attesters(slot) .into_iter() - .for_each(|duty_and_state| { + .fold(HashMap::new(), |mut map, duty_and_state| { if let Some(committee_index) = duty_and_state.duty.attestation_committee_index { - let validator_duties = committee_indices - .entry(committee_index) - .or_insert_with(|| vec![]); - validator_duties.push(duty_and_state.duty.clone()); + let validator_duties = map.entry(committee_index).or_insert_with(|| vec![]); - // If this duty entails the validator aggregating attestations, perform - // aggregation tasks - if duty_and_state.is_aggregator() { - let validator_duties = aggregator_committee_indices - .entry(committee_index) - .or_insert_with(|| vec![]); - validator_duties.push(duty_and_state); - } + validator_duties.push(duty_and_state); } + + map }); - // spawns tasks for all required raw attestations production - committee_indices + // For each committee index for this slot: + // + // - Create and publish an `Attestation` for all required validators. + // - Create and publish `SignedAggregateAndProof` for all aggregating validators. + duties_by_committee_index .into_iter() .for_each(|(committee_index, validator_duties)| { // Spawn a separate task for each attestation. - service.context.executor.spawn(self.clone().do_attestation( - slot, - committee_index, - validator_duties, - )); - }); - - // spawns tasks for all aggregate attestation production - aggregator_committee_indices - .into_iter() - .for_each(|(committee_index, validator_duties)| { - // Spawn a separate task for each aggregate attestation. service .context .executor - .spawn(self.clone().do_aggregate_attestation( + .spawn(self.clone().publish_attestations_and_aggregates( slot, committee_index, validator_duties, - Delay::new(aggregator_delay_instant.clone()), + aggregate_production_instant, )); }); + Ok(()) } @@ -280,7 +255,7 @@ impl AttestationService { /// This informs the beacon node that the validator has a duty on a particular /// slot allowing the beacon node to connect to the required subnet and determine /// if attestations need to be aggregated. - fn send_subscriptions(&self, duties: Vec) -> impl Future { + fn send_subscriptions(&self, duties: Vec) -> impl Future { let mut validator_subscriptions = Vec::new(); let mut successful_duties = Vec::new(); @@ -293,12 +268,13 @@ impl AttestationService { // builds a list of subscriptions for duty in duties { if let Some((slot, attestation_committee_index, _, validator_index)) = - attestation_duties(&duty) + duty.attestation_duties() { - if let Some(slot_signature) = - self.validator_store.sign_slot(&duty.validator_pubkey, slot) + if let Some(slot_signature) = self + .validator_store + .sign_slot(duty.validator_pubkey(), slot) { - let is_aggregator_proof = if duty.is_aggregator(&slot_signature) { + let is_aggregator_proof = if duty.is_aggregator() { Some(slot_signature.clone()) } else { None @@ -348,7 +324,7 @@ impl AttestationService { for (duty, is_aggregator_proof) in successful_duties { service_1 .duties_service - .subscribe_duty(&duty, is_aggregator_proof); + .subscribe_duty(&duty.duty, is_aggregator_proof); } Ok(()) }) @@ -361,218 +337,377 @@ impl AttestationService { }) } - /// For a given `committee_index`, download the attestation, have each validator in - /// `validator_duties` sign it and send the collection back to the beacon node. - fn do_attestation( - &self, - slot: Slot, - committee_index: CommitteeIndex, - validator_duties: Vec, - ) -> impl Future { - let service_1 = self.clone(); - let service_2 = self.clone(); - let log_1 = self.context.log.clone(); - let log_2 = self.context.log.clone(); - - self.beacon_node - .http - .validator() - .produce_attestation(slot, committee_index) - .map_err(|e| format!("Failed to produce attestation: {:?}", e)) - .map(move |attestation| { - validator_duties.iter().fold( - (Vec::new(), attestation), - |(mut attestation_list, attestation), duty| { - let log = service_1.context.log.clone(); - - if let Some(( - duty_slot, - duty_committee_index, - validator_committee_position, - _, - )) = attestation_duties(duty) - { - let mut raw_attestation = attestation.clone(); - if duty_slot == slot && duty_committee_index == committee_index { - if service_1 - .validator_store - .sign_attestation( - &duty.validator_pubkey, - validator_committee_position, - &mut raw_attestation, - ) - .is_none() - { - crit!(log, "Failed to sign attestation"); - } else { - attestation_list.push(raw_attestation); - } - } else { - crit!(log, "Inconsistent validator duties during signing"); - } - } else { - crit!(log, "Missing validator duties when signing"); - } - - (attestation_list, attestation) - }, - ) - }) - .and_then(move |(attestation_list, attestation)| { - service_2 - .beacon_node - .http - .validator() - .publish_attestations(attestation_list.clone()) - .map(|publish_status| (attestation_list, attestation, publish_status)) - .map_err(|e| format!("Failed to publish attestations: {:?}", e)) - }) - .map( - move |(attestation_list, attestation, publish_status)| match publish_status { - PublishStatus::Valid => info!( - log_1, - "Successfully published attestation"; - "signatures" => attestation_list.len(), - "head_block" => format!("{}", attestation.data.beacon_block_root), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - ), - PublishStatus::Invalid(msg) => crit!( - log_1, - "Published attestation was invalid"; - "message" => msg, - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - ), - PublishStatus::Unknown => { - crit!(log_1, "Unknown condition when publishing attestation") - } - }, - ) - .map_err(move |e| { - crit!( - log_2, - "Error during attestation production"; - "error" => e - ) - }) - } - - /// For a given `committee_index`, download the aggregate attestation, have it signed by all validators - /// in `validator_duties` then upload it. - fn do_aggregate_attestation( + /// Performs the first step of the attesting process: downloading `Attestation` objects, + /// signing them and returning them to the validator. + /// + /// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#attesting + /// + /// ## Detail + /// + /// The given `validator_duties` should already be filtered to only contain those that match + /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. + fn publish_attestations_and_aggregates( &self, slot: Slot, committee_index: CommitteeIndex, validator_duties: Vec, - aggregator_delay: Delay, - ) -> impl Future { + aggregate_production_instant: Instant, + ) -> Box + Send> { + // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have + // any validators for the given `slot` and `committee_index`. + if validator_duties.is_empty() { + return Box::new(future::ok(())); + } + + let service_1 = self.clone(); + let log_1 = self.context.log.clone(); + let validator_duties_1 = Arc::new(validator_duties); + let validator_duties_2 = validator_duties_1.clone(); + + Box::new( + // Step 1. + // + // Download, sign and publish an `Attestation` for each validator. + self.produce_and_publish_attestations(slot, committee_index, validator_duties_1) + .and_then::<_, Box + Send>>( + move |attestation_opt| { + if let Some(attestation) = attestation_opt { + Box::new( + // Step 2. (Only if step 1 produced an attestation) + // + // First, wait until the `aggregation_production_instant` (2/3rds + // of the way though the slot). As verified in the + // `delay_triggers_when_in_the_past` test, this code will still run + // even if the instant has already elapsed. + // + // Then download, sign and publish a `SignedAggregateAndProof` for each + // validator that is elected to aggregate for this `slot` and + // `committee_index`. + Delay::new(aggregate_production_instant) + .map_err(|e| { + format!( + "Unable to create aggregate production delay: {:?}", + e + ) + }) + .and_then(move |()| { + service_1.produce_and_publish_aggregates( + attestation, + validator_duties_2, + ) + }), + ) + } else { + // If `produce_and_publish_attestations` did not download any + // attestations then there is no need to produce any + // `SignedAggregateAndProof`. + Box::new(future::ok(())) + } + }, + ) + .map_err(move |e| { + crit!( + log_1, + "Error during attestation routine"; + "error" => format!("{:?}", e), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ) + }), + ) + } + + /// Performs the first step of the attesting process: downloading `Attestation` objects, + /// signing them and returning them to the validator. + /// + /// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#attesting + /// + /// ## Detail + /// + /// The given `validator_duties` should already be filtered to only contain those that match + /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. + /// + /// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each + /// validator and the list of individually-signed `Attestation` objects is returned to the BN. + fn produce_and_publish_attestations( + &self, + slot: Slot, + committee_index: CommitteeIndex, + validator_duties: Arc>, + ) -> Box>, Error = String> + Send> { + if validator_duties.is_empty() { + return Box::new(future::ok(None)); + } + + let service = self.clone(); + + Box::new( + self.beacon_node + .http + .validator() + .produce_attestation(slot, committee_index) + .map_err(|e| format!("Failed to produce attestation: {:?}", e)) + .and_then::<_, Box + Send>>(move |attestation| { + let log = service.context.log.clone(); + + // For each validator in `validator_duties`, clone the `attestation` and add + // their signature. + // + // If any validator is unable to sign, they are simply skipped. + let signed_attestations = validator_duties + .iter() + .filter_map(|duty| { + let log = service.context.log.clone(); + + // Ensure that all required fields are present in the validator duty. + let (duty_slot, duty_committee_index, validator_committee_position, _) = + if let Some(tuple) = duty.attestation_duties() { + tuple + } else { + crit!( + log, + "Missing validator duties when signing"; + "duties" => format!("{:?}", duty) + ); + return None; + }; + + // Ensure that the attestation matches the duties. + if duty_slot != attestation.data.slot + || duty_committee_index != attestation.data.index + { + crit!( + log, + "Inconsistent validator duties during signing"; + "validator" => format!("{:?}", duty.validator_pubkey()), + "duty_slot" => duty_slot, + "attestation_slot" => attestation.data.slot, + "duty_index" => duty_committee_index, + "attestation_index" => attestation.data.index, + ); + return None; + } + + let mut attestation = attestation.clone(); + + if service + .validator_store + .sign_attestation( + duty.validator_pubkey(), + validator_committee_position, + &mut attestation, + ) + .is_none() + { + crit!( + log, + "Attestation signing refused"; + "validator" => format!("{:?}", duty.validator_pubkey()), + "slot" => attestation.data.slot, + "index" => attestation.data.index, + ); + None + } else { + Some(attestation) + } + }) + .collect::>(); + + // If there are any signed attestations, publish them to the BN. Otherwise, + // just return early. + if let Some(attestation) = signed_attestations.first().cloned() { + let num_attestations = signed_attestations.len(); + let beacon_block_root = attestation.data.beacon_block_root; + + Box::new( + service + .beacon_node + .http + .validator() + .publish_attestations(signed_attestations) + .map_err(|e| format!("Failed to publish attestation: {:?}", e)) + .map(move |publish_status| match publish_status { + PublishStatus::Valid => info!( + log, + "Successfully published attestations"; + "count" => num_attestations, + "head_block" => format!("{:?}", beacon_block_root), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ), + PublishStatus::Invalid(msg) => crit!( + log, + "Published attestation was invalid"; + "message" => msg, + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ), + PublishStatus::Unknown => { + crit!(log, "Unknown condition when publishing attestation") + } + }) + .map(|()| Some(attestation)), + ) + } else { + debug!( + log, + "No attestations to publish"; + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ); + Box::new(future::ok(None)) + } + }), + ) + } + + /// Performs the second step of the attesting process: downloading an aggregated `Attestation`, + /// converting it into a `SignedAggregateAndProof` and returning it to the BN. + /// + /// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#broadcast-aggregate + /// + /// ## Detail + /// + /// The given `validator_duties` should already be filtered to only contain those that match + /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. + /// + /// Only one aggregated `Attestation` is downloaded from the BN. It is then cloned and signed + /// by each validator and the list of individually-signed `SignedAggregateAndProof` objects is + /// returned to the BN. + fn produce_and_publish_aggregates( + &self, + attestation: Attestation, + validator_duties: Arc>, + ) -> impl Future { let service_1 = self.clone(); - let service_2 = self.clone(); let log_1 = self.context.log.clone(); - let log_2 = self.context.log.clone(); self.beacon_node .http .validator() - .produce_aggregate_attestation(slot, committee_index) + .produce_aggregate_attestation(&attestation.data) .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e)) - .map(move |attestation| { - validator_duties.iter().fold( - (Vec::new(), attestation), - |(mut aggregate_and_proof_list, attestation), duty_and_state| { - let log = service_1.context.log.clone(); - - match ( - duty_and_state.selection_proof(), - attestation_duties(&duty_and_state.duty), - ) { - ( - Some(selection_proof), - Some((duty_slot, duty_committee_index, _, aggregator_index)), - ) => { - let raw_attestation = attestation.clone(); - if duty_slot == slot && duty_committee_index == committee_index { - // build the `AggregateAndProof` struct for each validator - let aggregate_and_proof = AggregateAndProof { - aggregator_index, - aggregate: raw_attestation, - selection_proof, - }; - - if let Some(signed_aggregate_and_proof) = - service_1.validator_store.sign_aggregate_and_proof( - &duty_and_state.duty.validator_pubkey, - aggregate_and_proof, - ) - { - aggregate_and_proof_list.push(signed_aggregate_and_proof); - } else { - crit!(log, "Failed to sign attestation"); - } - } else { - crit!(log, "Inconsistent validator duties during signing"); - } + .and_then::<_, Box + Send>>( + move |aggregated_attestation| { + // For each validator, clone the `aggregated_attestation` and convert it into + // a `SignedAggregateAndProof` + let signed_aggregate_and_proofs = validator_duties + .iter() + .filter_map(|duty_and_state| { + // Do not produce a signed aggregator for validators that are not + // subscribed aggregators. + // + // Note: this function returns `false` if the validator is required to + // be an aggregator but has not yet subscribed. + if !duty_and_state.is_aggregator() { + return None; } - _ => crit!( - log, - "Missing validator duties or not aggregate duty when signing" - ), - } - (aggregate_and_proof_list, attestation) - }, - ) - }) - .and_then(move |(aggregate_and_proof_list, attestation)| { - aggregator_delay - .map(move |_| (aggregate_and_proof_list, attestation)) - .map_err(move |e| format!("Error during aggregator delay: {:?}", e)) - }) - .and_then(move |(aggregate_and_proof_list, attestation)| { - service_2 - .beacon_node - .http - .validator() - .publish_aggregate_and_proof(aggregate_and_proof_list) - .map(|publish_status| (attestation, publish_status)) - .map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e)) - }) - .map(move |(attestation, publish_status)| match publish_status { - PublishStatus::Valid => info!( - log_1, - "Successfully published aggregate attestations"; - "signatures" => attestation.aggregation_bits.num_set_bits(), - "head_block" => format!("{}", attestation.data.beacon_block_root), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - ), - PublishStatus::Invalid(msg) => crit!( - log_1, - "Published attestation was invalid"; - "message" => msg, - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - ), - PublishStatus::Unknown => { - crit!(log_1, "Unknown condition when publishing attestation") - } - }) - .map_err(move |e| { - crit!( - log_2, - "Error during attestation production"; - "error" => e - ) - }) + let (duty_slot, duty_committee_index, _, validator_index) = + duty_and_state.attestation_duties().or_else(|| { + crit!(log_1, "Missing duties when signing aggregate"); + None + })?; + + let pubkey = &duty_and_state.duty.validator_pubkey; + let slot = attestation.data.slot; + let committee_index = attestation.data.index; + + if duty_slot != slot || duty_committee_index != committee_index { + crit!(log_1, "Inconsistent validator duties during signing"); + return None; + } + + if let Some(signed_aggregate_and_proof) = service_1 + .validator_store + .produce_signed_aggregate_and_proof( + pubkey, + validator_index, + aggregated_attestation.clone(), + ) + { + Some(signed_aggregate_and_proof) + } else { + crit!(log_1, "Failed to sign attestation"); + None + } + }) + .collect::>(); + + // If there any signed aggregates and proofs were produced, publish them to the + // BN. + if let Some(first) = signed_aggregate_and_proofs.first().cloned() { + let attestation = first.message.aggregate; + + Box::new(service_1 + .beacon_node + .http + .validator() + .publish_aggregate_and_proof(signed_aggregate_and_proofs) + .map(|publish_status| (attestation, publish_status)) + .map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e)) + .map(move |(attestation, publish_status)| match publish_status { + PublishStatus::Valid => info!( + log_1, + "Successfully published aggregate attestations"; + "signatures" => attestation.aggregation_bits.num_set_bits(), + "head_block" => format!("{}", attestation.data.beacon_block_root), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + PublishStatus::Invalid(msg) => crit!( + log_1, + "Published attestation was invalid"; + "message" => msg, + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + PublishStatus::Unknown => { + crit!(log_1, "Unknown condition when publishing attestation") + } + })) + } else { + debug!( + log_1, + "No signed aggregates to publish"; + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ); + Box::new(future::ok(())) + } + }, + ) } } -fn attestation_duties(duty: &ValidatorDuty) -> Option<(Slot, CommitteeIndex, usize, u64)> { - Some(( - duty.attestation_slot?, - duty.attestation_committee_index?, - duty.attestation_committee_position?, - duty.validator_index?, - )) +#[cfg(test)] +mod tests { + use super::*; + use parking_lot::RwLock; + use tokio::runtime::Builder as RuntimeBuilder; + + /// This test is to ensure that a `tokio_timer::Delay` with an instant in the past will still + /// trigger. + #[test] + fn delay_triggers_when_in_the_past() { + let in_the_past = Instant::now() - Duration::from_secs(2); + let state_1 = Arc::new(RwLock::new(in_the_past)); + let state_2 = state_1.clone(); + + let future = Delay::new(in_the_past) + .map_err(|_| panic!("Failed to create duration")) + .map(move |()| *state_1.write() = Instant::now()); + + let mut runtime = RuntimeBuilder::new() + .core_threads(1) + .build() + .expect("failed to start runtime"); + + runtime.block_on(future).expect("failed to complete future"); + + assert!( + *state_2.read() > in_the_past, + "state should have been updated" + ); + } } diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 7360c173d..7dbbdebd5 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -14,7 +14,7 @@ use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; -use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot}; +use types::{ChainSpec, CommitteeIndex, Epoch, EthSpec, PublicKey, Slot}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); @@ -24,14 +24,6 @@ const PRUNE_DEPTH: u64 = 4; type BaseHashMap = HashMap>; -#[derive(Debug, Clone)] -pub struct DutyAndState { - /// The validator duty. - pub duty: ValidatorDuty, - /// The current state of the validator duty. - state: DutyState, -} - #[derive(Debug, Clone)] pub enum DutyState { /// This duty has not been subscribed to the beacon node. @@ -43,6 +35,14 @@ pub enum DutyState { SubscribedAggregator(Signature), } +#[derive(Debug, Clone)] +pub struct DutyAndState { + /// The validator duty. + pub duty: ValidatorDuty, + /// The current state of the validator duty. + state: DutyState, +} + impl DutyAndState { /// Returns true if the duty is an aggregation duty (the validator must aggregate all /// attestations. @@ -70,6 +70,21 @@ impl DutyAndState { DutyState::SubscribedAggregator(_) => true, } } + + /// Returns the information required for an attesting validator, if they are scheduled to + /// attest. + pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64)> { + Some(( + self.duty.attestation_slot?, + self.duty.attestation_committee_index?, + self.duty.attestation_committee_position?, + self.duty.validator_index?, + )) + } + + pub fn validator_pubkey(&self) -> &PublicKey { + &self.duty.validator_pubkey + } } impl TryInto for ValidatorDutyBytes { @@ -166,7 +181,7 @@ impl DutiesStore { /// Gets a list of validator duties for an epoch that have not yet been subscribed /// to the beacon node. // Note: Potentially we should modify the data structure to store the unsubscribed epoch duties for validator clients with a large number of validators. This currently adds an O(N) search each slot. - fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec { + fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec { self.store .read() .iter() @@ -179,7 +194,7 @@ impl DutiesStore { } }) }) - .map(|duties| duties.duty.clone()) + .cloned() .collect() } @@ -403,7 +418,7 @@ impl DutiesService { } /// Returns all `ValidatorDuty` that have not been registered with the beacon node. - pub fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec { + pub fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec { self.store.unsubscribed_epoch_duties(epoch) } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 9d529ab41..903698741 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -105,7 +105,7 @@ impl ProductionValidatorClient { .duration_since(UNIX_EPOCH) .into_future() .map_err(|e| format!("Unable to read system time: {:?}", e)) - .and_then(move |now| { + .and_then::<_, Box + Send>>(move |now| { let log = log_3.clone(); let genesis = Duration::from_secs(genesis_time); @@ -114,9 +114,7 @@ impl ProductionValidatorClient { // // If the validator client starts before genesis, it will get errors from // the slot clock. - let box_future: Box + Send> = if now - < genesis - { + if now < genesis { info!( log, "Starting node prior to genesis"; @@ -138,9 +136,7 @@ impl ProductionValidatorClient { ); Box::new(future::ok((beacon_node, remote_eth2_config, genesis_time))) - }; - - box_future + } }) }) .and_then(move |(beacon_node, remote_eth2_config, genesis_time)| { diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index ce9c44f8c..d41208ff1 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -12,8 +12,8 @@ use std::path::PathBuf; use std::sync::Arc; use tempdir::TempDir; use types::{ - AggregateAndProof, Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, - PublicKey, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot, + Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, PublicKey, Signature, + SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot, }; #[derive(Clone)] @@ -199,6 +199,28 @@ impl ValidatorStore { }) } + /// Signs an `AggregateAndProof` for a given validator. + /// + /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be + /// modified by actors other than the signing validator. + pub fn produce_signed_aggregate_and_proof( + &self, + validator_pubkey: &PublicKey, + validator_index: u64, + aggregate: Attestation, + ) -> Option> { + let validators = self.validators.read(); + let voting_keypair = validators.get(validator_pubkey)?.voting_keypair.as_ref()?; + + Some(SignedAggregateAndProof::from_aggregate( + validator_index, + aggregate, + &voting_keypair.sk, + &self.fork()?, + &self.spec, + )) + } + /// Signs a slot for a given validator. /// /// This is used to subscribe a validator to a beacon node and is used to determine if the @@ -217,19 +239,4 @@ impl ValidatorStore { Some(Signature::new(message.as_bytes(), &voting_keypair.sk)) } - - /// Signs an `AggregateAndProof` for a given validator. - /// - /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be - /// modified by actors other than the signing validator. - pub fn sign_aggregate_and_proof( - &self, - validator_pubkey: &PublicKey, - aggregate_and_proof: AggregateAndProof, - ) -> Option> { - let validators = self.validators.read(); - let voting_keypair = validators.get(validator_pubkey)?.voting_keypair.as_ref()?; - - Some(aggregate_and_proof.into_signed(&voting_keypair.sk, &self.fork()?)) - } }