diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 26d2c19b5..84d8e1b07 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -750,6 +750,24 @@ impl std::convert::From> for WorkEvent { } } +pub struct BeaconProcessorSend(pub mpsc::Sender>); + +impl BeaconProcessorSend { + pub fn try_send(&self, message: WorkEvent) -> Result<(), Box>>> { + let work_type = message.work_type(); + match self.0.try_send(message) { + Ok(res) => Ok(res), + Err(e) => { + metrics::inc_counter_vec( + &metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE, + &[work_type], + ); + Err(Box::new(e)) + } + } + } +} + /// A consensus message (or multiple) from the network that requires processing. #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 09caaaa11..27d7dc962 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -279,6 +279,12 @@ lazy_static! { "Gossipsub light_client_optimistic_update errors per error type", &["type"] ); + pub static ref BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: Result = + try_create_int_counter_vec( + "beacon_processor_send_error_per_work_type", + "Total number of beacon processor send error per work type", + &["type"] + ); /* diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 1b0f1fb41..7a91f2d0b 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -6,7 +6,8 @@ #![allow(clippy::unit_arg)] use crate::beacon_processor::{ - BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, + BeaconProcessor, BeaconProcessorSend, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, + MAX_WORK_EVENT_QUEUE_LEN, }; use crate::error; use crate::service::{NetworkMessage, RequestId}; @@ -19,6 +20,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{ MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; +use logging::TimeLatch; use slog::{debug, o, trace}; use slog::{error, warn}; use std::cmp; @@ -39,9 +41,11 @@ pub struct Router { /// A network context to return and handle RPC requests. network: HandlerNetworkContext, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: BeaconProcessorSend, /// The `Router` logger. log: slog::Logger, + /// Provides de-bounce functionality for logging. + logger_debounce: TimeLatch, } /// Types of messages the router can receive. @@ -100,7 +104,7 @@ impl Router { beacon_chain.clone(), network_globals.clone(), network_send.clone(), - beacon_processor_send.clone(), + BeaconProcessorSend(beacon_processor_send.clone()), sync_logger, ); @@ -124,8 +128,9 @@ impl Router { chain: beacon_chain, sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), - beacon_processor_send, + beacon_processor_send: BeaconProcessorSend(beacon_processor_send), log: message_handler_log, + logger_debounce: TimeLatch::default(), }; // spawn handler task and move the message handler instance into the spawned thread @@ -479,12 +484,15 @@ impl Router { self.beacon_processor_send .try_send(work) .unwrap_or_else(|e| { - let work_type = match &e { + let work_type = match &*e { mpsc::error::TrySendError::Closed(work) | mpsc::error::TrySendError::Full(work) => work.work_type(), }; - error!(&self.log, "Unable to send message to the beacon processor"; - "error" => %e, "type" => work_type) + + if self.logger_debounce.elapsed() { + error!(&self.log, "Unable to send message to the beacon processor"; + "error" => %e, "type" => work_type) + } }) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 5a70944f6..82334db0f 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::beacon_processor::BeaconProcessorSend; use crate::service::RequestId; use crate::sync::manager::RequestId as SyncId; use crate::NetworkMessage; @@ -54,7 +55,7 @@ impl TestRig { SyncNetworkContext::new( network_tx, globals, - beacon_processor_tx, + BeaconProcessorSend(beacon_processor_tx), log.new(slog::o!("component" => "network_context")), ) }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 37b63cdba..c24d4c192 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -38,7 +38,7 @@ use super::block_lookups::BlockLookups; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; +use crate::beacon_processor::{BeaconProcessorSend, ChainSegmentProcessId}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; @@ -188,7 +188,7 @@ pub fn spawn( beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: BeaconProcessorSend, log: slog::Logger, ) -> mpsc::UnboundedSender> { assert!( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 23d42002f..03c466eec 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -3,7 +3,7 @@ use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ChainId}; -use crate::beacon_processor::WorkEvent; +use crate::beacon_processor::BeaconProcessorSend; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChainTypes, EngineState}; @@ -37,7 +37,7 @@ pub struct SyncNetworkContext { execution_engine_state: EngineState, /// Channel to send work to the beacon processor. - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: BeaconProcessorSend, /// Logger for the `SyncNetworkContext`. log: slog::Logger, @@ -47,7 +47,7 @@ impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, network_globals: Arc>, - beacon_processor_send: mpsc::Sender>, + beacon_processor_send: BeaconProcessorSend, log: slog::Logger, ) -> Self { Self { @@ -278,12 +278,12 @@ impl SyncNetworkContext { }) } - pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender>> { + pub fn processor_channel_if_enabled(&self) -> Option<&BeaconProcessorSend> { self.is_execution_engine_online() .then_some(&self.beacon_processor_send) } - pub fn processor_channel(&self) -> &mpsc::Sender> { + pub fn processor_channel(&self) -> &BeaconProcessorSend { &self.beacon_processor_send } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 0f1c00e50..2c35c57d9 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -375,7 +375,7 @@ mod tests { use crate::NetworkMessage; use super::*; - use crate::beacon_processor::WorkEvent as BeaconWorkEvent; + use crate::beacon_processor::{BeaconProcessorSend, WorkEvent as BeaconWorkEvent}; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::parking_lot::RwLock; @@ -603,7 +603,7 @@ mod tests { let cx = SyncNetworkContext::new( network_tx, globals.clone(), - beacon_processor_tx, + BeaconProcessorSend(beacon_processor_tx), log.new(o!("component" => "network_context")), ); let test_rig = TestRig {