Maintain attestations that reference unknown blocks (#2319)

## Issue Addressed

#635 

## Proposed Changes
- Keep attestations that reference a block we have not seen for 30secs before being re processed
- If we do import the block before that time elapses, it is reprocessed in that moment
- The first time it fails, do nothing wrt to gossipsub propagation or peer downscoring. If after being re processed it fails, downscore with a `LowToleranceError` and ignore the message.
This commit is contained in:
divma 2021-07-14 05:24:08 +00:00
parent 9656ffee7c
commit 304fb05e44
15 changed files with 1267 additions and 356 deletions

View File

@ -375,7 +375,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
pub fn verify(
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, Error> {
) -> Result<Self, (Error, SignedAggregateAndProof<T::EthSpec>)> {
Self::verify_slashable(signed_aggregate, chain)
.map(|verified_aggregate| {
if let Some(slasher) = chain.slasher.as_ref() {
@ -383,7 +383,9 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
}
verified_aggregate
})
.map_err(|slash_info| process_slash_info(slash_info, chain))
.map_err(|(slash_info, original_aggregate)| {
(process_slash_info(slash_info, chain), original_aggregate)
})
}
/// Run the checks that happen before an indexed attestation is constructed.
@ -509,17 +511,31 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
}
/// Verify the attestation, producing extra information about whether it might be slashable.
// NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not
// worth creating an alias.
#[allow(clippy::type_complexity)]
pub fn verify_slashable(
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, AttestationSlashInfo<T, Error>> {
) -> Result<
Self,
(
AttestationSlashInfo<T, Error>,
SignedAggregateAndProof<T::EthSpec>,
),
> {
use AttestationSlashInfo::*;
let attestation = &signed_aggregate.message.aggregate;
let aggregator_index = signed_aggregate.message.aggregator_index;
let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) {
Ok(root) => root,
Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)),
Err(e) => {
return Err((
SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e),
signed_aggregate,
))
}
};
let indexed_attestation =
@ -546,7 +562,12 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
.map_err(|e| BeaconChainError::from(e).into())
}) {
Ok(indexed_attestation) => indexed_attestation,
Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)),
Err(e) => {
return Err((
SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e),
signed_aggregate,
))
}
};
// Ensure that all signatures are valid.
@ -560,11 +581,11 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
}
})
{
return Err(SignatureInvalid(e));
return Err((SignatureInvalid(e), signed_aggregate));
}
if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) {
return Err(SignatureValid(indexed_attestation, e));
return Err((SignatureValid(indexed_attestation, e), signed_aggregate));
}
Ok(VerifiedAggregatedAttestation {
@ -715,7 +736,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
attestation: Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
chain: &BeaconChain<T>,
) -> Result<Self, Error> {
) -> Result<Self, (Error, Attestation<T::EthSpec>)> {
Self::verify_slashable(attestation, subnet_id, chain)
.map(|verified_unaggregated| {
if let Some(slasher) = chain.slasher.as_ref() {
@ -723,26 +744,31 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
}
verified_unaggregated
})
.map_err(|slash_info| process_slash_info(slash_info, chain))
.map_err(|(slash_info, original_attestation)| {
(process_slash_info(slash_info, chain), original_attestation)
})
}
/// Verify the attestation, producing extra information about whether it might be slashable.
// NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not
// worth creating an alias.
#[allow(clippy::type_complexity)]
pub fn verify_slashable(
attestation: Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
chain: &BeaconChain<T>,
) -> Result<Self, AttestationSlashInfo<T, Error>> {
) -> Result<Self, (AttestationSlashInfo<T, Error>, Attestation<T::EthSpec>)> {
use AttestationSlashInfo::*;
if let Err(e) = Self::verify_early_checks(&attestation, chain) {
return Err(SignatureNotChecked(attestation, e));
return Err((SignatureNotChecked(attestation.clone(), e), attestation));
}
let (indexed_attestation, committees_per_slot) =
match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) {
Ok(x) => x,
Err(e) => {
return Err(SignatureNotChecked(attestation, e));
return Err((SignatureNotChecked(attestation.clone(), e), attestation));
}
};
@ -754,16 +780,21 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
chain,
) {
Ok(t) => t,
Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)),
Err(e) => {
return Err((
SignatureNotCheckedIndexed(indexed_attestation, e),
attestation,
))
}
};
// The aggregate signature of the attestation is valid.
if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) {
return Err(SignatureInvalid(e));
return Err((SignatureInvalid(e), attestation));
}
if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) {
return Err(SignatureValid(indexed_attestation, e));
return Err((SignatureValid(indexed_attestation, e), attestation));
}
Ok(Self {

View File

@ -1225,7 +1225,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
unaggregated_attestation: Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
) -> Result<VerifiedUnaggregatedAttestation<T>, AttestationError> {
) -> Result<VerifiedUnaggregatedAttestation<T>, (AttestationError, Attestation<T::EthSpec>)>
{
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS);
let _timer =
metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);
@ -1249,7 +1250,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn verify_aggregated_attestation_for_gossip(
&self,
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
) -> Result<VerifiedAggregatedAttestation<T>, AttestationError> {
) -> Result<
VerifiedAggregatedAttestation<T>,
(AttestationError, SignedAggregateAndProof<T::EthSpec>),
> {
metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_REQUESTS);
let _timer =
metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);

View File

@ -223,7 +223,7 @@ fn aggregated_gossip_verification() {
.expect(&format!(
"{} should error during verify_aggregated_attestation_for_gossip",
$desc
)),
)).0,
$( $error ) |+ $( if $guard )?
),
"case: {}",
@ -606,7 +606,7 @@ fn unaggregated_gossip_verification() {
.expect(&format!(
"{} should error during verify_unaggregated_attestation_for_gossip",
$desc
)),
)).0,
$( $error ) |+ $( if $guard )?
),
"case: {}",

View File

@ -332,7 +332,7 @@ fn epoch_boundary_state_attestation_processing() {
{
checked_pre_fin = true;
assert!(matches!(
res.err().unwrap(),
res.err().unwrap().0,
AttnError::PastSlot {
attestation_slot,
earliest_permissible_slot,

View File

@ -537,7 +537,7 @@ fn attestations_with_increasing_slots() {
if expected_attestation_slot < expected_earliest_permissible_slot {
assert!(matches!(
res.err().unwrap(),
res.err().unwrap().0,
AttnError::PastSlot {
attestation_slot,
earliest_permissible_slot,

View File

@ -1790,8 +1790,8 @@ pub fn serve<T: BeaconChainTypes>(
let mut failures = Vec::new();
// Verify that all messages in the post are valid before processing further
for (index, aggregate) in aggregates.as_slice().iter().enumerate() {
match chain.verify_aggregated_attestation_for_gossip(aggregate.clone()) {
for (index, aggregate) in aggregates.into_iter().enumerate() {
match chain.verify_aggregated_attestation_for_gossip(aggregate) {
Ok(verified_aggregate) => {
messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new(
verified_aggregate.aggregate().clone(),
@ -1816,8 +1816,8 @@ pub fn serve<T: BeaconChainTypes>(
// It's reasonably likely that two different validators produce
// identical aggregates, especially if they're using the same beacon
// node.
Err(AttnError::AttestationAlreadyKnown(_)) => continue,
Err(e) => {
Err((AttnError::AttestationAlreadyKnown(_), _)) => continue,
Err((e, aggregate)) => {
error!(log,
"Failure verifying aggregate and proofs";
"error" => format!("{:?}", e),

View File

@ -1,210 +0,0 @@
//! Provides a mechanism which queues blocks for later processing when they arrive too early.
//!
//! When the `beacon_processor::Worker` imports a block that is acceptably early (i.e., within the
//! gossip propagation tolerance) it will send it to this queue where it will be placed in a
//! `DelayQueue` until the slot arrives. Once the block has been determined to be ready, it will be
//! sent back out on a channel to be processed by the `BeaconProcessor` again.
//!
//! There is the edge-case where the slot arrives before this queue manages to process it. In that
//! case, the block will be sent off for immediate processing (skipping the `DelayQueue`).
use super::MAX_DELAYED_BLOCK_QUEUE_LEN;
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock};
use eth2_libp2p::PeerId;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use slog::{crit, debug, error, Logger};
use slot_clock::SlotClock;
use std::collections::HashSet;
use std::pin::Pin;
use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::DelayQueue;
const TASK_NAME: &str = "beacon_processor_block_delay_queue";
/// Queue blocks for re-processing with an `ADDITIONAL_DELAY` after the slot starts. This is to
/// account for any slight drift in the system clock.
const ADDITIONAL_DELAY: Duration = Duration::from_millis(5);
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// A block that arrived early and has been queued for later import.
pub struct QueuedBlock<T: BeaconChainTypes> {
pub peer_id: PeerId,
pub block: GossipVerifiedBlock<T>,
pub seen_timestamp: Duration,
}
/// Unifies the different messages processed by the block delay queue.
enum InboundEvent<T: BeaconChainTypes> {
/// A block that has been received early that we should queue for later processing.
EarlyBlock(QueuedBlock<T>),
/// A block that was queued for later processing and is ready for import.
ReadyBlock(QueuedBlock<T>),
/// The `DelayQueue` returned an error.
DelayQueueError(TimeError),
}
/// Combines the `DelayQueue` and `Receiver` streams into a single stream.
///
/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained
/// control (specifically in the ordering of event processing).
struct InboundEvents<T: BeaconChainTypes> {
pub delay_queue: DelayQueue<QueuedBlock<T>>,
early_blocks_rx: Receiver<QueuedBlock<T>>,
}
impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
type Item = InboundEvent<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Poll for expired blocks *before* we try to process new blocks.
//
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.early_blocks_rx.poll_recv(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::EarlyBlock(queued_block)));
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Pending => {}
}
Poll::Pending
}
}
/// Spawn a queue which will accept blocks via the returned `Sender`, potentially queue them until
/// their slot arrives, then send them back out via `ready_blocks_tx`.
pub fn spawn_block_delay_queue<T: BeaconChainTypes>(
ready_blocks_tx: Sender<QueuedBlock<T>>,
executor: &TaskExecutor,
slot_clock: T::SlotClock,
log: Logger,
) -> Sender<QueuedBlock<T>> {
let (early_blocks_tx, early_blocks_rx): (_, Receiver<QueuedBlock<_>>) =
mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN);
let queue_future = async move {
let mut queued_block_roots = HashSet::new();
let mut inbound_events = InboundEvents {
early_blocks_rx,
delay_queue: DelayQueue::new(),
};
loop {
match inbound_events.next().await {
// Some block has been indicated as "early" and should be processed when the
// appropriate slot arrives.
Some(InboundEvent::EarlyBlock(early_block)) => {
let block_slot = early_block.block.block.slot();
let block_root = early_block.block.block_root;
// Don't add the same block to the queue twice. This prevents DoS attacks.
if queued_block_roots.contains(&block_root) {
continue;
}
if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue.
if queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
error!(
log,
"Early blocks queue is full";
"queue_size" => MAXIMUM_QUEUED_BLOCKS,
"msg" => "check system clock"
);
// Drop the block.
continue;
}
queued_block_roots.insert(block_root);
// Queue the block until the start of the appropriate slot, plus
// `ADDITIONAL_DELAY`.
inbound_events
.delay_queue
.insert(early_block, duration_till_slot + ADDITIONAL_DELAY);
} else {
// If there is no duration till the next slot, check to see if the slot
// has already arrived. If it has already arrived, send it out for
// immediate processing.
//
// If we can't read the slot or the slot hasn't arrived, simply drop the
// block.
//
// This logic is slightly awkward since `SlotClock::duration_to_slot`
// doesn't distinguish between a slot that has already arrived and an
// error reading the slot clock.
if let Some(now) = slot_clock.now() {
if block_slot <= now && ready_blocks_tx.try_send(early_block).is_err() {
error!(
log,
"Failed to send block";
);
}
}
}
}
// A block that was queued for later processing is now ready to be processed.
Some(InboundEvent::ReadyBlock(ready_block)) => {
let block_root = ready_block.block.block_root;
if !queued_block_roots.remove(&block_root) {
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown block in delay queue";
"block_root" => ?block_root
);
}
if ready_blocks_tx.try_send(ready_block).is_err() {
error!(
log,
"Failed to pop queued block";
);
}
}
Some(InboundEvent::DelayQueueError(e)) => crit!(
log,
"Failed to poll block delay queue";
"e" => ?e
),
None => {
debug!(
log,
"Block delay queue stopped";
"msg" => "shutting down"
);
break;
}
}
}
};
executor.spawn(queue_future, TASK_NAME);
early_blocks_tx
}

View File

@ -5,6 +5,7 @@
//!
//! - A "manager" task, which either spawns worker tasks or enqueues work.
//! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`.
//! - A task managing the scheduling of work that needs to be re-processed.
//!
//! ## Purpose
//!
@ -19,10 +20,12 @@
//!
//! ## Detail
//!
//! There is a single "manager" thread who listens to two event channels. These events are either:
//! There is a single "manager" thread who listens to three event channels. These events are
//! either:
//!
//! - A new parcel of work (work event).
//! - Indication that a worker has finished a parcel of work (worker idle).
//! - A work ready for reprocessing (work event).
//!
//! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count.
//!
@ -37,7 +40,6 @@
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock};
use block_delay_queue::{spawn_block_delay_queue, QueuedBlock};
use eth2_libp2p::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
MessageId, NetworkGlobals, PeerId, PeerRequestId,
@ -57,11 +59,14 @@ use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReadyWork,
};
use worker::{Toolbox, Worker};
mod block_delay_queue;
mod tests;
mod work_reprocessing_queue;
mod worker;
pub use worker::ProcessId;
@ -77,14 +82,25 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
/// set to the CPU count, but we set it high to be safe.
const MAX_IDLE_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for re-processing work events.
const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 16_384;
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384;
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192;
/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
/// start dropping them.
const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
/// start dropping them.
const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
@ -127,6 +143,7 @@ const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
/// The name of the worker tokio tasks.
const WORKER_TASK_NAME: &str = "beacon_processor_worker";
@ -148,6 +165,8 @@ pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const STATUS_PROCESSING: &str = "status_processing";
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
/// Used to send/receive results from a rpc block import in a blocking task.
pub type BlockResultSender<E> = oneshot::Sender<Result<Hash256, BlockError<E>>>;
@ -308,22 +327,6 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
/// Create a new `Work` event for some block that was delayed for later processing.
pub fn delayed_import_beacon_block(
peer_id: PeerId,
block: Box<GossipVerifiedBlock<T>>,
seen_timestamp: Duration,
) -> Self {
Self {
drop_during_sync: false,
work: Work::DelayedImportBlock {
peer_id,
block,
seen_timestamp,
},
}
}
/// Create a new `Work` event for some exit.
pub fn gossip_voluntary_exit(
message_id: MessageId,
@ -442,6 +445,57 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
fn from(ready_work: ReadyWork<T>) -> Self {
match ready_work {
ReadyWork::Block(QueuedBlock {
peer_id,
block,
seen_timestamp,
}) => Self {
drop_during_sync: false,
work: Work::DelayedImportBlock {
peer_id,
block: Box::new(block),
seen_timestamp,
},
},
ReadyWork::Unaggregate(QueuedUnaggregate {
peer_id,
message_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
}) => Self {
drop_during_sync: true,
work: Work::UnknownBlockAttestation {
message_id,
peer_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
},
},
ReadyWork::Aggregate(QueuedAggregate {
peer_id,
message_id,
attestation,
seen_timestamp,
}) => Self {
drop_during_sync: true,
work: Work::UnknownBlockAggregate {
message_id,
peer_id,
aggregate: attestation,
seen_timestamp,
},
},
}
}
}
/// A consensus message (or multiple) from the network that requires processing.
#[derive(Debug)]
pub enum Work<T: BeaconChainTypes> {
@ -453,12 +507,26 @@ pub enum Work<T: BeaconChainTypes> {
should_import: bool,
seen_timestamp: Duration,
},
UnknownBlockAttestation {
message_id: MessageId,
peer_id: PeerId,
attestation: Box<Attestation<T::EthSpec>>,
subnet_id: SubnetId,
should_import: bool,
seen_timestamp: Duration,
},
GossipAggregate {
message_id: MessageId,
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
seen_timestamp: Duration,
},
UnknownBlockAggregate {
message_id: MessageId,
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipBlock {
message_id: MessageId,
peer_id: PeerId,
@ -525,6 +593,8 @@ impl<T: BeaconChainTypes> Work<T> {
Work::Status { .. } => STATUS_PROCESSING,
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
}
}
}
@ -554,8 +624,8 @@ enum InboundEvent<T: BeaconChainTypes> {
WorkerIdle,
/// There is new work to be done.
WorkEvent(WorkEvent<T>),
/// A block that was delayed for import at a later slot has become ready.
QueuedBlock(Box<QueuedBlock<T>>),
/// A work event that was queued for re-processing has become ready.
ReprocessingWork(WorkEvent<T>),
}
/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream.
@ -567,8 +637,8 @@ struct InboundEvents<T: BeaconChainTypes> {
idle_rx: mpsc::Receiver<()>,
/// Used by upstream processes to send new work to the `BeaconProcessor`.
event_rx: mpsc::Receiver<WorkEvent<T>>,
/// Used internally for queuing blocks for processing once their slot arrives.
post_delay_block_queue_rx: mpsc::Receiver<QueuedBlock<T>>,
/// Used internally for queuing work ready to be re-processed.
reprocess_work_rx: mpsc::Receiver<ReadyWork<T>>,
}
impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
@ -589,9 +659,9 @@ impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
// Poll for delayed blocks before polling for new work. It might be the case that a delayed
// block is required to successfully process some new work.
match self.post_delay_block_queue_rx.poll_recv(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::QueuedBlock(Box::new(queued_block))));
match self.reprocess_work_rx.poll_recv(cx) {
Poll::Ready(Some(ready_work)) => {
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
}
Poll::Ready(None) => {
return Poll::Ready(None);
@ -643,7 +713,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
pub fn spawn_manager(
mut self,
event_rx: mpsc::Receiver<WorkEvent<T>>,
work_journal_tx: Option<mpsc::Sender<String>>,
work_journal_tx: Option<mpsc::Sender<&'static str>>,
) {
// Used by workers to communicate that they are finished a task.
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
@ -655,6 +725,10 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut aggregate_debounce = TimeLatch::default();
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
let mut attestation_debounce = TimeLatch::default();
let mut unknown_block_aggregate_queue =
LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
let mut unknown_block_attestation_queue =
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
// a strong feeling about queue type for exits.
@ -677,14 +751,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
// The delayed block queues are used to re-queue blocks for processing at a later time if
// they're received early.
let (post_delay_block_queue_tx, post_delay_block_queue_rx) =
mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN);
let pre_delay_block_queue_tx = {
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let work_reprocessing_tx = {
if let Some(chain) = self.beacon_chain.upgrade() {
spawn_block_delay_queue(
post_delay_block_queue_tx,
spawn_reprocess_scheduler(
ready_work_tx,
&self.executor,
chain.slot_clock.clone(),
self.log.clone(),
@ -704,7 +777,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut inbound_events = InboundEvents {
idle_rx,
event_rx,
post_delay_block_queue_rx,
reprocess_work_rx: ready_work_rx,
};
loop {
@ -713,14 +786,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.current_workers = self.current_workers.saturating_sub(1);
None
}
Some(InboundEvent::WorkEvent(event)) => Some(event),
Some(InboundEvent::QueuedBlock(queued_block)) => {
Some(WorkEvent::delayed_import_beacon_block(
queued_block.peer_id,
Box::new(queued_block.block),
queued_block.seen_timestamp,
))
}
Some(InboundEvent::WorkEvent(event))
| Some(InboundEvent::ReprocessingWork(event)) => Some(event),
None => {
debug!(
self.log,
@ -750,7 +817,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// We don't care if this message was successfully sent, we only use the journal
// during testing.
let _ = work_journal_tx.try_send(id.to_string());
let _ = work_journal_tx.try_send(id);
}
let can_spawn = self.current_workers < self.max_workers;
@ -766,7 +833,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
None if can_spawn => {
let toolbox = Toolbox {
idle_tx: idle_tx.clone(),
delayed_block_tx: pre_delay_block_queue_tx.clone(),
work_reprocessing_tx: work_reprocessing_tx.clone(),
};
// Check for chain segments first, they're the most efficient way to get
@ -792,6 +859,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.spawn_worker(item, toolbox);
} else if let Some(item) = attestation_queue.pop() {
self.spawn_worker(item, toolbox);
// Aggregates and unaggregates queued for re-processing are older and we
// care about fresher ones, so check those first.
} else if let Some(item) = unknown_block_aggregate_queue.pop() {
self.spawn_worker(item, toolbox);
} else if let Some(item) = unknown_block_attestation_queue.pop() {
self.spawn_worker(item, toolbox);
// Check RPC methods next. Status messages are needed for sync so
// prioritize them over syncing requests from other peers (BlocksByRange
// and BlocksByRoot)
@ -820,7 +893,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
if let Some(work_journal_tx) = &work_journal_tx {
// We don't care if this message was successfully sent, we only use the journal
// during testing.
let _ = work_journal_tx.try_send(NOTHING_TO_DO.to_string());
let _ = work_journal_tx.try_send(NOTHING_TO_DO);
}
}
}
@ -857,7 +930,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let work_id = work.str_id();
let toolbox = Toolbox {
idle_tx: idle_tx.clone(),
delayed_block_tx: pre_delay_block_queue_tx.clone(),
work_reprocessing_tx: work_reprocessing_tx.clone(),
};
match work {
@ -890,6 +963,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::BlocksByRootsRequest { .. } => {
bbroots_queue.push(work, work_id, &self.log)
}
Work::UnknownBlockAttestation { .. } => {
unknown_block_attestation_queue.push(work)
}
Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work)
}
}
}
}
@ -960,7 +1039,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
fn spawn_worker(&mut self, work: Work<T>, toolbox: Toolbox<T>) {
let idle_tx = toolbox.idle_tx;
let delayed_block_tx = toolbox.delayed_block_tx;
let work_reprocessing_tx = toolbox.work_reprocessing_tx;
// Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
//
@ -1031,6 +1110,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
*attestation,
subnet_id,
should_import,
Some(work_reprocessing_tx),
seen_timestamp,
),
/*
@ -1045,6 +1125,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id,
peer_id,
*aggregate,
Some(work_reprocessing_tx),
seen_timestamp,
),
/*
@ -1059,7 +1140,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id,
peer_id,
*block,
delayed_block_tx,
work_reprocessing_tx,
seen_timestamp,
),
/*
@ -1069,7 +1150,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id,
block,
seen_timestamp,
} => worker.process_gossip_verified_block(peer_id, *block, seen_timestamp),
} => worker.process_gossip_verified_block(
peer_id,
*block,
work_reprocessing_tx,
seen_timestamp,
),
/*
* Voluntary exits received on gossip.
*/
@ -1106,7 +1192,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
* Verification for beacon blocks received during syncing via RPC.
*/
Work::RpcBlock { block, result_tx } => {
worker.process_rpc_block(*block, result_tx)
worker.process_rpc_block(*block, result_tx, work_reprocessing_tx)
}
/*
* Verification for a chain segment (multiple blocks).
@ -1134,6 +1220,34 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request_id,
request,
} => worker.handle_blocks_by_root_request(peer_id, request_id, request),
Work::UnknownBlockAttestation {
message_id,
peer_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
} => worker.process_gossip_attestation(
message_id,
peer_id,
*attestation,
subnet_id,
should_import,
None, // Do not allow this attestation to be re-processed beyond this point.
seen_timestamp,
),
Work::UnknownBlockAggregate {
message_id,
peer_id,
aggregate,
seen_timestamp,
} => worker.process_gossip_aggregate(
message_id,
peer_id,
*aggregate,
None,
seen_timestamp,
),
};
trace!(

View File

@ -1,6 +1,7 @@
#![cfg(not(debug_assertions))] // Tests are too slow in debug.
#![cfg(test)]
use crate::beacon_processor::work_reprocessing_queue::QUEUED_ATTESTATION_DELAY;
use crate::beacon_processor::*;
use crate::{service::NetworkMessage, sync::SyncMessage};
use beacon_chain::test_utils::{
@ -42,11 +43,13 @@ struct TestRig {
chain: Arc<BeaconChain<T>>,
next_block: SignedBeaconBlock<E>,
attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_aggregate_attestations: Vec<SignedAggregateAndProof<E>>,
attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing,
voluntary_exit: SignedVoluntaryExit,
beacon_processor_tx: mpsc::Sender<WorkEvent<T>>,
work_journal_rx: mpsc::Receiver<String>,
work_journal_rx: mpsc::Receiver<&'static str>,
_network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
_sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
environment: Option<Environment<E>>,
@ -90,7 +93,7 @@ impl TestRig {
"precondition: current slot is one after head"
);
let (next_block, _next_state) =
let (next_block, next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());
let head_state_root = head.beacon_state_root();
@ -111,6 +114,35 @@ impl TestRig {
"precondition: attestations for testing"
);
let next_block_attestations = harness
.get_unaggregated_attestations(
&AttestationStrategy::AllValidators,
&next_state,
next_block.state_root(),
next_block.canonical_root(),
next_block.slot(),
)
.into_iter()
.flatten()
.collect::<Vec<_>>();
let next_block_aggregate_attestations = harness
.make_attestations(
&harness.get_all_validators(),
&next_state,
next_block.state_root(),
next_block.canonical_root().into(),
next_block.slot(),
)
.into_iter()
.filter_map(|(_, aggregate_opt)| aggregate_opt)
.collect::<Vec<_>>();
assert!(
!next_block_attestations.is_empty(),
"precondition: attestation for next block are not empty"
);
let attester_slashing = harness.make_attester_slashing(vec![0, 1]);
let proposer_slashing = harness.make_proposer_slashing(2);
let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap());
@ -174,6 +206,8 @@ impl TestRig {
chain,
next_block,
attestations,
next_block_attestations,
next_block_aggregate_attestations,
attester_slashing,
proposer_slashing,
voluntary_exit,
@ -185,6 +219,10 @@ impl TestRig {
}
}
pub fn head_root(&self) -> Hash256 {
self.chain.head().unwrap().beacon_block_root
}
pub fn enqueue_gossip_block(&self) {
self.beacon_processor_tx
.try_send(WorkEvent::gossip_beacon_block(
@ -196,6 +234,11 @@ impl TestRig {
.unwrap();
}
pub fn enqueue_rpc_block(&self) {
let (event, _rx) = WorkEvent::rpc_beacon_block(Box::new(self.next_block.clone()));
self.beacon_processor_tx.try_send(event).unwrap();
}
pub fn enqueue_unaggregated_attestation(&self) {
let (attestation, subnet_id) = self.attestations.first().unwrap().clone();
self.beacon_processor_tx
@ -240,6 +283,36 @@ impl TestRig {
.unwrap();
}
pub fn enqueue_next_block_unaggregated_attestation(&self) {
let (attestation, subnet_id) = self.next_block_attestations.first().unwrap().clone();
self.beacon_processor_tx
.try_send(WorkEvent::unaggregated_attestation(
junk_message_id(),
junk_peer_id(),
attestation,
subnet_id,
true,
Duration::from_secs(0),
))
.unwrap();
}
pub fn enqueue_next_block_aggregated_attestation(&self) {
let aggregate = self
.next_block_aggregate_attestations
.first()
.unwrap()
.clone();
self.beacon_processor_tx
.try_send(WorkEvent::aggregated_attestation(
junk_message_id(),
junk_peer_id(),
aggregate,
Duration::from_secs(0),
))
.unwrap();
}
fn runtime(&mut self) -> Arc<Runtime> {
self.environment
.as_mut()
@ -265,27 +338,37 @@ impl TestRig {
})
}
/// Assert that the `BeaconProcessor` event journal is as `expected`.
/// Checks that the `BeaconProcessor` event journal contains the `expected` events in the given
/// order with a matching number of `WORKER_FREED` events in between. `NOTHING_TO_DO` events
/// are ignored.
///
/// ## Note
///
/// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense
/// to use the `NOTHING_TO_DO` event to ensure that execution has completed.
pub fn assert_event_journal(&mut self, expected: &[&str]) {
let events = self.runtime().block_on(async {
let mut events = vec![];
/// Given the described logic, `expected` must not contain `WORKER_FREED` or `NOTHING_TO_DO`
/// events.
pub fn assert_event_journal_contains_ordered(&mut self, expected: &[&str]) {
assert!(expected
.iter()
.all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO));
let (events, worker_freed_remaining) = self.runtime().block_on(async {
let mut events = Vec::with_capacity(expected.len());
let mut worker_freed_remaining = expected.len();
let drain_future = async {
loop {
match self.work_journal_rx.recv().await {
Some(event) => {
events.push(event);
// Break as soon as we collect the desired number of events.
if events.len() >= expected.len() {
Some(event) if event == WORKER_FREED => {
worker_freed_remaining -= 1;
if worker_freed_remaining == 0 {
// Break when all expected events are finished.
break;
}
}
Some(event) if event == NOTHING_TO_DO => {
// Ignore these.
}
Some(event) => {
events.push(event);
}
None => break,
}
}
@ -294,9 +377,53 @@ impl TestRig {
// Drain the expected number of events from the channel, or time out and give up.
tokio::select! {
_ = tokio::time::sleep(STANDARD_TIMEOUT) => panic!(
"timeout ({:?}) expired waiting for events. expected {:?} but got {:?}",
"Timeout ({:?}) expired waiting for events. Expected {:?} but got {:?} waiting for {} `WORKER_FREED` events.",
STANDARD_TIMEOUT,
expected,
events,
worker_freed_remaining,
),
_ = drain_future => {},
}
(events, worker_freed_remaining)
});
assert_eq!(events, expected);
assert_eq!(worker_freed_remaining, 0);
}
pub fn assert_event_journal(&mut self, expected: &[&str]) {
self.assert_event_journal_with_timeout(expected, STANDARD_TIMEOUT);
}
/// Assert that the `BeaconProcessor` event journal is as `expected`.
///
/// ## Note
///
/// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense
/// to use the `NOTHING_TO_DO` event to ensure that execution has completed.
pub fn assert_event_journal_with_timeout(&mut self, expected: &[&str], timeout: Duration) {
let events = self.runtime().block_on(async {
let mut events = Vec::with_capacity(expected.len());
let drain_future = async {
while let Some(event) = self.work_journal_rx.recv().await {
events.push(event);
// Break as soon as we collect the desired number of events.
if events.len() >= expected.len() {
break;
}
}
};
// Drain the expected number of events from the channel, or time out and give up.
tokio::select! {
_ = tokio::time::sleep(timeout) => panic!(
"Timeout ({:?}) expired waiting for events. Expected {:?} but got {:?}",
timeout,
expected,
events
),
_ = drain_future => {},
@ -305,13 +432,7 @@ impl TestRig {
events
});
assert_eq!(
events,
expected
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
);
assert_eq!(events, expected);
}
}
@ -353,18 +474,18 @@ fn import_gossip_block_acceptably_early() {
// processing.
//
// If this causes issues we might be able to make the block delay queue add a longer delay for
// processing, instead of just MAXIMUM_GOSSIP_CLOCK_DISPARITY. Speak to @paulhauner if this test
// processing, instead of just ADDITIONAL_QUEUED_BLOCK_DELAY. Speak to @paulhauner if this test
// starts failing.
rig.chain.slot_clock.set_slot(rig.next_block.slot().into());
assert!(
rig.chain.head().unwrap().beacon_block_root != rig.next_block.canonical_root(),
rig.head_root() != rig.next_block.canonical_root(),
"block not yet imported"
);
rig.assert_event_journal(&[DELAYED_IMPORT_BLOCK, WORKER_FREED, NOTHING_TO_DO]);
assert_eq!(
rig.chain.head().unwrap().beacon_block_root,
rig.head_root(),
rig.next_block.canonical_root(),
"block should be imported and become head"
);
@ -395,12 +516,12 @@ fn import_gossip_block_unacceptably_early() {
rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]);
// Waiting for 5 seconds is a bit arbtirary, however it *should* be long enough to ensure the
// Waiting for 5 seconds is a bit arbitrary, however it *should* be long enough to ensure the
// block isn't imported.
rig.assert_no_events_for(Duration::from_secs(5));
assert!(
rig.chain.head().unwrap().beacon_block_root != rig.next_block.canonical_root(),
rig.head_root() != rig.next_block.canonical_root(),
"block should not be imported"
);
}
@ -421,7 +542,7 @@ fn import_gossip_block_at_current_slot() {
rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]);
assert_eq!(
rig.chain.head().unwrap().beacon_block_root,
rig.head_root(),
rig.next_block.canonical_root(),
"block should be imported and become head"
);
@ -445,6 +566,207 @@ fn import_gossip_attestation() {
);
}
enum BlockImportMethod {
Gossip,
Rpc,
}
/// Ensure that attestations that reference an unknown block get properly re-queued and
/// re-processed upon importing the block.
fn attestation_to_unknown_block_processed(import_method: BlockImportMethod) {
let mut rig = TestRig::new(SMALL_CHAIN);
// Send the attestation but not the block, and check that it was not imported.
let initial_attns = rig.chain.naive_aggregation_pool.read().num_attestations();
rig.enqueue_next_block_unaggregated_attestation();
rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]);
assert_eq!(
rig.chain.naive_aggregation_pool.read().num_attestations(),
initial_attns,
"Attestation should not have been included."
);
// Send the block and ensure that the attestation is received back and imported.
let block_event = match import_method {
BlockImportMethod::Gossip => {
rig.enqueue_gossip_block();
GOSSIP_BLOCK
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
RPC_BLOCK
}
};
rig.assert_event_journal_contains_ordered(&[block_event, UNKNOWN_BLOCK_ATTESTATION]);
// Run fork choice, since it isn't run when processing an RPC block. At runtime it is the
// responsibility of the sync manager to do this.
rig.chain.fork_choice().unwrap();
assert_eq!(
rig.head_root(),
rig.next_block.canonical_root(),
"Block should be imported and become head."
);
assert_eq!(
rig.chain.naive_aggregation_pool.read().num_attestations(),
initial_attns + 1,
"Attestation should have been included."
);
}
#[test]
fn attestation_to_unknown_block_processed_after_gossip_block() {
attestation_to_unknown_block_processed(BlockImportMethod::Gossip)
}
#[test]
fn attestation_to_unknown_block_processed_after_rpc_block() {
attestation_to_unknown_block_processed(BlockImportMethod::Rpc)
}
/// Ensure that attestations that reference an unknown block get properly re-queued and
/// re-processed upon importing the block.
fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod) {
let mut rig = TestRig::new(SMALL_CHAIN);
// Empty the op pool.
rig.chain
.op_pool
.prune_attestations(u64::max_value().into());
assert_eq!(rig.chain.op_pool.num_attestations(), 0);
// Send the attestation but not the block, and check that it was not imported.
let initial_attns = rig.chain.op_pool.num_attestations();
rig.enqueue_next_block_aggregated_attestation();
rig.assert_event_journal(&[GOSSIP_AGGREGATE, WORKER_FREED, NOTHING_TO_DO]);
assert_eq!(
rig.chain.op_pool.num_attestations(),
initial_attns,
"Attestation should not have been included."
);
// Send the block and ensure that the attestation is received back and imported.
let block_event = match import_method {
BlockImportMethod::Gossip => {
rig.enqueue_gossip_block();
GOSSIP_BLOCK
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
RPC_BLOCK
}
};
rig.assert_event_journal_contains_ordered(&[block_event, UNKNOWN_BLOCK_AGGREGATE]);
// Run fork choice, since it isn't run when processing an RPC block. At runtime it is the
// responsibility of the sync manager to do this.
rig.chain.fork_choice().unwrap();
assert_eq!(
rig.head_root(),
rig.next_block.canonical_root(),
"Block should be imported and become head."
);
assert_eq!(
rig.chain.op_pool.num_attestations(),
initial_attns + 1,
"Attestation should have been included."
);
}
#[test]
fn aggregate_attestation_to_unknown_block_processed_after_gossip_block() {
aggregate_attestation_to_unknown_block(BlockImportMethod::Gossip)
}
#[test]
fn aggregate_attestation_to_unknown_block_processed_after_rpc_block() {
aggregate_attestation_to_unknown_block(BlockImportMethod::Rpc)
}
/// Ensure that attestations that reference an unknown block get properly re-queued and re-processed
/// when the block is not seen.
#[test]
fn requeue_unknown_block_gossip_attestation_without_import() {
let mut rig = TestRig::new(SMALL_CHAIN);
// Send the attestation but not the block, and check that it was not imported.
let initial_attns = rig.chain.naive_aggregation_pool.read().num_attestations();
rig.enqueue_next_block_unaggregated_attestation();
rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]);
assert_eq!(
rig.chain.naive_aggregation_pool.read().num_attestations(),
initial_attns,
"Attestation should not have been included."
);
// Ensure that the attestation is received back but not imported.
rig.assert_event_journal_with_timeout(
&[UNKNOWN_BLOCK_ATTESTATION, WORKER_FREED, NOTHING_TO_DO],
Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY,
);
assert_eq!(
rig.chain.naive_aggregation_pool.read().num_attestations(),
initial_attns,
"Attestation should not have been included."
);
}
/// Ensure that aggregate that reference an unknown block get properly re-queued and re-processed
/// when the block is not seen.
#[test]
fn requeue_unknown_block_gossip_aggregated_attestation_without_import() {
let mut rig = TestRig::new(SMALL_CHAIN);
// Send the attestation but not the block, and check that it was not imported.
let initial_attns = rig.chain.op_pool.num_attestations();
rig.enqueue_next_block_aggregated_attestation();
rig.assert_event_journal(&[GOSSIP_AGGREGATE, WORKER_FREED, NOTHING_TO_DO]);
assert_eq!(
rig.chain.naive_aggregation_pool.read().num_attestations(),
initial_attns,
"Attestation should not have been included."
);
// Ensure that the attestation is received back but not imported.
rig.assert_event_journal_with_timeout(
&[UNKNOWN_BLOCK_AGGREGATE, WORKER_FREED, NOTHING_TO_DO],
Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY,
);
assert_eq!(
rig.chain.op_pool.num_attestations(),
initial_attns,
"Attestation should not have been included."
);
}
/// Ensure a bunch of valid operations can be imported.
#[test]
fn import_misc_gossip_ops() {

View File

@ -0,0 +1,500 @@
//! Provides a mechanism which queues work for later processing.
//!
//! When the `beacon_processor::Worker` imports a block that is acceptably early (i.e., within the
//! gossip propagation tolerance) it will send it to this queue where it will be placed in a
//! `DelayQueue` until the slot arrives. Once the block has been determined to be ready, it will be
//! sent back out on a channel to be processed by the `BeaconProcessor` again.
//!
//! There is the edge-case where the slot arrives before this queue manages to process it. In that
//! case, the block will be sent off for immediate processing (skipping the `DelayQueue`).
//!
//! Aggregated and unaggregated attestations that failed verification due to referencing an unknown
//! block will be re-queued until their block is imported, or until they expire.
use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use eth2_libp2p::{MessageId, PeerId};
use fnv::FnvHashMap;
use futures::task::Poll;
use futures::{Stream, StreamExt};
use slog::{crit, debug, error, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const BLOCKS: &str = "blocks";
const ATTESTATIONS: &str = "attestations";
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
/// This is to account for any slight drift in the system clock.
const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
/// For how long to queue aggregated and unaggregated attestations for re-processing.
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 1_024;
/// Messages that the scheduler can receive.
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// A block that has been received early and we should queue for later processing.
EarlyBlock(QueuedBlock<T>),
/// A block that was successfully processed. We use this to handle attestations for unknown
/// blocks.
BlockImported(Hash256),
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
/// An aggregated attestation that references an unknown block.
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
}
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork<T: BeaconChainTypes> {
Block(QueuedBlock<T>),
Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>),
}
/// An Attestation for which the corresponding block was not seen while processing, queued for
/// later.
pub struct QueuedUnaggregate<T: EthSpec> {
pub peer_id: PeerId,
pub message_id: MessageId,
pub attestation: Box<Attestation<T>>,
pub subnet_id: SubnetId,
pub should_import: bool,
pub seen_timestamp: Duration,
}
/// An aggregated attestation for which the corresponding block was not seen while processing, queued for
/// later.
pub struct QueuedAggregate<T: EthSpec> {
pub peer_id: PeerId,
pub message_id: MessageId,
pub attestation: Box<SignedAggregateAndProof<T>>,
pub seen_timestamp: Duration,
}
/// A block that arrived early and has been queued for later import.
pub struct QueuedBlock<T: BeaconChainTypes> {
pub peer_id: PeerId,
pub block: GossipVerifiedBlock<T>,
pub seen_timestamp: Duration,
}
/// Unifies the different messages processed by the block delay queue.
enum InboundEvent<T: BeaconChainTypes> {
/// A block that was queued for later processing and is ready for import.
ReadyBlock(QueuedBlock<T>),
/// An aggregated or unaggregated attestation is ready for re-processing.
ReadyAttestation(QueuedAttestationId),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage<T>),
}
/// Manages scheduling works that need to be later re-processed.
struct ReprocessQueue<T: BeaconChainTypes> {
/// Receiver of messages relevant to schedule works for reprocessing.
work_reprocessing_rx: Receiver<ReprocessQueueMessage<T>>,
/// Sender of works once they become ready
ready_work_tx: Sender<ReadyWork<T>>,
/* Queues */
/// Queue to manage scheduled early blocks.
block_delay_queue: DelayQueue<QueuedBlock<T>>,
/// Queue to manage scheduled attestations.
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
/* Queued items */
/// Queued blocks.
queued_block_roots: HashSet<Hash256>,
/// Queued aggregated attestations.
queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>,
/// Queued attestations.
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
/// Attestations (aggregated and unaggregated) per root.
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum QueuedAttestationId {
Aggregate(usize),
Unaggregate(usize),
}
impl<T: EthSpec> QueuedAggregate<T> {
pub fn beacon_block_root(&self) -> &Hash256 {
&self.attestation.message.aggregate.data.beacon_block_root
}
}
impl<T: EthSpec> QueuedUnaggregate<T> {
pub fn beacon_block_root(&self) -> &Hash256 {
&self.attestation.data.beacon_block_root
}
}
impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
type Item = InboundEvent<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// NOTE: implementing `Stream` is not necessary but allows to maintain the future selection
// order fine-grained and separate from the logic of handling each message, which is nice.
// Poll for expired blocks *before* we try to process new blocks.
//
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.attestations_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(attestation_id))) => {
return Poll::Ready(Some(InboundEvent::ReadyAttestation(
attestation_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
// Last empty the messages channel.
match self.work_reprocessing_rx.poll_recv(cx) {
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
Poll::Ready(None) | Poll::Pending => {}
}
Poll::Pending
}
}
/// Starts the job that manages scheduling works that need re-processing. The returned `Sender`
/// gives the communicating channel to receive those works. Once a work is ready, it is sent back
/// via `ready_work_tx`.
pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
ready_work_tx: Sender<ReadyWork<T>>,
executor: &TaskExecutor,
slot_clock: T::SlotClock,
log: Logger,
) -> Sender<ReprocessQueueMessage<T>> {
let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
// Basic sanity check.
assert!(ADDITIONAL_QUEUED_BLOCK_DELAY < MAXIMUM_GOSSIP_CLOCK_DISPARITY);
let mut queue = ReprocessQueue {
work_reprocessing_rx,
ready_work_tx,
block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
queued_block_roots: HashSet::new(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
next_attestation: 0,
};
executor.spawn(
async move {
while let Some(msg) = queue.next().await {
queue.handle_message(msg, &slot_clock, &log);
}
debug!(
log,
"Re-process queue stopped";
"msg" => "shutting down"
);
},
TASK_NAME,
);
work_reprocessing_tx
}
impl<T: BeaconChainTypes> ReprocessQueue<T> {
fn handle_message(&mut self, msg: InboundEvent<T>, slot_clock: &T::SlotClock, log: &Logger) {
use ReprocessQueueMessage::*;
match msg {
// Some block has been indicated as "early" and should be processed when the
// appropriate slot arrives.
InboundEvent::Msg(EarlyBlock(early_block)) => {
let block_slot = early_block.block.block.slot();
let block_root = early_block.block.block_root;
// Don't add the same block to the queue twice. This prevents DoS attacks.
if self.queued_block_roots.contains(&block_root) {
return;
}
if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue.
if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
error!(
log,
"Early blocks queue is full";
"queue_size" => MAXIMUM_QUEUED_BLOCKS,
"msg" => "check system clock"
);
// Drop the block.
return;
}
self.queued_block_roots.insert(block_root);
// Queue the block until the start of the appropriate slot, plus
// `ADDITIONAL_QUEUED_BLOCK_DELAY`.
self.block_delay_queue.insert(
early_block,
duration_till_slot + ADDITIONAL_QUEUED_BLOCK_DELAY,
);
} else {
// If there is no duration till the next slot, check to see if the slot
// has already arrived. If it has already arrived, send it out for
// immediate processing.
//
// If we can't read the slot or the slot hasn't arrived, simply drop the
// block.
//
// This logic is slightly awkward since `SlotClock::duration_to_slot`
// doesn't distinguish between a slot that has already arrived and an
// error reading the slot clock.
if let Some(now) = slot_clock.now() {
if block_slot <= now
&& self
.ready_work_tx
.try_send(ReadyWork::Block(early_block))
.is_err()
{
error!(
log,
"Failed to send block";
);
}
}
}
}
InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
error!(
log,
"Aggregate attestation delay queue is full";
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
"msg" => "check system clock"
);
// Drop the attestation.
return;
}
let att_id = QueuedAttestationId::Aggregate(self.next_attestation);
// Register the delay.
let delay_key = self
.attestations_delay_queue
.insert(att_id, QUEUED_ATTESTATION_DELAY);
// Register this attestation for the corresponding root.
self.awaiting_attestations_per_root
.entry(*queued_aggregate.beacon_block_root())
.or_default()
.push(att_id);
// Store the attestation and its info.
self.queued_aggregates
.insert(self.next_attestation, (queued_aggregate, delay_key));
self.next_attestation += 1;
}
InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
error!(
log,
"Attestation delay queue is full";
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
"msg" => "check system clock"
);
// Drop the attestation.
return;
}
let att_id = QueuedAttestationId::Unaggregate(self.next_attestation);
// Register the delay.
let delay_key = self
.attestations_delay_queue
.insert(att_id, QUEUED_ATTESTATION_DELAY);
// Register this attestation for the corresponding root.
self.awaiting_attestations_per_root
.entry(*queued_unaggregate.beacon_block_root())
.or_default()
.push(att_id);
// Store the attestation and its info.
self.queued_unaggregates
.insert(self.next_attestation, (queued_unaggregate, delay_key));
self.next_attestation += 1;
}
InboundEvent::Msg(BlockImported(root)) => {
// Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) {
for id in queued_ids {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
);
if let Some((work, delay_key)) = match id {
QueuedAttestationId::Aggregate(id) => self
.queued_aggregates
.remove(&id)
.map(|(aggregate, delay_key)| {
(ReadyWork::Aggregate(aggregate), delay_key)
}),
QueuedAttestationId::Unaggregate(id) => self
.queued_unaggregates
.remove(&id)
.map(|(unaggregate, delay_key)| {
(ReadyWork::Unaggregate(unaggregate), delay_key)
}),
} {
// Remove the delay.
self.attestations_delay_queue.remove(&delay_key);
// Send the work.
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled attestation";
);
}
} else {
// There is a mismatch between the attestation ids registered for this
// root and the queued attestations. This should never happen.
error!(
log,
"Unknown queued attestation for block root";
"block_root" => ?root,
"att_id" => ?id,
);
}
}
}
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyBlock(ready_block) => {
let block_root = ready_block.block.block_root;
if !self.queued_block_roots.remove(&block_root) {
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown block in delay queue";
"block_root" => ?block_root
);
}
if self
.ready_work_tx
.try_send(ReadyWork::Block(ready_block))
.is_err()
{
error!(
log,
"Failed to pop queued block";
);
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,
"Failed to poll queue";
"queue" => queue_name,
"e" => ?e
)
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
);
if let Some((root, work)) = match queued_id {
QueuedAttestationId::Aggregate(id) => {
self.queued_aggregates
.remove(&id)
.map(|(aggregate, _delay_key)| {
(
*aggregate.beacon_block_root(),
ReadyWork::Aggregate(aggregate),
)
})
}
QueuedAttestationId::Unaggregate(id) => self
.queued_unaggregates
.remove(&id)
.map(|(unaggregate, _delay_key)| {
(
*unaggregate.beacon_block_root(),
ReadyWork::Unaggregate(unaggregate),
)
}),
} {
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled attestation";
);
}
if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root) {
if let Some(index) = queued_atts.iter().position(|&id| id == queued_id) {
queued_atts.swap_remove(index);
}
}
}
}
}
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[BLOCKS],
self.block_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[ATTESTATIONS],
self.attestations_delay_queue.len() as i64,
);
}
}

View File

@ -13,11 +13,48 @@ use ssz::Encode;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
};
use super::{super::block_delay_queue::QueuedBlock, Worker};
use super::{
super::work_reprocessing_queue::{
QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReprocessQueueMessage,
},
Worker,
};
/// Data for an aggregated or unaggregated attestation that failed verification.
enum FailedAtt<T: EthSpec> {
Unaggregate {
attestation: Box<Attestation<T>>,
subnet_id: SubnetId,
should_import: bool,
seen_timestamp: Duration,
},
Aggregate {
attestation: Box<SignedAggregateAndProof<T>>,
seen_timestamp: Duration,
},
}
impl<T: EthSpec> FailedAtt<T> {
pub fn root(&self) -> &Hash256 {
match self {
FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root,
FailedAtt::Aggregate { attestation, .. } => {
&attestation.message.aggregate.data.beacon_block_root
}
}
}
pub fn kind(&self) -> &'static str {
match self {
FailedAtt::Unaggregate { .. } => "unaggregated",
FailedAtt::Aggregate { .. } => "aggregated",
}
}
}
impl<T: BeaconChainTypes> Worker<T> {
/* Auxiliary functions */
@ -59,6 +96,7 @@ impl<T: BeaconChainTypes> Worker<T> {
/// - Attempt to add it to the naive aggregation pool.
///
/// Raises a log if there are errors.
#[allow(clippy::too_many_arguments)]
pub fn process_gossip_attestation(
self,
message_id: MessageId,
@ -66,6 +104,7 @@ impl<T: BeaconChainTypes> Worker<T> {
attestation: Attestation<T::EthSpec>,
subnet_id: SubnetId,
should_import: bool,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration,
) {
let beacon_block_root = attestation.data.beacon_block_root;
@ -75,12 +114,17 @@ impl<T: BeaconChainTypes> Worker<T> {
.verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id))
{
Ok(attestation) => attestation,
Err(e) => {
Err((e, attestation)) => {
self.handle_attestation_verification_failure(
peer_id,
message_id,
beacon_block_root,
"unaggregated",
FailedAtt::Unaggregate {
attestation: Box::new(attestation),
subnet_id,
should_import,
seen_timestamp,
},
reprocess_tx,
e,
);
return;
@ -153,6 +197,7 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id: MessageId,
peer_id: PeerId,
aggregate: SignedAggregateAndProof<T::EthSpec>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration,
) {
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
@ -162,13 +207,16 @@ impl<T: BeaconChainTypes> Worker<T> {
.verify_aggregated_attestation_for_gossip(aggregate)
{
Ok(aggregate) => aggregate,
Err(e) => {
Err((e, attestation)) => {
// Report the failure to gossipsub
self.handle_attestation_verification_failure(
peer_id,
message_id,
beacon_block_root,
"aggregated",
FailedAtt::Aggregate {
attestation: Box::new(attestation),
seen_timestamp,
},
reprocess_tx,
e,
);
return;
@ -238,7 +286,7 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id: MessageId,
peer_id: PeerId,
block: SignedBeaconBlock<T::EthSpec>,
delayed_import_tx: mpsc::Sender<QueuedBlock<T>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
seen_duration: Duration,
) {
// Log metrics to track delay from other nodes on the network.
@ -361,12 +409,12 @@ impl<T: BeaconChainTypes> Worker<T> {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL);
if delayed_import_tx
.try_send(QueuedBlock {
if reprocess_tx
.try_send(ReprocessQueueMessage::EarlyBlock(QueuedBlock {
peer_id,
block: verified_block,
seen_timestamp: seen_duration,
})
}))
.is_err()
{
error!(
@ -378,7 +426,12 @@ impl<T: BeaconChainTypes> Worker<T> {
)
}
}
Ok(_) => self.process_gossip_verified_block(peer_id, verified_block, seen_duration),
Ok(_) => self.process_gossip_verified_block(
peer_id,
verified_block,
reprocess_tx,
seen_duration,
),
Err(e) => {
error!(
self.log,
@ -399,24 +452,34 @@ impl<T: BeaconChainTypes> Worker<T> {
self,
peer_id: PeerId,
verified_block: GossipVerifiedBlock<T>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let block = Box::new(verified_block.block.clone());
match self.chain.process_block(verified_block) {
Ok(_block_root) => {
Ok(block_root) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported(block_root))
.is_err()
{
error!(
self.log,
"Failed to inform block import";
"source" => "gossip",
"block_root" => %block_root,
)
};
trace!(
self.log,
"Gossipsub block processed";
"peer_id" => %peer_id
);
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
match self.chain.fork_choice() {
Ok(()) => trace!(
self.log,
@ -627,14 +690,16 @@ impl<T: BeaconChainTypes> Worker<T> {
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
pub fn handle_attestation_verification_failure(
fn handle_attestation_verification_failure(
&self,
peer_id: PeerId,
message_id: MessageId,
beacon_block_root: Hash256,
attestation_type: &str,
failed_att: FailedAtt<T::EthSpec>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
error: AttnError,
) {
let beacon_block_root = failed_att.root();
let attestation_type = failed_att.kind();
metrics::register_attestation_error(&error);
match &error {
AttnError::FutureEpoch { .. }
@ -796,20 +861,15 @@ impl<T: BeaconChainTypes> Worker<T> {
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
AttnError::UnknownHeadBlock { beacon_block_root } => {
// Note: its a little bit unclear as to whether or not this block is unknown or
// just old. See:
//
// https://github.com/sigp/lighthouse/issues/1039
// TODO: Maintain this attestation and re-process once sync completes
// TODO: We then score based on whether we can download the block and re-process.
trace!(
self.log,
"Attestation for unknown block";
"peer_id" => %peer_id,
"block" => %beacon_block_root
);
// we don't know the block, get the sync manager to handle the block lookup
if let Some(sender) = reprocess_tx {
// We don't know the block, get the sync manager to handle the block lookup, and
// send the attestation to be scheduled for re-processing.
self.sync_tx
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
.unwrap_or_else(|_| {
@ -819,7 +879,58 @@ impl<T: BeaconChainTypes> Worker<T> {
"msg" => "UnknownBlockHash"
)
});
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
let msg = match failed_att {
FailedAtt::Aggregate {
attestation,
seen_timestamp,
} => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL,
);
ReprocessQueueMessage::UnknownBlockAggregate(QueuedAggregate {
peer_id,
message_id,
attestation,
seen_timestamp,
})
}
FailedAtt::Unaggregate {
attestation,
subnet_id,
should_import,
seen_timestamp,
} => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL,
);
ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
peer_id,
message_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
})
}
};
if sender.try_send(msg).is_err() {
error!(
self.log,
"Failed to send attestation for re-processing";
)
}
} else {
// We shouldn't make any further attempts to process this attestation.
// Downscore the peer.
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
return;
}
AttnError::UnknownTargetRoot(_) => {
@ -879,7 +990,6 @@ impl<T: BeaconChainTypes> Worker<T> {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
AttnError::InvalidSubnetId { received, expected } => {
/*
* The attestation was received on an incorrect subnet id.

View File

@ -1,4 +1,4 @@
use super::QueuedBlock;
use super::work_reprocessing_queue::ReprocessQueueMessage;
use crate::{service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use slog::{error, Logger};
@ -46,5 +46,5 @@ impl<T: BeaconChainTypes> Worker<T> {
/// Contains the necessary items for a worker to do their job.
pub struct Toolbox<T: BeaconChainTypes> {
pub idle_tx: mpsc::Sender<()>,
pub delayed_block_tx: mpsc::Sender<QueuedBlock<T>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
}

View File

@ -1,4 +1,4 @@
use super::Worker;
use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker};
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
use crate::beacon_processor::BlockResultSender;
use crate::metrics;
@ -7,6 +7,7 @@ use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::{BeaconChainTypes, BlockError, ChainSegmentResult};
use eth2_libp2p::PeerId;
use slog::{crit, debug, error, info, trace, warn};
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
/// Id associated to a block processing request, either a batch or a single block.
@ -27,6 +28,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self,
block: SignedBeaconBlock<T::EthSpec>,
result_tx: BlockResultSender<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
) {
let slot = block.slot();
let block_result = self.chain.process_block(block);
@ -40,6 +42,18 @@ impl<T: BeaconChainTypes> Worker<T> {
"slot" => slot,
"hash" => %root
);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported(*root))
.is_err()
{
error!(
self.log,
"Failed to inform block import";
"source" => "rpc",
"block_root" => %root,
)
};
}
if result_tx.send(block_result).is_err() {

View File

@ -301,6 +301,10 @@ lazy_static! {
"beacon_processor_unaggregated_attestation_imported_total",
"Total number of unaggregated attestations imported to fork choice, etc."
);
pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_unaggregated_attestation_requeued_total",
"Total number of unaggregated attestations that referenced an unknown block and were re-queued."
);
// Aggregated attestations.
pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_aggregated_attestation_queue_total",
@ -314,6 +318,10 @@ lazy_static! {
"beacon_processor_aggregated_attestation_imported_total",
"Total number of aggregated attestations imported to fork choice, etc."
);
pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_aggregated_attestation_requeued_total",
"Total number of aggregated attestations that referenced an unknown block and were re-queued."
);
}
lazy_static! {
@ -370,6 +378,24 @@ lazy_static! {
"beacon_block_gossip_slot_start_delay_time",
"Duration between when the block is received and the start of the slot it belongs to.",
);
/*
* Attestation reprocessing queue metrics.
*/
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"beacon_processor_reprocessing_queue_total",
"Count of items in a reprocessing queue.",
&["type"]
);
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_expired_attestations",
"Number of queued attestations which have expired before a matching block has been found"
);
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_matched_attestations",
"Number of queued attestations where as matching block has been imported"
);
}
pub fn register_attestation_error(error: &AttnError) {

View File

@ -52,7 +52,7 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq +
/*
* Misc
*/
type MaxValidatorsPerCommittee: Unsigned + Clone + Sync + Send + Debug + PartialEq + Eq + Unpin;
type MaxValidatorsPerCommittee: Unsigned + Clone + Sync + Send + Debug + PartialEq + Eq;
/*
* Time parameters
*/