diff --git a/Cargo.lock b/Cargo.lock index bca31b1f8..1b0657f24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1607,6 +1607,38 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "discv5" +version = "0.1.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f52d2228d51e8f868a37d5b5b25b82c13552b635d5b47c3a5d53855a6fc4f0" +dependencies = [ + "aes-ctr", + "aes-gcm 0.8.0", + "arrayvec", + "digest 0.9.0", + "enr", + "fnv", + "futures 0.3.12", + "hex", + "hkdf", + "k256", + "lazy_static", + "lru_time_cache", + "parking_lot", + "rand 0.7.3", + "rlp 0.5.0", + "sha2 0.9.2", + "smallvec", + "tokio 1.1.0", + "tokio-stream", + "tokio-util 0.6.3", + "tracing", + "tracing-subscriber", + "uint", + "zeroize", +] + [[package]] name = "discv5" version = "0.1.0-beta.3" @@ -1775,9 +1807,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e" +checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" dependencies = [ "atty", "humantime", @@ -1966,7 +1998,7 @@ dependencies = [ "base64 0.13.0", "directory", "dirs 3.0.1", - "discv5", + "discv5 0.1.0-beta.3 (git+https://github.com/sigp/discv5?rev=02d2c896c66f8dc2b848c3996fedcd98e1dfec69)", "error-chain", "eth2_ssz", "eth2_ssz_derive", @@ -2797,7 +2829,7 @@ version = "0.1.0" dependencies = [ "beacon_chain", "bs58 0.4.0", - "discv5", + "discv5 0.1.0-beta.3 (git+https://github.com/sigp/discv5?rev=02d2c896c66f8dc2b848c3996fedcd98e1dfec69)", "environment", "eth1", "eth2", @@ -3657,7 +3689,7 @@ dependencies = [ "clap", "clap_utils", "directory", - "env_logger 0.8.2", + "env_logger 0.8.3", "environment", "eth2_network_config", "futures 0.3.12", @@ -4069,6 +4101,8 @@ name = "network" version = "0.2.0" dependencies = [ "beacon_chain", + "discv5 0.1.0-beta.3 (registry+https://github.com/rust-lang/crates.io-index)", + "environment", "error-chain", "eth2_libp2p", "eth2_ssz", @@ -4104,6 +4138,7 @@ dependencies = [ "tempfile", "tokio 1.1.0", "tokio-stream", + "tokio-util 0.6.3", "tree_hash", "types", ] @@ -5651,7 +5686,7 @@ name = "simulator" version = "0.2.0" dependencies = [ "clap", - "env_logger 0.8.2", + "env_logger 0.8.3", "eth1", "eth1_test_rig", "futures 0.3.12", @@ -5942,7 +5977,7 @@ dependencies = [ "arbitrary", "bls", "criterion", - "env_logger 0.8.2", + "env_logger 0.8.3", "eth2_hashing", "eth2_ssz", "eth2_ssz_types", @@ -6509,9 +6544,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd" +checksum = "1981ad97df782ab506a1f43bf82c967326960d278acf3bf8279809648c3ff3ea" dependencies = [ "futures-core", "pin-project-lite 0.2.4", diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index fe8398718..f6fba1409 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -385,6 +385,7 @@ pub fn signature_verify_chain_segment( /// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on /// the p2p network. +#[derive(Debug)] pub struct GossipVerifiedBlock { pub block: SignedBeaconBlock, pub block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 448e60fb1..f0b941a49 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -30,7 +30,7 @@ mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, ChainSegmentResult, - ForkChoiceError, StateSkipConfig, + ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::chain_config::ChainConfig; diff --git a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs index 79c439751..e303f8973 100644 --- a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs +++ b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs @@ -228,6 +228,11 @@ impl NaiveAggregationPool { outcome } + /// Returns the total number of attestations stored in `self`. + pub fn num_attestations(&self) -> usize { + self.maps.iter().map(|(_, map)| map.len()).sum() + } + /// Returns an aggregated `Attestation` with the given `data`, if any. pub fn get(&self, data: &AttestationData) -> Option> { self.maps.get(&data.slot).and_then(|map| map.get(data)) diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs index b1531d6b7..aa82692ae 100644 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -8,6 +8,7 @@ use types::{ pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; /// This snapshot is to be used for verifying a child of `self.beacon_block`. +#[derive(Debug)] pub struct PreProcessingSnapshot { /// This state is equivalent to the `self.beacon_block.state_root()` state that has been /// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 5f8470c2f..0f0fad35b 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -14,6 +14,8 @@ exit-future = "0.2.0" slog-term = "2.6.0" slog-async = "2.5.0" logging = { path = "../../common/logging" } +environment = { path = "../../lighthouse/environment" } +discv5 = { version = "0.1.0-beta.3" } [dependencies] beacon_chain = { path = "../beacon_chain" } @@ -31,7 +33,7 @@ tree_hash = "0.1.1" futures = "0.3.7" error-chain = "0.12.4" tokio = { version = "1.1.0", features = ["full"] } -tokio-stream = "0.1.2" +tokio-stream = "0.1.3" parking_lot = "0.11.0" smallvec = "1.6.1" rand = "0.7.3" @@ -46,3 +48,4 @@ num_cpus = "1.13.0" lru_cache = { path = "../../common/lru_cache" } if-addrs = "0.6.4" strum = { version = "0.20"} +tokio-util = { version = "0.6.3", features = ["time"] } diff --git a/beacon_node/network/src/beacon_processor/block_delay_queue.rs b/beacon_node/network/src/beacon_processor/block_delay_queue.rs new file mode 100644 index 000000000..c259d95fd --- /dev/null +++ b/beacon_node/network/src/beacon_processor/block_delay_queue.rs @@ -0,0 +1,210 @@ +//! Provides a mechanism which queues blocks for later processing when they arrive too early. +//! +//! When the `beacon_processor::Worker` imports a block that is acceptably early (i.e., within the +//! gossip propagation tolerance) it will send it to this queue where it will be placed in a +//! `DelayQueue` until the slot arrives. Once the block has been determined to be ready, it will be +//! sent back out on a channel to be processed by the `BeaconProcessor` again. +//! +//! There is the edge-case where the slot arrives before this queue manages to process it. In that +//! case, the block will be sent off for immediate processing (skipping the `DelayQueue`). +use super::MAX_DELAYED_BLOCK_QUEUE_LEN; +use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock}; +use eth2_libp2p::PeerId; +use futures::stream::{Stream, StreamExt}; +use futures::task::Poll; +use slog::{crit, debug, error, Logger}; +use slot_clock::SlotClock; +use std::collections::HashSet; +use std::pin::Pin; +use std::task::Context; +use std::time::Duration; +use task_executor::TaskExecutor; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::time::error::Error as TimeError; +use tokio_util::time::DelayQueue; + +const TASK_NAME: &str = "beacon_processor_block_delay_queue"; + +/// Queue blocks for re-processing with an `ADDITIONAL_DELAY` after the slot starts. This is to +/// account for any slight drift in the system clock. +const ADDITIONAL_DELAY: Duration = Duration::from_millis(5); + +/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that +/// we signature-verify blocks before putting them in the queue *should* protect against this, but +/// it's nice to have extra protection. +const MAXIMUM_QUEUED_BLOCKS: usize = 16; + +/// A block that arrived early and has been queued for later import. +pub struct QueuedBlock { + pub peer_id: PeerId, + pub block: GossipVerifiedBlock, + pub seen_timestamp: Duration, +} + +/// Unifies the different messages processed by the block delay queue. +enum InboundEvent { + /// A block that has been received early that we should queue for later processing. + EarlyBlock(QueuedBlock), + /// A block that was queued for later processing and is ready for import. + ReadyBlock(QueuedBlock), + /// The `DelayQueue` returned an error. + DelayQueueError(TimeError), +} + +/// Combines the `DelayQueue` and `Receiver` streams into a single stream. +/// +/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained +/// control (specifically in the ordering of event processing). +struct InboundEvents { + pub delay_queue: DelayQueue>, + early_blocks_rx: Receiver>, +} + +impl Stream for InboundEvents { + type Item = InboundEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Poll for expired blocks *before* we try to process new blocks. + // + // The sequential nature of blockchains means it is generally better to try and import all + // existing blocks before new ones. + match self.delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(queued_block))) => { + return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner()))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + + match self.early_blocks_rx.poll_recv(cx) { + Poll::Ready(Some(queued_block)) => { + return Poll::Ready(Some(InboundEvent::EarlyBlock(queued_block))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + Poll::Pending + } +} + +/// Spawn a queue which will accept blocks via the returned `Sender`, potentially queue them until +/// their slot arrives, then send them back out via `ready_blocks_tx`. +pub fn spawn_block_delay_queue( + ready_blocks_tx: Sender>, + executor: &TaskExecutor, + slot_clock: T::SlotClock, + log: Logger, +) -> Sender> { + let (early_blocks_tx, early_blocks_rx): (_, Receiver>) = + mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN); + + let queue_future = async move { + let mut queued_block_roots = HashSet::new(); + + let mut inbound_events = InboundEvents { + early_blocks_rx, + delay_queue: DelayQueue::new(), + }; + + loop { + match inbound_events.next().await { + // Some block has been indicated as "early" and should be processed when the + // appropriate slot arrives. + Some(InboundEvent::EarlyBlock(early_block)) => { + let block_slot = early_block.block.block.slot(); + let block_root = early_block.block.block_root; + + // Don't add the same block to the queue twice. This prevents DoS attacks. + if queued_block_roots.contains(&block_root) { + continue; + } + + if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { + // Check to ensure this won't over-fill the queue. + if queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { + error!( + log, + "Early blocks queue is full"; + "queue_size" => MAXIMUM_QUEUED_BLOCKS, + "msg" => "check system clock" + ); + // Drop the block. + continue; + } + + queued_block_roots.insert(block_root); + // Queue the block until the start of the appropriate slot, plus + // `ADDITIONAL_DELAY`. + inbound_events + .delay_queue + .insert(early_block, duration_till_slot + ADDITIONAL_DELAY); + } else { + // If there is no duration till the next slot, check to see if the slot + // has already arrived. If it has already arrived, send it out for + // immediate processing. + // + // If we can't read the slot or the slot hasn't arrived, simply drop the + // block. + // + // This logic is slightly awkward since `SlotClock::duration_to_slot` + // doesn't distinguish between a slot that has already arrived and an + // error reading the slot clock. + if let Some(now) = slot_clock.now() { + if block_slot <= now && ready_blocks_tx.try_send(early_block).is_err() { + error!( + log, + "Failed to send block"; + ); + } + } + } + } + // A block that was queued for later processing is now ready to be processed. + Some(InboundEvent::ReadyBlock(ready_block)) => { + let block_root = ready_block.block.block_root; + + if !queued_block_roots.remove(&block_root) { + // Log an error to alert that we've made a bad assumption about how this + // program works, but still process the block anyway. + error!( + log, + "Unknown block in delay queue"; + "block_root" => ?block_root + ); + } + + if ready_blocks_tx.try_send(ready_block).is_err() { + error!( + log, + "Failed to pop queued block"; + ); + } + } + Some(InboundEvent::DelayQueueError(e)) => crit!( + log, + "Failed to poll block delay queue"; + "e" => ?e + ), + None => { + debug!( + log, + "Block delay queue stopped"; + "msg" => "shutting down" + ); + break; + } + } + } + }; + + executor.spawn(queue_future, TASK_NAME); + + early_blocks_tx +} diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index e42cab146..04c172426 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -36,24 +36,32 @@ //! task. use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock}; +use block_delay_queue::{spawn_block_delay_queue, QueuedBlock}; use eth2_libp2p::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, MessageId, NetworkGlobals, PeerId, PeerRequestId, }; -use slog::{crit, debug, error, trace, warn, Logger}; +use futures::stream::{Stream, StreamExt}; +use futures::task::Poll; +use slog::{debug, error, trace, warn, Logger}; use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; use std::sync::{Arc, Weak}; +use std::task::Context; use std::time::{Duration, Instant}; use task_executor::TaskExecutor; use tokio::sync::{mpsc, oneshot}; use types::{ - Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, + Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -use worker::Worker; +use worker::{Toolbox, Worker}; +mod block_delay_queue; +mod tests; mod worker; pub use worker::ProcessId; @@ -81,6 +89,10 @@ const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024; /// before we start dropping them. const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but +/// within acceptable clock disparity) that will be queued before we start dropping them. +const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024; + /// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096; @@ -121,6 +133,22 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); +/// Unique IDs used for metrics and testing. +pub const WORKER_FREED: &str = "worker_freed"; +pub const NOTHING_TO_DO: &str = "nothing_to_do"; +pub const GOSSIP_ATTESTATION: &str = "gossip_attestation"; +pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; +pub const GOSSIP_BLOCK: &str = "gossip_block"; +pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; +pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; +pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; +pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing"; +pub const RPC_BLOCK: &str = "rpc_block"; +pub const CHAIN_SEGMENT: &str = "chain_segment"; +pub const STATUS_PROCESSING: &str = "status_processing"; +pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; +pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; + /// Used to send/receive results from a rpc block import in a blocking task. pub type BlockResultSender = oneshot::Sender>>; pub type BlockResultReceiver = oneshot::Receiver>>; @@ -210,18 +238,23 @@ impl LifoQueue { } /// An event to be processed by the manager task. -#[derive(Debug)] -pub struct WorkEvent { +pub struct WorkEvent { drop_during_sync: bool, - work: Work, + work: Work, } -impl WorkEvent { +impl fmt::Debug for WorkEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl WorkEvent { /// Create a new `Work` event for some unaggregated attestation. pub fn unaggregated_attestation( message_id: MessageId, peer_id: PeerId, - attestation: Attestation, + attestation: Attestation, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, @@ -243,7 +276,7 @@ impl WorkEvent { pub fn aggregated_attestation( message_id: MessageId, peer_id: PeerId, - aggregate: SignedAggregateAndProof, + aggregate: SignedAggregateAndProof, seen_timestamp: Duration, ) -> Self { Self { @@ -261,7 +294,7 @@ impl WorkEvent { pub fn gossip_beacon_block( message_id: MessageId, peer_id: PeerId, - block: Box>, + block: Box>, seen_timestamp: Duration, ) -> Self { Self { @@ -275,6 +308,22 @@ impl WorkEvent { } } + /// Create a new `Work` event for some block that was delayed for later processing. + pub fn delayed_import_beacon_block( + peer_id: PeerId, + block: Box>, + seen_timestamp: Duration, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::DelayedImportBlock { + peer_id, + block, + seen_timestamp, + }, + } + } + /// Create a new `Work` event for some exit. pub fn gossip_voluntary_exit( message_id: MessageId, @@ -311,7 +360,7 @@ impl WorkEvent { pub fn gossip_attester_slashing( message_id: MessageId, peer_id: PeerId, - attester_slashing: Box>, + attester_slashing: Box>, ) -> Self { Self { drop_during_sync: false, @@ -325,7 +374,9 @@ impl WorkEvent { /// Create a new `Work` event for some block, where the result from computation (if any) is /// sent to the other side of `result_tx`. - pub fn rpc_beacon_block(block: Box>) -> (Self, BlockResultReceiver) { + pub fn rpc_beacon_block( + block: Box>, + ) -> (Self, BlockResultReceiver) { let (result_tx, result_rx) = oneshot::channel(); let event = Self { drop_during_sync: false, @@ -335,7 +386,10 @@ impl WorkEvent { } /// Create a new work event to import `blocks` as a beacon chain segment. - pub fn chain_segment(process_id: ProcessId, blocks: Vec>) -> Self { + pub fn chain_segment( + process_id: ProcessId, + blocks: Vec>, + ) -> Self { Self { drop_during_sync: false, work: Work::ChainSegment { process_id, blocks }, @@ -390,11 +444,11 @@ impl WorkEvent { /// A consensus message (or multiple) from the network that requires processing. #[derive(Debug)] -pub enum Work { +pub enum Work { GossipAttestation { message_id: MessageId, peer_id: PeerId, - attestation: Box>, + attestation: Box>, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, @@ -402,13 +456,18 @@ pub enum Work { GossipAggregate { message_id: MessageId, peer_id: PeerId, - aggregate: Box>, + aggregate: Box>, seen_timestamp: Duration, }, GossipBlock { message_id: MessageId, peer_id: PeerId, - block: Box>, + block: Box>, + seen_timestamp: Duration, + }, + DelayedImportBlock { + peer_id: PeerId, + block: Box>, seen_timestamp: Duration, }, GossipVoluntaryExit { @@ -424,15 +483,15 @@ pub enum Work { GossipAttesterSlashing { message_id: MessageId, peer_id: PeerId, - attester_slashing: Box>, + attester_slashing: Box>, }, RpcBlock { - block: Box>, - result_tx: BlockResultSender, + block: Box>, + result_tx: BlockResultSender, }, ChainSegment { process_id: ProcessId, - blocks: Vec>, + blocks: Vec>, }, Status { peer_id: PeerId, @@ -450,21 +509,22 @@ pub enum Work { }, } -impl Work { +impl Work { /// Provides a `&str` that uniquely identifies each enum variant. fn str_id(&self) -> &'static str { match self { - Work::GossipAttestation { .. } => "gossip_attestation", - Work::GossipAggregate { .. } => "gossip_aggregate", - Work::GossipBlock { .. } => "gossip_block", - Work::GossipVoluntaryExit { .. } => "gossip_voluntary_exit", - Work::GossipProposerSlashing { .. } => "gossip_proposer_slashing", - Work::GossipAttesterSlashing { .. } => "gossip_attester_slashing", - Work::RpcBlock { .. } => "rpc_block", - Work::ChainSegment { .. } => "chain_segment", - Work::Status { .. } => "status_processing", - Work::BlocksByRangeRequest { .. } => "blocks_by_range_request", - Work::BlocksByRootsRequest { .. } => "blocks_by_roots_request", + Work::GossipAttestation { .. } => GOSSIP_ATTESTATION, + Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, + Work::GossipBlock { .. } => GOSSIP_BLOCK, + Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, + Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, + Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING, + Work::GossipAttesterSlashing { .. } => GOSSIP_ATTESTER_SLASHING, + Work::RpcBlock { .. } => RPC_BLOCK, + Work::ChainSegment { .. } => CHAIN_SEGMENT, + Work::Status { .. } => STATUS_PROCESSING, + Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, + Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, } } } @@ -488,6 +548,71 @@ impl TimeLatch { } } +/// Unifies all the messages processed by the `BeaconProcessor`. +enum InboundEvent { + /// A worker has completed a task and is free. + WorkerIdle, + /// There is new work to be done. + WorkEvent(WorkEvent), + /// A block that was delayed for import at a later slot has become ready. + QueuedBlock(Box>), +} + +/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. +/// +/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained +/// control (specifically in the ordering of event processing). +struct InboundEvents { + /// Used by workers when they finish a task. + idle_rx: mpsc::Receiver<()>, + /// Used by upstream processes to send new work to the `BeaconProcessor`. + event_rx: mpsc::Receiver>, + /// Used internally for queuing blocks for processing once their slot arrives. + post_delay_block_queue_rx: mpsc::Receiver>, +} + +impl Stream for InboundEvents { + type Item = InboundEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Always check for idle workers before anything else. This allows us to ensure that a big + // stream of new events doesn't suppress the processing of existing events. + match self.idle_rx.poll_recv(cx) { + Poll::Ready(Some(())) => { + return Poll::Ready(Some(InboundEvent::WorkerIdle)); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + // Poll for delayed blocks before polling for new work. It might be the case that a delayed + // block is required to successfully process some new work. + match self.post_delay_block_queue_rx.poll_recv(cx) { + Poll::Ready(Some(queued_block)) => { + return Poll::Ready(Some(InboundEvent::QueuedBlock(Box::new(queued_block)))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + match self.event_rx.poll_recv(cx) { + Poll::Ready(Some(event)) => { + return Poll::Ready(Some(InboundEvent::WorkEvent(event))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + Poll::Pending + } +} + /// A mutli-threaded processor for messages received on the network /// that need to be processed by the `BeaconChain` /// @@ -512,8 +637,16 @@ impl BeaconProcessor { /// /// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task /// started with `spawn_blocking`. - pub fn spawn_manager(mut self, mut event_rx: mpsc::Receiver>) { - let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); + /// + /// The optional `work_journal_tx` allows for an outside process to receive a log of all work + /// events processed by `self`. This should only be used during testing. + pub fn spawn_manager( + mut self, + event_rx: mpsc::Receiver>, + work_journal_tx: Option>, + ) { + // Used by workers to communicate that they are finished a task. + let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); // Using LIFO queues for attestations since validator profits rely upon getting fresh // attestations into blocks. Additionally, later attestations contain more information than @@ -538,55 +671,63 @@ impl BeaconProcessor { let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); + let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); + // The delayed block queues are used to re-queue blocks for processing at a later time if + // they're received early. + let (post_delay_block_queue_tx, post_delay_block_queue_rx) = + mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN); + let pre_delay_block_queue_tx = { + if let Some(chain) = self.beacon_chain.upgrade() { + spawn_block_delay_queue( + post_delay_block_queue_tx, + &self.executor, + chain.slot_clock.clone(), + self.log.clone(), + ) + } else { + // No need to proceed any further if the beacon chain has been dropped, the client + // is shutting down. + return; + } + }; + let executor = self.executor.clone(); // The manager future will run on the core executor and delegate tasks to worker // threads on the blocking executor. let manager_future = async move { + let mut inbound_events = InboundEvents { + idle_rx, + event_rx, + post_delay_block_queue_rx, + }; + loop { - // Listen to both the event and idle channels, acting on whichever is ready - // first. - // - // Set `work_event = Some(event)` if there is new work to be done. Otherwise sets - // `event = None` if it was a worker becoming idle. - let work_event = tokio::select! { - // A worker has finished some work. - new_idle_opt = idle_rx.recv() => { - if new_idle_opt.is_some() { - self.current_workers = self.current_workers.saturating_sub(1); - None - } else { - // Exit if all idle senders have been dropped. - // - // This shouldn't happen since this function holds a sender. - crit!( - self.log, - "Gossip processor stopped"; - "msg" => "all idle senders dropped" - ); - break - } - }, - // There is a new piece of work to be handled. - new_work_event_opt = event_rx.recv() => { - if let Some(new_work_event) = new_work_event_opt { - Some(new_work_event) - } else { - // Exit if all event senders have been dropped. - // - // This should happen when the client shuts down. - debug!( - self.log, - "Gossip processor stopped"; - "msg" => "all event senders dropped" - ); - break - } + let work_event = match inbound_events.next().await { + Some(InboundEvent::WorkerIdle) => { + self.current_workers = self.current_workers.saturating_sub(1); + None + } + Some(InboundEvent::WorkEvent(event)) => Some(event), + Some(InboundEvent::QueuedBlock(queued_block)) => { + Some(WorkEvent::delayed_import_beacon_block( + queued_block.peer_id, + Box::new(queued_block.block), + queued_block.seen_timestamp, + )) + } + None => { + debug!( + self.log, + "Gossip processor stopped"; + "msg" => "stream ended" + ); + break; } }; @@ -601,6 +742,17 @@ impl BeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL); } + if let Some(work_journal_tx) = &work_journal_tx { + let id = work_event + .as_ref() + .map(|event| event.work.str_id()) + .unwrap_or(WORKER_FREED); + + // We don't care if this message was successfully sent, we only use the journal + // during testing. + let _ = work_journal_tx.try_send(id.to_string()); + } + let can_spawn = self.current_workers < self.max_workers; let drop_during_sync = work_event .as_ref() @@ -612,46 +764,64 @@ impl BeaconProcessor { // We don't check the `work.drop_during_sync` here. We assume that if it made // it into the queue at any point then we should process it. None if can_spawn => { + let toolbox = Toolbox { + idle_tx: idle_tx.clone(), + delayed_block_tx: pre_delay_block_queue_tx.clone(), + }; + // Check for chain segments first, they're the most efficient way to get // blocks into the system. if let Some(item) = chain_segment_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); // Check sync blocks before gossip blocks, since we've already explicitly // requested these blocks. } else if let Some(item) = rpc_block_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); + // Check delayed blocks before gossip blocks, the gossip blocks might rely + // on the delayed ones. + } else if let Some(item) = delayed_block_queue.pop() { + self.spawn_worker(item, toolbox); // Check gossip blocks before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. } else if let Some(item) = aggregate_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); } else if let Some(item) = attestation_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); // Check RPC methods next. Status messages are needed for sync so // prioritize them over syncing requests from other peers (BlocksByRange // and BlocksByRoot) } else if let Some(item) = status_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); } else if let Some(item) = bbrange_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); } else if let Some(item) = bbroots_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); // Check slashings after all other consensus messages so we prioritize // following head. // // Check attester slashings before proposer slashings since they have the // potential to slash multiple validators at once. } else if let Some(item) = gossip_attester_slashing_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); } else if let Some(item) = gossip_proposer_slashing_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); // Check exits last since our validators don't get rewards from them. } else if let Some(item) = gossip_voluntary_exit_queue.pop() { - self.spawn_worker(idle_tx.clone(), item); + self.spawn_worker(item, toolbox); + // This statement should always be the final else statement. + } else { + // Let the journal know that a worker is freed and there's nothing else + // for it to do. + if let Some(work_journal_tx) = &work_journal_tx { + // We don't care if this message was successfully sent, we only use the journal + // during testing. + let _ = work_journal_tx.try_send(NOTHING_TO_DO.to_string()); + } } } // There is no new work event and we are unable to spawn a new worker. @@ -681,16 +851,25 @@ impl BeaconProcessor { "work_id" => work_id ); } - // There is a new work event and the chain is not syncing. Process it. + // There is a new work event and the chain is not syncing. Process it or queue + // it. Some(WorkEvent { work, .. }) => { let work_id = work.str_id(); + let toolbox = Toolbox { + idle_tx: idle_tx.clone(), + delayed_block_tx: pre_delay_block_queue_tx.clone(), + }; + match work { - _ if can_spawn => self.spawn_worker(idle_tx.clone(), work), + _ if can_spawn => self.spawn_worker(work, toolbox), Work::GossipAttestation { .. } => attestation_queue.push(work), Work::GossipAggregate { .. } => aggregate_queue.push(work), Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } + Work::DelayedImportBlock { .. } => { + delayed_block_queue.push(work, work_id, &self.log) + } Work::GossipVoluntaryExit { .. } => { gossip_voluntary_exit_queue.push(work, work_id, &self.log) } @@ -779,7 +958,10 @@ impl BeaconProcessor { /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. - fn spawn_worker(&mut self, idle_tx: mpsc::Sender<()>, work: Work) { + fn spawn_worker(&mut self, work: Work, toolbox: Toolbox) { + let idle_tx = toolbox.idle_tx; + let delayed_block_tx = toolbox.delayed_block_tx; + // Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped. // // This helps ensure that the worker is always freed in the case of an early exit or panic. @@ -873,7 +1055,21 @@ impl BeaconProcessor { peer_id, block, seen_timestamp, - } => worker.process_gossip_block(message_id, peer_id, *block, seen_timestamp), + } => worker.process_gossip_block( + message_id, + peer_id, + *block, + delayed_block_tx, + seen_timestamp, + ), + /* + * Import for blocks that we received earlier than their intended slot. + */ + Work::DelayedImportBlock { + peer_id, + block, + seen_timestamp, + } => worker.process_gossip_verified_block(peer_id, *block, seen_timestamp), /* * Voluntary exits received on gossip. */ diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs new file mode 100644 index 000000000..69fefe219 --- /dev/null +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -0,0 +1,499 @@ +#![cfg(not(debug_assertions))] // Tests are too slow in debug. +#![cfg(test)] + +use crate::beacon_processor::*; +use crate::{service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::{ + test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, + BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY, +}; +use discv5::enr::{CombinedKey, EnrBuilder}; +use environment::{null_logger, Environment, EnvironmentBuilder}; +use eth2_libp2p::{rpc::methods::MetaData, types::EnrBitfield, MessageId, NetworkGlobals, PeerId}; +use slot_clock::SlotClock; +use std::cmp; +use std::iter::Iterator; +use std::sync::Arc; +use std::time::Duration; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use types::{ + test_utils::generate_deterministic_keypairs, Attestation, AttesterSlashing, MainnetEthSpec, + ProposerSlashing, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, +}; + +type E = MainnetEthSpec; +type T = EphemeralHarnessType; + +const SLOTS_PER_EPOCH: u64 = 32; +const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize; +const SMALL_CHAIN: u64 = 2; +const LONG_CHAIN: u64 = SLOTS_PER_EPOCH * 2; + +const TCP_PORT: u16 = 42; +const UDP_PORT: u16 = 42; +const SEQ_NUMBER: u64 = 0; + +/// The default time to wait for `BeaconProcessor` events. +const STANDARD_TIMEOUT: Duration = Duration::from_secs(10); + +/// Provides utilities for testing the `BeaconProcessor`. +struct TestRig { + chain: Arc>, + next_block: SignedBeaconBlock, + attestations: Vec<(Attestation, SubnetId)>, + attester_slashing: AttesterSlashing, + proposer_slashing: ProposerSlashing, + voluntary_exit: SignedVoluntaryExit, + beacon_processor_tx: mpsc::Sender>, + work_journal_rx: mpsc::Receiver, + _network_rx: mpsc::UnboundedReceiver>, + _sync_rx: mpsc::UnboundedReceiver>, + environment: Option>, +} + +/// This custom drop implementation ensures that we shut down the tokio runtime gracefully. Without +/// it, tests will hang indefinitely. +impl Drop for TestRig { + fn drop(&mut self) { + // Causes the beacon processor to shutdown. + self.beacon_processor_tx = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN).0; + self.environment.take().unwrap().shutdown_on_idle(); + } +} + +impl TestRig { + pub fn new(chain_length: u64) -> Self { + let mut harness = BeaconChainHarness::new( + MainnetEthSpec, + generate_deterministic_keypairs(VALIDATOR_COUNT), + ); + + harness.advance_slot(); + + for _ in 0..chain_length { + harness.extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + harness.advance_slot(); + } + + let head = harness.chain.head().unwrap(); + + assert_eq!( + harness.chain.slot().unwrap(), + head.beacon_block.slot() + 1, + "precondition: current slot is one after head" + ); + + let (next_block, _next_state) = + harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); + + let attestations = harness + .get_unaggregated_attestations( + &AttestationStrategy::AllValidators, + &head.beacon_state, + head.beacon_block_root, + harness.chain.slot().unwrap(), + ) + .into_iter() + // .map(|vec| vec.into_iter().map(|(attestation, _subnet_id)| attestation)) + .flatten() + .collect::>(); + + assert!( + !attestations.is_empty(), + "precondition: attestations for testing" + ); + + let attester_slashing = harness.make_attester_slashing(vec![0, 1]); + let proposer_slashing = harness.make_proposer_slashing(2); + let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); + + // Changing this *after* the chain has been initialized is a bit cheeky, but it shouldn't + // cause issue. + // + // This allows for testing voluntary exits without building out a massive chain. + harness.chain.spec.shard_committee_period = 2; + + let chain = Arc::new(harness.chain); + + let (network_tx, _network_rx) = mpsc::unbounded_channel(); + + let log = null_logger().unwrap(); + + let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); + let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); + + // Default metadata + let meta_data = MetaData { + seq_number: SEQ_NUMBER, + attnets: EnrBitfield::::default(), + }; + let enr_key = CombinedKey::generate_secp256k1(); + let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); + let network_globals = Arc::new(NetworkGlobals::new( + enr, + TCP_PORT, + UDP_PORT, + meta_data, + vec![], + &log, + )); + + let mut environment = EnvironmentBuilder::mainnet() + .null_logger() + .unwrap() + .multi_threaded_tokio_runtime() + .unwrap() + .build() + .unwrap(); + + let executor = environment.core_context().executor; + + let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364); + + BeaconProcessor { + beacon_chain: Arc::downgrade(&chain), + network_tx, + sync_tx, + network_globals, + executor, + max_workers: cmp::max(1, num_cpus::get()), + current_workers: 0, + log: log.clone(), + } + .spawn_manager(beacon_processor_rx, Some(work_journal_tx)); + + Self { + chain, + next_block, + attestations, + attester_slashing, + proposer_slashing, + voluntary_exit, + beacon_processor_tx, + work_journal_rx, + _network_rx, + _sync_rx, + environment: Some(environment), + } + } + + pub fn enqueue_gossip_block(&self) { + self.beacon_processor_tx + .try_send(WorkEvent::gossip_beacon_block( + junk_message_id(), + junk_peer_id(), + Box::new(self.next_block.clone()), + Duration::from_secs(0), + )) + .unwrap(); + } + + pub fn enqueue_unaggregated_attestation(&self) { + let (attestation, subnet_id) = self.attestations.first().unwrap().clone(); + self.beacon_processor_tx + .try_send(WorkEvent::unaggregated_attestation( + junk_message_id(), + junk_peer_id(), + attestation, + subnet_id, + true, + Duration::from_secs(0), + )) + .unwrap(); + } + + pub fn enqueue_gossip_attester_slashing(&self) { + self.beacon_processor_tx + .try_send(WorkEvent::gossip_attester_slashing( + junk_message_id(), + junk_peer_id(), + Box::new(self.attester_slashing.clone()), + )) + .unwrap(); + } + + pub fn enqueue_gossip_proposer_slashing(&self) { + self.beacon_processor_tx + .try_send(WorkEvent::gossip_proposer_slashing( + junk_message_id(), + junk_peer_id(), + Box::new(self.proposer_slashing.clone()), + )) + .unwrap(); + } + + pub fn enqueue_gossip_voluntary_exit(&self) { + self.beacon_processor_tx + .try_send(WorkEvent::gossip_voluntary_exit( + junk_message_id(), + junk_peer_id(), + Box::new(self.voluntary_exit.clone()), + )) + .unwrap(); + } + + fn runtime(&mut self) -> Arc { + self.environment + .as_mut() + .unwrap() + .core_context() + .executor + .runtime() + .upgrade() + .unwrap() + } + + /// Assert that the `BeaconProcessor` doesn't produce any events in the given `duration`. + pub fn assert_no_events_for(&mut self, duration: Duration) { + self.runtime().block_on(async { + tokio::select! { + _ = tokio::time::sleep(duration) => (), + event = self.work_journal_rx.recv() => panic!( + "received {:?} within {:?} when expecting no events", + event, + duration + ), + } + }) + } + + /// Assert that the `BeaconProcessor` event journal is as `expected`. + /// + /// ## Note + /// + /// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense + /// to use the `NOTHING_TO_DO` event to ensure that execution has completed. + pub fn assert_event_journal(&mut self, expected: &[&str]) { + let events = self.runtime().block_on(async { + let mut events = vec![]; + + let drain_future = async { + loop { + match self.work_journal_rx.recv().await { + Some(event) => { + events.push(event); + + // Break as soon as we collect the desired number of events. + if events.len() >= expected.len() { + break; + } + } + None => break, + } + } + }; + + // Drain the expected number of events from the channel, or time out and give up. + tokio::select! { + _ = tokio::time::sleep(STANDARD_TIMEOUT) => panic!( + "timeout ({:?}) expired waiting for events. expected {:?} but got {:?}", + STANDARD_TIMEOUT, + expected, + events + ), + _ = drain_future => {}, + } + + events + }); + + assert_eq!( + events, + expected + .into_iter() + .map(|s| s.to_string()) + .collect::>() + ); + } +} + +fn junk_peer_id() -> PeerId { + PeerId::random() +} + +fn junk_message_id() -> MessageId { + MessageId::new(&[]) +} + +/// Blocks that arrive early should be queued for later processing. +#[test] +fn import_gossip_block_acceptably_early() { + let mut rig = TestRig::new(SMALL_CHAIN); + + let slot_start = rig + .chain + .slot_clock + .start_of(rig.next_block.slot()) + .unwrap(); + + rig.chain + .slot_clock + .set_current_time(slot_start - MAXIMUM_GOSSIP_CLOCK_DISPARITY); + + assert_eq!( + rig.chain.slot().unwrap(), + rig.next_block.slot() - 1, + "chain should be at the correct slot" + ); + + rig.enqueue_gossip_block(); + + rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]); + + // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock + // and check the head in the time between the block arrived early and when its due for + // processing. + // + // If this causes issues we might be able to make the block delay queue add a longer delay for + // processing, instead of just MAXIMUM_GOSSIP_CLOCK_DISPARITY. Speak to @paulhauner if this test + // starts failing. + rig.chain.slot_clock.set_slot(rig.next_block.slot().into()); + assert!( + rig.chain.head().unwrap().beacon_block_root != rig.next_block.canonical_root(), + "block not yet imported" + ); + + rig.assert_event_journal(&[DELAYED_IMPORT_BLOCK, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.head().unwrap().beacon_block_root, + rig.next_block.canonical_root(), + "block should be imported and become head" + ); +} + +/// Blocks that are *too* early shouldn't get into the delay queue. +#[test] +fn import_gossip_block_unacceptably_early() { + let mut rig = TestRig::new(SMALL_CHAIN); + + let slot_start = rig + .chain + .slot_clock + .start_of(rig.next_block.slot()) + .unwrap(); + + rig.chain + .slot_clock + .set_current_time(slot_start - MAXIMUM_GOSSIP_CLOCK_DISPARITY - Duration::from_millis(1)); + + assert_eq!( + rig.chain.slot().unwrap(), + rig.next_block.slot() - 1, + "chain should be at the correct slot" + ); + + rig.enqueue_gossip_block(); + + rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]); + + // Waiting for 5 seconds is a bit arbtirary, however it *should* be long enough to ensure the + // block isn't imported. + rig.assert_no_events_for(Duration::from_secs(5)); + + assert!( + rig.chain.head().unwrap().beacon_block_root != rig.next_block.canonical_root(), + "block should not be imported" + ); +} + +/// Blocks that arrive on-time should be processed normally. +#[test] +fn import_gossip_block_at_current_slot() { + let mut rig = TestRig::new(SMALL_CHAIN); + + assert_eq!( + rig.chain.slot().unwrap(), + rig.next_block.slot(), + "chain should be at the correct slot" + ); + + rig.enqueue_gossip_block(); + + rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.head().unwrap().beacon_block_root, + rig.next_block.canonical_root(), + "block should be imported and become head" + ); +} + +/// Ensure a valid attestation can be imported. +#[test] +fn import_gossip_attestation() { + let mut rig = TestRig::new(SMALL_CHAIN); + + let initial_attns = rig.chain.naive_aggregation_pool.read().num_attestations(); + + rig.enqueue_unaggregated_attestation(); + + rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.naive_aggregation_pool.read().num_attestations(), + initial_attns + 1, + "op pool should have one more attestation" + ); +} + +/// Ensure a bunch of valid operations can be imported. +#[test] +fn import_misc_gossip_ops() { + // Exits need the long chain so validators aren't too young to exit. + let mut rig = TestRig::new(LONG_CHAIN); + + /* + * Attester slashing + */ + + let initial_attester_slashings = rig.chain.op_pool.num_attester_slashings(); + + rig.enqueue_gossip_attester_slashing(); + + rig.assert_event_journal(&[GOSSIP_ATTESTER_SLASHING, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.op_pool.num_attester_slashings(), + initial_attester_slashings + 1, + "op pool should have one more attester slashing" + ); + + /* + * Proposer slashing + */ + + let initial_proposer_slashings = rig.chain.op_pool.num_proposer_slashings(); + + rig.enqueue_gossip_proposer_slashing(); + + rig.assert_event_journal(&[GOSSIP_PROPOSER_SLASHING, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.op_pool.num_proposer_slashings(), + initial_proposer_slashings + 1, + "op pool should have one more proposer slashing" + ); + + /* + * Voluntary exit + */ + + let initial_voluntary_exits = rig.chain.op_pool.num_voluntary_exits(); + + rig.enqueue_gossip_voluntary_exit(); + + rig.assert_event_journal(&[GOSSIP_VOLUNTARY_EXIT, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.op_pool.num_voluntary_exits(), + initial_voluntary_exits + 1, + "op pool should have one more exit" + ); +} diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index d9bf65b5e..4b3fd716b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -4,18 +4,20 @@ use beacon_chain::{ attestation_verification::{Error as AttnError, SignatureVerifiedAttestation}, observed_operations::ObservationOutcome, validator_monitor::get_block_delay_ms, - BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, + BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, }; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use slog::{debug, error, info, trace, warn}; +use slot_clock::SlotClock; use ssz::Encode; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -use super::Worker; +use super::{super::block_delay_queue::QueuedBlock, Worker}; impl Worker { /* Auxiliary functions */ @@ -236,6 +238,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, block: SignedBeaconBlock, + delayed_import_tx: mpsc::Sender>, seen_duration: Duration, ) { // Log metrics to track delay from other nodes on the network. @@ -324,6 +327,80 @@ impl Worker { &self.chain.slot_clock, ); + let block_slot = verified_block.block.slot(); + let block_root = verified_block.block_root; + + // Try read the current slot to determine if this block should be imported now or after some + // delay. + match self.chain.slot() { + // We only need to do a simple check about the block slot and the current slot since the + // `verify_block_for_gossip` function already ensures that the block is within the + // tolerance for block imports. + Ok(current_slot) if block_slot > current_slot => { + warn!( + self.log, + "Block arrived early"; + "block_slot" => %block_slot, + "block_root" => %block_root, + "msg" => "if this happens consistently, check system clock" + ); + + // Take note of how early this block arrived. + if let Some(duration) = self + .chain + .slot_clock + .start_of(block_slot) + .and_then(|start| start.checked_sub(seen_duration)) + { + metrics::observe_duration( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_EARLY_SECONDS, + duration, + ); + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL); + + if delayed_import_tx + .try_send(QueuedBlock { + peer_id, + block: verified_block, + seen_timestamp: seen_duration, + }) + .is_err() + { + error!( + self.log, + "Failed to defer block import"; + "block_slot" => %block_slot, + "block_root" => %block_root, + "location" => "block gossip" + ) + } + } + Ok(_) => self.process_gossip_verified_block(peer_id, verified_block, seen_duration), + Err(e) => { + error!( + self.log, + "Failed to defer block import"; + "error" => ?e, + "block_slot" => %block_slot, + "block_root" => %block_root, + "location" => "block gossip" + ) + } + } + } + + /// Process the beacon block that has already passed gossip verification. + /// + /// Raises a log if there are errors. + pub fn process_gossip_verified_block( + self, + peer_id: PeerId, + verified_block: GossipVerifiedBlock, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { let block = Box::new(verified_block.block.clone()); match self.chain.process_block(verified_block) { diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index 40a863303..1ac5a863c 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -1,3 +1,4 @@ +use super::QueuedBlock; use crate::{service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use slog::{error, Logger}; @@ -41,3 +42,9 @@ impl Worker { }); } } + +/// Contains the necessary items for a worker to do their job. +pub struct Toolbox { + pub idle_tx: mpsc::Sender<()>, + pub delayed_block_tx: mpsc::Sender>, +} diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 11f04ff86..f04cf8d7e 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -215,6 +215,14 @@ lazy_static! { "beacon_processor_gossip_block_imported_total", "Total number of gossip blocks imported to fork choice, etc." ); + pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL: Result = try_create_int_counter( + "beacon_processor_gossip_block_requeued_total", + "Total number of gossip blocks that arrived early and were re-queued for later processing." + ); + pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_EARLY_SECONDS: Result = try_create_histogram( + "beacon_processor_gossip_block_early_seconds", + "Whenever a gossip block is received early this metrics is set to how early that block was." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_exit_queue_total", diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d70263060..590d52da6 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -26,7 +26,7 @@ pub struct Processor { /// A network context to return and handle RPC requests. network: HandlerNetworkContext, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: mpsc::Sender>, /// The `RPCHandler` logger. log: slog::Logger, } @@ -64,7 +64,7 @@ impl Processor { current_workers: 0, log: log.clone(), } - .spawn_manager(beacon_processor_receive); + .spawn_manager(beacon_processor_receive, None); Processor { chain: beacon_chain, @@ -309,7 +309,7 @@ impl Processor { )) } - fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent) { + fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent) { self.beacon_processor_send .try_send(work) .unwrap_or_else(|e| { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 88e435422..3b092846f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -179,7 +179,7 @@ pub struct SyncManager { single_block_lookups: FnvHashMap, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: mpsc::Sender>, /// The logger for the import manager. log: Logger, @@ -210,7 +210,7 @@ pub fn spawn( beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> mpsc::UnboundedSender> { assert!( diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 7483d1d10..e28f01881 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -96,7 +96,7 @@ pub struct SyncingChain { validated_batches: u64, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: Sender>, + beacon_processor_send: Sender>, /// The chain's log. log: slog::Logger, @@ -123,7 +123,7 @@ impl SyncingChain { target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, - beacon_processor_send: Sender>, + beacon_processor_send: Sender>, log: &slog::Logger, ) -> Self { let mut peers = FnvHashMap::default(); diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 0111b05a5..89d49121b 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -194,7 +194,7 @@ impl ChainCollection { network: &mut SyncNetworkContext, local: &SyncInfo, awaiting_head_peers: &mut HashMap, - beacon_processor_send: &mpsc::Sender>, + beacon_processor_send: &mpsc::Sender>, ) { // Remove any outdated finalized/head chains self.purge_outdated_chains(local, awaiting_head_peers); @@ -328,7 +328,7 @@ impl ChainCollection { local_epoch: Epoch, local_head_epoch: Epoch, awaiting_head_peers: &mut HashMap, - beacon_processor_send: &mpsc::Sender>, + beacon_processor_send: &mpsc::Sender>, ) { // Include the awaiting head peers for (peer_id, peer_sync_info) in awaiting_head_peers.drain() { @@ -463,7 +463,7 @@ impl ChainCollection { target_head_slot: Slot, peer: PeerId, sync_type: RangeSyncType, - beacon_processor_send: &mpsc::Sender>, + beacon_processor_send: &mpsc::Sender>, network: &mut SyncNetworkContext, ) { let id = SyncingChain::::id(&target_head_root, &target_head_slot); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 1e48255f1..e291233bd 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -68,7 +68,7 @@ pub struct RangeSync { /// that need to be downloaded. chains: ChainCollection, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: mpsc::Sender>, /// The syncing logger. log: slog::Logger, } @@ -76,7 +76,7 @@ pub struct RangeSync { impl RangeSync { pub fn new( beacon_chain: Arc>, - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { RangeSync { diff --git a/common/slot_clock/src/lib.rs b/common/slot_clock/src/lib.rs index 36b4fcfcd..53f00b031 100644 --- a/common/slot_clock/src/lib.rs +++ b/common/slot_clock/src/lib.rs @@ -16,7 +16,7 @@ pub use types::Slot; /// A clock that reports the current slot. /// /// The clock is not required to be monotonically increasing and may go backwards. -pub trait SlotClock: Send + Sync + Sized { +pub trait SlotClock: Send + Sync + Sized + Clone { /// Creates a new slot clock where the first slot is `genesis_slot`, genesis occurred /// `genesis_duration` after the `UNIX_EPOCH` and each slot is `slot_duration` apart. fn new(genesis_slot: Slot, genesis_duration: Duration, slot_duration: Duration) -> Self; diff --git a/common/slot_clock/src/manual_slot_clock.rs b/common/slot_clock/src/manual_slot_clock.rs index ef45e07c1..567a6b4cd 100644 --- a/common/slot_clock/src/manual_slot_clock.rs +++ b/common/slot_clock/src/manual_slot_clock.rs @@ -37,6 +37,10 @@ impl ManualSlotClock { self.genesis_duration + self.slot_duration * slots_since_genesis; } + pub fn set_current_time(&self, duration: Duration) { + *self.current_time.write() = duration; + } + pub fn advance_slot(&self) { self.set_slot(self.now().unwrap().as_u64() + 1) }