Process network attestations (#966)

* Start updating types

* WIP

* Signature hacking

* Existing EF tests passing with fake_crypto

* Updates

* Delete outdated API spec

* The refactor continues

* It compiles

* WIP test fixes

* All release tests passing bar genesis state parsing

* Update and test YamlConfig

* Update to spec v0.10 compatible BLS

* Updates to BLS EF tests

* Add EF test for AggregateVerify

And delete unused hash2curve tests for uncompressed points

* Update EF tests to v0.10.1

* Use optional block root correctly in block proc

* Use genesis fork in deposit domain. All tests pass

* Cargo fmt

* Fast aggregate verify test

* Update REST API docs

* Cargo fmt

* Fix unused import

* Bump spec tags to v0.10.1

* Add `seconds_per_eth1_block` to chainspec

* Update to timestamp based eth1 voting scheme

* Return None from `get_votes_to_consider` if block cache is empty

* Handle overflows in `is_candidate_block`

* Revert to failing tests

* Fix eth1 data sets test

* Choose default vote according to spec

* Fix collect_valid_votes tests

* Fix `get_votes_to_consider` to choose all eligible blocks

* Uncomment winning_vote tests

* Add comments; remove unused code

* Reduce seconds_per_eth1_block for simulation

* Addressed review comments

* Add test for default vote case

* Fix logs

* Remove unused functions

* Meter default eth1 votes

* Fix comments

* Address review comments; remove unused dependency

* Add first attempt at attestation proc. re-write

* Add version 2 of attestation processing

* Minor fixes

* Add validator pubkey cache

* Make get_indexed_attestation take a committee

* Link signature processing into new attn verification

* First working version

* Ensure pubkey cache is updated

* Add more metrics, slight optimizations

* Clone committee cache during attestation processing

* Update shuffling cache during block processing

* Remove old commented-out code

* Fix shuffling cache insert bug

* Used indexed attestation in fork choice

* Restructure attn processing, add metrics

* Add more detailed metrics

* Tidy, fix failing tests

* Fix failing tests, tidy

* Disable/delete two outdated tests

* Add new Pubkeys struct to signature_sets

* Refactor with functional approach

* Update beacon chain

* Remove decompressed member from pubkey bytes

* Add hashmap for indices lookup

* Add state cache, remove store cache

* Only build the head committee cache

* Change `get_attesting_indices` to use Vec

* Fix failing test

* Tidy

* Add pubkey cache persistence file

* Add more comments

* Integrate persistence file into builder

* Add pubkey cache tests

* Add data_dir to beacon chain builder

* Remove Option in pubkey cache persistence file

* Ensure consistency between datadir/data_dir

* Fix failing network test

* Tidy

* Fix todos

* Improve tests

* Fix compile error

* Fix compile error from merge

* Split up block processing metrics

* Tidy

* Refactor get_pubkey_from_state

* Remove commented-out code

* Rename state_cache -> checkpoint_cache

* Rename Checkpoint -> Snapshot

* Tidy, add comments

* Tidy up find_head function

* Change some checkpoint -> snapshot

* Add tests

* Expose max_len

* Remove dead code

* Tidy

* Fix bug

* Add sync-speed metric

* Add first attempt at VerifiableBlock

* Start integrating into beacon chain

* Integrate VerifiableBlock

* Rename VerifableBlock -> PartialBlockVerification

* Add start of typed methods

* Add progress

* Add further progress

* Rename structs

* Add full block verification to block_processing.rs

* Further beacon chain integration

* Update checks for gossip

* Add todo

* Start adding segement verification

* Add passing chain segement test

* Initial integration with batch sync

* Minor changes

* Tidy, add more error checking

* Start adding chain_segment tests

* Finish invalid signature tests

* Include single and gossip verified blocks in tests

* Add gossip verification tests

* Start adding docs

* Finish adding comments to block_processing.rs

* Rename block_processing.rs -> block_verification

* Start removing old block processing code

* Fixes beacon_chain compilation

* Fix project-wide compile errors

* Remove old code

* Fix bug with beacon proposer index

* Fix shim for BlockProcessingError

* Only process one epoch at a time

* Fix loop in chain segment processing

* Add caching for state.eth1_data_votes

* Add BeaconChain::validator_pubkey

* Revert "Add caching for state.eth1_data_votes"

This reverts commit cd73dcd6434fb8d8e6bf30c5356355598ea7b78e.

* Allow for clock disparity

* Ensure errors are returned during batch processing

* Add block gossip verification

* Connect attestation processing to beacon chain

* Optimistically subscribe to subnets on the same slot

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
Age Manning 2020-03-27 14:10:56 +11:00
parent 0d45250f80
commit cf2cb26caa
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
9 changed files with 270 additions and 332 deletions

View File

@ -2,7 +2,7 @@ use crate::block_verification::{
check_block_relevancy, get_block_root, signature_verify_chain_segment, BlockError,
FullyVerifiedBlock, GossipVerifiedBlock, IntoFullyVerifiedBlock,
};
use crate::errors::{AttestationDropReason, BeaconChainError as Error, BlockProductionError};
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::events::{EventHandler, EventKind};
use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
@ -1203,90 +1203,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
/// Check that the `aggregator_index` in an aggregate attestation is as it should be.
// TODO: Check for optimisation/relevance
fn check_attestation_aggregator(
&self,
signed_aggregate_and_proof: &SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: &IndexedAttestation<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
) -> Result<(), AttestationDropReason> {
let aggregate_and_proof = &signed_aggregate_and_proof.message;
let attestation = &aggregate_and_proof.aggregate;
// Check that aggregator index is part of the committee attesting (quick).
if !indexed_attestation
.attesting_indices
.contains(&aggregate_and_proof.aggregator_index)
{
return Err(AttestationDropReason::AggregatorNotInAttestingIndices);
}
// Check that the aggregator is allowed to be aggregating (medium, one hash).
else if !state
.is_aggregator(
attestation.data.slot,
attestation.data.index,
&aggregate_and_proof.selection_proof,
&self.spec,
)
.unwrap_or(false)
{
return Err(AttestationDropReason::AggregatorNotSelected);
}
// Check that the signature is valid and the aggregator's selection proof is valid (slow-ish). Two sig verifications
if let Ok(Some(pubkey)) =
self.validator_pubkey(aggregate_and_proof.aggregator_index as usize)
{
if !signed_aggregate_and_proof.is_valid(&pubkey, &state.fork, &self.spec) {
Err(AttestationDropReason::AggregatorSignatureInvalid)
} else {
Ok(())
}
} else {
Err(AttestationDropReason::AggregatorNotInAttestingIndices)
}
}
/// Check that an attestation's slot doesn't make it ineligible for gossip.
fn check_attestation_slot_for_gossip(
&self,
attestation: &Attestation<T::EthSpec>,
) -> Result<(), AttestationDropReason> {
// `now_low_slot` is the slot of the current time minus MAXIMUM_GOSSIP_CLOCK_DISPARITY
// `now_high_slot` is the slot of the current time plus MAXIMUM_GOSSIP_CLOCK_DISPARITY
let (now_low_slot, now_high_slot) = self
.slot_clock
.now_duration()
.and_then(|now| {
let maximum_clock_disparity =
Duration::from_millis(self.spec.maximum_gossip_clock_disparity_millis);
let now_low_duration = now.checked_sub(maximum_clock_disparity)?;
let now_high_duration = now.checked_add(maximum_clock_disparity)?;
Some((
self.slot_clock.slot_of(now_low_duration)?,
self.slot_clock.slot_of(now_high_duration)?,
))
})
.ok_or_else(|| AttestationDropReason::SlotClockError)?;
let min_slot = attestation.data.slot;
let max_slot = min_slot + self.spec.attestation_propagation_slot_range;
if now_high_slot < min_slot {
Err(AttestationDropReason::TooNew {
attestation_slot: min_slot,
now: now_high_slot,
})
} else if now_low_slot > max_slot {
Err(AttestationDropReason::TooOld {
attestation_slot: min_slot,
now: now_low_slot,
})
} else {
Ok(())
}
}
/// Accept some exit and queue it for inclusion in an appropriate block.
pub fn process_voluntary_exit(
&self,

View File

@ -209,7 +209,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
/// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on
/// the p2p network.
pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
block: SignedBeaconBlock<T::EthSpec>,
pub block: SignedBeaconBlock<T::EthSpec>,
block_root: Hash256,
parent: BeaconSnapshot<T::EthSpec>,
}

View File

@ -94,27 +94,3 @@ easy_from_to!(BlockProcessingError, BlockProductionError);
easy_from_to!(BeaconStateError, BlockProductionError);
easy_from_to!(SlotProcessingError, BlockProductionError);
easy_from_to!(Eth1ChainError, BlockProductionError);
/// A reason for not propagating an attestation (single or aggregate).
#[derive(Debug, PartialEq)]
pub enum AttestationDropReason {
SlotClockError,
TooNew { attestation_slot: Slot, now: Slot },
TooOld { attestation_slot: Slot, now: Slot },
NoValidationState(BeaconChainError),
BlockUnknown(Hash256),
BadIndexedAttestation(AttestationValidationError),
AggregatorNotInAttestingIndices,
AggregatorNotSelected,
AggregatorSignatureInvalid,
SignatureInvalid,
}
/// A reason for not propagating a block.
#[derive(Debug, PartialEq)]
pub enum BlockDropReason {
SlotClockError,
TooNew { block_slot: Slot, now: Slot },
// FIXME(sproul): add detail here
ValidationFailure,
}

View File

@ -25,7 +25,7 @@ pub use self::beacon_chain::{
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use block_verification::{BlockError, BlockProcessingOutcome};
pub use block_verification::{BlockError, BlockProcessingOutcome, GossipVerifiedBlock};
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};
pub use events::EventHandler;
pub use fork_choice::ForkChoice;

View File

@ -10,10 +10,10 @@ use rand::seq::SliceRandom;
use rest_types::ValidatorSubscription;
use slog::{crit, debug, error, o, warn};
use slot_clock::SlotClock;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use types::{Attestation, EthSpec, SignedAggregateAndProof, Slot, SubnetId};
use types::{Attestation, EthSpec, Slot, SubnetId};
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
@ -44,8 +44,15 @@ pub enum AttServiceMessage {
EnrRemove(SubnetId),
/// Discover peers for a particular subnet.
DiscoverPeers(SubnetId),
/// Propagate an attestation if it's deemed valid.
Propagate(PeerId, MessageId),
}
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone)]
struct ExactSubnet {
/// The `SubnetId` associated with this subnet.
pub subnet_id: SubnetId,
/// The `Slot` associated with this subnet.
pub slot: Slot,
}
pub struct AttestationService<T: BeaconChainTypes> {
@ -62,13 +69,16 @@ pub struct AttestationService<T: BeaconChainTypes> {
random_subnets: HashSetDelay<SubnetId>,
/// A collection of timeouts for when to start searching for peers for a particular shard.
discover_peers: HashSetDelay<(SubnetId, Slot)>,
discover_peers: HashSetDelay<ExactSubnet>,
/// A collection of timeouts for when to subscribe to a shard subnet.
subscriptions: HashSetDelay<(SubnetId, Slot)>,
subscriptions: HashSetDelay<ExactSubnet>,
/// A collection of timeouts for when to unsubscribe from a shard subnet.
unsubscriptions: HashSetDelay<(SubnetId, Slot)>,
unsubscriptions: HashSetDelay<ExactSubnet>,
/// A mapping indicating the number of known aggregate validators for a given `ExactSubnet`.
_aggregate_validators_on_subnet: HashMap<ExactSubnet, usize>,
/// A collection of seen validators. These dictate how many random subnets we should be
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update
@ -114,6 +124,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
discover_peers: HashSetDelay::new(default_timeout),
subscriptions: HashSetDelay::new(default_timeout),
unsubscriptions: HashSetDelay::new(default_timeout),
_aggregate_validators_on_subnet: HashMap::new(),
known_validators: HashSetDelay::new(last_seen_val_timeout),
log,
}
@ -146,42 +157,45 @@ impl<T: BeaconChainTypes> AttestationService<T> {
subscription.attestation_committee_index
% self.beacon_chain.spec.attestation_subnet_count,
);
let exact_subnet = ExactSubnet {
subnet_id,
slot: subscription.slot,
};
// determine if we should run a discovery lookup request and request it if required
if let Err(e) = self.discover_peers_request(subnet_id, subscription.slot) {
if let Err(e) = self.discover_peers_request(exact_subnet.clone()) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
}
// determine if the validator is an aggregator. If so, we subscribe to the subnet and
// if successful add the validator to a mapping of known aggregators for that exact
// subnet.
// NOTE: There is a chance that a fork occurs between now and when the validator needs
// to aggregate attestations. If this happens, the signature will no longer be valid
// and it could be likely the validator no longer needs to aggregate. More
// sophisticated logic should be added using known future forks.
// TODO: Implement
// set the subscription timer to subscribe to the next subnet if required
if let Err(e) = self.subscribe_to_subnet(subnet_id, subscription.slot) {
if let Err(e) = self.subscribe_to_subnet(exact_subnet) {
warn!(self.log, "Subscription to subnet error"; "error" => e);
return Err(());
}
}
Ok(())
}
/// Handles un-aggregated attestations from the network.
pub fn handle_unaggregated_attestation(
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
/// verification, re-propagates and returns false.
pub fn should_process_attestation(
&mut self,
message_id: MessageId,
peer_id: PeerId,
subnet: SubnetId,
attestation: Attestation<T::EthSpec>,
) {
// TODO: Handle attestation processing
self.events
.push_back(AttServiceMessage::Propagate(peer_id, message_id));
}
/// Handles aggregate attestations from the network.
pub fn handle_aggregate_attestation(
&mut self,
message_id: MessageId,
peer_id: PeerId,
attestation: SignedAggregateAndProof<T::EthSpec>,
) {
// TODO: Handle attestation processing
self.events
.push_back(AttServiceMessage::Propagate(peer_id, message_id));
_message_id: &MessageId,
_peer_id: &PeerId,
_subnet: &SubnetId,
_attestation: &Attestation<T::EthSpec>,
) -> bool {
// TODO: Correctly handle validation aggregator checks
true
}
/* Internal private functions */
@ -191,11 +205,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
///
/// If there is sufficient time and no other request exists, queues a peer discovery request
/// for the required subnet.
fn discover_peers_request(
&mut self,
subnet_id: SubnetId,
subscription_slot: Slot,
) -> Result<(), &'static str> {
fn discover_peers_request(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
let current_slot = self
.beacon_chain
.slot_clock
@ -204,13 +214,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// if there is enough time to perform a discovery lookup
if subscription_slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) {
if exact_subnet.slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) {
// check if a discovery request already exists
if self
.discover_peers
.get(&(subnet_id, subscription_slot))
.is_some()
{
if self.discover_peers.get(&exact_subnet).is_some() {
// already a request queued, end
return Ok(());
}
@ -219,7 +225,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if self
.events
.iter()
.find(|event| event == &&AttServiceMessage::DiscoverPeers(subnet_id))
.find(|event| event == &&AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id))
.is_some()
{
// already queued a discovery event
@ -227,12 +233,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
}
// if the slot is more than epoch away, add an event to start looking for peers
if subscription_slot
if exact_subnet.slot
< current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// then instantly add a discovery request
self.events
.push_back(AttServiceMessage::DiscoverPeers(subnet_id));
.push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id));
} else {
// Queue the discovery event to be executed for
// TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD
@ -245,7 +251,8 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.ok_or_else(|| "Unable to determine duration to next slot")?;
// The -1 is done here to exclude the current slot duration, as we will use
// `duration_to_next_slot`.
let slots_until_discover = subscription_slot
let slots_until_discover = exact_subnet
.slot
.saturating_sub(current_slot)
.saturating_sub(1u64)
.saturating_sub(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD);
@ -254,7 +261,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
};
self.discover_peers
.insert_at((subnet_id, subscription_slot), duration_to_discover);
.insert_at(exact_subnet, duration_to_discover);
}
} else {
// TODO: Send the time frame needed to have a peer connected, so that we can
@ -270,11 +277,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// subnet is required for the given slot.
///
/// If required, adds a subscription event and an associated unsubscription event.
fn subscribe_to_subnet(
&mut self,
subnet_id: SubnetId,
subscription_slot: Slot,
) -> Result<(), &'static str> {
fn subscribe_to_subnet(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
// initialise timing variables
let current_slot = self
.beacon_chain
@ -282,26 +285,29 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.now()
.ok_or_else(|| "Could not get the current slot")?;
// Ignore a subscription to the current slot.
if current_slot >= subscription_slot {
return Err("Could not subscribe to current slot, insufficient time");
}
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
let advance_subscription_duration = slot_duration
.checked_div(ADVANCE_SUBSCRIBE_TIME)
.expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large");
// Calculate the duration to the subscription event and the duration to the end event.
// There are two main cases. Attempting to subscribe to the current slot and all others.
let (duration_to_subscribe, expected_end_subscription_duration) = {
let duration_to_next_slot = self
.beacon_chain
.slot_clock
.duration_to_next_slot()
.ok_or_else(|| "Unable to determine duration to next slot")?;
if current_slot >= exact_subnet.slot {
(Duration::from_secs(0), duration_to_next_slot)
} else {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
let advance_subscription_duration = slot_duration
.checked_div(ADVANCE_SUBSCRIBE_TIME)
.expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large");
// calculate the time to subscribe to the subnet
let duration_to_subscribe = {
// The -1 is done here to exclude the current slot duration, as we will use
// `duration_to_next_slot`.
let slots_until_subscribe = subscription_slot
let slots_until_subscribe = exact_subnet
.slot
.saturating_sub(current_slot)
.saturating_sub(1u64);
@ -309,7 +315,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.checked_add(slot_duration)
.ok_or_else(|| "Overflow in adding slot_duration attestation time")?
.checked_mul(slots_until_subscribe.as_u64() as u32)
.ok_or_else(|| "Overflow in multiplying number of slots in attestation time")?
.ok_or_else(|| {
"Overflow in multiplying number of slots in attestation time"
})?
.checked_sub(advance_subscription_duration)
.unwrap_or_else(|| Duration::from_secs(0))
};
@ -319,6 +327,10 @@ impl<T: BeaconChainTypes> AttestationService<T> {
+ slot_duration
+ std::cmp::min(advance_subscription_duration, duration_to_next_slot);
(duration_to_subscribe, expected_end_subscription_duration)
}
};
// Checks on current subscriptions
// Note: We may be connected to a long-lived random subnet. In this case we still add the
// subscription timeout and check this case when the timeout fires. This is because a
@ -326,24 +338,25 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).
// Return if we already have a subscription for this subnet_id and slot
if self.subscriptions.contains(&(subnet_id, subscription_slot)) {
if self.subscriptions.contains(&exact_subnet) {
return Ok(());
}
// We are not currently subscribed and have no waiting subscription, create one
self.subscriptions
.insert_at((subnet_id, subscription_slot), duration_to_subscribe);
.insert_at(exact_subnet.clone(), duration_to_subscribe);
// if there is an unsubscription event for the slot prior, we remove it to prevent
// unsubscriptions immediately after the subscription. We also want to minimize
// subscription churn and maintain a consecutive subnet subscriptions.
self.unsubscriptions
.remove(&(subnet_id, subscription_slot.saturating_sub(1u64)));
let to_remove_subnet = ExactSubnet {
subnet_id: exact_subnet.subnet_id,
slot: exact_subnet.slot.saturating_sub(1u64),
};
self.unsubscriptions.remove(&to_remove_subnet);
// add an unsubscription event to remove ourselves from the subnet once completed
self.unsubscriptions.insert_at(
(subnet_id, subscription_slot),
expected_end_subscription_duration,
);
self.unsubscriptions
.insert_at(exact_subnet, expected_end_subscription_duration);
Ok(())
}
@ -394,9 +407,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
for subnet_id in to_subscribe_subnets {
// remove this subnet from any immediate subscription/un-subscription events
self.subscriptions
.retain(|(map_subnet_id, _)| map_subnet_id != &subnet_id);
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
self.unsubscriptions
.retain(|(map_subnet_id, _)| map_subnet_id != &subnet_id);
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
// insert a new random subnet
self.random_subnets.insert(subnet_id);
@ -423,10 +436,10 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/* A collection of functions that handle the various timeouts */
/// Request a discovery query to find peers for a particular subnet.
fn handle_discover_peers(&mut self, subnet_id: SubnetId, target_slot: Slot) {
debug!(self.log, "Searching for peers for subnet"; "subnet" => *subnet_id, "target_slot" => target_slot);
fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) {
debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot);
self.events
.push_back(AttServiceMessage::DiscoverPeers(subnet_id));
.push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id));
}
/// A queued subscription is ready.
@ -434,9 +447,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// We add subscriptions events even if we are already subscribed to a random subnet (as these
/// can be unsubscribed at any time by inactive validators). If we are
/// still subscribed at the time the event fires, we don't re-subscribe.
fn handle_subscriptions(&mut self, subnet_id: SubnetId, target_slot: Slot) {
fn handle_subscriptions(&mut self, exact_subnet: ExactSubnet) {
// Check if the subnet currently exists as a long-lasting random subnet
if let Some(expiry) = self.random_subnets.get(&subnet_id) {
if let Some(expiry) = self.random_subnets.get(&exact_subnet.subnet_id) {
// we are subscribed via a random subnet, if this is to expire during the time we need
// to be subscribed, just extend the expiry
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
@ -449,13 +462,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if expiry < &(Instant::now() + expected_end_subscription_duration) {
self.random_subnets
.update_timeout(&subnet_id, expected_end_subscription_duration);
.update_timeout(&exact_subnet.subnet_id, expected_end_subscription_duration);
}
} else {
// we are also not un-subscribing from a subnet if the next slot requires us to be
// subscribed. Therefore there could be the case that we are already still subscribed
// to the required subnet. In which case we do not issue another subscription request.
let topic_kind = &GossipKind::CommitteeIndex(subnet_id);
let topic_kind = &GossipKind::CommitteeIndex(exact_subnet.subnet_id);
if self
.network_globals
.gossipsub_subscriptions
@ -465,9 +478,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.is_none()
{
// we are not already subscribed
debug!(self.log, "Subscribing to subnet"; "subnet" => *subnet_id, "target_slot" => target_slot.as_u64());
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64());
self.events
.push_back(AttServiceMessage::Subscribe(subnet_id));
.push_back(AttServiceMessage::Subscribe(exact_subnet.subnet_id));
}
}
}
@ -476,20 +489,20 @@ impl<T: BeaconChainTypes> AttestationService<T> {
///
/// Unsubscription events are added, even if we are subscribed to long-lived random subnets. If
/// a random subnet is present, we do not unsubscribe from it.
fn handle_unsubscriptions(&mut self, subnet_id: SubnetId, target_slot: Slot) {
fn handle_unsubscriptions(&mut self, exact_subnet: ExactSubnet) {
// Check if the subnet currently exists as a long-lasting random subnet
if self.random_subnets.contains(&subnet_id) {
if self.random_subnets.contains(&exact_subnet.subnet_id) {
return;
}
debug!(self.log, "Unsubscribing from subnet"; "subnet" => *subnet_id, "processed_slot" => target_slot.as_u64());
debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64());
// various logic checks
if self.subscriptions.contains(&(subnet_id, target_slot)) {
if self.subscriptions.contains(&exact_subnet) {
crit!(self.log, "Unsubscribing from a subnet in subscriptions");
}
self.events
.push_back(AttServiceMessage::Unsubscribe(subnet_id));
.push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id));
}
/// A random subnet has expired.
@ -546,11 +559,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// will unsubscribe from the expired subnet.
// If there is no subscription for this subnet,slot it is safe to add one, without
// unsubscribing early from a required subnet
if self
.subscriptions
.get(&(**subnet_id, current_slot + 2))
.is_none()
{
let subnet = ExactSubnet {
subnet_id: **subnet_id,
slot: current_slot + 2,
};
if self.subscriptions.get(&subnet).is_none() {
// set an unsubscribe event
let duration_to_next_slot = self
.beacon_chain
@ -563,7 +576,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// Set the unsubscription timeout
let unsubscription_duration = duration_to_next_slot + slot_duration * 2;
self.unsubscriptions
.insert_at((**subnet_id, current_slot + 2), unsubscription_duration);
.insert_at(subnet, unsubscription_duration);
}
// as the long lasting subnet subscription is being removed, remove the subnet_id from
@ -581,28 +594,28 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// process any peer discovery events
while let Async::Ready(Some((subnet_id, target_slot))) =
while let Async::Ready(Some(exact_subnet)) =
self.discover_peers.poll().map_err(|e| {
error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e));
})?
{
self.handle_discover_peers(subnet_id, target_slot);
self.handle_discover_peers(exact_subnet);
}
// process any subscription events
while let Async::Ready(Some((subnet_id, target_slot))) = self.subscriptions.poll().map_err(|e| {
while let Async::Ready(Some(exact_subnet)) = self.subscriptions.poll().map_err(|e| {
error!(self.log, "Failed to check for subnet subscription times"; "error"=> format!("{}", e));
})?
{
self.handle_subscriptions(subnet_id, target_slot);
self.handle_subscriptions(exact_subnet);
}
// process any un-subscription events
while let Async::Ready(Some((subnet_id, target_slot))) = self.unsubscriptions.poll().map_err(|e| {
while let Async::Ready(Some(exact_subnet)) = self.unsubscriptions.poll().map_err(|e| {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> format!("{}", e));
})?
{
self.handle_unsubscriptions(subnet_id, target_slot);
self.handle_unsubscriptions(exact_subnet);
}
// process any random subnet expiries

View File

@ -1,55 +0,0 @@
/// Process a gossip message declaring a new attestation.
///
/// Not currently implemented.
pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, _msg: Attestation<T::EthSpec>) {
// TODO: Handle subnet gossip
/*
match self.chain.process_attestation(msg.clone()) {
Ok(outcome) => match outcome {
AttestationProcessingOutcome::Processed => {
debug!(
self.log,
"Processed attestation";
"source" => "gossip",
"peer" => format!("{:?}",peer_id),
"block_root" => format!("{}", msg.data.beacon_block_root),
"slot" => format!("{}", msg.data.slot),
);
}
AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => {
// TODO: Maintain this attestation and re-process once sync completes
trace!(
self.log,
"Attestation for unknown block";
"peer_id" => format!("{:?}", peer_id),
"block" => format!("{}", beacon_block_root)
);
// we don't know the block, get the sync manager to handle the block lookup
self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root));
}
AttestationProcessingOutcome::FutureEpoch { .. }
| AttestationProcessingOutcome::PastEpoch { .. }
| AttestationProcessingOutcome::UnknownTargetRoot { .. }
| AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation
AttestationProcessingOutcome::Invalid { .. }
| AttestationProcessingOutcome::EmptyAggregationBitfield { .. }
| AttestationProcessingOutcome::AttestsToFutureBlock { .. }
| AttestationProcessingOutcome::InvalidSignature
| AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. }
| AttestationProcessingOutcome::BadTargetEpoch { .. } => {
// the peer has sent a bad attestation. Remove them.
self.network.disconnect(peer_id, GoodbyeReason::Fault);
}
},
Err(_) => {
// error is logged during the processing therefore no error is logged here
trace!(
self.log,
"Erroneous gossip attestation ssz";
"ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())),
);
}
};
*/
}

View File

@ -8,7 +8,7 @@ pub mod processor;
use crate::error;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes};
use eth2_libp2p::{
rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination},
MessageId, PeerId, PubsubData, PubsubMessage, RPCEvent,
@ -16,7 +16,7 @@ use eth2_libp2p::{
use futures::future::Future;
use futures::stream::Stream;
use processor::Processor;
use slog::{crit, debug, o, trace, warn};
use slog::{debug, o, trace, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
@ -218,12 +218,43 @@ impl<T: BeaconChainTypes> Router<T> {
gossip_message: PubsubMessage<T::EthSpec>,
) {
match gossip_message.data {
PubsubData::BeaconBlock(block) => {
if self.processor.should_forward_block(&block) {
// Attestations should never reach the router.
PubsubData::AggregateAndProofAttestation(aggregate_and_proof) => {
if self
.processor
.should_forward_aggregate_attestation(&aggregate_and_proof)
{
self.propagate_message(id, peer_id.clone());
}
self.processor.on_block_gossip(peer_id, block);
self.processor.process_attestation_gossip(
peer_id,
aggregate_and_proof.message.aggregate,
AttestationType::Aggregated,
);
}
PubsubData::Attestation(subnet_attestation) => {
if self
.processor
.should_forward_attestation(&subnet_attestation.1)
{
self.propagate_message(id, peer_id.clone());
}
self.processor.process_attestation_gossip(
peer_id,
subnet_attestation.1,
AttestationType::Unaggregated { should_store: true },
);
}
PubsubData::BeaconBlock(block) => match self.processor.should_forward_block(block) {
Ok(verified_block) => {
self.propagate_message(id, peer_id.clone());
self.processor.on_block_gossip(peer_id, verified_block);
}
Err(e) => {
warn!(self.log, "Could not verify block for gossip";
"error" => format!("{:?}", e));
}
},
PubsubData::VoluntaryExit(_exit) => {
// TODO: Apply more sophisticated validation
self.propagate_message(id, peer_id.clone());
@ -242,19 +273,6 @@ impl<T: BeaconChainTypes> Router<T> {
// TODO: Handle attester slashings
debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) );
}
// Attestations should never reach the router.
PubsubData::AggregateAndProofAttestation(_agg_attestation) => {
crit!(
self.log,
"Attestations should always be handled by the attestation service"
);
}
PubsubData::Attestation(_boxed_subnet_attestation) => {
crit!(
self.log,
"Attestations should always be handled by the attestation service"
);
}
}
}

View File

@ -1,6 +1,9 @@
use crate::service::NetworkMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use beacon_chain::{
AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, BlockError,
BlockProcessingOutcome, GossipVerifiedBlock,
};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
@ -9,7 +12,9 @@ use ssz::Encode;
use std::sync::Arc;
use store::Store;
use tokio::sync::{mpsc, oneshot};
use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{
Attestation, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, Slot,
};
//TODO: Rate limit requests
@ -468,18 +473,11 @@ impl<T: BeaconChainTypes> Processor<T> {
/// Template function to be called on a block to determine if the block should be propagated
/// across the network.
pub fn should_forward_block(&mut self, _block: &Box<SignedBeaconBlock<T::EthSpec>>) -> bool {
// TODO: Propagate error once complete
// self.chain.should_forward_block(block).is_ok()
true
}
/// Template function to be called on an attestation to determine if the attestation should be propagated
/// across the network.
pub fn _should_forward_attestation(&mut self, _attestation: &Attestation<T::EthSpec>) -> bool {
// TODO: Propagate error once complete
//self.chain.should_forward_attestation(attestation).is_ok()
true
pub fn should_forward_block(
&mut self,
block: Box<SignedBeaconBlock<T::EthSpec>>,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
self.chain.verify_block_for_gossip(*block)
}
/// Process a gossip message declaring a new block.
@ -490,9 +488,10 @@ impl<T: BeaconChainTypes> Processor<T> {
pub fn on_block_gossip(
&mut self,
peer_id: PeerId,
block: Box<SignedBeaconBlock<T::EthSpec>>,
verified_block: GossipVerifiedBlock<T>,
) -> bool {
match BlockProcessingOutcome::shim(self.chain.process_block(*block.clone())) {
let block = Box::new(verified_block.block.clone());
match BlockProcessingOutcome::shim(self.chain.process_block(verified_block)) {
Ok(outcome) => match outcome {
BlockProcessingOutcome::Processed { .. } => {
trace!(self.log, "Gossipsub block processed";
@ -551,6 +550,79 @@ impl<T: BeaconChainTypes> Processor<T> {
// TODO: Update with correct block gossip checking
true
}
/// Verifies the Aggregate attestation before propagating.
pub fn should_forward_aggregate_attestation(
&self,
_aggregate_and_proof: &Box<SignedAggregateAndProof<T::EthSpec>>,
) -> bool {
// TODO: Implement
true
}
/// Verifies the attestation before propagating.
pub fn should_forward_attestation(&self, _aggregate: &Attestation<T::EthSpec>) -> bool {
// TODO: Implement
true
}
/// Process a new attestation received from gossipsub.
pub fn process_attestation_gossip(
&mut self,
peer_id: PeerId,
msg: Attestation<T::EthSpec>,
attestation_type: AttestationType,
) {
match self
.chain
.process_attestation(msg.clone(), attestation_type)
{
Ok(outcome) => match outcome {
AttestationProcessingOutcome::Processed => {
debug!(
self.log,
"Processed attestation";
"source" => "gossip",
"peer" => format!("{:?}",peer_id),
"block_root" => format!("{}", msg.data.beacon_block_root),
"slot" => format!("{}", msg.data.slot),
);
}
AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => {
// TODO: Maintain this attestation and re-process once sync completes
trace!(
self.log,
"Attestation for unknown block";
"peer_id" => format!("{:?}", peer_id),
"block" => format!("{}", beacon_block_root)
);
// we don't know the block, get the sync manager to handle the block lookup
self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root));
}
AttestationProcessingOutcome::FutureEpoch { .. }
| AttestationProcessingOutcome::PastEpoch { .. }
| AttestationProcessingOutcome::UnknownTargetRoot { .. }
| AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation
AttestationProcessingOutcome::Invalid { .. }
| AttestationProcessingOutcome::EmptyAggregationBitfield { .. }
| AttestationProcessingOutcome::AttestsToFutureBlock { .. }
| AttestationProcessingOutcome::InvalidSignature
| AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. }
| AttestationProcessingOutcome::BadTargetEpoch { .. } => {
// the peer has sent a bad attestation. Remove them.
self.network.disconnect(peer_id, GoodbyeReason::Fault);
}
},
Err(_) => {
// error is logged during the processing therefore no error is logged here
trace!(
self.log,
"Erroneous gossip attestation ssz";
"ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())),
);
}
};
}
}
/// Build a `StatusMessage` representing the state of the given `beacon_chain`.

View File

@ -274,11 +274,6 @@ fn spawn_service<T: BeaconChainTypes>(
AttServiceMessage::DiscoverPeers(subnet_id) => {
service.libp2p.swarm.peers_request(subnet_id);
},
AttServiceMessage::Propagate(source, message_id) => {
service.libp2p
.swarm
.propagate_message(&source, message_id);
}
}
}
@ -317,13 +312,16 @@ fn spawn_service<T: BeaconChainTypes>(
match message.data {
// attestation information gets processed in the attestation service
PubsubData::AggregateAndProofAttestation(signed_aggregate_and_proof) => {
service.attestation_service.handle_aggregate_attestation(id, source, *signed_aggregate_and_proof);
},
PubsubData::Attestation(subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let attestation = subnet_and_attestation.1;
service.attestation_service.handle_unaggregated_attestation(id, source, subnet, attestation);
PubsubData::Attestation(ref subnet_and_attestation) => {
let subnet = &subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we process
// the attestation
if service.attestation_service.should_process_attestation(&id, &source, subnet, attestation) {
service.router_send
.try_send(RouterMessage::PubsubMessage(id, source, message))
.map_err(|_| { debug!(log, "Failed to send pubsub message to router");})?;
}
}
_ => {
// all else is sent to the router