added debounce to log (#4269)

## Issue Addressed

[#4259](https://github.com/sigp/lighthouse/issues/4259)

## Proposed Changes

debounce spammy `Unable to send message to the beacon processor` log messages

## Additional Info

We could potentially debounce other logs that have the potential to be "spammy". 

After some feedback we decided to additionally add the following change:

create a newtype wrapper around `mpsc::Sender<BeaconWorkEvent<T>>`. When there is an error on the try_send method on the wrapper, we increase a counter metric with one label per work type.
This commit is contained in:
Eitan Seri-Levi 2023-06-30 01:13:03 +00:00
parent 1aff082eea
commit edd093293a
7 changed files with 50 additions and 17 deletions

View File

@ -750,6 +750,24 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
} }
} }
pub struct BeaconProcessorSend<T: BeaconChainTypes>(pub mpsc::Sender<WorkEvent<T>>);
impl<T: BeaconChainTypes> BeaconProcessorSend<T> {
pub fn try_send(&self, message: WorkEvent<T>) -> Result<(), Box<TrySendError<WorkEvent<T>>>> {
let work_type = message.work_type();
match self.0.try_send(message) {
Ok(res) => Ok(res),
Err(e) => {
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
&[work_type],
);
Err(Box::new(e))
}
}
}
}
/// A consensus message (or multiple) from the network that requires processing. /// A consensus message (or multiple) from the network that requires processing.
#[derive(Derivative)] #[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))] #[derivative(Debug(bound = "T: BeaconChainTypes"))]

View File

@ -279,6 +279,12 @@ lazy_static! {
"Gossipsub light_client_optimistic_update errors per error type", "Gossipsub light_client_optimistic_update errors per error type",
&["type"] &["type"]
); );
pub static ref BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: Result<IntCounterVec> =
try_create_int_counter_vec(
"beacon_processor_send_error_per_work_type",
"Total number of beacon processor send error per work type",
&["type"]
);
/* /*

View File

@ -6,7 +6,8 @@
#![allow(clippy::unit_arg)] #![allow(clippy::unit_arg)]
use crate::beacon_processor::{ use crate::beacon_processor::{
BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, BeaconProcessor, BeaconProcessorSend, InvalidBlockStorage, WorkEvent as BeaconWorkEvent,
MAX_WORK_EVENT_QUEUE_LEN,
}; };
use crate::error; use crate::error;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
@ -19,6 +20,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
}; };
use logging::TimeLatch;
use slog::{debug, o, trace}; use slog::{debug, o, trace};
use slog::{error, warn}; use slog::{error, warn};
use std::cmp; use std::cmp;
@ -39,9 +41,11 @@ pub struct Router<T: BeaconChainTypes> {
/// A network context to return and handle RPC requests. /// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>, network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain. /// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>, beacon_processor_send: BeaconProcessorSend<T>,
/// The `Router` logger. /// The `Router` logger.
log: slog::Logger, log: slog::Logger,
/// Provides de-bounce functionality for logging.
logger_debounce: TimeLatch,
} }
/// Types of messages the router can receive. /// Types of messages the router can receive.
@ -100,7 +104,7 @@ impl<T: BeaconChainTypes> Router<T> {
beacon_chain.clone(), beacon_chain.clone(),
network_globals.clone(), network_globals.clone(),
network_send.clone(), network_send.clone(),
beacon_processor_send.clone(), BeaconProcessorSend(beacon_processor_send.clone()),
sync_logger, sync_logger,
); );
@ -124,8 +128,9 @@ impl<T: BeaconChainTypes> Router<T> {
chain: beacon_chain, chain: beacon_chain,
sync_send, sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()), network: HandlerNetworkContext::new(network_send, log.clone()),
beacon_processor_send, beacon_processor_send: BeaconProcessorSend(beacon_processor_send),
log: message_handler_log, log: message_handler_log,
logger_debounce: TimeLatch::default(),
}; };
// spawn handler task and move the message handler instance into the spawned thread // spawn handler task and move the message handler instance into the spawned thread
@ -479,12 +484,15 @@ impl<T: BeaconChainTypes> Router<T> {
self.beacon_processor_send self.beacon_processor_send
.try_send(work) .try_send(work)
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
let work_type = match &e { let work_type = match &*e {
mpsc::error::TrySendError::Closed(work) mpsc::error::TrySendError::Closed(work)
| mpsc::error::TrySendError::Full(work) => work.work_type(), | mpsc::error::TrySendError::Full(work) => work.work_type(),
}; };
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type) if self.logger_debounce.elapsed() {
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)
}
}) })
} }
} }

View File

@ -1,5 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use crate::beacon_processor::BeaconProcessorSend;
use crate::service::RequestId; use crate::service::RequestId;
use crate::sync::manager::RequestId as SyncId; use crate::sync::manager::RequestId as SyncId;
use crate::NetworkMessage; use crate::NetworkMessage;
@ -54,7 +55,7 @@ impl TestRig {
SyncNetworkContext::new( SyncNetworkContext::new(
network_tx, network_tx,
globals, globals,
beacon_processor_tx, BeaconProcessorSend(beacon_processor_tx),
log.new(slog::o!("component" => "network_context")), log.new(slog::o!("component" => "network_context")),
) )
}; };

View File

@ -38,7 +38,7 @@ use super::block_lookups::BlockLookups;
use super::network_context::SyncNetworkContext; use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::beacon_processor::{BeaconProcessorSend, ChainSegmentProcessId};
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
@ -188,7 +188,7 @@ pub fn spawn<T: BeaconChainTypes>(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>, beacon_processor_send: BeaconProcessorSend<T>,
log: slog::Logger, log: slog::Logger,
) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> { ) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
assert!( assert!(

View File

@ -3,7 +3,7 @@
use super::manager::{Id, RequestId as SyncRequestId}; use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ChainId}; use super::range_sync::{BatchId, ChainId};
use crate::beacon_processor::WorkEvent; use crate::beacon_processor::BeaconProcessorSend;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChainTypes, EngineState}; use beacon_chain::{BeaconChainTypes, EngineState};
@ -37,7 +37,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
execution_engine_state: EngineState, execution_engine_state: EngineState,
/// Channel to send work to the beacon processor. /// Channel to send work to the beacon processor.
beacon_processor_send: mpsc::Sender<WorkEvent<T>>, beacon_processor_send: BeaconProcessorSend<T>,
/// Logger for the `SyncNetworkContext`. /// Logger for the `SyncNetworkContext`.
log: slog::Logger, log: slog::Logger,
@ -47,7 +47,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new( pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<WorkEvent<T>>, beacon_processor_send: BeaconProcessorSend<T>,
log: slog::Logger, log: slog::Logger,
) -> Self { ) -> Self {
Self { Self {
@ -278,12 +278,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}) })
} }
pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender<WorkEvent<T>>> { pub fn processor_channel_if_enabled(&self) -> Option<&BeaconProcessorSend<T>> {
self.is_execution_engine_online() self.is_execution_engine_online()
.then_some(&self.beacon_processor_send) .then_some(&self.beacon_processor_send)
} }
pub fn processor_channel(&self) -> &mpsc::Sender<WorkEvent<T>> { pub fn processor_channel(&self) -> &BeaconProcessorSend<T> {
&self.beacon_processor_send &self.beacon_processor_send
} }

View File

@ -375,7 +375,7 @@ mod tests {
use crate::NetworkMessage; use crate::NetworkMessage;
use super::*; use super::*;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::beacon_processor::{BeaconProcessorSend, WorkEvent as BeaconWorkEvent};
use beacon_chain::builder::Witness; use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock; use beacon_chain::parking_lot::RwLock;
@ -603,7 +603,7 @@ mod tests {
let cx = SyncNetworkContext::new( let cx = SyncNetworkContext::new(
network_tx, network_tx,
globals.clone(), globals.clone(),
beacon_processor_tx, BeaconProcessorSend(beacon_processor_tx),
log.new(o!("component" => "network_context")), log.new(o!("component" => "network_context")),
); );
let test_rig = TestRig { let test_rig = TestRig {