From cbda0a2f0a3f13ba94e49d8f4eaa760c5b7aaf60 Mon Sep 17 00:00:00 2001 From: tim gretler Date: Mon, 7 Mar 2022 06:30:17 +0000 Subject: [PATCH] Add log debounce to work processor (#3045) ## Issue Addressed #3010 ## Proposed Changes - move log debounce time latch to `./common/logging` - add timelatch to limit logging for `attestations_delay_queue` and `queued_block_roots` ## Additional Info - Is a separate crate for the time latch preferred? - `elapsed()` could take `LOG_DEBOUNCE_INTERVAL ` as an argument to allow for different granularity. --- beacon_node/network/Cargo.toml | 2 +- .../network/src/beacon_processor/mod.rs | 25 +--------- .../work_reprocessing_queue.rs | 47 ++++++++++++------- common/logging/src/lib.rs | 23 +++++++++ 4 files changed, 55 insertions(+), 42 deletions(-) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 1c7506483..96458da0a 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -11,7 +11,6 @@ matches = "0.1.8" exit-future = "0.2.0" slog-term = "2.6.0" slog-async = "2.5.0" -logging = { path = "../../common/logging" } environment = { path = "../../lighthouse/environment" } [dependencies] @@ -35,6 +34,7 @@ fnv = "1.0.7" rlp = "0.5.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +logging = { path = "../../common/logging" } task_executor = { path = "../../common/task_executor" } igd = "0.11.1" itertools = "0.10.0" diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 7c3d482fa..2389afdb4 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -47,13 +47,14 @@ use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, }; +use logging::TimeLatch; use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::task::Context; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::{mpsc, oneshot}; @@ -159,9 +160,6 @@ const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; /// The name of the worker tokio tasks. const WORKER_TASK_NAME: &str = "beacon_processor_worker"; -/// The minimum interval between log messages indicating that a queue is full. -const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); - /// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single /// batch. /// @@ -742,25 +740,6 @@ impl Work { } } -/// Provides de-bounce functionality for logging. -#[derive(Default)] -struct TimeLatch(Option); - -impl TimeLatch { - /// Only returns true once every `LOG_DEBOUNCE_INTERVAL`. - fn elapsed(&mut self) -> bool { - let now = Instant::now(); - - let is_elapsed = self.0.map_or(false, |elapse_time| now > elapse_time); - - if is_elapsed || self.0.is_none() { - self.0 = Some(now + LOG_DEBOUNCE_INTERVAL); - } - - is_elapsed - } -} - /// Unifies all the messages processed by the `BeaconProcessor`. enum InboundEvent { /// A worker has completed a task and is free. diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 299e71c8d..33c15cf06 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -17,6 +17,7 @@ use fnv::FnvHashMap; use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; +use logging::TimeLatch; use slog::{crit, debug, error, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; @@ -133,6 +134,8 @@ struct ReprocessQueue { /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, + early_block_debounce: TimeLatch, + attestation_delay_debounce: TimeLatch, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -223,6 +226,8 @@ pub fn spawn_reprocess_scheduler( queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), next_attestation: 0, + early_block_debounce: TimeLatch::default(), + attestation_delay_debounce: TimeLatch::default(), }; executor.spawn( @@ -261,12 +266,14 @@ impl ReprocessQueue { if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { // Check to ensure this won't over-fill the queue. if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { - warn!( - log, - "Early blocks queue is full"; - "queue_size" => MAXIMUM_QUEUED_BLOCKS, - "msg" => "check system clock" - ); + if self.early_block_debounce.elapsed() { + warn!( + log, + "Early blocks queue is full"; + "queue_size" => MAXIMUM_QUEUED_BLOCKS, + "msg" => "check system clock" + ); + } // Drop the block. return; } @@ -306,12 +313,14 @@ impl ReprocessQueue { } InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => { if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { - error!( - log, - "Aggregate attestation delay queue is full"; - "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, - "msg" => "check system clock" - ); + if self.attestation_delay_debounce.elapsed() { + error!( + log, + "Aggregate attestation delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, + "msg" => "check system clock" + ); + } // Drop the attestation. return; } @@ -337,12 +346,14 @@ impl ReprocessQueue { } InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => { if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { - error!( - log, - "Attestation delay queue is full"; - "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, - "msg" => "check system clock" - ); + if self.attestation_delay_debounce.elapsed() { + error!( + log, + "Attestation delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, + "msg" => "check system clock" + ); + } // Drop the attestation. return; } diff --git a/common/logging/src/lib.rs b/common/logging/src/lib.rs index eab8e326b..85c425574 100644 --- a/common/logging/src/lib.rs +++ b/common/logging/src/lib.rs @@ -7,9 +7,13 @@ use lighthouse_metrics::{ use slog::Logger; use slog_term::Decorator; use std::io::{Result, Write}; +use std::time::{Duration, Instant}; pub const MAX_MESSAGE_WIDTH: usize = 40; +/// The minimum interval between log messages indicating that a queue is full. +const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); + lazy_static! { pub static ref INFOS_TOTAL: MetricsResult = try_create_int_counter("info_total", "Count of infos logged"); @@ -187,6 +191,25 @@ fn is_ascii_control(character: &u8) -> bool { ) } +/// Provides de-bounce functionality for logging. +#[derive(Default)] +pub struct TimeLatch(Option); + +impl TimeLatch { + /// Only returns true once every `LOG_DEBOUNCE_INTERVAL`. + pub fn elapsed(&mut self) -> bool { + let now = Instant::now(); + + let is_elapsed = self.0.map_or(false, |elapse_time| now > elapse_time); + + if is_elapsed || self.0.is_none() { + self.0 = Some(now + LOG_DEBOUNCE_INTERVAL); + } + + is_elapsed + } +} + /// Return a logger suitable for test usage. /// /// By default no logs will be printed, but they can be enabled via