a62e52f319
* some blob reprocessing work * remove ForceBlockLookup * reorder enum match arms in sync manager * a lot more reprocessing work * impl logic for triggerng blob lookups along with block lookups * deal with rpc blobs in groups per block in the da checker. don't cache missing blob ids in the da checker. * make single block lookup generic * more work * add delayed processing logic and combine some requests * start fixing some compile errors * fix compilation in main block lookup mod * much work * get things compiling * parent blob lookups * fix compile * revert red/stevie changes * fix up sync manager delay message logic * add peer usefulness enum * should remove lookup refactor * consolidate retry error handling * improve peer scoring during certain failures in parent lookups * improve retry code * drop parent lookup if either req has a peer disconnect during download * refactor single block processed method * processing peer refactor * smol bugfix * fix some todos * fix lints * fix lints * fix compile in lookup tests * fix lints * fix lints * fix existing block lookup tests * renamings * fix after merge * cargo fmt * compilation fix in beacon chain tests * fix * refactor lookup tests to work with multiple forks and response types * make tests into macros * wrap availability check error * fix compile after merge * add random blobs * start fixing up lookup verify error handling * some bug fixes and the start of deneb only tests * make tests work for all forks * track information about peer source * error refactoring * improve peer scoring * fix test compilation * make sure blobs are sent for processing after stream termination, delete copied tests * add some tests and fix a bug * smol bugfixes and moar tests * add tests and fix some things * compile after merge * lots of refactoring * retry on invalid block/blob * merge unknown parent messages before current slot lookup * get tests compiling * penalize blob peer on invalid blobs * Check disk on in-memory cache miss * Update beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs * Update beacon_node/network/src/sync/network_context.rs Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> * fix bug in matching blocks and blobs in range sync * pr feedback * fix conflicts * upgrade logs from warn to crit when we receive incorrect response in range * synced_and_connected_within_tolerance -> should_search_for_block * remove todo * Fix Broken Overflow Tests * fix merge conflicts * checkpoint sync without alignment * add import * query for checkpoint state by slot rather than state root (teku doesn't serve by state root) * get state first and query by most recent block root * simplify delay logic * rename unknown parent sync message variants * rename parameter, block_slot -> slot * add some docs to the lookup module * use interval instead of sleep * drop request if blocks and blobs requests both return `None` for `Id` * clean up `find_single_lookup` logic * add lookup source enum * clean up `find_single_lookup` logic * add docs to find_single_lookup_request * move LookupSource our of param where unnecessary * remove unnecessary todo * query for block by `state.latest_block_header.slot` * fix lint * fix test * fix test * fix observed blob sidecars test * PR updates * use optional params instead of a closure * create lookup and trigger request in separate method calls * remove `LookupSource` * make sure duplicate lookups are not dropped --------- Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com> Co-authored-by: Mark Mackey <mark@sigmaprime.io> Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com>
2250 lines
89 KiB
Rust
2250 lines
89 KiB
Rust
//! Provides the `BeaconProcessor`, a multi-threaded processor for messages received on the network
|
|
//! that need to be processed by the `BeaconChain`.
|
|
//!
|
|
//! Uses `tokio` tasks (instead of raw threads) to provide the following tasks:
|
|
//!
|
|
//! - A "manager" task, which either spawns worker tasks or enqueues work.
|
|
//! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`.
|
|
//! - A task managing the scheduling of work that needs to be re-processed.
|
|
//!
|
|
//! ## Purpose
|
|
//!
|
|
//! The purpose of the `BeaconProcessor` is to provide two things:
|
|
//!
|
|
//! 1. Moving long-running, blocking tasks off the main `tokio` executor.
|
|
//! 2. A fixed-length buffer for consensus messages.
|
|
//!
|
|
//! (1) ensures that we don't clog up the networking stack with long-running tasks, potentially
|
|
//! causing timeouts. (2) means that we can easily and explicitly reject messages when we're
|
|
//! overloaded and also distribute load across time.
|
|
//!
|
|
//! ## Detail
|
|
//!
|
|
//! There is a single "manager" thread who listens to three event channels. These events are
|
|
//! either:
|
|
//!
|
|
//! - A new parcel of work (work event).
|
|
//! - Indication that a worker has finished a parcel of work (worker idle).
|
|
//! - A work ready for reprocessing (work event).
|
|
//!
|
|
//! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count.
|
|
//!
|
|
//! Whenever the manager receives a new parcel of work, it is either:
|
|
//!
|
|
//! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers).
|
|
//! - Added to a queue.
|
|
//!
|
|
//! Whenever the manager receives a notification that a worker has finished a parcel of work, it
|
|
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
|
|
//! task.
|
|
|
|
use crate::sync::manager::BlockProcessType;
|
|
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
|
use beacon_chain::blob_verification::BlockWrapper;
|
|
use beacon_chain::parking_lot::Mutex;
|
|
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
|
|
use derivative::Derivative;
|
|
use futures::stream::{Stream, StreamExt};
|
|
use futures::task::Poll;
|
|
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
|
use lighthouse_network::rpc::LightClientBootstrapRequest;
|
|
use lighthouse_network::{
|
|
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
|
|
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
|
|
};
|
|
use logging::TimeLatch;
|
|
use slog::{crit, debug, error, trace, warn, Logger};
|
|
use std::collections::VecDeque;
|
|
use std::future::Future;
|
|
use std::path::PathBuf;
|
|
use std::pin::Pin;
|
|
use std::sync::{Arc, Weak};
|
|
use std::task::Context;
|
|
use std::time::Duration;
|
|
use std::{cmp, collections::HashSet};
|
|
use task_executor::TaskExecutor;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::mpsc::error::TrySendError;
|
|
use types::blob_sidecar::FixedBlobSidecarList;
|
|
use types::{
|
|
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
|
|
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar,
|
|
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
|
|
SyncCommitteeMessage, SyncSubnetId,
|
|
};
|
|
use work_reprocessing_queue::{
|
|
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
|
|
QueuedUnaggregate, ReadyWork,
|
|
};
|
|
|
|
use worker::{Toolbox, Worker};
|
|
|
|
mod tests;
|
|
mod work_reprocessing_queue;
|
|
mod worker;
|
|
|
|
use crate::beacon_processor::work_reprocessing_queue::{
|
|
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
|
|
};
|
|
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
|
|
|
|
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
|
///
|
|
/// Setting this too low will cause consensus messages to be dropped.
|
|
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
|
|
|
|
/// The maximum size of the channel for idle events to the `BeaconProcessor`.
|
|
///
|
|
/// Setting this too low will prevent new workers from being spawned. It *should* only need to be
|
|
/// set to the CPU count, but we set it high to be safe.
|
|
const MAX_IDLE_QUEUE_LEN: usize = 16_384;
|
|
|
|
/// The maximum size of the channel for re-processing work events.
|
|
const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4;
|
|
|
|
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
|
|
/// them.
|
|
const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384;
|
|
|
|
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
|
|
/// them.
|
|
const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192;
|
|
|
|
/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
|
|
/// start dropping them.
|
|
const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 4_096;
|
|
|
|
/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we
|
|
/// start dropping them.
|
|
const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored
|
|
/// before we start dropping them.
|
|
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `SignedBlobSidecar` objects received on gossip that
|
|
/// will be stored before we start dropping them.
|
|
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
|
|
/// within acceptable clock disparity) that will be queued before we start dropping them.
|
|
const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored
|
|
/// before we start dropping them.
|
|
const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096;
|
|
|
|
/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored
|
|
/// before we start dropping them.
|
|
const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;
|
|
|
|
/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored
|
|
/// before we start dropping them.
|
|
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;
|
|
|
|
/// The maximum number of queued `LightClientFinalityUpdate` objects received on gossip that will be stored
|
|
/// before we start dropping them.
|
|
const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
|
/// before we start dropping them.
|
|
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
|
/// for reprocessing before we start dropping them.
|
|
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;
|
|
|
|
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
|
|
/// them.
|
|
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
|
|
|
|
/// The maximum number of queued `SignedContributionAndProof` objects that will be stored before we
|
|
/// start dropping them.
|
|
const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024;
|
|
|
|
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
|
|
/// will be stored before we start dropping them.
|
|
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
|
|
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024 * 4;
|
|
|
|
/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
|
|
/// be stored before we start dropping them.
|
|
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
|
|
|
|
/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be
|
|
/// stored before we start dropping them.
|
|
const MAX_STATUS_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that
|
|
/// will be stored before we start dropping them.
|
|
const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;
|
|
|
|
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
|
|
/// will be stored before we start dropping them.
|
|
const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
|
|
|
|
const MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them.
|
|
///
|
|
/// This value is set high to accommodate the large spike that is expected immediately after Capella
|
|
/// is activated.
|
|
const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
|
|
|
|
/// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that
|
|
/// will be stored before we start dropping them.
|
|
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;
|
|
|
|
/// The name of the manager tokio task.
|
|
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
|
|
|
|
/// The name of the worker tokio tasks.
|
|
const WORKER_TASK_NAME: &str = "beacon_processor_worker";
|
|
|
|
/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single
|
|
/// batch.
|
|
///
|
|
/// Choosing these values is difficult since there is a trade-off between:
|
|
///
|
|
/// - It is faster to verify one large batch than multiple smaller batches.
|
|
/// - "Poisoning" attacks have a larger impact as the batch size increases.
|
|
///
|
|
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
|
|
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
|
|
/// individually verifying each attestation signature.
|
|
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
|
|
const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
|
|
|
|
/// Unique IDs used for metrics and testing.
|
|
pub const WORKER_FREED: &str = "worker_freed";
|
|
pub const NOTHING_TO_DO: &str = "nothing_to_do";
|
|
pub const GOSSIP_ATTESTATION: &str = "gossip_attestation";
|
|
pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
|
|
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
|
|
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
|
|
pub const GOSSIP_BLOCK: &str = "gossip_block";
|
|
pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar";
|
|
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
|
|
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
|
|
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
|
|
pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing";
|
|
pub const GOSSIP_SYNC_SIGNATURE: &str = "gossip_sync_signature";
|
|
pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution";
|
|
pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
|
|
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
|
|
pub const RPC_BLOCK: &str = "rpc_block";
|
|
pub const RPC_BLOB: &str = "rpc_blob";
|
|
pub const CHAIN_SEGMENT: &str = "chain_segment";
|
|
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
|
|
pub const STATUS_PROCESSING: &str = "status_processing";
|
|
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
|
|
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
|
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
|
|
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
|
|
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
|
|
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
|
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
|
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
|
|
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
|
|
|
|
/// A simple first-in-first-out queue with a maximum length.
|
|
struct FifoQueue<T> {
|
|
queue: VecDeque<T>,
|
|
max_length: usize,
|
|
}
|
|
|
|
impl<T> FifoQueue<T> {
|
|
/// Create a new, empty queue with the given length.
|
|
pub fn new(max_length: usize) -> Self {
|
|
Self {
|
|
queue: VecDeque::default(),
|
|
max_length,
|
|
}
|
|
}
|
|
|
|
/// Add a new item to the queue.
|
|
///
|
|
/// Drops `item` if the queue is full.
|
|
pub fn push(&mut self, item: T, item_desc: &str, log: &Logger) {
|
|
if self.queue.len() == self.max_length {
|
|
error!(
|
|
log,
|
|
"Work queue is full";
|
|
"msg" => "the system has insufficient resources for load",
|
|
"queue_len" => self.max_length,
|
|
"queue" => item_desc,
|
|
)
|
|
} else {
|
|
self.queue.push_back(item);
|
|
}
|
|
}
|
|
|
|
/// Remove the next item from the queue.
|
|
pub fn pop(&mut self) -> Option<T> {
|
|
self.queue.pop_front()
|
|
}
|
|
|
|
/// Returns the current length of the queue.
|
|
pub fn len(&self) -> usize {
|
|
self.queue.len()
|
|
}
|
|
}
|
|
|
|
/// A simple last-in-first-out queue with a maximum length.
|
|
struct LifoQueue<T> {
|
|
queue: VecDeque<T>,
|
|
max_length: usize,
|
|
}
|
|
|
|
impl<T> LifoQueue<T> {
|
|
/// Create a new, empty queue with the given length.
|
|
pub fn new(max_length: usize) -> Self {
|
|
Self {
|
|
queue: VecDeque::default(),
|
|
max_length,
|
|
}
|
|
}
|
|
|
|
/// Add a new item to the front of the queue.
|
|
///
|
|
/// If the queue is full, the item at the back of the queue is dropped.
|
|
pub fn push(&mut self, item: T) {
|
|
if self.queue.len() == self.max_length {
|
|
self.queue.pop_back();
|
|
}
|
|
self.queue.push_front(item);
|
|
}
|
|
|
|
/// Remove the next item from the queue.
|
|
pub fn pop(&mut self) -> Option<T> {
|
|
self.queue.pop_front()
|
|
}
|
|
|
|
/// Returns `true` if the queue is full.
|
|
pub fn is_full(&self) -> bool {
|
|
self.queue.len() >= self.max_length
|
|
}
|
|
|
|
/// Returns the current length of the queue.
|
|
pub fn len(&self) -> usize {
|
|
self.queue.len()
|
|
}
|
|
}
|
|
|
|
/// A handle that sends a message on the provided channel to a receiver when it gets dropped.
|
|
///
|
|
/// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache`
|
|
/// and perform any other necessary cleanup.
|
|
pub struct DuplicateCacheHandle {
|
|
entry: Hash256,
|
|
cache: DuplicateCache,
|
|
}
|
|
|
|
impl Drop for DuplicateCacheHandle {
|
|
fn drop(&mut self) {
|
|
self.cache.remove(&self.entry);
|
|
}
|
|
}
|
|
|
|
/// A simple cache for detecting duplicate block roots across multiple threads.
|
|
#[derive(Clone, Default)]
|
|
pub struct DuplicateCache {
|
|
inner: Arc<Mutex<HashSet<Hash256>>>,
|
|
}
|
|
|
|
impl DuplicateCache {
|
|
/// Checks if the given block_root exists and inserts it into the cache if
|
|
/// it doesn't exist.
|
|
///
|
|
/// Returns a `Some(DuplicateCacheHandle)` if the block_root was successfully
|
|
/// inserted and `None` if the block root already existed in the cache.
|
|
///
|
|
/// The handle removes the entry from the cache when it is dropped. This ensures that any unclean
|
|
/// shutdowns in the worker tasks does not leave inconsistent state in the cache.
|
|
pub fn check_and_insert(&self, block_root: Hash256) -> Option<DuplicateCacheHandle> {
|
|
let mut inner = self.inner.lock();
|
|
if inner.insert(block_root) {
|
|
Some(DuplicateCacheHandle {
|
|
entry: block_root,
|
|
cache: self.clone(),
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Remove the given block_root from the cache.
|
|
pub fn remove(&self, block_root: &Hash256) {
|
|
let mut inner = self.inner.lock();
|
|
inner.remove(block_root);
|
|
}
|
|
}
|
|
|
|
/// An event to be processed by the manager task.
|
|
#[derive(Derivative)]
|
|
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
|
|
pub struct WorkEvent<T: BeaconChainTypes> {
|
|
drop_during_sync: bool,
|
|
work: Work<T>,
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> WorkEvent<T> {
|
|
/// Create a new `Work` event for some unaggregated attestation.
|
|
pub fn unaggregated_attestation(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
attestation: Attestation<T::EthSpec>,
|
|
subnet_id: SubnetId,
|
|
should_import: bool,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::GossipAttestation {
|
|
message_id,
|
|
peer_id,
|
|
attestation: Box::new(attestation),
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some aggregated attestation.
|
|
pub fn aggregated_attestation(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
aggregate: SignedAggregateAndProof<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::GossipAggregate {
|
|
message_id,
|
|
peer_id,
|
|
aggregate: Box::new(aggregate),
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some block.
|
|
pub fn gossip_beacon_block(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
peer_client: Client,
|
|
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::GossipBlock {
|
|
message_id,
|
|
peer_id,
|
|
peer_client,
|
|
block,
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some blobs sidecar.
|
|
pub fn gossip_signed_blob_sidecar(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
peer_client: Client,
|
|
blob_index: u64,
|
|
signed_blob: SignedBlobSidecar<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::GossipSignedBlobSidecar {
|
|
message_id,
|
|
peer_id,
|
|
peer_client,
|
|
blob_index,
|
|
signed_blob: Box::new(signed_blob),
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some sync committee signature.
|
|
pub fn gossip_sync_signature(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
sync_signature: SyncCommitteeMessage,
|
|
subnet_id: SyncSubnetId,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::GossipSyncSignature {
|
|
message_id,
|
|
peer_id,
|
|
sync_signature: Box::new(sync_signature),
|
|
subnet_id,
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some sync committee contribution.
|
|
pub fn gossip_sync_contribution(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
sync_contribution: SignedContributionAndProof<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::GossipSyncContribution {
|
|
message_id,
|
|
peer_id,
|
|
sync_contribution: Box::new(sync_contribution),
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some exit.
|
|
pub fn gossip_voluntary_exit(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
voluntary_exit: Box<SignedVoluntaryExit>,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::GossipVoluntaryExit {
|
|
message_id,
|
|
peer_id,
|
|
voluntary_exit,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some proposer slashing.
|
|
pub fn gossip_proposer_slashing(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
proposer_slashing: Box<ProposerSlashing>,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::GossipProposerSlashing {
|
|
message_id,
|
|
peer_id,
|
|
proposer_slashing,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some light client finality update.
|
|
pub fn gossip_light_client_finality_update(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::GossipLightClientFinalityUpdate {
|
|
message_id,
|
|
peer_id,
|
|
light_client_finality_update,
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some light client optimistic update.
|
|
pub fn gossip_light_client_optimistic_update(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::GossipLightClientOptimisticUpdate {
|
|
message_id,
|
|
peer_id,
|
|
light_client_optimistic_update,
|
|
seen_timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some attester slashing.
|
|
pub fn gossip_attester_slashing(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::GossipAttesterSlashing {
|
|
message_id,
|
|
peer_id,
|
|
attester_slashing,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some BLS to execution change.
|
|
pub fn gossip_bls_to_execution_change(
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::GossipBlsToExecutionChange {
|
|
message_id,
|
|
peer_id,
|
|
bls_to_execution_change,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new `Work` event for some block, where the result from computation (if any) is
|
|
/// sent to the other side of `result_tx`.
|
|
pub fn rpc_beacon_block(
|
|
block_root: Hash256,
|
|
block: BlockWrapper<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
process_type: BlockProcessType,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::RpcBlock {
|
|
block_root,
|
|
block,
|
|
seen_timestamp,
|
|
process_type,
|
|
should_process: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn rpc_blobs(
|
|
block_root: Hash256,
|
|
blobs: FixedBlobSidecarList<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
process_type: BlockProcessType,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::RpcBlobs {
|
|
block_root,
|
|
blobs,
|
|
seen_timestamp,
|
|
process_type,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new work event to import `blocks` as a beacon chain segment.
|
|
pub fn chain_segment(
|
|
process_id: ChainSegmentProcessId,
|
|
blocks: Vec<BlockWrapper<T::EthSpec>>,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::ChainSegment { process_id, blocks },
|
|
}
|
|
}
|
|
|
|
/// Create a new work event to process `StatusMessage`s from the RPC network.
|
|
pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::Status { peer_id, message },
|
|
}
|
|
}
|
|
|
|
/// Create a new work event to process `BlocksByRangeRequest`s from the RPC network.
|
|
pub fn blocks_by_range_request(
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlocksByRangeRequest,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::BlocksByRangeRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new work event to process `BlocksByRootRequest`s from the RPC network.
|
|
pub fn blocks_by_roots_request(
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlocksByRootRequest,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::BlocksByRootsRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn blobs_by_range_request(
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlobsByRangeRequest,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::BlobsByRangeRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Create a new work event to process `LightClientBootstrap`s from the RPC network.
|
|
pub fn lightclient_bootstrap_request(
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: LightClientBootstrapRequest,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: true,
|
|
work: Work::LightClientBootstrapRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn blobs_by_root_request(
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlobsByRootRequest,
|
|
) -> Self {
|
|
Self {
|
|
drop_during_sync: false,
|
|
work: Work::BlobsByRootsRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Get a `str` representation of the type of work this `WorkEvent` contains.
|
|
pub fn work_type(&self) -> &'static str {
|
|
self.work.str_id()
|
|
}
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
|
fn from(ready_work: ReadyWork<T>) -> Self {
|
|
match ready_work {
|
|
ReadyWork::GossipBlock(QueuedGossipBlock {
|
|
peer_id,
|
|
block,
|
|
seen_timestamp,
|
|
}) => Self {
|
|
drop_during_sync: false,
|
|
work: Work::DelayedImportBlock {
|
|
peer_id,
|
|
block,
|
|
seen_timestamp,
|
|
},
|
|
},
|
|
ReadyWork::RpcBlock(QueuedRpcBlock {
|
|
block_root,
|
|
block,
|
|
seen_timestamp,
|
|
process_type,
|
|
should_process,
|
|
}) => Self {
|
|
drop_during_sync: false,
|
|
work: Work::RpcBlock {
|
|
block_root,
|
|
block,
|
|
seen_timestamp,
|
|
process_type,
|
|
should_process,
|
|
},
|
|
},
|
|
ReadyWork::Unaggregate(QueuedUnaggregate {
|
|
peer_id,
|
|
message_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
}) => Self {
|
|
drop_during_sync: true,
|
|
work: Work::UnknownBlockAttestation {
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
},
|
|
},
|
|
ReadyWork::Aggregate(QueuedAggregate {
|
|
peer_id,
|
|
message_id,
|
|
attestation,
|
|
seen_timestamp,
|
|
}) => Self {
|
|
drop_during_sync: true,
|
|
work: Work::UnknownBlockAggregate {
|
|
message_id,
|
|
peer_id,
|
|
aggregate: attestation,
|
|
seen_timestamp,
|
|
},
|
|
},
|
|
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
|
|
peer_id,
|
|
message_id,
|
|
light_client_optimistic_update,
|
|
seen_timestamp,
|
|
..
|
|
}) => Self {
|
|
drop_during_sync: true,
|
|
work: Work::UnknownLightClientOptimisticUpdate {
|
|
message_id,
|
|
peer_id,
|
|
light_client_optimistic_update,
|
|
seen_timestamp,
|
|
},
|
|
},
|
|
ReadyWork::BackfillSync(QueuedBackfillBatch { process_id, blocks }) => {
|
|
WorkEvent::chain_segment(process_id, blocks)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A consensus message (or multiple) from the network that requires processing.
|
|
#[derive(Derivative)]
|
|
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
|
|
pub enum Work<T: BeaconChainTypes> {
|
|
GossipAttestation {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
attestation: Box<Attestation<T::EthSpec>>,
|
|
subnet_id: SubnetId,
|
|
should_import: bool,
|
|
seen_timestamp: Duration,
|
|
},
|
|
UnknownBlockAttestation {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
attestation: Box<Attestation<T::EthSpec>>,
|
|
subnet_id: SubnetId,
|
|
should_import: bool,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipAttestationBatch {
|
|
packages: Vec<GossipAttestationPackage<T::EthSpec>>,
|
|
},
|
|
GossipAggregate {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
UnknownBlockAggregate {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
UnknownLightClientOptimisticUpdate {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipAggregateBatch {
|
|
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
|
|
},
|
|
GossipBlock {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
peer_client: Client,
|
|
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipSignedBlobSidecar {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
peer_client: Client,
|
|
blob_index: u64,
|
|
signed_blob: Box<SignedBlobSidecar<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
DelayedImportBlock {
|
|
peer_id: PeerId,
|
|
block: Box<GossipVerifiedBlock<T>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipVoluntaryExit {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
voluntary_exit: Box<SignedVoluntaryExit>,
|
|
},
|
|
GossipProposerSlashing {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
proposer_slashing: Box<ProposerSlashing>,
|
|
},
|
|
GossipAttesterSlashing {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
|
|
},
|
|
GossipSyncSignature {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
sync_signature: Box<SyncCommitteeMessage>,
|
|
subnet_id: SyncSubnetId,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipSyncContribution {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
sync_contribution: Box<SignedContributionAndProof<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipLightClientFinalityUpdate {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
GossipLightClientOptimisticUpdate {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
|
seen_timestamp: Duration,
|
|
},
|
|
RpcBlock {
|
|
block_root: Hash256,
|
|
block: BlockWrapper<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
process_type: BlockProcessType,
|
|
should_process: bool,
|
|
},
|
|
RpcBlobs {
|
|
block_root: Hash256,
|
|
blobs: FixedBlobSidecarList<T::EthSpec>,
|
|
seen_timestamp: Duration,
|
|
process_type: BlockProcessType,
|
|
},
|
|
ChainSegment {
|
|
process_id: ChainSegmentProcessId,
|
|
blocks: Vec<BlockWrapper<T::EthSpec>>,
|
|
},
|
|
Status {
|
|
peer_id: PeerId,
|
|
message: StatusMessage,
|
|
},
|
|
BlocksByRangeRequest {
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlocksByRangeRequest,
|
|
},
|
|
BlocksByRootsRequest {
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlocksByRootRequest,
|
|
},
|
|
BlobsByRangeRequest {
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlobsByRangeRequest,
|
|
},
|
|
GossipBlsToExecutionChange {
|
|
message_id: MessageId,
|
|
peer_id: PeerId,
|
|
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
|
|
},
|
|
LightClientBootstrapRequest {
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: LightClientBootstrapRequest,
|
|
},
|
|
BlobsByRootsRequest {
|
|
peer_id: PeerId,
|
|
request_id: PeerRequestId,
|
|
request: BlobsByRootRequest,
|
|
},
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> Work<T> {
|
|
/// Provides a `&str` that uniquely identifies each enum variant.
|
|
fn str_id(&self) -> &'static str {
|
|
match self {
|
|
Work::GossipAttestation { .. } => GOSSIP_ATTESTATION,
|
|
Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH,
|
|
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
|
|
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
|
|
Work::GossipBlock { .. } => GOSSIP_BLOCK,
|
|
Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR,
|
|
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
|
|
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
|
|
Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING,
|
|
Work::GossipAttesterSlashing { .. } => GOSSIP_ATTESTER_SLASHING,
|
|
Work::GossipSyncSignature { .. } => GOSSIP_SYNC_SIGNATURE,
|
|
Work::GossipSyncContribution { .. } => GOSSIP_SYNC_CONTRIBUTION,
|
|
Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
|
|
Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
|
|
Work::RpcBlock { .. } => RPC_BLOCK,
|
|
Work::RpcBlobs { .. } => RPC_BLOB,
|
|
Work::ChainSegment {
|
|
process_id: ChainSegmentProcessId::BackSyncBatchId { .. },
|
|
..
|
|
} => CHAIN_SEGMENT_BACKFILL,
|
|
Work::ChainSegment { .. } => CHAIN_SEGMENT,
|
|
Work::Status { .. } => STATUS_PROCESSING,
|
|
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
|
|
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
|
|
Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST,
|
|
Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST,
|
|
Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
|
|
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
|
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
|
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
|
|
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Unifies all the messages processed by the `BeaconProcessor`.
|
|
enum InboundEvent<T: BeaconChainTypes> {
|
|
/// A worker has completed a task and is free.
|
|
WorkerIdle,
|
|
/// There is new work to be done.
|
|
WorkEvent(WorkEvent<T>),
|
|
/// A work event that was queued for re-processing has become ready.
|
|
ReprocessingWork(WorkEvent<T>),
|
|
}
|
|
|
|
/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream.
|
|
///
|
|
/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained
|
|
/// control (specifically in the ordering of event processing).
|
|
struct InboundEvents<T: BeaconChainTypes> {
|
|
/// Used by workers when they finish a task.
|
|
idle_rx: mpsc::Receiver<()>,
|
|
/// Used by upstream processes to send new work to the `BeaconProcessor`.
|
|
event_rx: mpsc::Receiver<WorkEvent<T>>,
|
|
/// Used internally for queuing work ready to be re-processed.
|
|
reprocess_work_rx: mpsc::Receiver<ReadyWork<T>>,
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
|
|
type Item = InboundEvent<T>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
// Always check for idle workers before anything else. This allows us to ensure that a big
|
|
// stream of new events doesn't suppress the processing of existing events.
|
|
match self.idle_rx.poll_recv(cx) {
|
|
Poll::Ready(Some(())) => {
|
|
return Poll::Ready(Some(InboundEvent::WorkerIdle));
|
|
}
|
|
Poll::Ready(None) => {
|
|
return Poll::Ready(None);
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
// Poll for delayed blocks before polling for new work. It might be the case that a delayed
|
|
// block is required to successfully process some new work.
|
|
match self.reprocess_work_rx.poll_recv(cx) {
|
|
Poll::Ready(Some(ready_work)) => {
|
|
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
|
|
}
|
|
Poll::Ready(None) => {
|
|
return Poll::Ready(None);
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
match self.event_rx.poll_recv(cx) {
|
|
Poll::Ready(Some(event)) => {
|
|
return Poll::Ready(Some(InboundEvent::WorkEvent(event)));
|
|
}
|
|
Poll::Ready(None) => {
|
|
return Poll::Ready(None);
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
/// Defines if and where we will store the SSZ files of invalid blocks.
|
|
#[derive(Clone)]
|
|
pub enum InvalidBlockStorage {
|
|
Enabled(PathBuf),
|
|
Disabled,
|
|
}
|
|
|
|
/// A mutli-threaded processor for messages received on the network
|
|
/// that need to be processed by the `BeaconChain`
|
|
///
|
|
/// See module level documentation for more information.
|
|
pub struct BeaconProcessor<T: BeaconChainTypes> {
|
|
pub beacon_chain: Weak<BeaconChain<T>>,
|
|
pub network_tx: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
|
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
|
|
pub network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
|
pub executor: TaskExecutor,
|
|
pub max_workers: usize,
|
|
pub current_workers: usize,
|
|
pub importing_blocks: DuplicateCache,
|
|
pub invalid_block_storage: InvalidBlockStorage,
|
|
pub log: Logger,
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|
/// Spawns the "manager" task which checks the receiver end of the returned `Sender` for
|
|
/// messages which contain some new work which will be:
|
|
///
|
|
/// - Performed immediately, if a worker is available.
|
|
/// - Queued for later processing, if no worker is currently available.
|
|
///
|
|
/// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
|
|
/// started with `spawn_blocking`.
|
|
///
|
|
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work
|
|
/// events processed by `self`. This should only be used during testing.
|
|
pub fn spawn_manager(
|
|
mut self,
|
|
event_rx: mpsc::Receiver<WorkEvent<T>>,
|
|
work_journal_tx: Option<mpsc::Sender<&'static str>>,
|
|
) {
|
|
// Used by workers to communicate that they are finished a task.
|
|
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
|
|
|
|
// Using LIFO queues for attestations since validator profits rely upon getting fresh
|
|
// attestations into blocks. Additionally, later attestations contain more information than
|
|
// earlier ones, so we consider them more valuable.
|
|
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
|
|
let mut aggregate_debounce = TimeLatch::default();
|
|
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
|
|
let mut attestation_debounce = TimeLatch::default();
|
|
let mut unknown_block_aggregate_queue =
|
|
LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
|
|
let mut unknown_block_attestation_queue =
|
|
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
|
|
|
|
let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN);
|
|
let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN);
|
|
|
|
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
|
|
// a strong feeling about queue type for exits.
|
|
let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN);
|
|
|
|
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
|
|
// queues with lots of junk messages.
|
|
let mut gossip_proposer_slashing_queue =
|
|
FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN);
|
|
let mut gossip_attester_slashing_queue =
|
|
FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN);
|
|
|
|
// Using a FIFO queue for light client updates to maintain sequence order.
|
|
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
|
|
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
|
let mut unknown_light_client_update_queue =
|
|
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
|
|
|
|
// Using a FIFO queue since blocks need to be imported sequentially.
|
|
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
|
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
|
|
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
|
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
|
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
|
let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
|
|
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
|
|
|
|
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
|
|
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
|
|
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
|
|
let mut blbroots_queue = FifoQueue::new(MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN);
|
|
let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN);
|
|
|
|
let mut gossip_bls_to_execution_change_queue =
|
|
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
|
|
|
|
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
|
|
|
|
let chain = match self.beacon_chain.upgrade() {
|
|
Some(chain) => chain,
|
|
// No need to proceed any further if the beacon chain has been dropped, the client
|
|
// is shutting down.
|
|
None => return,
|
|
};
|
|
|
|
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
|
// receive them back once they are ready (`ready_work_rx`).
|
|
let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
|
|
let work_reprocessing_tx = spawn_reprocess_scheduler(
|
|
ready_work_tx,
|
|
&self.executor,
|
|
chain.slot_clock.clone(),
|
|
self.log.clone(),
|
|
);
|
|
|
|
let executor = self.executor.clone();
|
|
|
|
// The manager future will run on the core executor and delegate tasks to worker
|
|
// threads on the blocking executor.
|
|
let manager_future = async move {
|
|
let mut inbound_events = InboundEvents {
|
|
idle_rx,
|
|
event_rx,
|
|
reprocess_work_rx: ready_work_rx,
|
|
};
|
|
|
|
let enable_backfill_rate_limiting = chain.config.enable_backfill_rate_limiting;
|
|
|
|
loop {
|
|
let work_event = match inbound_events.next().await {
|
|
Some(InboundEvent::WorkerIdle) => {
|
|
self.current_workers = self.current_workers.saturating_sub(1);
|
|
None
|
|
}
|
|
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
|
|
match QueuedBackfillBatch::try_from(event) {
|
|
Ok(backfill_batch) => {
|
|
match work_reprocessing_tx
|
|
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
|
|
{
|
|
Err(e) => {
|
|
warn!(
|
|
self.log,
|
|
"Unable to queue backfill work event. Will try to process now.";
|
|
"error" => %e
|
|
);
|
|
match e {
|
|
TrySendError::Full(reprocess_queue_message)
|
|
| TrySendError::Closed(reprocess_queue_message) => {
|
|
match reprocess_queue_message {
|
|
ReprocessQueueMessage::BackfillSync(
|
|
backfill_batch,
|
|
) => Some(backfill_batch.into()),
|
|
other => {
|
|
crit!(
|
|
self.log,
|
|
"Unexpected queue message type";
|
|
"message_type" => other.as_ref()
|
|
);
|
|
// This is an unhandled exception, drop the message.
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(..) => {
|
|
// backfill work sent to "reprocessing" queue. Process the next event.
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
Err(event) => Some(event),
|
|
}
|
|
}
|
|
Some(InboundEvent::WorkEvent(event))
|
|
| Some(InboundEvent::ReprocessingWork(event)) => Some(event),
|
|
None => {
|
|
debug!(
|
|
self.log,
|
|
"Gossip processor stopped";
|
|
"msg" => "stream ended"
|
|
);
|
|
break;
|
|
}
|
|
};
|
|
|
|
let _event_timer =
|
|
metrics::start_timer(&metrics::BEACON_PROCESSOR_EVENT_HANDLING_SECONDS);
|
|
if let Some(event) = &work_event {
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT,
|
|
&[event.work.str_id()],
|
|
);
|
|
} else {
|
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL);
|
|
}
|
|
|
|
if let Some(work_journal_tx) = &work_journal_tx {
|
|
let id = work_event
|
|
.as_ref()
|
|
.map(|event| event.work.str_id())
|
|
.unwrap_or(WORKER_FREED);
|
|
|
|
// We don't care if this message was successfully sent, we only use the journal
|
|
// during testing.
|
|
let _ = work_journal_tx.try_send(id);
|
|
}
|
|
|
|
let can_spawn = self.current_workers < self.max_workers;
|
|
let drop_during_sync = work_event
|
|
.as_ref()
|
|
.map_or(false, |event| event.drop_during_sync);
|
|
|
|
match work_event {
|
|
// There is no new work event, but we are able to spawn a new worker.
|
|
//
|
|
// We don't check the `work.drop_during_sync` here. We assume that if it made
|
|
// it into the queue at any point then we should process it.
|
|
None if can_spawn => {
|
|
let toolbox = Toolbox {
|
|
idle_tx: idle_tx.clone(),
|
|
work_reprocessing_tx: work_reprocessing_tx.clone(),
|
|
};
|
|
|
|
// Check for chain segments first, they're the most efficient way to get
|
|
// blocks into the system.
|
|
if let Some(item) = chain_segment_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Sync block and blob segments have the same priority as normal chain
|
|
// segments. This here might change depending on how batch processing
|
|
// evolves.
|
|
} else if let Some(item) = rpc_block_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = rpc_blob_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Check delayed blocks before gossip blocks, the gossip blocks might rely
|
|
// on the delayed ones.
|
|
} else if let Some(item) = delayed_block_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Check gossip blocks before gossip attestations, since a block might be
|
|
// required to verify some attestations.
|
|
} else if let Some(item) = gossip_block_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = gossip_blob_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Check the aggregates, *then* the unaggregates since we assume that
|
|
// aggregates are more valuable to local validators and effectively give us
|
|
// more information with less signature verification time.
|
|
} else if aggregate_queue.len() > 0 {
|
|
let batch_size =
|
|
cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE);
|
|
|
|
if batch_size < 2 {
|
|
// One single aggregate is in the queue, process it individually.
|
|
if let Some(item) = aggregate_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
}
|
|
} else {
|
|
// Collect two or more aggregates into a batch, so they can take
|
|
// advantage of batch signature verification.
|
|
//
|
|
// Note: this will convert the `Work::GossipAggregate` item into a
|
|
// `Work::GossipAggregateBatch` item.
|
|
let mut packages = Vec::with_capacity(batch_size);
|
|
for _ in 0..batch_size {
|
|
if let Some(item) = aggregate_queue.pop() {
|
|
match item {
|
|
Work::GossipAggregate {
|
|
message_id,
|
|
peer_id,
|
|
aggregate,
|
|
seen_timestamp,
|
|
} => {
|
|
packages.push(GossipAggregatePackage::new(
|
|
message_id,
|
|
peer_id,
|
|
aggregate,
|
|
seen_timestamp,
|
|
));
|
|
}
|
|
_ => {
|
|
error!(self.log, "Invalid item in aggregate queue")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process all aggregates with a single worker.
|
|
self.spawn_worker(Work::GossipAggregateBatch { packages }, toolbox)
|
|
}
|
|
// Check the unaggregated attestation queue.
|
|
//
|
|
// Potentially use batching.
|
|
} else if attestation_queue.len() > 0 {
|
|
let batch_size = cmp::min(
|
|
attestation_queue.len(),
|
|
MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
|
|
);
|
|
|
|
if batch_size < 2 {
|
|
// One single attestation is in the queue, process it individually.
|
|
if let Some(item) = attestation_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
}
|
|
} else {
|
|
// Collect two or more attestations into a batch, so they can take
|
|
// advantage of batch signature verification.
|
|
//
|
|
// Note: this will convert the `Work::GossipAttestation` item into a
|
|
// `Work::GossipAttestationBatch` item.
|
|
let mut packages = Vec::with_capacity(batch_size);
|
|
for _ in 0..batch_size {
|
|
if let Some(item) = attestation_queue.pop() {
|
|
match item {
|
|
Work::GossipAttestation {
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
} => {
|
|
packages.push(GossipAttestationPackage::new(
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
));
|
|
}
|
|
_ => error!(
|
|
self.log,
|
|
"Invalid item in attestation queue"
|
|
),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process all attestations with a single worker.
|
|
self.spawn_worker(
|
|
Work::GossipAttestationBatch { packages },
|
|
toolbox,
|
|
)
|
|
}
|
|
// Check sync committee messages after attestations as their rewards are lesser
|
|
// and they don't influence fork choice.
|
|
} else if let Some(item) = sync_contribution_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = sync_message_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Aggregates and unaggregates queued for re-processing are older and we
|
|
// care about fresher ones, so check those first.
|
|
} else if let Some(item) = unknown_block_aggregate_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = unknown_block_attestation_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Check RPC methods next. Status messages are needed for sync so
|
|
// prioritize them over syncing requests from other peers (BlocksByRange
|
|
// and BlocksByRoot)
|
|
} else if let Some(item) = status_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = bbrange_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = bbroots_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = blbrange_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = blbroots_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Check slashings after all other consensus messages so we prioritize
|
|
// following head.
|
|
//
|
|
// Check attester slashings before proposer slashings since they have the
|
|
// potential to slash multiple validators at once.
|
|
} else if let Some(item) = gossip_attester_slashing_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = gossip_proposer_slashing_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Check exits and address changes late since our validators don't get
|
|
// rewards from them.
|
|
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// Handle backfill sync chain segments.
|
|
} else if let Some(item) = backfill_chain_segment.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
// This statement should always be the final else statement.
|
|
} else if let Some(item) = lcbootstrap_queue.pop() {
|
|
self.spawn_worker(item, toolbox);
|
|
} else {
|
|
// Let the journal know that a worker is freed and there's nothing else
|
|
// for it to do.
|
|
if let Some(work_journal_tx) = &work_journal_tx {
|
|
// We don't care if this message was successfully sent, we only use the journal
|
|
// during testing.
|
|
let _ = work_journal_tx.try_send(NOTHING_TO_DO);
|
|
}
|
|
}
|
|
}
|
|
// There is no new work event and we are unable to spawn a new worker.
|
|
//
|
|
// I cannot see any good reason why this would happen.
|
|
None => {
|
|
warn!(
|
|
self.log,
|
|
"Unexpected gossip processor condition";
|
|
"msg" => "no new work and cannot spawn worker"
|
|
);
|
|
}
|
|
// The chain is syncing and this event should be dropped during sync.
|
|
Some(work_event)
|
|
if self.network_globals.sync_state.read().is_syncing()
|
|
&& drop_during_sync =>
|
|
{
|
|
let work_id = work_event.work.str_id();
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT,
|
|
&[work_id],
|
|
);
|
|
trace!(
|
|
self.log,
|
|
"Gossip processor skipping work";
|
|
"msg" => "chain is syncing",
|
|
"work_id" => work_id
|
|
);
|
|
}
|
|
// There is a new work event and the chain is not syncing. Process it or queue
|
|
// it.
|
|
Some(WorkEvent { work, .. }) => {
|
|
let work_id = work.str_id();
|
|
let toolbox = Toolbox {
|
|
idle_tx: idle_tx.clone(),
|
|
work_reprocessing_tx: work_reprocessing_tx.clone(),
|
|
};
|
|
|
|
match work {
|
|
_ if can_spawn => self.spawn_worker(work, toolbox),
|
|
Work::GossipAttestation { .. } => attestation_queue.push(work),
|
|
// Attestation batches are formed internally within the
|
|
// `BeaconProcessor`, they are not sent from external services.
|
|
Work::GossipAttestationBatch { .. } => crit!(
|
|
self.log,
|
|
"Unsupported inbound event";
|
|
"type" => "GossipAttestationBatch"
|
|
),
|
|
Work::GossipAggregate { .. } => aggregate_queue.push(work),
|
|
// Aggregate batches are formed internally within the `BeaconProcessor`,
|
|
// they are not sent from external services.
|
|
Work::GossipAggregateBatch { .. } => crit!(
|
|
self.log,
|
|
"Unsupported inbound event";
|
|
"type" => "GossipAggregateBatch"
|
|
),
|
|
Work::GossipBlock { .. } => {
|
|
gossip_block_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::GossipSignedBlobSidecar { .. } => {
|
|
gossip_blob_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::DelayedImportBlock { .. } => {
|
|
delayed_block_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::GossipVoluntaryExit { .. } => {
|
|
gossip_voluntary_exit_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::GossipProposerSlashing { .. } => {
|
|
gossip_proposer_slashing_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::GossipAttesterSlashing { .. } => {
|
|
gossip_attester_slashing_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::GossipSyncSignature { .. } => sync_message_queue.push(work),
|
|
Work::GossipSyncContribution { .. } => {
|
|
sync_contribution_queue.push(work)
|
|
}
|
|
Work::GossipLightClientFinalityUpdate { .. } => {
|
|
finality_update_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::GossipLightClientOptimisticUpdate { .. } => {
|
|
optimistic_update_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
|
|
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
|
|
Work::ChainSegment { ref process_id, .. } => match process_id {
|
|
ChainSegmentProcessId::RangeBatchId { .. }
|
|
| ChainSegmentProcessId::ParentLookup { .. } => {
|
|
chain_segment_queue.push(work, work_id, &self.log)
|
|
}
|
|
ChainSegmentProcessId::BackSyncBatchId { .. } => {
|
|
backfill_chain_segment.push(work, work_id, &self.log)
|
|
}
|
|
},
|
|
Work::Status { .. } => status_queue.push(work, work_id, &self.log),
|
|
Work::BlocksByRangeRequest { .. } => {
|
|
bbrange_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::BlocksByRootsRequest { .. } => {
|
|
bbroots_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::BlobsByRangeRequest { .. } => {
|
|
blbrange_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::LightClientBootstrapRequest { .. } => {
|
|
lcbootstrap_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::UnknownBlockAttestation { .. } => {
|
|
unknown_block_attestation_queue.push(work)
|
|
}
|
|
Work::UnknownBlockAggregate { .. } => {
|
|
unknown_block_aggregate_queue.push(work)
|
|
}
|
|
Work::GossipBlsToExecutionChange { .. } => {
|
|
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::BlobsByRootsRequest { .. } => {
|
|
blbroots_queue.push(work, work_id, &self.log)
|
|
}
|
|
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
|
unknown_light_client_update_queue.push(work, work_id, &self.log)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL,
|
|
self.current_workers as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL,
|
|
attestation_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL,
|
|
aggregate_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL,
|
|
sync_message_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL,
|
|
sync_contribution_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL,
|
|
gossip_block_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL,
|
|
rpc_block_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL,
|
|
rpc_blob_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
|
|
chain_segment_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL,
|
|
backfill_chain_segment.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL,
|
|
gossip_voluntary_exit_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL,
|
|
gossip_proposer_slashing_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL,
|
|
gossip_attester_slashing_queue.len() as i64,
|
|
);
|
|
metrics::set_gauge(
|
|
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
|
|
gossip_bls_to_execution_change_queue.len() as i64,
|
|
);
|
|
|
|
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
|
|
error!(
|
|
self.log,
|
|
"Aggregate attestation queue full";
|
|
"msg" => "the system has insufficient resources for load",
|
|
"queue_len" => aggregate_queue.max_length,
|
|
)
|
|
}
|
|
|
|
if attestation_queue.is_full() && attestation_debounce.elapsed() {
|
|
error!(
|
|
self.log,
|
|
"Attestation queue full";
|
|
"msg" => "the system has insufficient resources for load",
|
|
"queue_len" => attestation_queue.max_length,
|
|
)
|
|
}
|
|
}
|
|
};
|
|
|
|
// Spawn on the core executor.
|
|
executor.spawn(manager_future, MANAGER_TASK_NAME);
|
|
}
|
|
|
|
/// Spawns a blocking worker thread to process some `Work`.
|
|
///
|
|
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
|
|
fn spawn_worker(&mut self, work: Work<T>, toolbox: Toolbox<T>) {
|
|
let idle_tx = toolbox.idle_tx;
|
|
let work_reprocessing_tx = toolbox.work_reprocessing_tx;
|
|
|
|
let work_id = work.str_id();
|
|
let worker_timer =
|
|
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
|
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL);
|
|
metrics::inc_counter_vec(
|
|
&metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT,
|
|
&[work.str_id()],
|
|
);
|
|
|
|
// Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
|
|
//
|
|
// This helps ensure that the worker is always freed in the case of an early exit or panic.
|
|
// As such, this instantiation should happen as early in the function as possible.
|
|
let send_idle_on_drop = SendOnDrop {
|
|
tx: idle_tx,
|
|
_worker_timer: worker_timer,
|
|
log: self.log.clone(),
|
|
};
|
|
|
|
let worker_id = self.current_workers;
|
|
self.current_workers = self.current_workers.saturating_add(1);
|
|
|
|
let chain = if let Some(chain) = self.beacon_chain.upgrade() {
|
|
chain
|
|
} else {
|
|
debug!(
|
|
self.log,
|
|
"Beacon chain dropped, shutting down";
|
|
);
|
|
return;
|
|
};
|
|
|
|
let executor = self.executor.clone();
|
|
|
|
let worker = Worker {
|
|
chain,
|
|
network_tx: self.network_tx.clone(),
|
|
sync_tx: self.sync_tx.clone(),
|
|
log: self.log.clone(),
|
|
};
|
|
|
|
let duplicate_cache = self.importing_blocks.clone();
|
|
|
|
trace!(
|
|
self.log,
|
|
"Spawning beacon processor worker";
|
|
"work" => work_id,
|
|
"worker" => worker_id,
|
|
);
|
|
|
|
let task_spawner = TaskSpawner {
|
|
executor: executor.clone(),
|
|
send_idle_on_drop,
|
|
};
|
|
|
|
let sub_executor = executor;
|
|
match work {
|
|
/*
|
|
* Individual unaggregated attestation verification.
|
|
*/
|
|
Work::GossipAttestation {
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_attestation(
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
Some(work_reprocessing_tx),
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
/*
|
|
* Batched unaggregated attestation verification.
|
|
*/
|
|
Work::GossipAttestationBatch { packages } => task_spawner.spawn_blocking(|| {
|
|
worker.process_gossip_attestation_batch(packages, Some(work_reprocessing_tx))
|
|
}),
|
|
/*
|
|
* Individual aggregated attestation verification.
|
|
*/
|
|
Work::GossipAggregate {
|
|
message_id,
|
|
peer_id,
|
|
aggregate,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_aggregate(
|
|
message_id,
|
|
peer_id,
|
|
aggregate,
|
|
Some(work_reprocessing_tx),
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
/*
|
|
* Batched aggregated attestation verification.
|
|
*/
|
|
Work::GossipAggregateBatch { packages } => task_spawner.spawn_blocking(|| {
|
|
worker.process_gossip_aggregate_batch(packages, Some(work_reprocessing_tx))
|
|
}),
|
|
/*
|
|
* Verification for beacon blocks received on gossip.
|
|
*/
|
|
Work::GossipBlock {
|
|
message_id,
|
|
peer_id,
|
|
peer_client,
|
|
block,
|
|
seen_timestamp,
|
|
} => {
|
|
let invalid_block_storage = self.invalid_block_storage.clone();
|
|
task_spawner.spawn_async(async move {
|
|
worker
|
|
.process_gossip_block(
|
|
message_id,
|
|
peer_id,
|
|
peer_client,
|
|
block.into(),
|
|
work_reprocessing_tx,
|
|
duplicate_cache,
|
|
invalid_block_storage,
|
|
seen_timestamp,
|
|
)
|
|
.await
|
|
})
|
|
}
|
|
/*
|
|
* Verification for blobs sidecars received on gossip.
|
|
*/
|
|
Work::GossipSignedBlobSidecar {
|
|
message_id,
|
|
peer_id,
|
|
peer_client,
|
|
blob_index,
|
|
signed_blob,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_async(async move {
|
|
worker
|
|
.process_gossip_blob(
|
|
message_id,
|
|
peer_id,
|
|
peer_client,
|
|
blob_index,
|
|
*signed_blob,
|
|
seen_timestamp,
|
|
)
|
|
.await
|
|
}),
|
|
/*
|
|
* Import for blocks that we received earlier than their intended slot.
|
|
*/
|
|
Work::DelayedImportBlock {
|
|
peer_id,
|
|
block,
|
|
seen_timestamp,
|
|
} => {
|
|
let invalid_block_storage = self.invalid_block_storage.clone();
|
|
task_spawner.spawn_async(worker.process_gossip_verified_block(
|
|
peer_id,
|
|
*block,
|
|
work_reprocessing_tx,
|
|
invalid_block_storage,
|
|
seen_timestamp,
|
|
))
|
|
}
|
|
/*
|
|
* Voluntary exits received on gossip.
|
|
*/
|
|
Work::GossipVoluntaryExit {
|
|
message_id,
|
|
peer_id,
|
|
voluntary_exit,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_voluntary_exit(message_id, peer_id, *voluntary_exit)
|
|
}),
|
|
/*
|
|
* Proposer slashings received on gossip.
|
|
*/
|
|
Work::GossipProposerSlashing {
|
|
message_id,
|
|
peer_id,
|
|
proposer_slashing,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing)
|
|
}),
|
|
/*
|
|
* Attester slashings received on gossip.
|
|
*/
|
|
Work::GossipAttesterSlashing {
|
|
message_id,
|
|
peer_id,
|
|
attester_slashing,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing)
|
|
}),
|
|
/*
|
|
* Sync committee message verification.
|
|
*/
|
|
Work::GossipSyncSignature {
|
|
message_id,
|
|
peer_id,
|
|
sync_signature,
|
|
subnet_id,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_sync_committee_signature(
|
|
message_id,
|
|
peer_id,
|
|
*sync_signature,
|
|
subnet_id,
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
/*
|
|
* Sync contribution verification.
|
|
*/
|
|
Work::GossipSyncContribution {
|
|
message_id,
|
|
peer_id,
|
|
sync_contribution,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_sync_committee_contribution(
|
|
message_id,
|
|
peer_id,
|
|
*sync_contribution,
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
/*
|
|
* BLS to execution change verification.
|
|
*/
|
|
Work::GossipBlsToExecutionChange {
|
|
message_id,
|
|
peer_id,
|
|
bls_to_execution_change,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_bls_to_execution_change(
|
|
message_id,
|
|
peer_id,
|
|
*bls_to_execution_change,
|
|
)
|
|
}),
|
|
/*
|
|
* Light client finality update verification.
|
|
*/
|
|
Work::GossipLightClientFinalityUpdate {
|
|
message_id,
|
|
peer_id,
|
|
light_client_finality_update,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_finality_update(
|
|
message_id,
|
|
peer_id,
|
|
*light_client_finality_update,
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
/*
|
|
* Light client optimistic update verification.
|
|
*/
|
|
Work::GossipLightClientOptimisticUpdate {
|
|
message_id,
|
|
peer_id,
|
|
light_client_optimistic_update,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_optimistic_update(
|
|
message_id,
|
|
peer_id,
|
|
*light_client_optimistic_update,
|
|
Some(work_reprocessing_tx),
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
/*
|
|
* Verification for beacon blocks received during syncing via RPC.
|
|
*/
|
|
Work::RpcBlock {
|
|
block_root,
|
|
block,
|
|
seen_timestamp,
|
|
process_type,
|
|
should_process,
|
|
} => task_spawner.spawn_async(worker.process_rpc_block(
|
|
block_root,
|
|
block,
|
|
seen_timestamp,
|
|
process_type,
|
|
work_reprocessing_tx,
|
|
duplicate_cache,
|
|
should_process,
|
|
)),
|
|
Work::RpcBlobs {
|
|
block_root,
|
|
blobs,
|
|
seen_timestamp,
|
|
process_type,
|
|
} => task_spawner.spawn_async(worker.process_rpc_blobs(
|
|
block_root,
|
|
blobs,
|
|
seen_timestamp,
|
|
process_type,
|
|
)),
|
|
/*
|
|
* Verification for a chain segment (multiple blocks).
|
|
*/
|
|
Work::ChainSegment { process_id, blocks } => {
|
|
let notify_execution_layer = if self
|
|
.network_globals
|
|
.sync_state
|
|
.read()
|
|
.is_syncing_finalized()
|
|
{
|
|
NotifyExecutionLayer::No
|
|
} else {
|
|
NotifyExecutionLayer::Yes
|
|
};
|
|
|
|
task_spawner.spawn_async(async move {
|
|
worker
|
|
.process_chain_segment(process_id, blocks, notify_execution_layer)
|
|
.await
|
|
})
|
|
}
|
|
/*
|
|
* Processing of Status Messages.
|
|
*/
|
|
Work::Status { peer_id, message } => {
|
|
task_spawner.spawn_blocking(move || worker.process_status(peer_id, message))
|
|
}
|
|
/*
|
|
* Processing of range syncing requests from other peers.
|
|
*/
|
|
Work::BlocksByRangeRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
|
|
worker.handle_blocks_by_range_request(
|
|
sub_executor,
|
|
send_idle_on_drop,
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
)
|
|
}),
|
|
/*
|
|
* Processing of blocks by roots requests from other peers.
|
|
*/
|
|
Work::BlocksByRootsRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
|
|
worker.handle_blocks_by_root_request(
|
|
sub_executor,
|
|
send_idle_on_drop,
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
)
|
|
}),
|
|
Work::BlobsByRangeRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
|
|
worker.handle_blobs_by_range_request(
|
|
send_idle_on_drop,
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
)
|
|
}),
|
|
|
|
Work::BlobsByRootsRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
|
|
worker.handle_blobs_by_root_request(send_idle_on_drop, peer_id, request_id, request)
|
|
}),
|
|
|
|
/*
|
|
* Processing of lightclient bootstrap requests from other peers.
|
|
*/
|
|
Work::LightClientBootstrapRequest {
|
|
peer_id,
|
|
request_id,
|
|
request,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.handle_light_client_bootstrap(peer_id, request_id, request)
|
|
}),
|
|
Work::UnknownBlockAttestation {
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_attestation(
|
|
message_id,
|
|
peer_id,
|
|
attestation,
|
|
subnet_id,
|
|
should_import,
|
|
None, // Do not allow this attestation to be re-processed beyond this point.
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
Work::UnknownBlockAggregate {
|
|
message_id,
|
|
peer_id,
|
|
aggregate,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_aggregate(
|
|
message_id,
|
|
peer_id,
|
|
aggregate,
|
|
None,
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
Work::UnknownLightClientOptimisticUpdate {
|
|
message_id,
|
|
peer_id,
|
|
light_client_optimistic_update,
|
|
seen_timestamp,
|
|
} => task_spawner.spawn_blocking(move || {
|
|
worker.process_gossip_optimistic_update(
|
|
message_id,
|
|
peer_id,
|
|
*light_client_optimistic_update,
|
|
None,
|
|
seen_timestamp,
|
|
)
|
|
}),
|
|
};
|
|
}
|
|
}
|
|
|
|
/// Spawns tasks that are either:
|
|
///
|
|
/// - Blocking (i.e. intensive methods that shouldn't run on the core `tokio` executor)
|
|
/// - Async (i.e. `async` methods)
|
|
///
|
|
/// Takes a `SendOnDrop` and ensures it is dropped after the task completes. This frees the beacon
|
|
/// processor worker so a new task can be started.
|
|
struct TaskSpawner {
|
|
executor: TaskExecutor,
|
|
send_idle_on_drop: SendOnDrop,
|
|
}
|
|
|
|
impl TaskSpawner {
|
|
/// Spawn an async task, dropping the `SendOnDrop` after the task has completed.
|
|
fn spawn_async(self, task: impl Future<Output = ()> + Send + 'static) {
|
|
self.executor.spawn(
|
|
async {
|
|
task.await;
|
|
drop(self.send_idle_on_drop)
|
|
},
|
|
WORKER_TASK_NAME,
|
|
)
|
|
}
|
|
|
|
/// Spawn a blocking task, dropping the `SendOnDrop` after the task has completed.
|
|
fn spawn_blocking<F>(self, task: F)
|
|
where
|
|
F: FnOnce() + Send + 'static,
|
|
{
|
|
self.executor.spawn_blocking(
|
|
|| {
|
|
task();
|
|
drop(self.send_idle_on_drop)
|
|
},
|
|
WORKER_TASK_NAME,
|
|
)
|
|
}
|
|
|
|
/// Spawn a blocking task, passing the `SendOnDrop` into the task.
|
|
///
|
|
/// ## Notes
|
|
///
|
|
/// Users must ensure the `SendOnDrop` is dropped at the appropriate time!
|
|
pub fn spawn_blocking_with_manual_send_idle<F>(self, task: F)
|
|
where
|
|
F: FnOnce(SendOnDrop) + Send + 'static,
|
|
{
|
|
self.executor.spawn_blocking(
|
|
|| {
|
|
task(self.send_idle_on_drop);
|
|
},
|
|
WORKER_TASK_NAME,
|
|
)
|
|
}
|
|
}
|
|
|
|
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on
|
|
/// `self.log` if the send fails (this happens when the node is shutting down).
|
|
///
|
|
/// ## Purpose
|
|
///
|
|
/// This is useful for ensuring that a worker-freed message is still sent if a worker panics.
|
|
///
|
|
/// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic:
|
|
///
|
|
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
|
|
pub struct SendOnDrop {
|
|
tx: mpsc::Sender<()>,
|
|
// The field is unused, but it's here to ensure the timer is dropped once the task has finished.
|
|
_worker_timer: Option<metrics::HistogramTimer>,
|
|
log: Logger,
|
|
}
|
|
|
|
impl Drop for SendOnDrop {
|
|
fn drop(&mut self) {
|
|
if let Err(e) = self.tx.try_send(()) {
|
|
warn!(
|
|
self.log,
|
|
"Unable to free worker";
|
|
"msg" => "did not free worker, shutdown may be underway",
|
|
"error" => %e
|
|
)
|
|
}
|
|
}
|
|
}
|