Add log debounce to work processor (#3045)

## Issue Addressed

#3010 

## Proposed Changes

- move log debounce time latch to `./common/logging`
- add timelatch to limit logging for `attestations_delay_queue` and `queued_block_roots`

## Additional Info

- Is a separate crate for the time latch preferred? 
- `elapsed()` could take `LOG_DEBOUNCE_INTERVAL ` as an argument to allow for different granularity.
This commit is contained in:
tim gretler 2022-03-07 06:30:17 +00:00
parent 1829250ee4
commit cbda0a2f0a
4 changed files with 55 additions and 42 deletions

View File

@ -11,7 +11,6 @@ matches = "0.1.8"
exit-future = "0.2.0" exit-future = "0.2.0"
slog-term = "2.6.0" slog-term = "2.6.0"
slog-async = "2.5.0" slog-async = "2.5.0"
logging = { path = "../../common/logging" }
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
[dependencies] [dependencies]
@ -35,6 +34,7 @@ fnv = "1.0.7"
rlp = "0.5.0" rlp = "0.5.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
logging = { path = "../../common/logging" }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
igd = "0.11.1" igd = "0.11.1"
itertools = "0.10.0" itertools = "0.10.0"

View File

@ -47,13 +47,14 @@ use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
}; };
use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::Context; use std::task::Context;
use std::time::{Duration, Instant}; use std::time::Duration;
use std::{cmp, collections::HashSet}; use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@ -159,9 +160,6 @@ const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
/// The name of the worker tokio tasks. /// The name of the worker tokio tasks.
const WORKER_TASK_NAME: &str = "beacon_processor_worker"; const WORKER_TASK_NAME: &str = "beacon_processor_worker";
/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single /// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single
/// batch. /// batch.
/// ///
@ -742,25 +740,6 @@ impl<T: BeaconChainTypes> Work<T> {
} }
} }
/// Provides de-bounce functionality for logging.
#[derive(Default)]
struct TimeLatch(Option<Instant>);
impl TimeLatch {
/// Only returns true once every `LOG_DEBOUNCE_INTERVAL`.
fn elapsed(&mut self) -> bool {
let now = Instant::now();
let is_elapsed = self.0.map_or(false, |elapse_time| now > elapse_time);
if is_elapsed || self.0.is_none() {
self.0 = Some(now + LOG_DEBOUNCE_INTERVAL);
}
is_elapsed
}
}
/// Unifies all the messages processed by the `BeaconProcessor`. /// Unifies all the messages processed by the `BeaconProcessor`.
enum InboundEvent<T: BeaconChainTypes> { enum InboundEvent<T: BeaconChainTypes> {
/// A worker has completed a task and is free. /// A worker has completed a task and is free.

View File

@ -17,6 +17,7 @@ use fnv::FnvHashMap;
use futures::task::Poll; use futures::task::Poll;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use lighthouse_network::{MessageId, PeerId}; use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use slog::{crit, debug, error, warn, Logger}; use slog::{crit, debug, error, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -133,6 +134,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/* Aux */ /* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations /// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize, next_attestation: usize,
early_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -223,6 +226,8 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
queued_unaggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(), awaiting_attestations_per_root: HashMap::new(),
next_attestation: 0, next_attestation: 0,
early_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
}; };
executor.spawn( executor.spawn(
@ -261,12 +266,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue. // Check to ensure this won't over-fill the queue.
if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
warn!( if self.early_block_debounce.elapsed() {
log, warn!(
"Early blocks queue is full"; log,
"queue_size" => MAXIMUM_QUEUED_BLOCKS, "Early blocks queue is full";
"msg" => "check system clock" "queue_size" => MAXIMUM_QUEUED_BLOCKS,
); "msg" => "check system clock"
);
}
// Drop the block. // Drop the block.
return; return;
} }
@ -306,12 +313,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
} }
InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => { InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
error!( if self.attestation_delay_debounce.elapsed() {
log, error!(
"Aggregate attestation delay queue is full"; log,
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, "Aggregate attestation delay queue is full";
"msg" => "check system clock" "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
); "msg" => "check system clock"
);
}
// Drop the attestation. // Drop the attestation.
return; return;
} }
@ -337,12 +346,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
} }
InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => { InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
error!( if self.attestation_delay_debounce.elapsed() {
log, error!(
"Attestation delay queue is full"; log,
"queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, "Attestation delay queue is full";
"msg" => "check system clock" "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS,
); "msg" => "check system clock"
);
}
// Drop the attestation. // Drop the attestation.
return; return;
} }

View File

@ -7,9 +7,13 @@ use lighthouse_metrics::{
use slog::Logger; use slog::Logger;
use slog_term::Decorator; use slog_term::Decorator;
use std::io::{Result, Write}; use std::io::{Result, Write};
use std::time::{Duration, Instant};
pub const MAX_MESSAGE_WIDTH: usize = 40; pub const MAX_MESSAGE_WIDTH: usize = 40;
/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
lazy_static! { lazy_static! {
pub static ref INFOS_TOTAL: MetricsResult<IntCounter> = pub static ref INFOS_TOTAL: MetricsResult<IntCounter> =
try_create_int_counter("info_total", "Count of infos logged"); try_create_int_counter("info_total", "Count of infos logged");
@ -187,6 +191,25 @@ fn is_ascii_control(character: &u8) -> bool {
) )
} }
/// Provides de-bounce functionality for logging.
#[derive(Default)]
pub struct TimeLatch(Option<Instant>);
impl TimeLatch {
/// Only returns true once every `LOG_DEBOUNCE_INTERVAL`.
pub fn elapsed(&mut self) -> bool {
let now = Instant::now();
let is_elapsed = self.0.map_or(false, |elapse_time| now > elapse_time);
if is_elapsed || self.0.is_none() {
self.0 = Some(now + LOG_DEBOUNCE_INTERVAL);
}
is_elapsed
}
}
/// Return a logger suitable for test usage. /// Return a logger suitable for test usage.
/// ///
/// By default no logs will be printed, but they can be enabled via /// By default no logs will be printed, but they can be enabled via