//! Provides the `BeaconProcessor`, a multi-threaded processor for messages received on the network //! that need to be processed by the `BeaconChain`. //! //! Uses `tokio` tasks (instead of raw threads) to provide the following tasks: //! //! - A "manager" task, which either spawns worker tasks or enqueues work. //! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`. //! - A task managing the scheduling of work that needs to be re-processed. //! //! ## Purpose //! //! The purpose of the `BeaconProcessor` is to provide two things: //! //! 1. Moving long-running, blocking tasks off the main `tokio` executor. //! 2. A fixed-length buffer for consensus messages. //! //! (1) ensures that we don't clog up the networking stack with long-running tasks, potentially //! causing timeouts. (2) means that we can easily and explicitly reject messages when we're //! overloaded and also distribute load across time. //! //! ## Detail //! //! There is a single "manager" thread who listens to three event channels. These events are //! either: //! //! - A new parcel of work (work event). //! - Indication that a worker has finished a parcel of work (worker idle). //! - A work ready for reprocessing (work event). //! //! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count. //! //! Whenever the manager receives a new parcel of work, it is either: //! //! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers). //! - Added to a queue. //! //! Whenever the manager receives a notification that a worker has finished a parcel of work, it //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. use crate::sync::manager::BlockProcessType; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::parking_lot::Mutex; use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer}; use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::rpc::LightClientBootstrapRequest; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, }; use logging::TimeLatch; use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::task::Context; use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use types::blob_sidecar::FixedBlobSidecarList; use types::{ Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; use worker::{Toolbox, Worker}; mod tests; mod work_reprocessing_queue; mod worker; use crate::beacon_processor::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, }; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for idle events to the `BeaconProcessor`. /// /// Setting this too low will prevent new workers from being spawned. It *should* only need to be /// set to the CPU count, but we set it high to be safe. const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4; /// The maximum number of queued `Attestation` objects that will be stored before we start dropping /// them. const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384; /// The maximum number of queued `Attestation` objects that will be stored before we start dropping /// them. const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192; /// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we /// start dropping them. const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 4_096; /// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we /// start dropping them. const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBlobSidecar` objects received on gossip that /// will be stored before we start dropping them. const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// within acceptable clock disparity) that will be queued before we start dropping them. const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096; /// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096; /// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096; /// The maximum number of queued `LightClientFinalityUpdate` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored /// for reprocessing before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; /// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping /// them. const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; /// The maximum number of queued `SignedContributionAndProof` objects that will be stored before we /// start dropping them. const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024; /// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024 * 4; /// The maximum number of queued `Vec` objects received during syncing that will /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; /// The maximum number of queued `StatusMessage` objects received from the network RPC that will be /// stored before we start dropping them. const MAX_STATUS_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; const MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024; /// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them. /// /// This value is set high to accommodate the large spike that is expected immediately after Capella /// is activated. const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384; /// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; /// The name of the worker tokio tasks. const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single /// batch. /// /// Choosing these values is difficult since there is a trade-off between: /// /// - It is faster to verify one large batch than multiple smaller batches. /// - "Poisoning" attacks have a larger impact as the batch size increases. /// /// Poisoning occurs when an invalid signature is included in a batch of attestations. A single /// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to /// individually verifying each attestation signature. const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; /// Unique IDs used for metrics and testing. pub const WORKER_FREED: &str = "worker_freed"; pub const NOTHING_TO_DO: &str = "nothing_to_do"; pub const GOSSIP_ATTESTATION: &str = "gossip_attestation"; pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing"; pub const GOSSIP_SYNC_SIGNATURE: &str = "gossip_sync_signature"; pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution"; pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const RPC_BLOCK: &str = "rpc_block"; pub const RPC_BLOB: &str = "rpc_blob"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { queue: VecDeque, max_length: usize, } impl FifoQueue { /// Create a new, empty queue with the given length. pub fn new(max_length: usize) -> Self { Self { queue: VecDeque::default(), max_length, } } /// Add a new item to the queue. /// /// Drops `item` if the queue is full. pub fn push(&mut self, item: T, item_desc: &str, log: &Logger) { if self.queue.len() == self.max_length { error!( log, "Work queue is full"; "msg" => "the system has insufficient resources for load", "queue_len" => self.max_length, "queue" => item_desc, ) } else { self.queue.push_back(item); } } /// Remove the next item from the queue. pub fn pop(&mut self) -> Option { self.queue.pop_front() } /// Returns the current length of the queue. pub fn len(&self) -> usize { self.queue.len() } } /// A simple last-in-first-out queue with a maximum length. struct LifoQueue { queue: VecDeque, max_length: usize, } impl LifoQueue { /// Create a new, empty queue with the given length. pub fn new(max_length: usize) -> Self { Self { queue: VecDeque::default(), max_length, } } /// Add a new item to the front of the queue. /// /// If the queue is full, the item at the back of the queue is dropped. pub fn push(&mut self, item: T) { if self.queue.len() == self.max_length { self.queue.pop_back(); } self.queue.push_front(item); } /// Remove the next item from the queue. pub fn pop(&mut self) -> Option { self.queue.pop_front() } /// Returns `true` if the queue is full. pub fn is_full(&self) -> bool { self.queue.len() >= self.max_length } /// Returns the current length of the queue. pub fn len(&self) -> usize { self.queue.len() } } /// A handle that sends a message on the provided channel to a receiver when it gets dropped. /// /// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache` /// and perform any other necessary cleanup. pub struct DuplicateCacheHandle { entry: Hash256, cache: DuplicateCache, } impl Drop for DuplicateCacheHandle { fn drop(&mut self) { self.cache.remove(&self.entry); } } /// A simple cache for detecting duplicate block roots across multiple threads. #[derive(Clone, Default)] pub struct DuplicateCache { inner: Arc>>, } impl DuplicateCache { /// Checks if the given block_root exists and inserts it into the cache if /// it doesn't exist. /// /// Returns a `Some(DuplicateCacheHandle)` if the block_root was successfully /// inserted and `None` if the block root already existed in the cache. /// /// The handle removes the entry from the cache when it is dropped. This ensures that any unclean /// shutdowns in the worker tasks does not leave inconsistent state in the cache. pub fn check_and_insert(&self, block_root: Hash256) -> Option { let mut inner = self.inner.lock(); if inner.insert(block_root) { Some(DuplicateCacheHandle { entry: block_root, cache: self.clone(), }) } else { None } } /// Remove the given block_root from the cache. pub fn remove(&self, block_root: &Hash256) { let mut inner = self.inner.lock(); inner.remove(block_root); } } /// An event to be processed by the manager task. #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct WorkEvent { drop_during_sync: bool, work: Work, } impl WorkEvent { /// Create a new `Work` event for some unaggregated attestation. pub fn unaggregated_attestation( message_id: MessageId, peer_id: PeerId, attestation: Attestation, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, work: Work::GossipAttestation { message_id, peer_id, attestation: Box::new(attestation), subnet_id, should_import, seen_timestamp, }, } } /// Create a new `Work` event for some aggregated attestation. pub fn aggregated_attestation( message_id: MessageId, peer_id: PeerId, aggregate: SignedAggregateAndProof, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, work: Work::GossipAggregate { message_id, peer_id, aggregate: Box::new(aggregate), seen_timestamp, }, } } /// Create a new `Work` event for some block. pub fn gossip_beacon_block( message_id: MessageId, peer_id: PeerId, peer_client: Client, block: Arc>, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: false, work: Work::GossipBlock { message_id, peer_id, peer_client, block, seen_timestamp, }, } } /// Create a new `Work` event for some blobs sidecar. pub fn gossip_signed_blob_sidecar( message_id: MessageId, peer_id: PeerId, peer_client: Client, blob_index: u64, signed_blob: SignedBlobSidecar, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: false, work: Work::GossipSignedBlobSidecar { message_id, peer_id, peer_client, blob_index, signed_blob: Box::new(signed_blob), seen_timestamp, }, } } /// Create a new `Work` event for some sync committee signature. pub fn gossip_sync_signature( message_id: MessageId, peer_id: PeerId, sync_signature: SyncCommitteeMessage, subnet_id: SyncSubnetId, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, work: Work::GossipSyncSignature { message_id, peer_id, sync_signature: Box::new(sync_signature), subnet_id, seen_timestamp, }, } } /// Create a new `Work` event for some sync committee contribution. pub fn gossip_sync_contribution( message_id: MessageId, peer_id: PeerId, sync_contribution: SignedContributionAndProof, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, work: Work::GossipSyncContribution { message_id, peer_id, sync_contribution: Box::new(sync_contribution), seen_timestamp, }, } } /// Create a new `Work` event for some exit. pub fn gossip_voluntary_exit( message_id: MessageId, peer_id: PeerId, voluntary_exit: Box, ) -> Self { Self { drop_during_sync: false, work: Work::GossipVoluntaryExit { message_id, peer_id, voluntary_exit, }, } } /// Create a new `Work` event for some proposer slashing. pub fn gossip_proposer_slashing( message_id: MessageId, peer_id: PeerId, proposer_slashing: Box, ) -> Self { Self { drop_during_sync: false, work: Work::GossipProposerSlashing { message_id, peer_id, proposer_slashing, }, } } /// Create a new `Work` event for some light client finality update. pub fn gossip_light_client_finality_update( message_id: MessageId, peer_id: PeerId, light_client_finality_update: Box>, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, work: Work::GossipLightClientFinalityUpdate { message_id, peer_id, light_client_finality_update, seen_timestamp, }, } } /// Create a new `Work` event for some light client optimistic update. pub fn gossip_light_client_optimistic_update( message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: Box>, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: true, work: Work::GossipLightClientOptimisticUpdate { message_id, peer_id, light_client_optimistic_update, seen_timestamp, }, } } /// Create a new `Work` event for some attester slashing. pub fn gossip_attester_slashing( message_id: MessageId, peer_id: PeerId, attester_slashing: Box>, ) -> Self { Self { drop_during_sync: false, work: Work::GossipAttesterSlashing { message_id, peer_id, attester_slashing, }, } } /// Create a new `Work` event for some BLS to execution change. pub fn gossip_bls_to_execution_change( message_id: MessageId, peer_id: PeerId, bls_to_execution_change: Box, ) -> Self { Self { drop_during_sync: false, work: Work::GossipBlsToExecutionChange { message_id, peer_id, bls_to_execution_change, }, } } /// Create a new `Work` event for some block, where the result from computation (if any) is /// sent to the other side of `result_tx`. pub fn rpc_beacon_block( block_root: Hash256, block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { Self { drop_during_sync: false, work: Work::RpcBlock { block_root, block, seen_timestamp, process_type, should_process: true, }, } } pub fn rpc_blobs( block_root: Hash256, blobs: FixedBlobSidecarList, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { Self { drop_during_sync: false, work: Work::RpcBlobs { block_root, blobs, seen_timestamp, process_type, }, } } /// Create a new work event to import `blocks` as a beacon chain segment. pub fn chain_segment( process_id: ChainSegmentProcessId, blocks: Vec>, ) -> Self { Self { drop_during_sync: false, work: Work::ChainSegment { process_id, blocks }, } } /// Create a new work event to process `StatusMessage`s from the RPC network. pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self { Self { drop_during_sync: false, work: Work::Status { peer_id, message }, } } /// Create a new work event to process `BlocksByRangeRequest`s from the RPC network. pub fn blocks_by_range_request( peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRangeRequest, ) -> Self { Self { drop_during_sync: false, work: Work::BlocksByRangeRequest { peer_id, request_id, request, }, } } /// Create a new work event to process `BlocksByRootRequest`s from the RPC network. pub fn blocks_by_roots_request( peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) -> Self { Self { drop_during_sync: false, work: Work::BlocksByRootsRequest { peer_id, request_id, request, }, } } pub fn blobs_by_range_request( peer_id: PeerId, request_id: PeerRequestId, request: BlobsByRangeRequest, ) -> Self { Self { drop_during_sync: false, work: Work::BlobsByRangeRequest { peer_id, request_id, request, }, } } /// Create a new work event to process `LightClientBootstrap`s from the RPC network. pub fn lightclient_bootstrap_request( peer_id: PeerId, request_id: PeerRequestId, request: LightClientBootstrapRequest, ) -> Self { Self { drop_during_sync: true, work: Work::LightClientBootstrapRequest { peer_id, request_id, request, }, } } pub fn blobs_by_root_request( peer_id: PeerId, request_id: PeerRequestId, request: BlobsByRootRequest, ) -> Self { Self { drop_during_sync: false, work: Work::BlobsByRootsRequest { peer_id, request_id, request, }, } } /// Get a `str` representation of the type of work this `WorkEvent` contains. pub fn work_type(&self) -> &'static str { self.work.str_id() } } impl std::convert::From> for WorkEvent { fn from(ready_work: ReadyWork) -> Self { match ready_work { ReadyWork::GossipBlock(QueuedGossipBlock { peer_id, block, seen_timestamp, }) => Self { drop_during_sync: false, work: Work::DelayedImportBlock { peer_id, block, seen_timestamp, }, }, ReadyWork::RpcBlock(QueuedRpcBlock { block_root, block, seen_timestamp, process_type, should_process, }) => Self { drop_during_sync: false, work: Work::RpcBlock { block_root, block, seen_timestamp, process_type, should_process, }, }, ReadyWork::Unaggregate(QueuedUnaggregate { peer_id, message_id, attestation, subnet_id, should_import, seen_timestamp, }) => Self { drop_during_sync: true, work: Work::UnknownBlockAttestation { message_id, peer_id, attestation, subnet_id, should_import, seen_timestamp, }, }, ReadyWork::Aggregate(QueuedAggregate { peer_id, message_id, attestation, seen_timestamp, }) => Self { drop_during_sync: true, work: Work::UnknownBlockAggregate { message_id, peer_id, aggregate: attestation, seen_timestamp, }, }, ReadyWork::LightClientUpdate(QueuedLightClientUpdate { peer_id, message_id, light_client_optimistic_update, seen_timestamp, .. }) => Self { drop_during_sync: true, work: Work::UnknownLightClientOptimisticUpdate { message_id, peer_id, light_client_optimistic_update, seen_timestamp, }, }, ReadyWork::BackfillSync(QueuedBackfillBatch { process_id, blocks }) => { WorkEvent::chain_segment(process_id, blocks) } } } } /// A consensus message (or multiple) from the network that requires processing. #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub enum Work { GossipAttestation { message_id: MessageId, peer_id: PeerId, attestation: Box>, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, }, UnknownBlockAttestation { message_id: MessageId, peer_id: PeerId, attestation: Box>, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, }, GossipAttestationBatch { packages: Vec>, }, GossipAggregate { message_id: MessageId, peer_id: PeerId, aggregate: Box>, seen_timestamp: Duration, }, UnknownBlockAggregate { message_id: MessageId, peer_id: PeerId, aggregate: Box>, seen_timestamp: Duration, }, UnknownLightClientOptimisticUpdate { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: Box>, seen_timestamp: Duration, }, GossipAggregateBatch { packages: Vec>, }, GossipBlock { message_id: MessageId, peer_id: PeerId, peer_client: Client, block: Arc>, seen_timestamp: Duration, }, GossipSignedBlobSidecar { message_id: MessageId, peer_id: PeerId, peer_client: Client, blob_index: u64, signed_blob: Box>, seen_timestamp: Duration, }, DelayedImportBlock { peer_id: PeerId, block: Box>, seen_timestamp: Duration, }, GossipVoluntaryExit { message_id: MessageId, peer_id: PeerId, voluntary_exit: Box, }, GossipProposerSlashing { message_id: MessageId, peer_id: PeerId, proposer_slashing: Box, }, GossipAttesterSlashing { message_id: MessageId, peer_id: PeerId, attester_slashing: Box>, }, GossipSyncSignature { message_id: MessageId, peer_id: PeerId, sync_signature: Box, subnet_id: SyncSubnetId, seen_timestamp: Duration, }, GossipSyncContribution { message_id: MessageId, peer_id: PeerId, sync_contribution: Box>, seen_timestamp: Duration, }, GossipLightClientFinalityUpdate { message_id: MessageId, peer_id: PeerId, light_client_finality_update: Box>, seen_timestamp: Duration, }, GossipLightClientOptimisticUpdate { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: Box>, seen_timestamp: Duration, }, RpcBlock { block_root: Hash256, block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, should_process: bool, }, RpcBlobs { block_root: Hash256, blobs: FixedBlobSidecarList, seen_timestamp: Duration, process_type: BlockProcessType, }, ChainSegment { process_id: ChainSegmentProcessId, blocks: Vec>, }, Status { peer_id: PeerId, message: StatusMessage, }, BlocksByRangeRequest { peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRangeRequest, }, BlocksByRootsRequest { peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, }, BlobsByRangeRequest { peer_id: PeerId, request_id: PeerRequestId, request: BlobsByRangeRequest, }, GossipBlsToExecutionChange { message_id: MessageId, peer_id: PeerId, bls_to_execution_change: Box, }, LightClientBootstrapRequest { peer_id: PeerId, request_id: PeerRequestId, request: LightClientBootstrapRequest, }, BlobsByRootsRequest { peer_id: PeerId, request_id: PeerRequestId, request: BlobsByRootRequest, }, } impl Work { /// Provides a `&str` that uniquely identifies each enum variant. fn str_id(&self) -> &'static str { match self { Work::GossipAttestation { .. } => GOSSIP_ATTESTATION, Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH, Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOBS_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING, Work::GossipAttesterSlashing { .. } => GOSSIP_ATTESTER_SLASHING, Work::GossipSyncSignature { .. } => GOSSIP_SYNC_SIGNATURE, Work::GossipSyncContribution { .. } => GOSSIP_SYNC_CONTRIBUTION, Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOB, Work::ChainSegment { process_id: ChainSegmentProcessId::BackSyncBatchId { .. }, .. } => CHAIN_SEGMENT_BACKFILL, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST, Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST, Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE, } } } /// Unifies all the messages processed by the `BeaconProcessor`. enum InboundEvent { /// A worker has completed a task and is free. WorkerIdle, /// There is new work to be done. WorkEvent(WorkEvent), /// A work event that was queued for re-processing has become ready. ReprocessingWork(WorkEvent), } /// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. /// /// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained /// control (specifically in the ordering of event processing). struct InboundEvents { /// Used by workers when they finish a task. idle_rx: mpsc::Receiver<()>, /// Used by upstream processes to send new work to the `BeaconProcessor`. event_rx: mpsc::Receiver>, /// Used internally for queuing work ready to be re-processed. reprocess_work_rx: mpsc::Receiver>, } impl Stream for InboundEvents { type Item = InboundEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Always check for idle workers before anything else. This allows us to ensure that a big // stream of new events doesn't suppress the processing of existing events. match self.idle_rx.poll_recv(cx) { Poll::Ready(Some(())) => { return Poll::Ready(Some(InboundEvent::WorkerIdle)); } Poll::Ready(None) => { return Poll::Ready(None); } Poll::Pending => {} } // Poll for delayed blocks before polling for new work. It might be the case that a delayed // block is required to successfully process some new work. match self.reprocess_work_rx.poll_recv(cx) { Poll::Ready(Some(ready_work)) => { return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); } Poll::Ready(None) => { return Poll::Ready(None); } Poll::Pending => {} } match self.event_rx.poll_recv(cx) { Poll::Ready(Some(event)) => { return Poll::Ready(Some(InboundEvent::WorkEvent(event))); } Poll::Ready(None) => { return Poll::Ready(None); } Poll::Pending => {} } Poll::Pending } } /// Defines if and where we will store the SSZ files of invalid blocks. #[derive(Clone)] pub enum InvalidBlockStorage { Enabled(PathBuf), Disabled, } /// A mutli-threaded processor for messages received on the network /// that need to be processed by the `BeaconChain` /// /// See module level documentation for more information. pub struct BeaconProcessor { pub beacon_chain: Weak>, pub network_tx: mpsc::UnboundedSender>, pub sync_tx: mpsc::UnboundedSender>, pub network_globals: Arc>, pub executor: TaskExecutor, pub max_workers: usize, pub current_workers: usize, pub importing_blocks: DuplicateCache, pub invalid_block_storage: InvalidBlockStorage, pub log: Logger, } impl BeaconProcessor { /// Spawns the "manager" task which checks the receiver end of the returned `Sender` for /// messages which contain some new work which will be: /// /// - Performed immediately, if a worker is available. /// - Queued for later processing, if no worker is currently available. /// /// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task /// started with `spawn_blocking`. /// /// The optional `work_journal_tx` allows for an outside process to receive a log of all work /// events processed by `self`. This should only be used during testing. pub fn spawn_manager( mut self, event_rx: mpsc::Receiver>, work_journal_tx: Option>, ) { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); // Using LIFO queues for attestations since validator profits rely upon getting fresh // attestations into blocks. Additionally, later attestations contain more information than // earlier ones, so we consider them more valuable. let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN); let mut aggregate_debounce = TimeLatch::default(); let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); let mut attestation_debounce = TimeLatch::default(); let mut unknown_block_aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); let mut unknown_block_attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN); let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN); // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have // a strong feeling about queue type for exits. let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN); // Using a FIFO queue for slashing to prevent people from flushing their slashings from the // queues with lots of junk messages. let mut gossip_proposer_slashing_queue = FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN); let mut gossip_attester_slashing_queue = FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN); // Using a FIFO queue for light client updates to maintain sequence order. let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN); let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN); let mut unknown_light_client_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); let mut blbroots_queue = FifoQueue::new(MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN); let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN); let mut gossip_bls_to_execution_change_queue = FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN); let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); let chain = match self.beacon_chain.upgrade() { Some(chain) => chain, // No need to proceed any further if the beacon chain has been dropped, the client // is shutting down. None => return, }; // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); let work_reprocessing_tx = spawn_reprocess_scheduler( ready_work_tx, &self.executor, chain.slot_clock.clone(), self.log.clone(), ); let executor = self.executor.clone(); // The manager future will run on the core executor and delegate tasks to worker // threads on the blocking executor. let manager_future = async move { let mut inbound_events = InboundEvents { idle_rx, event_rx, reprocess_work_rx: ready_work_rx, }; let enable_backfill_rate_limiting = chain.config.enable_backfill_rate_limiting; loop { let work_event = match inbound_events.next().await { Some(InboundEvent::WorkerIdle) => { self.current_workers = self.current_workers.saturating_sub(1); None } Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { match QueuedBackfillBatch::try_from(event) { Ok(backfill_batch) => { match work_reprocessing_tx .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) { Err(e) => { warn!( self.log, "Unable to queue backfill work event. Will try to process now."; "error" => %e ); match e { TrySendError::Full(reprocess_queue_message) | TrySendError::Closed(reprocess_queue_message) => { match reprocess_queue_message { ReprocessQueueMessage::BackfillSync( backfill_batch, ) => Some(backfill_batch.into()), other => { crit!( self.log, "Unexpected queue message type"; "message_type" => other.as_ref() ); // This is an unhandled exception, drop the message. continue; } } } } } Ok(..) => { // backfill work sent to "reprocessing" queue. Process the next event. continue; } } } Err(event) => Some(event), } } Some(InboundEvent::WorkEvent(event)) | Some(InboundEvent::ReprocessingWork(event)) => Some(event), None => { debug!( self.log, "Gossip processor stopped"; "msg" => "stream ended" ); break; } }; let _event_timer = metrics::start_timer(&metrics::BEACON_PROCESSOR_EVENT_HANDLING_SECONDS); if let Some(event) = &work_event { metrics::inc_counter_vec( &metrics::BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT, &[event.work.str_id()], ); } else { metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL); } if let Some(work_journal_tx) = &work_journal_tx { let id = work_event .as_ref() .map(|event| event.work.str_id()) .unwrap_or(WORKER_FREED); // We don't care if this message was successfully sent, we only use the journal // during testing. let _ = work_journal_tx.try_send(id); } let can_spawn = self.current_workers < self.max_workers; let drop_during_sync = work_event .as_ref() .map_or(false, |event| event.drop_during_sync); match work_event { // There is no new work event, but we are able to spawn a new worker. // // We don't check the `work.drop_during_sync` here. We assume that if it made // it into the queue at any point then we should process it. None if can_spawn => { let toolbox = Toolbox { idle_tx: idle_tx.clone(), work_reprocessing_tx: work_reprocessing_tx.clone(), }; // Check for chain segments first, they're the most efficient way to get // blocks into the system. if let Some(item) = chain_segment_queue.pop() { self.spawn_worker(item, toolbox); // Sync block and blob segments have the same priority as normal chain // segments. This here might change depending on how batch processing // evolves. } else if let Some(item) = rpc_block_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = rpc_blob_queue.pop() { self.spawn_worker(item, toolbox); // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { self.spawn_worker(item, toolbox); // Check gossip blocks before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = gossip_blob_queue.pop() { self.spawn_worker(item, toolbox); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. } else if aggregate_queue.len() > 0 { let batch_size = cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE); if batch_size < 2 { // One single aggregate is in the queue, process it individually. if let Some(item) = aggregate_queue.pop() { self.spawn_worker(item, toolbox); } } else { // Collect two or more aggregates into a batch, so they can take // advantage of batch signature verification. // // Note: this will convert the `Work::GossipAggregate` item into a // `Work::GossipAggregateBatch` item. let mut packages = Vec::with_capacity(batch_size); for _ in 0..batch_size { if let Some(item) = aggregate_queue.pop() { match item { Work::GossipAggregate { message_id, peer_id, aggregate, seen_timestamp, } => { packages.push(GossipAggregatePackage::new( message_id, peer_id, aggregate, seen_timestamp, )); } _ => { error!(self.log, "Invalid item in aggregate queue") } } } } // Process all aggregates with a single worker. self.spawn_worker(Work::GossipAggregateBatch { packages }, toolbox) } // Check the unaggregated attestation queue. // // Potentially use batching. } else if attestation_queue.len() > 0 { let batch_size = cmp::min( attestation_queue.len(), MAX_GOSSIP_ATTESTATION_BATCH_SIZE, ); if batch_size < 2 { // One single attestation is in the queue, process it individually. if let Some(item) = attestation_queue.pop() { self.spawn_worker(item, toolbox); } } else { // Collect two or more attestations into a batch, so they can take // advantage of batch signature verification. // // Note: this will convert the `Work::GossipAttestation` item into a // `Work::GossipAttestationBatch` item. let mut packages = Vec::with_capacity(batch_size); for _ in 0..batch_size { if let Some(item) = attestation_queue.pop() { match item { Work::GossipAttestation { message_id, peer_id, attestation, subnet_id, should_import, seen_timestamp, } => { packages.push(GossipAttestationPackage::new( message_id, peer_id, attestation, subnet_id, should_import, seen_timestamp, )); } _ => error!( self.log, "Invalid item in attestation queue" ), } } } // Process all attestations with a single worker. self.spawn_worker( Work::GossipAttestationBatch { packages }, toolbox, ) } // Check sync committee messages after attestations as their rewards are lesser // and they don't influence fork choice. } else if let Some(item) = sync_contribution_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = sync_message_queue.pop() { self.spawn_worker(item, toolbox); // Aggregates and unaggregates queued for re-processing are older and we // care about fresher ones, so check those first. } else if let Some(item) = unknown_block_aggregate_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = unknown_block_attestation_queue.pop() { self.spawn_worker(item, toolbox); // Check RPC methods next. Status messages are needed for sync so // prioritize them over syncing requests from other peers (BlocksByRange // and BlocksByRoot) } else if let Some(item) = status_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = bbrange_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = bbroots_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = blbrange_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = blbroots_queue.pop() { self.spawn_worker(item, toolbox); // Check slashings after all other consensus messages so we prioritize // following head. // // Check attester slashings before proposer slashings since they have the // potential to slash multiple validators at once. } else if let Some(item) = gossip_attester_slashing_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = gossip_proposer_slashing_queue.pop() { self.spawn_worker(item, toolbox); // Check exits and address changes late since our validators don't get // rewards from them. } else if let Some(item) = gossip_voluntary_exit_queue.pop() { self.spawn_worker(item, toolbox); } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { self.spawn_worker(item, toolbox); // Handle backfill sync chain segments. } else if let Some(item) = backfill_chain_segment.pop() { self.spawn_worker(item, toolbox); // This statement should always be the final else statement. } else if let Some(item) = lcbootstrap_queue.pop() { self.spawn_worker(item, toolbox); } else { // Let the journal know that a worker is freed and there's nothing else // for it to do. if let Some(work_journal_tx) = &work_journal_tx { // We don't care if this message was successfully sent, we only use the journal // during testing. let _ = work_journal_tx.try_send(NOTHING_TO_DO); } } } // There is no new work event and we are unable to spawn a new worker. // // I cannot see any good reason why this would happen. None => { warn!( self.log, "Unexpected gossip processor condition"; "msg" => "no new work and cannot spawn worker" ); } // The chain is syncing and this event should be dropped during sync. Some(work_event) if self.network_globals.sync_state.read().is_syncing() && drop_during_sync => { let work_id = work_event.work.str_id(); metrics::inc_counter_vec( &metrics::BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT, &[work_id], ); trace!( self.log, "Gossip processor skipping work"; "msg" => "chain is syncing", "work_id" => work_id ); } // There is a new work event and the chain is not syncing. Process it or queue // it. Some(WorkEvent { work, .. }) => { let work_id = work.str_id(); let toolbox = Toolbox { idle_tx: idle_tx.clone(), work_reprocessing_tx: work_reprocessing_tx.clone(), }; match work { _ if can_spawn => self.spawn_worker(work, toolbox), Work::GossipAttestation { .. } => attestation_queue.push(work), // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. Work::GossipAttestationBatch { .. } => crit!( self.log, "Unsupported inbound event"; "type" => "GossipAttestationBatch" ), Work::GossipAggregate { .. } => aggregate_queue.push(work), // Aggregate batches are formed internally within the `BeaconProcessor`, // they are not sent from external services. Work::GossipAggregateBatch { .. } => crit!( self.log, "Unsupported inbound event"; "type" => "GossipAggregateBatch" ), Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } Work::GossipSignedBlobSidecar { .. } => { gossip_blob_queue.push(work, work_id, &self.log) } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) } Work::GossipVoluntaryExit { .. } => { gossip_voluntary_exit_queue.push(work, work_id, &self.log) } Work::GossipProposerSlashing { .. } => { gossip_proposer_slashing_queue.push(work, work_id, &self.log) } Work::GossipAttesterSlashing { .. } => { gossip_attester_slashing_queue.push(work, work_id, &self.log) } Work::GossipSyncSignature { .. } => sync_message_queue.push(work), Work::GossipSyncContribution { .. } => { sync_contribution_queue.push(work) } Work::GossipLightClientFinalityUpdate { .. } => { finality_update_queue.push(work, work_id, &self.log) } Work::GossipLightClientOptimisticUpdate { .. } => { optimistic_update_queue.push(work, work_id, &self.log) } Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log), Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), Work::ChainSegment { ref process_id, .. } => match process_id { ChainSegmentProcessId::RangeBatchId { .. } | ChainSegmentProcessId::ParentLookup { .. } => { chain_segment_queue.push(work, work_id, &self.log) } ChainSegmentProcessId::BackSyncBatchId { .. } => { backfill_chain_segment.push(work, work_id, &self.log) } }, Work::Status { .. } => status_queue.push(work, work_id, &self.log), Work::BlocksByRangeRequest { .. } => { bbrange_queue.push(work, work_id, &self.log) } Work::BlocksByRootsRequest { .. } => { bbroots_queue.push(work, work_id, &self.log) } Work::BlobsByRangeRequest { .. } => { blbrange_queue.push(work, work_id, &self.log) } Work::LightClientBootstrapRequest { .. } => { lcbootstrap_queue.push(work, work_id, &self.log) } Work::UnknownBlockAttestation { .. } => { unknown_block_attestation_queue.push(work) } Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } Work::GossipBlsToExecutionChange { .. } => { gossip_bls_to_execution_change_queue.push(work, work_id, &self.log) } Work::BlobsByRootsRequest { .. } => { blbroots_queue.push(work, work_id, &self.log) } Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id, &self.log) } } } } metrics::set_gauge( &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL, self.current_workers as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL, attestation_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL, aggregate_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL, sync_message_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL, sync_contribution_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL, gossip_block_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, rpc_block_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL, rpc_blob_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, chain_segment_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL, backfill_chain_segment.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL, gossip_voluntary_exit_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL, gossip_proposer_slashing_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL, gossip_attester_slashing_queue.len() as i64, ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL, gossip_bls_to_execution_change_queue.len() as i64, ); if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( self.log, "Aggregate attestation queue full"; "msg" => "the system has insufficient resources for load", "queue_len" => aggregate_queue.max_length, ) } if attestation_queue.is_full() && attestation_debounce.elapsed() { error!( self.log, "Attestation queue full"; "msg" => "the system has insufficient resources for load", "queue_len" => attestation_queue.max_length, ) } } }; // Spawn on the core executor. executor.spawn(manager_future, MANAGER_TASK_NAME); } /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. fn spawn_worker(&mut self, work: Work, toolbox: Toolbox) { let idle_tx = toolbox.idle_tx; let work_reprocessing_tx = toolbox.work_reprocessing_tx; let work_id = work.str_id(); let worker_timer = metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL); metrics::inc_counter_vec( &metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT, &[work.str_id()], ); // Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped. // // This helps ensure that the worker is always freed in the case of an early exit or panic. // As such, this instantiation should happen as early in the function as possible. let send_idle_on_drop = SendOnDrop { tx: idle_tx, _worker_timer: worker_timer, log: self.log.clone(), }; let worker_id = self.current_workers; self.current_workers = self.current_workers.saturating_add(1); let chain = if let Some(chain) = self.beacon_chain.upgrade() { chain } else { debug!( self.log, "Beacon chain dropped, shutting down"; ); return; }; let executor = self.executor.clone(); let worker = Worker { chain, network_tx: self.network_tx.clone(), sync_tx: self.sync_tx.clone(), log: self.log.clone(), }; let duplicate_cache = self.importing_blocks.clone(); trace!( self.log, "Spawning beacon processor worker"; "work" => work_id, "worker" => worker_id, ); let task_spawner = TaskSpawner { executor: executor.clone(), send_idle_on_drop, }; let sub_executor = executor; match work { /* * Individual unaggregated attestation verification. */ Work::GossipAttestation { message_id, peer_id, attestation, subnet_id, should_import, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_attestation( message_id, peer_id, attestation, subnet_id, should_import, Some(work_reprocessing_tx), seen_timestamp, ) }), /* * Batched unaggregated attestation verification. */ Work::GossipAttestationBatch { packages } => task_spawner.spawn_blocking(|| { worker.process_gossip_attestation_batch(packages, Some(work_reprocessing_tx)) }), /* * Individual aggregated attestation verification. */ Work::GossipAggregate { message_id, peer_id, aggregate, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_aggregate( message_id, peer_id, aggregate, Some(work_reprocessing_tx), seen_timestamp, ) }), /* * Batched aggregated attestation verification. */ Work::GossipAggregateBatch { packages } => task_spawner.spawn_blocking(|| { worker.process_gossip_aggregate_batch(packages, Some(work_reprocessing_tx)) }), /* * Verification for beacon blocks received on gossip. */ Work::GossipBlock { message_id, peer_id, peer_client, block, seen_timestamp, } => { let invalid_block_storage = self.invalid_block_storage.clone(); task_spawner.spawn_async(async move { worker .process_gossip_block( message_id, peer_id, peer_client, block.into(), work_reprocessing_tx, duplicate_cache, invalid_block_storage, seen_timestamp, ) .await }) } /* * Verification for blobs sidecars received on gossip. */ Work::GossipSignedBlobSidecar { message_id, peer_id, peer_client, blob_index, signed_blob, seen_timestamp, } => task_spawner.spawn_async(async move { worker .process_gossip_blob( message_id, peer_id, peer_client, blob_index, *signed_blob, seen_timestamp, ) .await }), /* * Import for blocks that we received earlier than their intended slot. */ Work::DelayedImportBlock { peer_id, block, seen_timestamp, } => { let invalid_block_storage = self.invalid_block_storage.clone(); task_spawner.spawn_async(worker.process_gossip_verified_block( peer_id, *block, work_reprocessing_tx, invalid_block_storage, seen_timestamp, )) } /* * Voluntary exits received on gossip. */ Work::GossipVoluntaryExit { message_id, peer_id, voluntary_exit, } => task_spawner.spawn_blocking(move || { worker.process_gossip_voluntary_exit(message_id, peer_id, *voluntary_exit) }), /* * Proposer slashings received on gossip. */ Work::GossipProposerSlashing { message_id, peer_id, proposer_slashing, } => task_spawner.spawn_blocking(move || { worker.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing) }), /* * Attester slashings received on gossip. */ Work::GossipAttesterSlashing { message_id, peer_id, attester_slashing, } => task_spawner.spawn_blocking(move || { worker.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing) }), /* * Sync committee message verification. */ Work::GossipSyncSignature { message_id, peer_id, sync_signature, subnet_id, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_sync_committee_signature( message_id, peer_id, *sync_signature, subnet_id, seen_timestamp, ) }), /* * Sync contribution verification. */ Work::GossipSyncContribution { message_id, peer_id, sync_contribution, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_sync_committee_contribution( message_id, peer_id, *sync_contribution, seen_timestamp, ) }), /* * BLS to execution change verification. */ Work::GossipBlsToExecutionChange { message_id, peer_id, bls_to_execution_change, } => task_spawner.spawn_blocking(move || { worker.process_gossip_bls_to_execution_change( message_id, peer_id, *bls_to_execution_change, ) }), /* * Light client finality update verification. */ Work::GossipLightClientFinalityUpdate { message_id, peer_id, light_client_finality_update, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_finality_update( message_id, peer_id, *light_client_finality_update, seen_timestamp, ) }), /* * Light client optimistic update verification. */ Work::GossipLightClientOptimisticUpdate { message_id, peer_id, light_client_optimistic_update, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_optimistic_update( message_id, peer_id, *light_client_optimistic_update, Some(work_reprocessing_tx), seen_timestamp, ) }), /* * Verification for beacon blocks received during syncing via RPC. */ Work::RpcBlock { block_root, block, seen_timestamp, process_type, should_process, } => task_spawner.spawn_async(worker.process_rpc_block( block_root, block, seen_timestamp, process_type, work_reprocessing_tx, duplicate_cache, should_process, )), Work::RpcBlobs { block_root, blobs, seen_timestamp, process_type, } => task_spawner.spawn_async(worker.process_rpc_blobs( block_root, blobs, seen_timestamp, process_type, )), /* * Verification for a chain segment (multiple blocks). */ Work::ChainSegment { process_id, blocks } => { let notify_execution_layer = if self .network_globals .sync_state .read() .is_syncing_finalized() { NotifyExecutionLayer::No } else { NotifyExecutionLayer::Yes }; task_spawner.spawn_async(async move { worker .process_chain_segment(process_id, blocks, notify_execution_layer) .await }) } /* * Processing of Status Messages. */ Work::Status { peer_id, message } => { task_spawner.spawn_blocking(move || worker.process_status(peer_id, message)) } /* * Processing of range syncing requests from other peers. */ Work::BlocksByRangeRequest { peer_id, request_id, request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { worker.handle_blocks_by_range_request( sub_executor, send_idle_on_drop, peer_id, request_id, request, ) }), /* * Processing of blocks by roots requests from other peers. */ Work::BlocksByRootsRequest { peer_id, request_id, request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { worker.handle_blocks_by_root_request( sub_executor, send_idle_on_drop, peer_id, request_id, request, ) }), Work::BlobsByRangeRequest { peer_id, request_id, request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { worker.handle_blobs_by_range_request( send_idle_on_drop, peer_id, request_id, request, ) }), Work::BlobsByRootsRequest { peer_id, request_id, request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { worker.handle_blobs_by_root_request(send_idle_on_drop, peer_id, request_id, request) }), /* * Processing of lightclient bootstrap requests from other peers. */ Work::LightClientBootstrapRequest { peer_id, request_id, request, } => task_spawner.spawn_blocking(move || { worker.handle_light_client_bootstrap(peer_id, request_id, request) }), Work::UnknownBlockAttestation { message_id, peer_id, attestation, subnet_id, should_import, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_attestation( message_id, peer_id, attestation, subnet_id, should_import, None, // Do not allow this attestation to be re-processed beyond this point. seen_timestamp, ) }), Work::UnknownBlockAggregate { message_id, peer_id, aggregate, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_aggregate( message_id, peer_id, aggregate, None, seen_timestamp, ) }), Work::UnknownLightClientOptimisticUpdate { message_id, peer_id, light_client_optimistic_update, seen_timestamp, } => task_spawner.spawn_blocking(move || { worker.process_gossip_optimistic_update( message_id, peer_id, *light_client_optimistic_update, None, seen_timestamp, ) }), }; } } /// Spawns tasks that are either: /// /// - Blocking (i.e. intensive methods that shouldn't run on the core `tokio` executor) /// - Async (i.e. `async` methods) /// /// Takes a `SendOnDrop` and ensures it is dropped after the task completes. This frees the beacon /// processor worker so a new task can be started. struct TaskSpawner { executor: TaskExecutor, send_idle_on_drop: SendOnDrop, } impl TaskSpawner { /// Spawn an async task, dropping the `SendOnDrop` after the task has completed. fn spawn_async(self, task: impl Future + Send + 'static) { self.executor.spawn( async { task.await; drop(self.send_idle_on_drop) }, WORKER_TASK_NAME, ) } /// Spawn a blocking task, dropping the `SendOnDrop` after the task has completed. fn spawn_blocking(self, task: F) where F: FnOnce() + Send + 'static, { self.executor.spawn_blocking( || { task(); drop(self.send_idle_on_drop) }, WORKER_TASK_NAME, ) } /// Spawn a blocking task, passing the `SendOnDrop` into the task. /// /// ## Notes /// /// Users must ensure the `SendOnDrop` is dropped at the appropriate time! pub fn spawn_blocking_with_manual_send_idle(self, task: F) where F: FnOnce(SendOnDrop) + Send + 'static, { self.executor.spawn_blocking( || { task(self.send_idle_on_drop); }, WORKER_TASK_NAME, ) } } /// This struct will send a message on `self.tx` when it is dropped. An error will be logged on /// `self.log` if the send fails (this happens when the node is shutting down). /// /// ## Purpose /// /// This is useful for ensuring that a worker-freed message is still sent if a worker panics. /// /// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic: /// /// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics pub struct SendOnDrop { tx: mpsc::Sender<()>, // The field is unused, but it's here to ensure the timer is dropped once the task has finished. _worker_timer: Option, log: Logger, } impl Drop for SendOnDrop { fn drop(&mut self) { if let Err(e) = self.tx.try_send(()) { warn!( self.log, "Unable to free worker"; "msg" => "did not free worker, shutdown may be underway", "error" => %e ) } } }