Avoid penalizing peers for delays during processing (#2894)

## Issue Addressed

NA

## Proposed Changes

We have observed occasions were under-resourced nodes will receive messages that were valid *at the time*, but later become invalidated due to long waits for a `BeaconProcessor` worker.

In this PR, we will check to see if the message was valid *at the time of receipt*. If it was initially valid but invalid now, we just ignore the message without penalizing the peer.

## Additional Info

NA
This commit is contained in:
Paul Hauner 2022-01-12 02:36:24 +00:00
parent b656007963
commit 61f60bdf03
4 changed files with 95 additions and 36 deletions

View File

@ -452,7 +452,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
// //
// We do not queue future attestations for later processing. // We do not queue future attestations for later processing.
verify_propagation_slot_range(chain, attestation)?; verify_propagation_slot_range(&chain.slot_clock, attestation)?;
// Check the attestation's epoch matches its target. // Check the attestation's epoch matches its target.
if attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()) if attestation.data.slot.epoch(T::EthSpec::slots_per_epoch())
@ -716,7 +716,7 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
// //
// We do not queue future attestations for later processing. // We do not queue future attestations for later processing.
verify_propagation_slot_range(chain, attestation)?; verify_propagation_slot_range(&chain.slot_clock, attestation)?;
// Check to ensure that the attestation is "unaggregated". I.e., it has exactly one // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one
// aggregation bit set. // aggregation bit set.
@ -1019,14 +1019,13 @@ fn verify_head_block_is_known<T: BeaconChainTypes>(
/// to the current slot of the `chain`. /// to the current slot of the `chain`.
/// ///
/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. /// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
pub fn verify_propagation_slot_range<T: BeaconChainTypes>( pub fn verify_propagation_slot_range<S: SlotClock, E: EthSpec>(
chain: &BeaconChain<T>, slot_clock: &S,
attestation: &Attestation<T::EthSpec>, attestation: &Attestation<E>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let attestation_slot = attestation.data.slot; let attestation_slot = attestation.data.slot;
let latest_permissible_slot = chain let latest_permissible_slot = slot_clock
.slot_clock
.now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or(BeaconChainError::UnableToReadSlot)?; .ok_or(BeaconChainError::UnableToReadSlot)?;
if attestation_slot > latest_permissible_slot { if attestation_slot > latest_permissible_slot {
@ -1037,11 +1036,10 @@ pub fn verify_propagation_slot_range<T: BeaconChainTypes>(
} }
// Taking advantage of saturating subtraction on `Slot`. // Taking advantage of saturating subtraction on `Slot`.
let earliest_permissible_slot = chain let earliest_permissible_slot = slot_clock
.slot_clock
.now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or(BeaconChainError::UnableToReadSlot)? .ok_or(BeaconChainError::UnableToReadSlot)?
- T::EthSpec::slots_per_epoch(); - E::slots_per_epoch();
if attestation_slot < earliest_permissible_slot { if attestation_slot < earliest_permissible_slot {
return Err(Error::PastSlot { return Err(Error::PastSlot {
attestation_slot, attestation_slot,

View File

@ -273,7 +273,7 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
let subcommittee_index = contribution.subcommittee_index as usize; let subcommittee_index = contribution.subcommittee_index as usize;
// Ensure sync committee contribution is within the MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance. // Ensure sync committee contribution is within the MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance.
verify_propagation_slot_range(chain, contribution)?; verify_propagation_slot_range(&chain.slot_clock, contribution)?;
// Validate subcommittee index. // Validate subcommittee index.
if contribution.subcommittee_index >= SYNC_COMMITTEE_SUBNET_COUNT { if contribution.subcommittee_index >= SYNC_COMMITTEE_SUBNET_COUNT {
@ -428,7 +428,7 @@ impl VerifiedSyncCommitteeMessage {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
// //
// We do not queue future sync committee messages for later processing. // We do not queue future sync committee messages for later processing.
verify_propagation_slot_range(chain, &sync_message)?; verify_propagation_slot_range(&chain.slot_clock, &sync_message)?;
// Ensure the `subnet_id` is valid for the given validator. // Ensure the `subnet_id` is valid for the given validator.
let pubkey = chain let pubkey = chain
@ -516,14 +516,13 @@ impl VerifiedSyncCommitteeMessage {
/// to the current slot of the `chain`. /// to the current slot of the `chain`.
/// ///
/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. /// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
pub fn verify_propagation_slot_range<T: BeaconChainTypes, U: SlotData>( pub fn verify_propagation_slot_range<S: SlotClock, U: SlotData>(
chain: &BeaconChain<T>, slot_clock: &S,
sync_contribution: &U, sync_contribution: &U,
) -> Result<(), Error> { ) -> Result<(), Error> {
let message_slot = sync_contribution.get_slot(); let message_slot = sync_contribution.get_slot();
let latest_permissible_slot = chain let latest_permissible_slot = slot_clock
.slot_clock
.now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or(BeaconChainError::UnableToReadSlot)?; .ok_or(BeaconChainError::UnableToReadSlot)?;
if message_slot > latest_permissible_slot { if message_slot > latest_permissible_slot {
@ -533,8 +532,7 @@ pub fn verify_propagation_slot_range<T: BeaconChainTypes, U: SlotData>(
}); });
} }
let earliest_permissible_slot = chain let earliest_permissible_slot = slot_clock
.slot_clock
.now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or(BeaconChainError::UnableToReadSlot)?; .ok_or(BeaconChainError::UnableToReadSlot)?;

View File

@ -2,9 +2,9 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::store::Error; use beacon_chain::store::Error;
use beacon_chain::{ use beacon_chain::{
attestation_verification::{Error as AttnError, VerifiedAttestation}, attestation_verification::{self, Error as AttnError, VerifiedAttestation},
observed_operations::ObservationOutcome, observed_operations::ObservationOutcome,
sync_committee_verification::Error as SyncCommitteeError, sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms, validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError, ForkChoiceError, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError, ForkChoiceError,
GossipVerifiedBlock, GossipVerifiedBlock,
@ -19,7 +19,7 @@ use tokio::sync::mpsc;
use types::{ use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
SubnetId, SyncCommitteeMessage, SyncSubnetId, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
}; };
use super::{ use super::{
@ -100,12 +100,7 @@ enum FailedAtt<T: EthSpec> {
impl<T: EthSpec> FailedAtt<T> { impl<T: EthSpec> FailedAtt<T> {
pub fn beacon_block_root(&self) -> &Hash256 { pub fn beacon_block_root(&self) -> &Hash256 {
match self { &self.attestation().data.beacon_block_root
FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root,
FailedAtt::Aggregate { attestation, .. } => {
&attestation.message.aggregate.data.beacon_block_root
}
}
} }
pub fn kind(&self) -> &'static str { pub fn kind(&self) -> &'static str {
@ -114,6 +109,13 @@ impl<T: EthSpec> FailedAtt<T> {
FailedAtt::Aggregate { .. } => "aggregated", FailedAtt::Aggregate { .. } => "aggregated",
} }
} }
pub fn attestation(&self) -> &Attestation<T> {
match self {
FailedAtt::Unaggregate { attestation, .. } => attestation,
FailedAtt::Aggregate { attestation, .. } => &attestation.message.aggregate,
}
}
} }
/// Items required to verify a batch of unaggregated gossip attestations. /// Items required to verify a batch of unaggregated gossip attestations.
@ -410,6 +412,7 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
reprocess_tx, reprocess_tx,
error, error,
seen_timestamp,
); );
} }
} }
@ -608,6 +611,7 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
reprocess_tx, reprocess_tx,
error, error,
seen_timestamp,
); );
} }
} }
@ -1117,6 +1121,7 @@ impl<T: BeaconChainTypes> Worker<T> {
subnet_id: SyncSubnetId, subnet_id: SyncSubnetId,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
let message_slot = sync_signature.slot;
let sync_signature = match self let sync_signature = match self
.chain .chain
.verify_sync_committee_message_for_gossip(sync_signature, subnet_id) .verify_sync_committee_message_for_gossip(sync_signature, subnet_id)
@ -1128,6 +1133,8 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id, message_id,
"sync_signature", "sync_signature",
e, e,
message_slot,
seen_timestamp,
); );
return; return;
} }
@ -1177,6 +1184,7 @@ impl<T: BeaconChainTypes> Worker<T> {
sync_contribution: SignedContributionAndProof<T::EthSpec>, sync_contribution: SignedContributionAndProof<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
let contribution_slot = sync_contribution.message.contribution.slot;
let sync_contribution = match self let sync_contribution = match self
.chain .chain
.verify_sync_contribution_for_gossip(sync_contribution) .verify_sync_contribution_for_gossip(sync_contribution)
@ -1189,6 +1197,8 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id, message_id,
"sync_contribution", "sync_contribution",
e, e,
contribution_slot,
seen_timestamp,
); );
return; return;
} }
@ -1232,6 +1242,7 @@ impl<T: BeaconChainTypes> Worker<T> {
failed_att: FailedAtt<T::EthSpec>, failed_att: FailedAtt<T::EthSpec>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>, reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
error: AttnError, error: AttnError,
seen_timestamp: Duration,
) { ) {
let beacon_block_root = failed_att.beacon_block_root(); let beacon_block_root = failed_att.beacon_block_root();
let attestation_type = failed_att.kind(); let attestation_type = failed_att.kind();
@ -1239,8 +1250,7 @@ impl<T: BeaconChainTypes> Worker<T> {
match &error { match &error {
AttnError::FutureEpoch { .. } AttnError::FutureEpoch { .. }
| AttnError::PastEpoch { .. } | AttnError::PastEpoch { .. }
| AttnError::FutureSlot { .. } | AttnError::FutureSlot { .. } => {
| AttnError::PastSlot { .. } => {
/* /*
* These errors can be triggered by a mismatch between our slot and the peer. * These errors can be triggered by a mismatch between our slot and the peer.
* *
@ -1262,6 +1272,24 @@ impl<T: BeaconChainTypes> Worker<T> {
// Do not propagate these messages. // Do not propagate these messages.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
} }
AttnError::PastSlot { .. } => {
// Produce a slot clock frozen at the time we received the message from the
// network.
let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp);
let hindsight_verification =
attestation_verification::verify_propagation_slot_range(
seen_clock,
failed_att.attestation(),
);
// Only penalize the peer if it would have been invalid at the moment we received
// it.
if hindsight_verification.is_err() {
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
/* /*
* These errors are caused by invalid signatures. * These errors are caused by invalid signatures.
@ -1625,6 +1653,8 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id: MessageId, message_id: MessageId,
message_type: &str, message_type: &str,
error: SyncCommitteeError, error: SyncCommitteeError,
sync_committee_message_slot: Slot,
seen_timestamp: Duration,
) { ) {
metrics::register_sync_committee_error(&error); metrics::register_sync_committee_error(&error);
@ -1650,10 +1680,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Do not propagate these messages. // Do not propagate these messages.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
} }
SyncCommitteeError::PastSlot { SyncCommitteeError::PastSlot { .. } => {
message_slot,
earliest_permissible_slot,
} => {
/* /*
* This error can be triggered by a mismatch between our slot and the peer. * This error can be triggered by a mismatch between our slot and the peer.
* *
@ -1667,12 +1694,34 @@ impl<T: BeaconChainTypes> Worker<T> {
"type" => ?message_type, "type" => ?message_type,
); );
// We tolerate messages that were just one slot late. // Compute the slot when we received the message.
if *message_slot + 1 < *earliest_permissible_slot { let received_slot = self
.chain
.slot_clock
.slot_of(seen_timestamp)
.unwrap_or_else(|| self.chain.slot_clock.genesis_slot());
// The message is "excessively" late if it was more than one slot late.
let excessively_late = received_slot > sync_committee_message_slot + 1;
// This closure will lazily produce a slot clock frozen at the time we received the
// message from the network and return a bool indicating if the message was invalid
// at the time of receipt too.
let invalid_in_hindsight = || {
let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp);
let hindsight_verification =
sync_committee_verification::verify_propagation_slot_range(
seen_clock,
&sync_committee_message_slot,
);
hindsight_verification.is_err()
};
// Penalize the peer if the message was more than one slot late
if excessively_late && invalid_in_hindsight() {
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
} }
// Do not propagate these messages.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
} }
SyncCommitteeError::EmptyAggregationBitfield => { SyncCommitteeError::EmptyAggregationBitfield => {

View File

@ -112,4 +112,18 @@ pub trait SlotClock: Send + Sync + Sized + Clone {
Duration::from_secs(duration_into_slot.as_secs() % seconds_per_slot) Duration::from_secs(duration_into_slot.as_secs() % seconds_per_slot)
}) })
} }
/// Produces a *new* slot clock with the same configuration of `self`, except that clock is
/// "frozen" at the `freeze_at` time.
///
/// This is useful for observing the slot clock at arbitrary fixed points in time.
fn freeze_at(&self, freeze_at: Duration) -> ManualSlotClock {
let slot_clock = ManualSlotClock::new(
self.genesis_slot(),
self.genesis_duration(),
self.slot_duration(),
);
slot_clock.set_current_time(freeze_at);
slot_clock
}
} }