diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index ba8243cb3..c62bbdeb3 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -36,22 +36,22 @@ //! task. use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; -use beacon_chain::{ - attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, - BlockError, ForkChoiceError, -}; -use chain_segment::handle_chain_segment; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use environment::TaskExecutor; use eth2_libp2p::{MessageId, NetworkGlobals, PeerId}; -use slog::{crit, debug, error, info, trace, warn, Logger}; -use ssz::Encode; +use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, oneshot}; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; +use types::{ + Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedVoluntaryExit, SubnetId, +}; +use worker::Worker; mod chain_segment; +mod worker; pub use chain_segment::ProcessId; @@ -78,6 +78,18 @@ const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024; /// before we start dropping them. const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `SignedVoluntaryExit` objects received on gossip that will be stored +/// before we start dropping them. +const MAX_GOSSIP_EXIT_QUEUE_LEN: usize = 4_096; + +/// The maximum number of queued `ProposerSlashing` objects received on gossip that will be stored +/// before we start dropping them. +const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096; + +/// The maximum number of queued `AttesterSlashing` objects received on gossip that will be stored +/// before we start dropping them. +const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096; + /// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that /// will be stored before we start dropping them. const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; @@ -242,6 +254,54 @@ impl WorkEvent { } } + /// Create a new `Work` event for some exit. + pub fn gossip_voluntary_exit( + message_id: MessageId, + peer_id: PeerId, + voluntary_exit: Box, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::GossipVoluntaryExit { + message_id, + peer_id, + voluntary_exit, + }, + } + } + + /// Create a new `Work` event for some proposer slashing. + pub fn gossip_proposer_slashing( + message_id: MessageId, + peer_id: PeerId, + proposer_slashing: Box, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::GossipProposerSlashing { + message_id, + peer_id, + proposer_slashing, + }, + } + } + + /// Create a new `Work` event for some attester slashing. + pub fn gossip_attester_slashing( + message_id: MessageId, + peer_id: PeerId, + attester_slashing: Box>, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::GossipAttesterSlashing { + message_id, + peer_id, + attester_slashing, + }, + } + } + /// Create a new `Work` event for some block, where the result from computation (if any) is /// sent to the other side of `result_tx`. pub fn rpc_beacon_block(block: Box>) -> (Self, BlockResultReceiver) { @@ -282,6 +342,21 @@ pub enum Work { peer_id: PeerId, block: Box>, }, + GossipVoluntaryExit { + message_id: MessageId, + peer_id: PeerId, + voluntary_exit: Box, + }, + GossipProposerSlashing { + message_id: MessageId, + peer_id: PeerId, + proposer_slashing: Box, + }, + GossipAttesterSlashing { + message_id: MessageId, + peer_id: PeerId, + attester_slashing: Box>, + }, RpcBlock { block: Box>, result_tx: BlockResultSender, @@ -299,6 +374,9 @@ impl Work { Work::GossipAttestation { .. } => "gossip_attestation", Work::GossipAggregate { .. } => "gossip_aggregate", Work::GossipBlock { .. } => "gossip_block", + Work::GossipVoluntaryExit { .. } => "gossip_voluntary_exit", + Work::GossipProposerSlashing { .. } => "gossip_proposer_slashing", + Work::GossipAttesterSlashing { .. } => "gossip_attester_slashing", Work::RpcBlock { .. } => "rpc_block", Work::ChainSegment { .. } => "chain_segment", } @@ -351,21 +429,33 @@ impl BeaconProcessor { pub fn spawn_manager(mut self, mut event_rx: mpsc::Receiver>) { let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); + // Using LIFO queues for attestations since validator profits rely upon getting fresh + // attestations into blocks. Additionally, later attestations contain more information than + // earlier ones, so we consider them more valuable. let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN); let mut aggregate_debounce = TimeLatch::default(); - let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); let mut attestation_debounce = TimeLatch::default(); - let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); + // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have + // a strong feeling about queue type for exits. + let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN); + // Using a FIFO queue for slashing to prevent people from flushing their slashings from the + // queues with lots of junk messages. + let mut gossip_proposer_slashing_queue = + FifoQueue::new(MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN); + let mut gossip_attester_slashing_queue = + FifoQueue::new(MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN); + + // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); - let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); + let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let executor = self.executor.clone(); - // The manager future will run on the non-blocking executor and delegate tasks to worker + // The manager future will run on the core executor and delegate tasks to worker // threads on the blocking executor. let manager_future = async move { loop { @@ -452,6 +542,18 @@ impl BeaconProcessor { self.spawn_worker(idle_tx.clone(), item); } else if let Some(item) = attestation_queue.pop() { self.spawn_worker(idle_tx.clone(), item); + // Check slashings after all other consensus messages so we prioritize + // following head. + // + // Check attester slashings before proposer slashings since they have the + // potential to slash multiple validators at once. + } else if let Some(item) = gossip_attester_slashing_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + } else if let Some(item) = gossip_proposer_slashing_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + // Check exits last since our validators don't get rewards from them. + } else if let Some(item) = gossip_voluntary_exit_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); } } // There is no new work event and we are unable to spawn a new worker. @@ -491,6 +593,15 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } + Work::GossipVoluntaryExit { .. } => { + gossip_voluntary_exit_queue.push(work, work_id, &self.log) + } + Work::GossipProposerSlashing { .. } => { + gossip_proposer_slashing_queue.push(work, work_id, &self.log) + } + Work::GossipAttesterSlashing { .. } => { + gossip_attester_slashing_queue.push(work, work_id, &self.log) + } Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log), Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) @@ -523,6 +634,18 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, chain_segment_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL, + gossip_voluntary_exit_queue.len() as i64, + ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL, + gossip_proposer_slashing_queue.len() as i64, + ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL, + gossip_attester_slashing_queue.len() as i64, + ); if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( @@ -544,7 +667,7 @@ impl BeaconProcessor { } }; - // Spawn on the non-blocking executor. + // Spawn on the core executor. executor.spawn(manager_future, MANAGER_TASK_NAME); } @@ -574,11 +697,16 @@ impl BeaconProcessor { return; }; - let network_tx = self.network_tx.clone(); - let sync_tx = self.sync_tx.clone(); let log = self.log.clone(); let executor = self.executor.clone(); + let worker = Worker { + chain, + network_tx: self.network_tx.clone(), + sync_tx: self.sync_tx.clone(), + log: self.log.clone(), + }; + trace!( self.log, "Spawning beacon processor worker"; @@ -589,298 +717,85 @@ impl BeaconProcessor { executor.spawn_blocking( move || { let _worker_timer = worker_timer; - let inner_log = log.clone(); - // We use this closure pattern to avoid using a `return` that prevents the - // `idle_tx` message from sending. - let handler = || { - let log = inner_log.clone(); - match work { - /* - * Unaggregated attestation verification. - */ - Work::GossipAttestation { - message_id, - peer_id, - attestation, - subnet_id, - should_import, - } => { - let beacon_block_root = attestation.data.beacon_block_root; - - let attestation = match chain - .verify_unaggregated_attestation_for_gossip(*attestation, subnet_id) - { - Ok(attestation) => attestation, - Err(e) => { - handle_attestation_verification_failure( - &log, - sync_tx, - peer_id, - beacon_block_root, - "unaggregated", - e, - ); - return; - } - }; - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); - - if !should_import { - return; - } - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, - ); - - if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) { - match e { - BeaconChainError::ForkChoiceError( - ForkChoiceError::InvalidAttestation(e), - ) => debug!( - log, - "Attestation invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - e => error!( - log, - "Error applying attestation to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - } - } - - if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { - debug!( - log, - "Attestation invalid for agg pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, - ); - } - /* - * Aggregated attestation verification. - */ - Work::GossipAggregate { - message_id, - peer_id, - aggregate, - } => { - let beacon_block_root = - aggregate.message.aggregate.data.beacon_block_root; - - let aggregate = - match chain.verify_aggregated_attestation_for_gossip(*aggregate) { - Ok(aggregate) => aggregate, - Err(e) => { - handle_attestation_verification_failure( - &log, - sync_tx, - peer_id, - beacon_block_root, - "aggregated", - e, - ); - return; - } - }; - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, - ); - - if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) { - match e { - BeaconChainError::ForkChoiceError( - ForkChoiceError::InvalidAttestation(e), - ) => debug!( - log, - "Aggregate invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - e => error!( - log, - "Error applying aggregate to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - } - } - - if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) { - debug!( - log, - "Attestation invalid for op pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, - ); - } - /* - * Verification for beacon blocks received on gossip. - */ - Work::GossipBlock { - message_id, - peer_id, - block, - } => { - let verified_block = match chain.verify_block_for_gossip(*block) { - Ok(verified_block) => { - info!( - log, - "New block received"; - "slot" => verified_block.block.slot(), - "hash" => verified_block.block_root.to_string() - ); - propagate_gossip_message( - network_tx, - message_id, - peer_id.clone(), - &log, - ); - verified_block - } - Err(BlockError::ParentUnknown(block)) => { - send_sync_message( - sync_tx, - SyncMessage::UnknownBlock(peer_id, block), - &log, - ); - return; - } - Err(BlockError::BlockIsAlreadyKnown) => { - debug!( - log, - "Gossip block is already known"; - ); - return; - } - Err(e) => { - warn!( - log, - "Could not verify block for gossip"; - "error" => format!("{:?}", e) - ); - return; - } - }; - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL, - ); - - let block = Box::new(verified_block.block.clone()); - match chain.process_block(verified_block) { - Ok(_block_root) => { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL, - ); - - trace!( - log, - "Gossipsub block processed"; - "peer_id" => peer_id.to_string() - ); - - // TODO: It would be better if we can run this _after_ we publish the block to - // reduce block propagation latency. - // - // The `MessageHandler` would be the place to put this, however it doesn't seem - // to have a reference to the `BeaconChain`. I will leave this for future - // works. - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "block gossip" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "block gossip" - ), - } - } - Err(BlockError::ParentUnknown { .. }) => { - // Inform the sync manager to find parents for this block - // This should not occur. It should be checked by `should_forward_block` - error!( - log, - "Block with unknown parent attempted to be processed"; - "peer_id" => peer_id.to_string() - ); - send_sync_message( - sync_tx, - SyncMessage::UnknownBlock(peer_id, block), - &log, - ); - } - other => { - debug!( - log, - "Invalid gossip beacon block"; - "outcome" => format!("{:?}", other), - "block root" => format!("{}", block.canonical_root()), - "block slot" => block.slot() - ); - trace!( - log, - "Invalid gossip beacon block ssz"; - "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), - ); - } - }; - } - /* - * Verification for beacon blocks received during syncing via RPC. - */ - Work::RpcBlock { block, result_tx } => { - let block_result = chain.process_block(*block); - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL, - ); - - if result_tx.send(block_result).is_err() { - crit!(log, "Failed return sync block result"); - } - } - /* - * Verification for a chain segment (multiple blocks). - */ - Work::ChainSegment { process_id, blocks } => { - handle_chain_segment(chain, process_id, blocks, sync_tx, log) - } - }; + match work { + /* + * Unaggregated attestation verification. + */ + Work::GossipAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + } => worker.process_gossip_attestation( + message_id, + peer_id, + *attestation, + subnet_id, + should_import, + ), + /* + * Aggregated attestation verification. + */ + Work::GossipAggregate { + message_id, + peer_id, + aggregate, + } => worker.process_gossip_aggregate(message_id, peer_id, *aggregate), + /* + * Verification for beacon blocks received on gossip. + */ + Work::GossipBlock { + message_id, + peer_id, + block, + } => worker.process_gossip_block(message_id, peer_id, *block), + /* + * Voluntary exits received on gossip. + */ + Work::GossipVoluntaryExit { + message_id, + peer_id, + voluntary_exit, + } => worker.process_gossip_voluntary_exit(message_id, peer_id, *voluntary_exit), + /* + * Proposer slashings received on gossip. + */ + Work::GossipProposerSlashing { + message_id, + peer_id, + proposer_slashing, + } => worker.process_gossip_proposer_slashing( + message_id, + peer_id, + *proposer_slashing, + ), + /* + * Attester slashings received on gossip. + */ + Work::GossipAttesterSlashing { + message_id, + peer_id, + attester_slashing, + } => worker.process_gossip_attester_slashing( + message_id, + peer_id, + *attester_slashing, + ), + /* + * Verification for beacon blocks received during syncing via RPC. + */ + Work::RpcBlock { block, result_tx } => { + worker.process_rpc_block(*block, result_tx) + } + /* + * Verification for a chain segment (multiple blocks). + */ + Work::ChainSegment { process_id, blocks } => { + worker.process_chain_segment(process_id, blocks) + } }; - handler(); trace!( log, @@ -902,300 +817,3 @@ impl BeaconProcessor { ); } } - -/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on -/// the gossip network. -/// -/// Creates a log if there is an interal error. -fn propagate_gossip_message( - network_tx: mpsc::UnboundedSender>, - message_id: MessageId, - peer_id: PeerId, - log: &Logger, -) { - network_tx - .send(NetworkMessage::Validate { - propagation_source: peer_id, - message_id, - }) - .unwrap_or_else(|_| { - warn!( - log, - "Could not send propagation request to the network service" - ) - }); -} - -/// Send a message to `sync_tx`. -/// -/// Creates a log if there is an interal error. -fn send_sync_message( - sync_tx: mpsc::UnboundedSender>, - message: SyncMessage, - log: &Logger, -) { - sync_tx - .send(message) - .unwrap_or_else(|_| error!(log, "Could not send message to the sync service")); -} - -/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the -/// network. -pub fn handle_attestation_verification_failure( - log: &Logger, - sync_tx: mpsc::UnboundedSender>, - peer_id: PeerId, - beacon_block_root: Hash256, - attestation_type: &str, - error: AttnError, -) { - metrics::register_attestation_error(&error); - match &error { - AttnError::FutureEpoch { .. } - | AttnError::PastEpoch { .. } - | AttnError::FutureSlot { .. } - | AttnError::PastSlot { .. } => { - /* - * These errors can be triggered by a mismatch between our slot and the peer. - * - * - * The peer has published an invalid consensus message, _only_ if we trust our own clock. - */ - } - AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { - /* - * These errors are caused by invalid signatures. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::EmptyAggregationBitfield => { - /* - * The aggregate had no signatures and is therefore worthless. - * - * Whilst we don't gossip this attestation, this act is **not** a clear - * violation of the spec nor indication of fault. - * - * This may change soon. Reference: - * - * https://github.com/ethereum/eth2.0-specs/pull/1732 - */ - } - AttnError::AggregatorPubkeyUnknown(_) => { - /* - * The aggregator index was higher than any known validator index. This is - * possible in two cases: - * - * 1. The attestation is malformed - * 2. The attestation attests to a beacon_block_root that we do not know. - * - * It should be impossible to reach (2) without triggering - * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is - * faulty. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AggregatorNotInCommittee { .. } => { - /* - * The aggregator index was higher than any known validator index. This is - * possible in two cases: - * - * 1. The attestation is malformed - * 2. The attestation attests to a beacon_block_root that we do not know. - * - * It should be impossible to reach (2) without triggering - * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is - * faulty. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AttestationAlreadyKnown { .. } => { - /* - * The aggregate attestation has already been observed on the network or in - * a block. - * - * The peer is not necessarily faulty. - */ - trace!( - log, - "Attestation already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), - ); - return; - } - AttnError::AggregatorAlreadyKnown(_) => { - /* - * There has already been an aggregate attestation seen from this - * aggregator index. - * - * The peer is not necessarily faulty. - */ - trace!( - log, - "Aggregator already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), - ); - return; - } - AttnError::PriorAttestationKnown { .. } => { - /* - * We have already seen an attestation from this validator for this epoch. - * - * The peer is not necessarily faulty. - */ - trace!( - log, - "Prior attestation known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), - ); - return; - } - AttnError::ValidatorIndexTooHigh(_) => { - /* - * The aggregator index (or similar field) was higher than the maximum - * possible number of validators. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::UnknownHeadBlock { beacon_block_root } => { - // Note: its a little bit unclear as to whether or not this block is unknown or - // just old. See: - // - // https://github.com/sigp/lighthouse/issues/1039 - - // TODO: Maintain this attestation and re-process once sync completes - debug!( - log, - "Attestation for unknown block"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root) - ); - // we don't know the block, get the sync manager to handle the block lookup - sync_tx - .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) - .unwrap_or_else(|_| { - warn!( - log, - "Failed to send to sync service"; - "msg" => "UnknownBlockHash" - ) - }); - return; - } - AttnError::UnknownTargetRoot(_) => { - /* - * The block indicated by the target root is not known to us. - * - * We should always get `AttnError::UnknwonHeadBlock` before we get this - * error, so this means we can get this error if: - * - * 1. The target root does not represent a valid block. - * 2. We do not have the target root in our DB. - * - * For (2), we should only be processing attestations when we should have - * all the available information. Note: if we do a weak-subjectivity sync - * it's possible that this situation could occur, but I think it's - * unlikely. For now, we will declare this to be an invalid message> - * - * The peer has published an invalid consensus message. - */ - } - AttnError::BadTargetEpoch => { - /* - * The aggregator index (or similar field) was higher than the maximum - * possible number of validators. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::NoCommitteeForSlotAndIndex { .. } => { - /* - * It is not possible to attest this the given committee in the given slot. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::NotExactlyOneAggregationBitSet(_) => { - /* - * The unaggregated attestation doesn't have only one signature. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AttestsToFutureBlock { .. } => { - /* - * The beacon_block_root is from a higher slot than the attestation. - * - * The peer has published an invalid consensus message. - */ - } - - AttnError::InvalidSubnetId { received, expected } => { - /* - * The attestation was received on an incorrect subnet id. - */ - debug!( - log, - "Received attestation on incorrect subnet"; - "expected" => format!("{:?}", expected), - "received" => format!("{:?}", received), - ) - } - AttnError::Invalid(_) => { - /* - * The attestation failed the state_processing verification. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::TooManySkippedSlots { - head_block_slot, - attestation_slot, - } => { - /* - * The attestation references a head block that is too far behind the attestation slot. - * - * The message is not necessarily invalid, but we choose to ignore it. - */ - debug!( - log, - "Rejected long skip slot attestation"; - "head_block_slot" => head_block_slot, - "attestation_slot" => attestation_slot, - ) - } - AttnError::BeaconChainError(e) => { - /* - * Lighthouse hit an unexpected error whilst processing the attestation. It - * should be impossible to trigger a `BeaconChainError` from the network, - * so we have a bug. - * - * It's not clear if the message is invalid/malicious. - */ - error!( - log, - "Unable to validate aggregate"; - "peer_id" => peer_id.to_string(), - "error" => format!("{:?}", e), - ); - } - } - - debug!( - log, - "Invalid attestation from network"; - "reason" => format!("{:?}", error), - "block" => format!("{}", beacon_block_root), - "peer_id" => peer_id.to_string(), - "type" => format!("{:?}", attestation_type), - ); -} diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker.rs new file mode 100644 index 000000000..7d6ba3d42 --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker.rs @@ -0,0 +1,726 @@ +use super::{ + chain_segment::{handle_chain_segment, ProcessId}, + BlockResultSender, +}; +use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::{ + attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, + BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, +}; +use eth2_libp2p::{MessageId, PeerId}; +use slog::{crit, debug, error, info, trace, warn, Logger}; +use ssz::Encode; +use std::sync::Arc; +use tokio::sync::mpsc; +use types::{ + Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedVoluntaryExit, SubnetId, +}; + +/// Contains the context necessary to import blocks, attestations, etc to the beacon chain. +pub struct Worker { + pub chain: Arc>, + pub network_tx: mpsc::UnboundedSender>, + pub sync_tx: mpsc::UnboundedSender>, + pub log: Logger, +} + +impl Worker { + /// Process the unaggregated attestation received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to apply it to fork choice. + /// - Attempt to add it to the naive aggregation pool. + /// + /// Raises a log if there are errors. + pub fn process_gossip_attestation( + self, + message_id: MessageId, + peer_id: PeerId, + attestation: Attestation, + subnet_id: SubnetId, + should_import: bool, + ) { + let beacon_block_root = attestation.data.beacon_block_root; + + let attestation = match self + .chain + .verify_unaggregated_attestation_for_gossip(attestation, subnet_id) + { + Ok(attestation) => attestation, + Err(e) => { + self.handle_attestation_verification_failure( + peer_id, + beacon_block_root, + "unaggregated", + e, + ); + return; + } + }; + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_gossip_message(message_id, peer_id.clone()); + + if !should_import { + return; + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL); + + if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + debug!( + self.log, + "Attestation invalid for fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + e => error!( + self.log, + "Error applying attestation to fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + } + } + + if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) { + debug!( + self.log, + "Attestation invalid for agg pool"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL); + } + + /// Process the aggregated attestation received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to apply it to fork choice. + /// - Attempt to add it to the block inclusion pool. + /// + /// Raises a log if there are errors. + pub fn process_gossip_aggregate( + self, + message_id: MessageId, + peer_id: PeerId, + aggregate: SignedAggregateAndProof, + ) { + let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; + + let aggregate = match self + .chain + .verify_aggregated_attestation_for_gossip(aggregate) + { + Ok(aggregate) => aggregate, + Err(e) => { + self.handle_attestation_verification_failure( + peer_id, + beacon_block_root, + "aggregated", + e, + ); + return; + } + }; + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_gossip_message(message_id, peer_id.clone()); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); + + if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + debug!( + self.log, + "Aggregate invalid for fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + e => error!( + self.log, + "Error applying aggregate to fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + } + } + + if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) { + debug!( + self.log, + "Attestation invalid for op pool"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL); + } + + /// Process the beacon block received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to + /// be downloaded. + /// + /// Raises a log if there are errors. + pub fn process_gossip_block( + self, + message_id: MessageId, + peer_id: PeerId, + block: SignedBeaconBlock, + ) { + let verified_block = match self.chain.verify_block_for_gossip(block) { + Ok(verified_block) => { + info!( + self.log, + "New block received"; + "slot" => verified_block.block.slot(), + "hash" => verified_block.block_root.to_string() + ); + self.propagate_gossip_message(message_id, peer_id.clone()); + verified_block + } + Err(BlockError::ParentUnknown(block)) => { + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); + return; + } + Err(BlockError::BlockIsAlreadyKnown) => { + debug!( + self.log, + "Gossip block is already known"; + ); + return; + } + Err(e) => { + warn!( + self.log, + "Could not verify block for gossip"; + "error" => format!("{:?}", e) + ); + return; + } + }; + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); + + let block = Box::new(verified_block.block.clone()); + match self.chain.process_block(verified_block) { + Ok(_block_root) => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + + trace!( + self.log, + "Gossipsub block processed"; + "peer_id" => peer_id.to_string() + ); + + // TODO: It would be better if we can run this _after_ we publish the block to + // reduce block propagation latency. + // + // The `MessageHandler` would be the place to put this, however it doesn't seem + // to have a reference to the `BeaconChain`. I will leave this for future + // works. + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "block gossip" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "block gossip" + ), + } + } + Err(BlockError::ParentUnknown { .. }) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( + self.log, + "Block with unknown parent attempted to be processed"; + "peer_id" => peer_id.to_string() + ); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); + } + other => { + debug!( + self.log, + "Invalid gossip beacon block"; + "outcome" => format!("{:?}", other), + "block root" => format!("{}", block.canonical_root()), + "block slot" => block.slot() + ); + trace!( + self.log, + "Invalid gossip beacon block ssz"; + "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + } + }; + } + + pub fn process_gossip_voluntary_exit( + self, + message_id: MessageId, + peer_id: PeerId, + voluntary_exit: SignedVoluntaryExit, + ) { + let validator_index = voluntary_exit.message.validator_index; + + let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) { + Ok(ObservationOutcome::New(exit)) => exit, + Ok(ObservationOutcome::AlreadyKnown) => { + debug!( + self.log, + "Dropping exit for already exiting validator"; + "validator_index" => validator_index, + "peer" => peer_id.to_string() + ); + return; + } + Err(e) => { + debug!( + self.log, + "Dropping invalid exit"; + "validator_index" => validator_index, + "peer" => peer_id.to_string(), + "error" => format!("{:?}", e) + ); + return; + } + }; + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL); + + self.propagate_gossip_message(message_id, peer_id); + + self.chain.import_voluntary_exit(exit); + debug!(self.log, "Successfully imported voluntary exit"); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL); + } + + pub fn process_gossip_proposer_slashing( + self, + message_id: MessageId, + peer_id: PeerId, + proposer_slashing: ProposerSlashing, + ) { + let validator_index = proposer_slashing.signed_header_1.message.proposer_index; + + let slashing = match self + .chain + .verify_proposer_slashing_for_gossip(proposer_slashing) + { + Ok(ObservationOutcome::New(slashing)) => slashing, + Ok(ObservationOutcome::AlreadyKnown) => { + debug!( + self.log, + "Dropping proposer slashing"; + "reason" => "Already seen a proposer slashing for that validator", + "validator_index" => validator_index, + "peer" => peer_id.to_string() + ); + return; + } + Err(e) => { + debug!( + self.log, + "Dropping invalid proposer slashing"; + "validator_index" => validator_index, + "peer" => peer_id.to_string(), + "error" => format!("{:?}", e) + ); + return; + } + }; + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL); + + self.propagate_gossip_message(message_id, peer_id); + + self.chain.import_proposer_slashing(slashing); + debug!(self.log, "Successfully imported proposer slashing"); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL); + } + + pub fn process_gossip_attester_slashing( + self, + message_id: MessageId, + peer_id: PeerId, + attester_slashing: AttesterSlashing, + ) { + let slashing = match self + .chain + .verify_attester_slashing_for_gossip(attester_slashing) + { + Ok(ObservationOutcome::New(slashing)) => slashing, + Ok(ObservationOutcome::AlreadyKnown) => { + debug!( + self.log, + "Dropping attester slashing"; + "reason" => "Slashings already known for all slashed validators", + "peer" => peer_id.to_string() + ); + return; + } + Err(e) => { + debug!( + self.log, + "Dropping invalid attester slashing"; + "peer" => peer_id.to_string(), + "error" => format!("{:?}", e) + ); + return; + } + }; + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL); + + self.propagate_gossip_message(message_id, peer_id); + + if let Err(e) = self.chain.import_attester_slashing(slashing) { + debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e)); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL); + } else { + debug!(self.log, "Successfully imported attester slashing"); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL); + } + } + + /// Attempt to process a block received from a direct RPC request, returning the processing + /// result on the `result_tx` channel. + /// + /// Raises a log if there are errors publishing the result to the channel. + pub fn process_rpc_block( + self, + block: SignedBeaconBlock, + result_tx: BlockResultSender, + ) { + let block_result = self.chain.process_block(block); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + + if result_tx.send(block_result).is_err() { + crit!(self.log, "Failed return sync block result"); + } + } + + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync + /// thread if more blocks are needed to process it. + pub fn process_chain_segment( + self, + process_id: ProcessId, + blocks: Vec>, + ) { + handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log) + } + + /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on + /// the gossip network. + /// + /// Creates a log if there is an interal error. + fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) { + self.network_tx + .send(NetworkMessage::Validate { + propagation_source: peer_id, + message_id, + }) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send propagation request to the network service" + ) + }); + } + + /// Send a message to `sync_tx`. + /// + /// Creates a log if there is an interal error. + fn send_sync_message(&self, message: SyncMessage) { + self.sync_tx + .send(message) + .unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service")); + } + + /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the + /// network. + pub fn handle_attestation_verification_failure( + &self, + peer_id: PeerId, + beacon_block_root: Hash256, + attestation_type: &str, + error: AttnError, + ) { + metrics::register_attestation_error(&error); + match &error { + AttnError::FutureEpoch { .. } + | AttnError::PastEpoch { .. } + | AttnError::FutureSlot { .. } + | AttnError::PastSlot { .. } => { + /* + * These errors can be triggered by a mismatch between our slot and the peer. + * + * + * The peer has published an invalid consensus message, _only_ if we trust our own clock. + */ + } + AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { + /* + * These errors are caused by invalid signatures. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::EmptyAggregationBitfield => { + /* + * The aggregate had no signatures and is therefore worthless. + * + * Whilst we don't gossip this attestation, this act is **not** a clear + * violation of the spec nor indication of fault. + * + * This may change soon. Reference: + * + * https://github.com/ethereum/eth2.0-specs/pull/1732 + */ + } + AttnError::AggregatorPubkeyUnknown(_) => { + /* + * The aggregator index was higher than any known validator index. This is + * possible in two cases: + * + * 1. The attestation is malformed + * 2. The attestation attests to a beacon_block_root that we do not know. + * + * It should be impossible to reach (2) without triggering + * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is + * faulty. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AggregatorNotInCommittee { .. } => { + /* + * The aggregator index was higher than any known validator index. This is + * possible in two cases: + * + * 1. The attestation is malformed + * 2. The attestation attests to a beacon_block_root that we do not know. + * + * It should be impossible to reach (2) without triggering + * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is + * faulty. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AttestationAlreadyKnown { .. } => { + /* + * The aggregate attestation has already been observed on the network or in + * a block. + * + * The peer is not necessarily faulty. + */ + trace!( + self.log, + "Attestation already known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::AggregatorAlreadyKnown(_) => { + /* + * There has already been an aggregate attestation seen from this + * aggregator index. + * + * The peer is not necessarily faulty. + */ + trace!( + self.log, + "Aggregator already known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::PriorAttestationKnown { .. } => { + /* + * We have already seen an attestation from this validator for this epoch. + * + * The peer is not necessarily faulty. + */ + trace!( + self.log, + "Prior attestation known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::ValidatorIndexTooHigh(_) => { + /* + * The aggregator index (or similar field) was higher than the maximum + * possible number of validators. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::UnknownHeadBlock { beacon_block_root } => { + // Note: its a little bit unclear as to whether or not this block is unknown or + // just old. See: + // + // https://github.com/sigp/lighthouse/issues/1039 + + // TODO: Maintain this attestation and re-process once sync completes + debug!( + self.log, + "Attestation for unknown block"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root) + ); + // we don't know the block, get the sync manager to handle the block lookup + self.sync_tx + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + return; + } + AttnError::UnknownTargetRoot(_) => { + /* + * The block indicated by the target root is not known to us. + * + * We should always get `AttnError::UnknwonHeadBlock` before we get this + * error, so this means we can get this error if: + * + * 1. The target root does not represent a valid block. + * 2. We do not have the target root in our DB. + * + * For (2), we should only be processing attestations when we should have + * all the available information. Note: if we do a weak-subjectivity sync + * it's possible that this situation could occur, but I think it's + * unlikely. For now, we will declare this to be an invalid message> + * + * The peer has published an invalid consensus message. + */ + } + AttnError::BadTargetEpoch => { + /* + * The aggregator index (or similar field) was higher than the maximum + * possible number of validators. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::NoCommitteeForSlotAndIndex { .. } => { + /* + * It is not possible to attest this the given committee in the given slot. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::NotExactlyOneAggregationBitSet(_) => { + /* + * The unaggregated attestation doesn't have only one signature. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AttestsToFutureBlock { .. } => { + /* + * The beacon_block_root is from a higher slot than the attestation. + * + * The peer has published an invalid consensus message. + */ + } + + AttnError::InvalidSubnetId { received, expected } => { + /* + * The attestation was received on an incorrect subnet id. + */ + debug!( + self.log, + "Received attestation on incorrect subnet"; + "expected" => format!("{:?}", expected), + "received" => format!("{:?}", received), + ) + } + AttnError::Invalid(_) => { + /* + * The attestation failed the state_processing verification. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::TooManySkippedSlots { + head_block_slot, + attestation_slot, + } => { + /* + * The attestation references a head block that is too far behind the attestation slot. + * + * The message is not necessarily invalid, but we choose to ignore it. + */ + debug!( + self.log, + "Rejected long skip slot attestation"; + "head_block_slot" => head_block_slot, + "attestation_slot" => attestation_slot, + ) + } + AttnError::BeaconChainError(e) => { + /* + * Lighthouse hit an unexpected error whilst processing the attestation. It + * should be impossible to trigger a `BeaconChainError` from the network, + * so we have a bug. + * + * It's not clear if the message is invalid/malicious. + */ + error!( + self.log, + "Unable to validate aggregate"; + "peer_id" => peer_id.to_string(), + "error" => format!("{:?}", e), + ); + } + } + + debug!( + self.log, + "Invalid attestation from network"; + "reason" => format!("{:?}", error), + "block" => format!("{}", beacon_block_root), + "peer_id" => peer_id.to_string(), + "type" => format!("{:?}", attestation_type), + ); + } +} diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 18064d798..e178f531d 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -98,6 +98,49 @@ lazy_static! { "beacon_processor_gossip_block_imported_total", "Total number of gossip blocks imported to fork choice, etc." ); + // Gossip Exits. + pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_exit_queue_total", + "Count of exits from gossip waiting to be verified." + ); + pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_exit_verified_total", + "Total number of voluntary exits verified for propagation." + ); + pub static ref BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_exit_imported_total", + "Total number of voluntary exits imported to the op pool." + ); + // Gossip proposer slashings. + pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_proposer_slashing_queue_total", + "Count of proposer slashings from gossip waiting to be verified." + ); + pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_proposer_slashing_verified_total", + "Total number of proposer slashings verified for propagation." + ); + pub static ref BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_proposer_slashing_imported_total", + "Total number of proposer slashings imported to the op pool." + ); + // Gossip attester slashings. + pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_attester_slashing_queue_total", + "Count of attester slashings from gossip waiting to be verified." + ); + pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_attester_slashing_verified_total", + "Total number of attester slashings verified for propagation." + ); + pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_attester_slashing_imported_total", + "Total number of attester slashings imported to the op pool." + ); + pub static ref BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL: Result = try_create_int_counter( + "beacon_processor_attester_slashing_error_total", + "Total number of attester slashings that raised an error during processing." + ); // Rpc blocks. pub static ref BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_rpc_block_queue_total", diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index c3a729ce7..0fa2494ff 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -16,7 +16,7 @@ use eth2_libp2p::{ }; use futures::prelude::*; use processor::Processor; -use slog::{debug, o, trace, warn}; +use slog::{debug, o, trace}; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -26,8 +26,6 @@ use types::EthSpec; /// passing them to the internal message processor. The message processor spawns a syncing thread /// which manages which blocks need to be requested and processed. pub struct Router { - /// A channel to the network service to allow for gossip propagation. - network_send: mpsc::UnboundedSender>, /// Access to the peer db for logging. network_globals: Arc>, /// Processes validated and decoded messages from the network. Has direct access to the @@ -89,13 +87,12 @@ impl Router { executor.clone(), beacon_chain, network_globals.clone(), - network_send.clone(), + network_send, &log, ); // generate the Message handler let mut handler = Router { - network_send, network_globals, processor, log: message_handler_log, @@ -232,13 +229,7 @@ impl Router { } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id)); - if let Some(verified_exit) = self - .processor - .verify_voluntary_exit_for_gossip(&peer_id, *exit) - { - self.propagate_message(id, peer_id); - self.processor.import_verified_voluntary_exit(verified_exit); - } + self.processor.on_voluntary_exit_gossip(id, peer_id, exit); } PubsubMessage::ProposerSlashing(proposer_slashing) => { debug!( @@ -246,14 +237,8 @@ impl Router { "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) ); - if let Some(verified_proposer_slashing) = self - .processor - .verify_proposer_slashing_for_gossip(&peer_id, *proposer_slashing) - { - self.propagate_message(id, peer_id); - self.processor - .import_verified_proposer_slashing(verified_proposer_slashing); - } + self.processor + .on_proposer_slashing_gossip(id, peer_id, proposer_slashing); } PubsubMessage::AttesterSlashing(attester_slashing) => { debug!( @@ -261,30 +246,9 @@ impl Router { "Received a attester slashing"; "peer_id" => format!("{}", peer_id) ); - if let Some(verified_attester_slashing) = self - .processor - .verify_attester_slashing_for_gossip(&peer_id, *attester_slashing) - { - self.propagate_message(id, peer_id); - self.processor - .import_verified_attester_slashing(verified_attester_slashing); - } + self.processor + .on_attester_slashing_gossip(id, peer_id, attester_slashing); } } } - - /// Informs the network service that the message should be forwarded to other peers (is valid). - fn propagate_message(&mut self, message_id: MessageId, propagation_source: PeerId) { - self.network_send - .send(NetworkMessage::Validate { - propagation_source, - message_id, - }) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send propagation request to the network service" - ) - }); - } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 3a663badd..099652123 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -3,14 +3,13 @@ use crate::beacon_processor::{ }; use crate::service::NetworkMessage; use crate::sync::{PeerSyncInfo, SyncMessage}; -use beacon_chain::{observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::*; use eth2_libp2p::{ MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, }; use itertools::process_results; use slog::{debug, error, o, trace, warn}; -use state_processing::SigVerifiedOp; use std::cmp; use std::sync::Arc; use tokio::sync::mpsc; @@ -585,140 +584,70 @@ impl Processor { }) } - /// Verify a voluntary exit before gossiping or processing it. - /// - /// Errors are logged at debug level. - pub fn verify_voluntary_exit_for_gossip( - &self, - peer_id: &PeerId, - voluntary_exit: SignedVoluntaryExit, - ) -> Option> { - let validator_index = voluntary_exit.message.validator_index; - - match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) { - Ok(ObservationOutcome::New(sig_verified_exit)) => Some(sig_verified_exit), - Ok(ObservationOutcome::AlreadyKnown) => { - debug!( - self.log, - "Dropping exit for already exiting validator"; - "validator_index" => validator_index, - "peer" => peer_id.to_string() - ); - None - } - Err(e) => { - debug!( - self.log, - "Dropping invalid exit"; - "validator_index" => validator_index, - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) - ); - None - } - } - } - - /// Import a verified exit into the op pool. - pub fn import_verified_voluntary_exit( - &self, - verified_voluntary_exit: SigVerifiedOp, + pub fn on_voluntary_exit_gossip( + &mut self, + message_id: MessageId, + peer_id: PeerId, + voluntary_exit: Box, ) { - self.chain.import_voluntary_exit(verified_voluntary_exit); - debug!(self.log, "Successfully imported voluntary exit"); + self.beacon_processor_send + .try_send(BeaconWorkEvent::gossip_voluntary_exit( + message_id, + peer_id, + voluntary_exit, + )) + .unwrap_or_else(|e| { + error!( + &self.log, + "Unable to send to gossip processor"; + "type" => "voluntary exit gossip", + "error" => e.to_string(), + ) + }) } - /// Verify a proposer slashing before gossiping or processing it. - /// - /// Errors are logged at debug level. - pub fn verify_proposer_slashing_for_gossip( - &self, - peer_id: &PeerId, - proposer_slashing: ProposerSlashing, - ) -> Option> { - let validator_index = proposer_slashing.signed_header_1.message.proposer_index; - - match self - .chain - .verify_proposer_slashing_for_gossip(proposer_slashing) - { - Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing), - Ok(ObservationOutcome::AlreadyKnown) => { - debug!( - self.log, - "Dropping proposer slashing"; - "reason" => "Already seen a proposer slashing for that validator", - "validator_index" => validator_index, - "peer" => peer_id.to_string() - ); - None - } - Err(e) => { - debug!( - self.log, - "Dropping invalid proposer slashing"; - "validator_index" => validator_index, - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) - ); - None - } - } - } - - /// Import a verified proposer slashing into the op pool. - pub fn import_verified_proposer_slashing( - &self, - proposer_slashing: SigVerifiedOp, + pub fn on_proposer_slashing_gossip( + &mut self, + message_id: MessageId, + peer_id: PeerId, + proposer_slashing: Box, ) { - self.chain.import_proposer_slashing(proposer_slashing); - debug!(self.log, "Successfully imported proposer slashing"); + self.beacon_processor_send + .try_send(BeaconWorkEvent::gossip_proposer_slashing( + message_id, + peer_id, + proposer_slashing, + )) + .unwrap_or_else(|e| { + error!( + &self.log, + "Unable to send to gossip processor"; + "type" => "proposer slashing gossip", + "error" => e.to_string(), + ) + }) } - /// Verify an attester slashing before gossiping or processing it. - /// - /// Errors are logged at debug level. - pub fn verify_attester_slashing_for_gossip( - &self, - peer_id: &PeerId, - attester_slashing: AttesterSlashing, - ) -> Option>> { - match self - .chain - .verify_attester_slashing_for_gossip(attester_slashing) - { - Ok(ObservationOutcome::New(verified_slashing)) => Some(verified_slashing), - Ok(ObservationOutcome::AlreadyKnown) => { - debug!( - self.log, - "Dropping attester slashing"; - "reason" => "Slashings already known for all slashed validators", - "peer" => peer_id.to_string() - ); - None - } - Err(e) => { - debug!( - self.log, - "Dropping invalid attester slashing"; - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) - ); - None - } - } - } - - /// Import a verified attester slashing into the op pool. - pub fn import_verified_attester_slashing( - &self, - attester_slashing: SigVerifiedOp>, + pub fn on_attester_slashing_gossip( + &mut self, + message_id: MessageId, + peer_id: PeerId, + attester_slashing: Box>, ) { - if let Err(e) = self.chain.import_attester_slashing(attester_slashing) { - debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e)); - } else { - debug!(self.log, "Successfully imported attester slashing"); - } + self.beacon_processor_send + .try_send(BeaconWorkEvent::gossip_attester_slashing( + message_id, + peer_id, + attester_slashing, + )) + .unwrap_or_else(|e| { + error!( + &self.log, + "Unable to send to gossip processor"; + "type" => "attester slashing gossip", + "error" => e.to_string(), + ) + }) } }