diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 726898e99..b0c56e9db 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2,7 +2,7 @@ use crate::block_verification::{ check_block_relevancy, get_block_root, signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock, IntoFullyVerifiedBlock, }; -use crate::errors::{AttestationDropReason, BeaconChainError as Error, BlockProductionError}; +use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; @@ -1203,90 +1203,6 @@ impl BeaconChain { } } - /// Check that the `aggregator_index` in an aggregate attestation is as it should be. - // TODO: Check for optimisation/relevance - fn check_attestation_aggregator( - &self, - signed_aggregate_and_proof: &SignedAggregateAndProof, - indexed_attestation: &IndexedAttestation, - state: &BeaconState, - ) -> Result<(), AttestationDropReason> { - let aggregate_and_proof = &signed_aggregate_and_proof.message; - let attestation = &aggregate_and_proof.aggregate; - - // Check that aggregator index is part of the committee attesting (quick). - if !indexed_attestation - .attesting_indices - .contains(&aggregate_and_proof.aggregator_index) - { - return Err(AttestationDropReason::AggregatorNotInAttestingIndices); - } - // Check that the aggregator is allowed to be aggregating (medium, one hash). - else if !state - .is_aggregator( - attestation.data.slot, - attestation.data.index, - &aggregate_and_proof.selection_proof, - &self.spec, - ) - .unwrap_or(false) - { - return Err(AttestationDropReason::AggregatorNotSelected); - } - // Check that the signature is valid and the aggregator's selection proof is valid (slow-ish). Two sig verifications - if let Ok(Some(pubkey)) = - self.validator_pubkey(aggregate_and_proof.aggregator_index as usize) - { - if !signed_aggregate_and_proof.is_valid(&pubkey, &state.fork, &self.spec) { - Err(AttestationDropReason::AggregatorSignatureInvalid) - } else { - Ok(()) - } - } else { - Err(AttestationDropReason::AggregatorNotInAttestingIndices) - } - } - - /// Check that an attestation's slot doesn't make it ineligible for gossip. - fn check_attestation_slot_for_gossip( - &self, - attestation: &Attestation, - ) -> Result<(), AttestationDropReason> { - // `now_low_slot` is the slot of the current time minus MAXIMUM_GOSSIP_CLOCK_DISPARITY - // `now_high_slot` is the slot of the current time plus MAXIMUM_GOSSIP_CLOCK_DISPARITY - let (now_low_slot, now_high_slot) = self - .slot_clock - .now_duration() - .and_then(|now| { - let maximum_clock_disparity = - Duration::from_millis(self.spec.maximum_gossip_clock_disparity_millis); - let now_low_duration = now.checked_sub(maximum_clock_disparity)?; - let now_high_duration = now.checked_add(maximum_clock_disparity)?; - Some(( - self.slot_clock.slot_of(now_low_duration)?, - self.slot_clock.slot_of(now_high_duration)?, - )) - }) - .ok_or_else(|| AttestationDropReason::SlotClockError)?; - - let min_slot = attestation.data.slot; - let max_slot = min_slot + self.spec.attestation_propagation_slot_range; - - if now_high_slot < min_slot { - Err(AttestationDropReason::TooNew { - attestation_slot: min_slot, - now: now_high_slot, - }) - } else if now_low_slot > max_slot { - Err(AttestationDropReason::TooOld { - attestation_slot: min_slot, - now: now_low_slot, - }) - } else { - Ok(()) - } - } - /// Accept some exit and queue it for inclusion in an appropriate block. pub fn process_voluntary_exit( &self, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0ef3b0550..4b7138694 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -209,7 +209,7 @@ pub fn signature_verify_chain_segment( /// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on /// the p2p network. pub struct GossipVerifiedBlock { - block: SignedBeaconBlock, + pub block: SignedBeaconBlock, block_root: Hash256, parent: BeaconSnapshot, } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 2f18d41f8..ffe32f340 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -94,27 +94,3 @@ easy_from_to!(BlockProcessingError, BlockProductionError); easy_from_to!(BeaconStateError, BlockProductionError); easy_from_to!(SlotProcessingError, BlockProductionError); easy_from_to!(Eth1ChainError, BlockProductionError); - -/// A reason for not propagating an attestation (single or aggregate). -#[derive(Debug, PartialEq)] -pub enum AttestationDropReason { - SlotClockError, - TooNew { attestation_slot: Slot, now: Slot }, - TooOld { attestation_slot: Slot, now: Slot }, - NoValidationState(BeaconChainError), - BlockUnknown(Hash256), - BadIndexedAttestation(AttestationValidationError), - AggregatorNotInAttestingIndices, - AggregatorNotSelected, - AggregatorSignatureInvalid, - SignatureInvalid, -} - -/// A reason for not propagating a block. -#[derive(Debug, PartialEq)] -pub enum BlockDropReason { - SlotClockError, - TooNew { block_slot: Slot, now: Slot }, - // FIXME(sproul): add detail here - ValidationFailure, -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4c42a5dff..c9b84e879 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -25,7 +25,7 @@ pub use self::beacon_chain::{ }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::errors::{BeaconChainError, BlockProductionError}; -pub use block_verification::{BlockError, BlockProcessingOutcome}; +pub use block_verification::{BlockError, BlockProcessingOutcome, GossipVerifiedBlock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; pub use events::EventHandler; pub use fork_choice::ForkChoice; diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 9316484b4..838600638 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -10,10 +10,10 @@ use rand::seq::SliceRandom; use rest_types::ValidatorSubscription; use slog::{crit, debug, error, o, warn}; use slot_clock::SlotClock; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; -use types::{Attestation, EthSpec, SignedAggregateAndProof, Slot, SubnetId}; +use types::{Attestation, EthSpec, Slot, SubnetId}; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. @@ -44,8 +44,15 @@ pub enum AttServiceMessage { EnrRemove(SubnetId), /// Discover peers for a particular subnet. DiscoverPeers(SubnetId), - /// Propagate an attestation if it's deemed valid. - Propagate(PeerId, MessageId), +} + +/// A particular subnet at a given slot. +#[derive(PartialEq, Eq, Hash, Clone)] +struct ExactSubnet { + /// The `SubnetId` associated with this subnet. + pub subnet_id: SubnetId, + /// The `Slot` associated with this subnet. + pub slot: Slot, } pub struct AttestationService { @@ -62,13 +69,16 @@ pub struct AttestationService { random_subnets: HashSetDelay, /// A collection of timeouts for when to start searching for peers for a particular shard. - discover_peers: HashSetDelay<(SubnetId, Slot)>, + discover_peers: HashSetDelay, /// A collection of timeouts for when to subscribe to a shard subnet. - subscriptions: HashSetDelay<(SubnetId, Slot)>, + subscriptions: HashSetDelay, /// A collection of timeouts for when to unsubscribe from a shard subnet. - unsubscriptions: HashSetDelay<(SubnetId, Slot)>, + unsubscriptions: HashSetDelay, + + /// A mapping indicating the number of known aggregate validators for a given `ExactSubnet`. + _aggregate_validators_on_subnet: HashMap, /// A collection of seen validators. These dictate how many random subnets we should be /// subscribed to. As these time out, we unsubscribe for the required random subnets and update @@ -114,6 +124,7 @@ impl AttestationService { discover_peers: HashSetDelay::new(default_timeout), subscriptions: HashSetDelay::new(default_timeout), unsubscriptions: HashSetDelay::new(default_timeout), + _aggregate_validators_on_subnet: HashMap::new(), known_validators: HashSetDelay::new(last_seen_val_timeout), log, } @@ -146,42 +157,45 @@ impl AttestationService { subscription.attestation_committee_index % self.beacon_chain.spec.attestation_subnet_count, ); + + let exact_subnet = ExactSubnet { + subnet_id, + slot: subscription.slot, + }; // determine if we should run a discovery lookup request and request it if required - if let Err(e) = self.discover_peers_request(subnet_id, subscription.slot) { + if let Err(e) = self.discover_peers_request(exact_subnet.clone()) { warn!(self.log, "Discovery lookup request error"; "error" => e); } + // determine if the validator is an aggregator. If so, we subscribe to the subnet and + // if successful add the validator to a mapping of known aggregators for that exact + // subnet. + // NOTE: There is a chance that a fork occurs between now and when the validator needs + // to aggregate attestations. If this happens, the signature will no longer be valid + // and it could be likely the validator no longer needs to aggregate. More + // sophisticated logic should be added using known future forks. + // TODO: Implement + // set the subscription timer to subscribe to the next subnet if required - if let Err(e) = self.subscribe_to_subnet(subnet_id, subscription.slot) { + if let Err(e) = self.subscribe_to_subnet(exact_subnet) { warn!(self.log, "Subscription to subnet error"; "error" => e); + return Err(()); } } Ok(()) } - /// Handles un-aggregated attestations from the network. - pub fn handle_unaggregated_attestation( + /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip + /// verification, re-propagates and returns false. + pub fn should_process_attestation( &mut self, - message_id: MessageId, - peer_id: PeerId, - subnet: SubnetId, - attestation: Attestation, - ) { - // TODO: Handle attestation processing - self.events - .push_back(AttServiceMessage::Propagate(peer_id, message_id)); - } - - /// Handles aggregate attestations from the network. - pub fn handle_aggregate_attestation( - &mut self, - message_id: MessageId, - peer_id: PeerId, - attestation: SignedAggregateAndProof, - ) { - // TODO: Handle attestation processing - self.events - .push_back(AttServiceMessage::Propagate(peer_id, message_id)); + _message_id: &MessageId, + _peer_id: &PeerId, + _subnet: &SubnetId, + _attestation: &Attestation, + ) -> bool { + // TODO: Correctly handle validation aggregator checks + true } /* Internal private functions */ @@ -191,11 +205,7 @@ impl AttestationService { /// /// If there is sufficient time and no other request exists, queues a peer discovery request /// for the required subnet. - fn discover_peers_request( - &mut self, - subnet_id: SubnetId, - subscription_slot: Slot, - ) -> Result<(), &'static str> { + fn discover_peers_request(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> { let current_slot = self .beacon_chain .slot_clock @@ -204,13 +214,9 @@ impl AttestationService { let slot_duration = self.beacon_chain.slot_clock.slot_duration(); // if there is enough time to perform a discovery lookup - if subscription_slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) { + if exact_subnet.slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) { // check if a discovery request already exists - if self - .discover_peers - .get(&(subnet_id, subscription_slot)) - .is_some() - { + if self.discover_peers.get(&exact_subnet).is_some() { // already a request queued, end return Ok(()); } @@ -219,7 +225,7 @@ impl AttestationService { if self .events .iter() - .find(|event| event == &&AttServiceMessage::DiscoverPeers(subnet_id)) + .find(|event| event == &&AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id)) .is_some() { // already queued a discovery event @@ -227,12 +233,12 @@ impl AttestationService { } // if the slot is more than epoch away, add an event to start looking for peers - if subscription_slot + if exact_subnet.slot < current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD) { // then instantly add a discovery request self.events - .push_back(AttServiceMessage::DiscoverPeers(subnet_id)); + .push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id)); } else { // Queue the discovery event to be executed for // TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD @@ -245,7 +251,8 @@ impl AttestationService { .ok_or_else(|| "Unable to determine duration to next slot")?; // The -1 is done here to exclude the current slot duration, as we will use // `duration_to_next_slot`. - let slots_until_discover = subscription_slot + let slots_until_discover = exact_subnet + .slot .saturating_sub(current_slot) .saturating_sub(1u64) .saturating_sub(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD); @@ -254,7 +261,7 @@ impl AttestationService { }; self.discover_peers - .insert_at((subnet_id, subscription_slot), duration_to_discover); + .insert_at(exact_subnet, duration_to_discover); } } else { // TODO: Send the time frame needed to have a peer connected, so that we can @@ -270,11 +277,7 @@ impl AttestationService { /// subnet is required for the given slot. /// /// If required, adds a subscription event and an associated unsubscription event. - fn subscribe_to_subnet( - &mut self, - subnet_id: SubnetId, - subscription_slot: Slot, - ) -> Result<(), &'static str> { + fn subscribe_to_subnet(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> { // initialise timing variables let current_slot = self .beacon_chain @@ -282,42 +285,51 @@ impl AttestationService { .now() .ok_or_else(|| "Could not get the current slot")?; - // Ignore a subscription to the current slot. - if current_slot >= subscription_slot { - return Err("Could not subscribe to current slot, insufficient time"); - } + // Calculate the duration to the subscription event and the duration to the end event. + // There are two main cases. Attempting to subscribe to the current slot and all others. + let (duration_to_subscribe, expected_end_subscription_duration) = { + let duration_to_next_slot = self + .beacon_chain + .slot_clock + .duration_to_next_slot() + .ok_or_else(|| "Unable to determine duration to next slot")?; - let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - let advance_subscription_duration = slot_duration - .checked_div(ADVANCE_SUBSCRIBE_TIME) - .expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large"); - let duration_to_next_slot = self - .beacon_chain - .slot_clock - .duration_to_next_slot() - .ok_or_else(|| "Unable to determine duration to next slot")?; + if current_slot >= exact_subnet.slot { + (Duration::from_secs(0), duration_to_next_slot) + } else { + let slot_duration = self.beacon_chain.slot_clock.slot_duration(); + let advance_subscription_duration = slot_duration + .checked_div(ADVANCE_SUBSCRIBE_TIME) + .expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large"); - // calculate the time to subscribe to the subnet - let duration_to_subscribe = { - // The -1 is done here to exclude the current slot duration, as we will use - // `duration_to_next_slot`. - let slots_until_subscribe = subscription_slot - .saturating_sub(current_slot) - .saturating_sub(1u64); + // calculate the time to subscribe to the subnet + let duration_to_subscribe = { + // The -1 is done here to exclude the current slot duration, as we will use + // `duration_to_next_slot`. + let slots_until_subscribe = exact_subnet + .slot + .saturating_sub(current_slot) + .saturating_sub(1u64); - duration_to_next_slot - .checked_add(slot_duration) - .ok_or_else(|| "Overflow in adding slot_duration attestation time")? - .checked_mul(slots_until_subscribe.as_u64() as u32) - .ok_or_else(|| "Overflow in multiplying number of slots in attestation time")? - .checked_sub(advance_subscription_duration) - .unwrap_or_else(|| Duration::from_secs(0)) + duration_to_next_slot + .checked_add(slot_duration) + .ok_or_else(|| "Overflow in adding slot_duration attestation time")? + .checked_mul(slots_until_subscribe.as_u64() as u32) + .ok_or_else(|| { + "Overflow in multiplying number of slots in attestation time" + })? + .checked_sub(advance_subscription_duration) + .unwrap_or_else(|| Duration::from_secs(0)) + }; + // the duration until we no longer need this subscription. We assume a single slot is + // sufficient. + let expected_end_subscription_duration = duration_to_subscribe + + slot_duration + + std::cmp::min(advance_subscription_duration, duration_to_next_slot); + + (duration_to_subscribe, expected_end_subscription_duration) + } }; - // the duration until we no longer need this subscription. We assume a single slot is - // sufficient. - let expected_end_subscription_duration = duration_to_subscribe - + slot_duration - + std::cmp::min(advance_subscription_duration, duration_to_next_slot); // Checks on current subscriptions // Note: We may be connected to a long-lived random subnet. In this case we still add the @@ -326,24 +338,25 @@ impl AttestationService { // in-active. This case is checked on the subscription event (see `handle_subscriptions`). // Return if we already have a subscription for this subnet_id and slot - if self.subscriptions.contains(&(subnet_id, subscription_slot)) { + if self.subscriptions.contains(&exact_subnet) { return Ok(()); } // We are not currently subscribed and have no waiting subscription, create one self.subscriptions - .insert_at((subnet_id, subscription_slot), duration_to_subscribe); + .insert_at(exact_subnet.clone(), duration_to_subscribe); // if there is an unsubscription event for the slot prior, we remove it to prevent // unsubscriptions immediately after the subscription. We also want to minimize // subscription churn and maintain a consecutive subnet subscriptions. - self.unsubscriptions - .remove(&(subnet_id, subscription_slot.saturating_sub(1u64))); + let to_remove_subnet = ExactSubnet { + subnet_id: exact_subnet.subnet_id, + slot: exact_subnet.slot.saturating_sub(1u64), + }; + self.unsubscriptions.remove(&to_remove_subnet); // add an unsubscription event to remove ourselves from the subnet once completed - self.unsubscriptions.insert_at( - (subnet_id, subscription_slot), - expected_end_subscription_duration, - ); + self.unsubscriptions + .insert_at(exact_subnet, expected_end_subscription_duration); Ok(()) } @@ -394,9 +407,9 @@ impl AttestationService { for subnet_id in to_subscribe_subnets { // remove this subnet from any immediate subscription/un-subscription events self.subscriptions - .retain(|(map_subnet_id, _)| map_subnet_id != &subnet_id); + .retain(|exact_subnet| exact_subnet.subnet_id != subnet_id); self.unsubscriptions - .retain(|(map_subnet_id, _)| map_subnet_id != &subnet_id); + .retain(|exact_subnet| exact_subnet.subnet_id != subnet_id); // insert a new random subnet self.random_subnets.insert(subnet_id); @@ -423,10 +436,10 @@ impl AttestationService { /* A collection of functions that handle the various timeouts */ /// Request a discovery query to find peers for a particular subnet. - fn handle_discover_peers(&mut self, subnet_id: SubnetId, target_slot: Slot) { - debug!(self.log, "Searching for peers for subnet"; "subnet" => *subnet_id, "target_slot" => target_slot); + fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) { + debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot); self.events - .push_back(AttServiceMessage::DiscoverPeers(subnet_id)); + .push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id)); } /// A queued subscription is ready. @@ -434,9 +447,9 @@ impl AttestationService { /// We add subscriptions events even if we are already subscribed to a random subnet (as these /// can be unsubscribed at any time by inactive validators). If we are /// still subscribed at the time the event fires, we don't re-subscribe. - fn handle_subscriptions(&mut self, subnet_id: SubnetId, target_slot: Slot) { + fn handle_subscriptions(&mut self, exact_subnet: ExactSubnet) { // Check if the subnet currently exists as a long-lasting random subnet - if let Some(expiry) = self.random_subnets.get(&subnet_id) { + if let Some(expiry) = self.random_subnets.get(&exact_subnet.subnet_id) { // we are subscribed via a random subnet, if this is to expire during the time we need // to be subscribed, just extend the expiry let slot_duration = self.beacon_chain.slot_clock.slot_duration(); @@ -449,13 +462,13 @@ impl AttestationService { if expiry < &(Instant::now() + expected_end_subscription_duration) { self.random_subnets - .update_timeout(&subnet_id, expected_end_subscription_duration); + .update_timeout(&exact_subnet.subnet_id, expected_end_subscription_duration); } } else { // we are also not un-subscribing from a subnet if the next slot requires us to be // subscribed. Therefore there could be the case that we are already still subscribed // to the required subnet. In which case we do not issue another subscription request. - let topic_kind = &GossipKind::CommitteeIndex(subnet_id); + let topic_kind = &GossipKind::CommitteeIndex(exact_subnet.subnet_id); if self .network_globals .gossipsub_subscriptions @@ -465,9 +478,9 @@ impl AttestationService { .is_none() { // we are not already subscribed - debug!(self.log, "Subscribing to subnet"; "subnet" => *subnet_id, "target_slot" => target_slot.as_u64()); + debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64()); self.events - .push_back(AttServiceMessage::Subscribe(subnet_id)); + .push_back(AttServiceMessage::Subscribe(exact_subnet.subnet_id)); } } } @@ -476,20 +489,20 @@ impl AttestationService { /// /// Unsubscription events are added, even if we are subscribed to long-lived random subnets. If /// a random subnet is present, we do not unsubscribe from it. - fn handle_unsubscriptions(&mut self, subnet_id: SubnetId, target_slot: Slot) { + fn handle_unsubscriptions(&mut self, exact_subnet: ExactSubnet) { // Check if the subnet currently exists as a long-lasting random subnet - if self.random_subnets.contains(&subnet_id) { + if self.random_subnets.contains(&exact_subnet.subnet_id) { return; } - debug!(self.log, "Unsubscribing from subnet"; "subnet" => *subnet_id, "processed_slot" => target_slot.as_u64()); + debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64()); // various logic checks - if self.subscriptions.contains(&(subnet_id, target_slot)) { + if self.subscriptions.contains(&exact_subnet) { crit!(self.log, "Unsubscribing from a subnet in subscriptions"); } self.events - .push_back(AttServiceMessage::Unsubscribe(subnet_id)); + .push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id)); } /// A random subnet has expired. @@ -546,11 +559,11 @@ impl AttestationService { // will unsubscribe from the expired subnet. // If there is no subscription for this subnet,slot it is safe to add one, without // unsubscribing early from a required subnet - if self - .subscriptions - .get(&(**subnet_id, current_slot + 2)) - .is_none() - { + let subnet = ExactSubnet { + subnet_id: **subnet_id, + slot: current_slot + 2, + }; + if self.subscriptions.get(&subnet).is_none() { // set an unsubscribe event let duration_to_next_slot = self .beacon_chain @@ -563,7 +576,7 @@ impl AttestationService { // Set the unsubscription timeout let unsubscription_duration = duration_to_next_slot + slot_duration * 2; self.unsubscriptions - .insert_at((**subnet_id, current_slot + 2), unsubscription_duration); + .insert_at(subnet, unsubscription_duration); } // as the long lasting subnet subscription is being removed, remove the subnet_id from @@ -581,28 +594,28 @@ impl Stream for AttestationService { fn poll(&mut self) -> Poll, Self::Error> { // process any peer discovery events - while let Async::Ready(Some((subnet_id, target_slot))) = + while let Async::Ready(Some(exact_subnet)) = self.discover_peers.poll().map_err(|e| { error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e)); })? { - self.handle_discover_peers(subnet_id, target_slot); + self.handle_discover_peers(exact_subnet); } // process any subscription events - while let Async::Ready(Some((subnet_id, target_slot))) = self.subscriptions.poll().map_err(|e| { + while let Async::Ready(Some(exact_subnet)) = self.subscriptions.poll().map_err(|e| { error!(self.log, "Failed to check for subnet subscription times"; "error"=> format!("{}", e)); })? { - self.handle_subscriptions(subnet_id, target_slot); + self.handle_subscriptions(exact_subnet); } // process any un-subscription events - while let Async::Ready(Some((subnet_id, target_slot))) = self.unsubscriptions.poll().map_err(|e| { + while let Async::Ready(Some(exact_subnet)) = self.unsubscriptions.poll().map_err(|e| { error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> format!("{}", e)); })? { - self.handle_unsubscriptions(subnet_id, target_slot); + self.handle_unsubscriptions(exact_subnet); } // process any random subnet expiries diff --git a/beacon_node/network/src/attestation_service/process.rs b/beacon_node/network/src/attestation_service/process.rs deleted file mode 100644 index 4995b3a0b..000000000 --- a/beacon_node/network/src/attestation_service/process.rs +++ /dev/null @@ -1,55 +0,0 @@ - - /// Process a gossip message declaring a new attestation. - /// - /// Not currently implemented. - pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, _msg: Attestation) { - // TODO: Handle subnet gossip - /* - match self.chain.process_attestation(msg.clone()) { - Ok(outcome) => match outcome { - AttestationProcessingOutcome::Processed => { - debug!( - self.log, - "Processed attestation"; - "source" => "gossip", - "peer" => format!("{:?}",peer_id), - "block_root" => format!("{}", msg.data.beacon_block_root), - "slot" => format!("{}", msg.data.slot), - ); - } - AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { - // TODO: Maintain this attestation and re-process once sync completes - trace!( - self.log, - "Attestation for unknown block"; - "peer_id" => format!("{:?}", peer_id), - "block" => format!("{}", beacon_block_root) - ); - // we don't know the block, get the sync manager to handle the block lookup - self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); - } - AttestationProcessingOutcome::FutureEpoch { .. } - | AttestationProcessingOutcome::PastEpoch { .. } - | AttestationProcessingOutcome::UnknownTargetRoot { .. } - | AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation - AttestationProcessingOutcome::Invalid { .. } - | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } - | AttestationProcessingOutcome::AttestsToFutureBlock { .. } - | AttestationProcessingOutcome::InvalidSignature - | AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. } - | AttestationProcessingOutcome::BadTargetEpoch { .. } => { - // the peer has sent a bad attestation. Remove them. - self.network.disconnect(peer_id, GoodbyeReason::Fault); - } - }, - Err(_) => { - // error is logged during the processing therefore no error is logged here - trace!( - self.log, - "Erroneous gossip attestation ssz"; - "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), - ); - } - }; - */ - } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index f3c5f74ca..de2727436 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -8,7 +8,7 @@ pub mod processor; use crate::error; use crate::service::NetworkMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, MessageId, PeerId, PubsubData, PubsubMessage, RPCEvent, @@ -16,7 +16,7 @@ use eth2_libp2p::{ use futures::future::Future; use futures::stream::Stream; use processor::Processor; -use slog::{crit, debug, o, trace, warn}; +use slog::{debug, o, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -218,12 +218,43 @@ impl Router { gossip_message: PubsubMessage, ) { match gossip_message.data { - PubsubData::BeaconBlock(block) => { - if self.processor.should_forward_block(&block) { + // Attestations should never reach the router. + PubsubData::AggregateAndProofAttestation(aggregate_and_proof) => { + if self + .processor + .should_forward_aggregate_attestation(&aggregate_and_proof) + { self.propagate_message(id, peer_id.clone()); } - self.processor.on_block_gossip(peer_id, block); + self.processor.process_attestation_gossip( + peer_id, + aggregate_and_proof.message.aggregate, + AttestationType::Aggregated, + ); } + PubsubData::Attestation(subnet_attestation) => { + if self + .processor + .should_forward_attestation(&subnet_attestation.1) + { + self.propagate_message(id, peer_id.clone()); + } + self.processor.process_attestation_gossip( + peer_id, + subnet_attestation.1, + AttestationType::Unaggregated { should_store: true }, + ); + } + PubsubData::BeaconBlock(block) => match self.processor.should_forward_block(block) { + Ok(verified_block) => { + self.propagate_message(id, peer_id.clone()); + self.processor.on_block_gossip(peer_id, verified_block); + } + Err(e) => { + warn!(self.log, "Could not verify block for gossip"; + "error" => format!("{:?}", e)); + } + }, PubsubData::VoluntaryExit(_exit) => { // TODO: Apply more sophisticated validation self.propagate_message(id, peer_id.clone()); @@ -242,19 +273,6 @@ impl Router { // TODO: Handle attester slashings debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) ); } - // Attestations should never reach the router. - PubsubData::AggregateAndProofAttestation(_agg_attestation) => { - crit!( - self.log, - "Attestations should always be handled by the attestation service" - ); - } - PubsubData::Attestation(_boxed_subnet_attestation) => { - crit!( - self.log, - "Attestations should always be handled by the attestation service" - ); - } } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index a6c5edccd..4711cf4ce 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -1,6 +1,9 @@ use crate::service::NetworkMessage; use crate::sync::SyncMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; +use beacon_chain::{ + AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, BlockError, + BlockProcessingOutcome, GossipVerifiedBlock, +}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; @@ -9,7 +12,9 @@ use ssz::Encode; use std::sync::Arc; use store::Store; use tokio::sync::{mpsc, oneshot}; -use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{ + Attestation, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, Slot, +}; //TODO: Rate limit requests @@ -468,18 +473,11 @@ impl Processor { /// Template function to be called on a block to determine if the block should be propagated /// across the network. - pub fn should_forward_block(&mut self, _block: &Box>) -> bool { - // TODO: Propagate error once complete - // self.chain.should_forward_block(block).is_ok() - true - } - - /// Template function to be called on an attestation to determine if the attestation should be propagated - /// across the network. - pub fn _should_forward_attestation(&mut self, _attestation: &Attestation) -> bool { - // TODO: Propagate error once complete - //self.chain.should_forward_attestation(attestation).is_ok() - true + pub fn should_forward_block( + &mut self, + block: Box>, + ) -> Result, BlockError> { + self.chain.verify_block_for_gossip(*block) } /// Process a gossip message declaring a new block. @@ -490,9 +488,10 @@ impl Processor { pub fn on_block_gossip( &mut self, peer_id: PeerId, - block: Box>, + verified_block: GossipVerifiedBlock, ) -> bool { - match BlockProcessingOutcome::shim(self.chain.process_block(*block.clone())) { + let block = Box::new(verified_block.block.clone()); + match BlockProcessingOutcome::shim(self.chain.process_block(verified_block)) { Ok(outcome) => match outcome { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; @@ -551,6 +550,79 @@ impl Processor { // TODO: Update with correct block gossip checking true } + + /// Verifies the Aggregate attestation before propagating. + pub fn should_forward_aggregate_attestation( + &self, + _aggregate_and_proof: &Box>, + ) -> bool { + // TODO: Implement + true + } + + /// Verifies the attestation before propagating. + pub fn should_forward_attestation(&self, _aggregate: &Attestation) -> bool { + // TODO: Implement + true + } + + /// Process a new attestation received from gossipsub. + pub fn process_attestation_gossip( + &mut self, + peer_id: PeerId, + msg: Attestation, + attestation_type: AttestationType, + ) { + match self + .chain + .process_attestation(msg.clone(), attestation_type) + { + Ok(outcome) => match outcome { + AttestationProcessingOutcome::Processed => { + debug!( + self.log, + "Processed attestation"; + "source" => "gossip", + "peer" => format!("{:?}",peer_id), + "block_root" => format!("{}", msg.data.beacon_block_root), + "slot" => format!("{}", msg.data.slot), + ); + } + AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { + // TODO: Maintain this attestation and re-process once sync completes + trace!( + self.log, + "Attestation for unknown block"; + "peer_id" => format!("{:?}", peer_id), + "block" => format!("{}", beacon_block_root) + ); + // we don't know the block, get the sync manager to handle the block lookup + self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); + } + AttestationProcessingOutcome::FutureEpoch { .. } + | AttestationProcessingOutcome::PastEpoch { .. } + | AttestationProcessingOutcome::UnknownTargetRoot { .. } + | AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation + AttestationProcessingOutcome::Invalid { .. } + | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } + | AttestationProcessingOutcome::AttestsToFutureBlock { .. } + | AttestationProcessingOutcome::InvalidSignature + | AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. } + | AttestationProcessingOutcome::BadTargetEpoch { .. } => { + // the peer has sent a bad attestation. Remove them. + self.network.disconnect(peer_id, GoodbyeReason::Fault); + } + }, + Err(_) => { + // error is logged during the processing therefore no error is logged here + trace!( + self.log, + "Erroneous gossip attestation ssz"; + "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), + ); + } + }; + } } /// Build a `StatusMessage` representing the state of the given `beacon_chain`. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index cdee58f7b..ab8c2d3a1 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -274,11 +274,6 @@ fn spawn_service( AttServiceMessage::DiscoverPeers(subnet_id) => { service.libp2p.swarm.peers_request(subnet_id); }, - AttServiceMessage::Propagate(source, message_id) => { - service.libp2p - .swarm - .propagate_message(&source, message_id); - } } } @@ -317,13 +312,16 @@ fn spawn_service( match message.data { // attestation information gets processed in the attestation service - PubsubData::AggregateAndProofAttestation(signed_aggregate_and_proof) => { - service.attestation_service.handle_aggregate_attestation(id, source, *signed_aggregate_and_proof); - }, - PubsubData::Attestation(subnet_and_attestation) => { - let subnet = subnet_and_attestation.0; - let attestation = subnet_and_attestation.1; - service.attestation_service.handle_unaggregated_attestation(id, source, subnet, attestation); + PubsubData::Attestation(ref subnet_and_attestation) => { + let subnet = &subnet_and_attestation.0; + let attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we process + // the attestation + if service.attestation_service.should_process_attestation(&id, &source, subnet, attestation) { + service.router_send + .try_send(RouterMessage::PubsubMessage(id, source, message)) + .map_err(|_| { debug!(log, "Failed to send pubsub message to router");})?; + } } _ => { // all else is sent to the router