Process gossip blocks on the GossipProcessor (#1523)

## Issue Addressed

NA

## Proposed Changes

Moves beacon block processing over to the newly-added `GossipProcessor`. This moves the task off the core executor onto the blocking one.

## Additional Info

- With this PR, gossip blocks are being ignored during sync.
This commit is contained in:
Paul Hauner 2020-08-17 09:20:27 +00:00
parent 61d5b592cb
commit f85485884f
12 changed files with 845 additions and 454 deletions

View File

@ -1,10 +1,11 @@
use crate::metrics;
use crate::router::processor::FUTURE_SLOT_TOLERANCE;
use crate::sync::manager::SyncMessage;
use crate::sync::range_sync::{BatchId, ChainId};
use crate::sync::{BatchId, BatchProcessResult, ChainId};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult};
use eth2_libp2p::PeerId;
use slog::{debug, error, trace, warn};
use std::sync::{Arc, Weak};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{EthSpec, SignedBeaconBlock};
@ -17,85 +18,71 @@ pub enum ProcessId {
ParentLookup(PeerId),
}
/// The result of a block processing request.
// TODO: When correct batch error handling occurs, we will include an error type.
#[derive(Debug)]
pub enum BatchProcessResult {
/// The batch was completed successfully.
Success,
/// The batch processing failed.
Failed,
/// The batch processing failed but managed to import at least one block.
Partial,
}
/// Spawns a thread handling the block processing of a request: range syncing or parent lookup.
pub fn spawn_block_processor<T: BeaconChainTypes>(
chain: Weak<BeaconChain<T>>,
pub fn handle_chain_segment<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
process_id: ProcessId,
downloaded_blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
log: slog::Logger,
) {
std::thread::spawn(move || {
match process_id {
// this a request from the range sync
ProcessId::RangeBatchId(chain_id, batch_id) => {
let len = downloaded_blocks.len();
let start_slot = if len > 0 {
downloaded_blocks[0].message.slot.as_u64()
} else {
0
};
let end_slot = if len > 0 {
downloaded_blocks[len - 1].message.slot.as_u64()
} else {
0
};
match process_id {
// this a request from the range sync
ProcessId::RangeBatchId(chain_id, batch_id) => {
let len = downloaded_blocks.len();
let start_slot = if len > 0 {
downloaded_blocks[0].message.slot.as_u64()
} else {
0
};
let end_slot = if len > 0 {
downloaded_blocks[len - 1].message.slot.as_u64()
} else {
0
};
debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot);
let result = match process_blocks(chain, downloaded_blocks.iter(), &log) {
(_, Ok(_)) => {
debug!(log, "Batch processed"; "id" => *batch_id , "start_slot" => start_slot, "end_slot" => end_slot);
BatchProcessResult::Success
}
(imported_blocks, Err(e)) if imported_blocks > 0 => {
debug!(log, "Batch processing failed but imported some blocks";
debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot);
let result = match process_blocks(chain, downloaded_blocks.iter(), &log) {
(_, Ok(_)) => {
debug!(log, "Batch processed"; "id" => *batch_id , "start_slot" => start_slot, "end_slot" => end_slot);
BatchProcessResult::Success
}
(imported_blocks, Err(e)) if imported_blocks > 0 => {
debug!(log, "Batch processing failed but imported some blocks";
"id" => *batch_id, "error" => e, "imported_blocks"=> imported_blocks);
BatchProcessResult::Partial
}
(_, Err(e)) => {
debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e);
BatchProcessResult::Failed
}
};
BatchProcessResult::Partial
}
(_, Err(e)) => {
debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e);
BatchProcessResult::Failed
}
};
let msg = SyncMessage::BatchProcessed {
chain_id,
batch_id,
downloaded_blocks,
result,
};
sync_send.send(msg).unwrap_or_else(|_| {
debug!(
log,
"Block processor could not inform range sync result. Likely shutting down."
);
});
}
// this a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id) => {
let msg = SyncMessage::BatchProcessed {
chain_id,
batch_id,
downloaded_blocks,
result,
};
sync_send.send(msg).unwrap_or_else(|_| {
debug!(
log, "Processing parent lookup";
"last_peer_id" => format!("{}", peer_id),
"blocks" => downloaded_blocks.len()
log,
"Block processor could not inform range sync result. Likely shutting down."
);
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match process_blocks(chain, downloaded_blocks.iter().rev(), &log) {
(_, Err(e)) => {
warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e);
sync_send
});
}
// this a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id) => {
debug!(
log, "Processing parent lookup";
"last_peer_id" => format!("{}", peer_id),
"blocks" => downloaded_blocks.len()
);
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match process_blocks(chain, downloaded_blocks.iter().rev(), &log) {
(_, Err(e)) => {
warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e);
sync_send
.send(SyncMessage::ParentLookupFailed(peer_id))
.unwrap_or_else(|_| {
// on failure, inform to downvote the peer
@ -104,14 +91,13 @@ pub fn spawn_block_processor<T: BeaconChainTypes>(
"Block processor could not inform parent lookup result. Likely shutting down."
);
});
}
(_, Ok(_)) => {
debug!(log, "Parent lookup processed successfully");
}
}
(_, Ok(_)) => {
debug!(log, "Parent lookup processed successfully");
}
}
}
});
}
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
@ -120,43 +106,39 @@ fn process_blocks<
T: BeaconChainTypes,
I: Iterator<Item = &'a SignedBeaconBlock<T::EthSpec>>,
>(
chain: Weak<BeaconChain<T>>,
chain: Arc<BeaconChain<T>>,
downloaded_blocks: I,
log: &slog::Logger,
) -> (usize, Result<(), String>) {
if let Some(chain) = chain.upgrade() {
let blocks = downloaded_blocks.cloned().collect::<Vec<_>>();
let (imported_blocks, r) = match chain.process_chain_segment(blocks) {
ChainSegmentResult::Successful { imported_blocks } => {
if imported_blocks == 0 {
debug!(log, "All blocks already known");
} else {
debug!(
log, "Imported blocks from network";
"count" => imported_blocks,
);
// Batch completed successfully with at least one block, run fork choice.
run_fork_choice(chain, log);
}
(imported_blocks, Ok(()))
let blocks = downloaded_blocks.cloned().collect::<Vec<_>>();
match chain.process_chain_segment(blocks) {
ChainSegmentResult::Successful { imported_blocks } => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL);
if imported_blocks == 0 {
debug!(log, "All blocks already known");
} else {
debug!(
log, "Imported blocks from network";
"count" => imported_blocks,
);
// Batch completed successfully with at least one block, run fork choice.
run_fork_choice(chain, log);
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
let r = handle_failed_chain_segment(error, log);
if imported_blocks > 0 {
run_fork_choice(chain, log);
}
(imported_blocks, r)
}
};
return (imported_blocks, r);
(imported_blocks, Ok(()))
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = handle_failed_chain_segment(error, log);
if imported_blocks > 0 {
run_fork_choice(chain, log);
}
(imported_blocks, r)
}
}
(0, Ok(()))
}
/// Runs fork-choice on a given chain. This is used during block processing after one successful

View File

@ -1,4 +1,4 @@
//! Provides the `GossipProcessor`, a mutli-threaded processor for messages received on the network
//! Provides the `BeaconProcessor`, a mutli-threaded processor for messages received on the network
//! that need to be processed by the `BeaconChain`.
//!
//! Uses `tokio` tasks (instead of raw threads) to provide the following tasks:
@ -8,7 +8,7 @@
//!
//! ## Purpose
//!
//! The purpose of the `GossipProcessor` is to provide two things:
//! The purpose of the `BeaconProcessor` is to provide two things:
//!
//! 1. Moving long-running, blocking tasks off the main `tokio` executor.
//! 2. A fixed-length buffer for consensus messages.
@ -38,23 +38,29 @@
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{
attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
ForkChoiceError,
BlockError, ForkChoiceError,
};
use chain_segment::handle_chain_segment;
use environment::TaskExecutor;
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId};
use slog::{crit, debug, error, trace, warn, Logger};
use slog::{crit, debug, error, info, trace, warn, Logger};
use ssz::Encode;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId};
use tokio::sync::{mpsc, oneshot};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
/// The maximum size of the channel for work events to the `GossipProcessor`.
mod chain_segment;
pub use chain_segment::ProcessId;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for idle events to the `GossipProcessor`.
/// The maximum size of the channel for idle events to the `BeaconProcessor`.
///
/// Setting this too low will prevent new workers from being spawned. It *should* only need to be
/// set to the CPU count, but we set it high to be safe.
@ -68,6 +74,18 @@ const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384;
/// start dropping them.
const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored
/// before we start dropping them.
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_gossip_processor_manager";
/// The name of the worker tokio tasks.
@ -76,16 +94,56 @@ const WORKER_TASK_NAME: &str = "beacon_gossip_processor_worker";
/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
/// A queued item from gossip, awaiting processing.
struct QueueItem<T> {
message_id: MessageId,
peer_id: PeerId,
item: T,
/// Used to send/receive results from a rpc block import in a blocking task.
pub type BlockResultSender<E> = oneshot::Sender<Result<Hash256, BlockError<E>>>;
pub type BlockResultReceiver<E> = oneshot::Receiver<Result<Hash256, BlockError<E>>>;
/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
queue: VecDeque<T>,
max_length: usize,
}
impl<T> FifoQueue<T> {
/// Create a new, empty queue with the given length.
pub fn new(max_length: usize) -> Self {
Self {
queue: VecDeque::default(),
max_length,
}
}
/// Add a new item to the queue.
///
/// Drops `item` if the queue is full.
pub fn push(&mut self, item: T, item_desc: &str, log: &Logger) {
if self.queue.len() == self.max_length {
error!(
log,
"Block queue full";
"msg" => "the system has insufficient resources for load",
"queue_len" => self.max_length,
"queue" => item_desc,
)
} else {
self.queue.push_back(item);
}
}
/// Remove the next item from the queue.
pub fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
}
/// Returns the current length of the queue.
pub fn len(&self) -> usize {
self.queue.len()
}
}
/// A simple last-in-first-out queue with a maximum length.
struct LifoQueue<T> {
queue: VecDeque<QueueItem<T>>,
queue: VecDeque<T>,
max_length: usize,
}
@ -98,8 +156,10 @@ impl<T> LifoQueue<T> {
}
}
/// Add a new item to the queue.
pub fn push(&mut self, item: QueueItem<T>) {
/// Add a new item to the front of the queue.
///
/// If the queue is full, the item at the back of the queue is dropped.
pub fn push(&mut self, item: T) {
if self.queue.len() == self.max_length {
self.queue.pop_back();
}
@ -107,7 +167,7 @@ impl<T> LifoQueue<T> {
}
/// Remove the next item from the queue.
pub fn pop(&mut self) -> Option<QueueItem<T>> {
pub fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
}
@ -123,10 +183,9 @@ impl<T> LifoQueue<T> {
}
/// An event to be processed by the manager task.
#[derive(Debug, PartialEq)]
#[derive(Debug)]
pub struct WorkEvent<E: EthSpec> {
message_id: MessageId,
peer_id: PeerId,
drop_during_sync: bool,
work: Work<E>,
}
@ -140,9 +199,14 @@ impl<E: EthSpec> WorkEvent<E> {
should_import: bool,
) -> Self {
Self {
message_id,
peer_id,
work: Work::Attestation(Box::new((attestation, subnet_id, should_import))),
drop_during_sync: true,
work: Work::GossipAttestation {
message_id,
peer_id,
attestation: Box::new(attestation),
subnet_id,
should_import,
},
}
}
@ -153,18 +217,92 @@ impl<E: EthSpec> WorkEvent<E> {
aggregate: SignedAggregateAndProof<E>,
) -> Self {
Self {
message_id,
peer_id,
work: Work::Aggregate(Box::new(aggregate)),
drop_during_sync: true,
work: Work::GossipAggregate {
message_id,
peer_id,
aggregate: Box::new(aggregate),
},
}
}
/// Create a new `Work` event for some block.
pub fn gossip_beacon_block(
message_id: MessageId,
peer_id: PeerId,
block: Box<SignedBeaconBlock<E>>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::GossipBlock {
message_id,
peer_id,
block,
},
}
}
/// Create a new `Work` event for some block, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn rpc_beacon_block(block: Box<SignedBeaconBlock<E>>) -> (Self, BlockResultReceiver<E>) {
let (result_tx, result_rx) = oneshot::channel();
let event = Self {
drop_during_sync: false,
work: Work::RpcBlock { block, result_tx },
};
(event, result_rx)
}
/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn chain_segment(process_id: ProcessId, blocks: Vec<SignedBeaconBlock<E>>) -> Self {
Self {
drop_during_sync: false,
work: Work::ChainSegment { process_id, blocks },
}
}
}
/// A consensus message from gossip which requires processing.
#[derive(Debug, PartialEq)]
/// A consensus message (or multiple) from the network that requires processing.
#[derive(Debug)]
pub enum Work<E: EthSpec> {
Attestation(Box<(Attestation<E>, SubnetId, bool)>),
Aggregate(Box<SignedAggregateAndProof<E>>),
GossipAttestation {
message_id: MessageId,
peer_id: PeerId,
attestation: Box<Attestation<E>>,
subnet_id: SubnetId,
should_import: bool,
},
GossipAggregate {
message_id: MessageId,
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<E>>,
},
GossipBlock {
message_id: MessageId,
peer_id: PeerId,
block: Box<SignedBeaconBlock<E>>,
},
RpcBlock {
block: Box<SignedBeaconBlock<E>>,
result_tx: BlockResultSender<E>,
},
ChainSegment {
process_id: ProcessId,
blocks: Vec<SignedBeaconBlock<E>>,
},
}
impl<E: EthSpec> Work<E> {
/// Provides a `&str` that uniquely identifies each enum variant.
fn str_id(&self) -> &'static str {
match self {
Work::GossipAttestation { .. } => "gossip_attestation",
Work::GossipAggregate { .. } => "gossip_aggregate",
Work::GossipBlock { .. } => "gossip_block",
Work::RpcBlock { .. } => "rpc_block",
Work::ChainSegment { .. } => "chain_segment",
}
}
}
/// Provides de-bounce functionality for logging.
@ -190,8 +328,8 @@ impl TimeLatch {
/// that need to be processed by the `BeaconChain`
///
/// See module level documentation for more information.
pub struct GossipProcessor<T: BeaconChainTypes> {
pub beacon_chain: Arc<BeaconChain<T>>,
pub struct BeaconProcessor<T: BeaconChainTypes> {
pub beacon_chain: Weak<BeaconChain<T>>,
pub network_tx: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
pub network_globals: Arc<NetworkGlobals<T::EthSpec>>,
@ -201,7 +339,7 @@ pub struct GossipProcessor<T: BeaconChainTypes> {
pub log: Logger,
}
impl<T: BeaconChainTypes> GossipProcessor<T> {
impl<T: BeaconChainTypes> BeaconProcessor<T> {
/// Spawns the "manager" task which checks the receiver end of the returned `Sender` for
/// messages which contain some new work which will be:
///
@ -210,9 +348,7 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
///
/// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
/// started with `spawn_blocking`.
pub fn spawn_manager(mut self) -> mpsc::Sender<WorkEvent<T::EthSpec>> {
let (event_tx, mut event_rx) =
mpsc::channel::<WorkEvent<T::EthSpec>>(MAX_WORK_EVENT_QUEUE_LEN);
pub fn spawn_manager(mut self, mut event_rx: mpsc::Receiver<WorkEvent<T::EthSpec>>) {
let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
@ -221,6 +357,12 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
let mut attestation_debounce = TimeLatch::default();
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let executor = self.executor.clone();
// The manager future will run on the non-blocking executor and delegate tasks to worker
@ -236,7 +378,6 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
// A worker has finished some work.
new_idle_opt = idle_rx.recv() => {
if new_idle_opt.is_some() {
metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_IDLE_EVENTS_TOTAL);
self.current_workers = self.current_workers.saturating_sub(1);
None
} else {
@ -254,7 +395,6 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
// There is a new piece of work to be handled.
new_work_event_opt = event_rx.recv() => {
if let Some(new_work_event) = new_work_event_opt {
metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORK_EVENTS_TOTAL);
Some(new_work_event)
} else {
// Exit if all event senders have been dropped.
@ -271,30 +411,47 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
};
let _event_timer =
metrics::start_timer(&metrics::GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS);
metrics::start_timer(&metrics::BEACON_PROCESSOR_EVENT_HANDLING_SECONDS);
if let Some(event) = &work_event {
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT,
&[event.work.str_id()],
);
} else {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL);
}
let can_spawn = self.current_workers < self.max_workers;
let drop_during_sync = work_event
.as_ref()
.map_or(false, |event| event.drop_during_sync);
match work_event {
// There is no new work event, but we are able to spawn a new worker.
//
// We don't check the `work.drop_during_sync` here. We assume that if it made
// it into the queue at any point then we should process it.
None if can_spawn => {
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively
// give us more information with less signature verification time.
if let Some(item) = aggregate_queue.pop() {
self.spawn_worker(
idle_tx.clone(),
item.message_id,
item.peer_id,
Work::Aggregate(item.item),
);
// Check for chain segments first, they're the most efficient way to get
// blocks into the system.
if let Some(item) = chain_segment_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
// Check sync blocks before gossip blocks, since we've already explicitly
// requested these blocks.
} else if let Some(item) = rpc_block_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
// Check gossip blocks before gossip attestations, since a block might be
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
// Check the aggregates, *then* the unaggregates
// since we assume that aggregates are more valuable to local validators
// and effectively give us more information with less signature
// verification time.
} else if let Some(item) = aggregate_queue.pop() {
self.spawn_worker(idle_tx.clone(), item);
} else if let Some(item) = attestation_queue.pop() {
self.spawn_worker(
idle_tx.clone(),
item.message_id,
item.peer_id,
Work::Attestation(item.item),
);
self.spawn_worker(idle_tx.clone(), item);
}
}
// There is no new work event and we are unable to spawn a new worker.
@ -307,54 +464,65 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
"msg" => "no new work and cannot spawn worker"
);
}
// There is a new work event, but the chain is syncing. Ignore it.
Some(WorkEvent { .. })
if self.network_globals.sync_state.read().is_syncing() =>
// The chain is syncing and this event should be dropped during sync.
Some(work_event)
if self.network_globals.sync_state.read().is_syncing()
&& drop_during_sync =>
{
metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORK_EVENTS_IGNORED_TOTAL);
let work_id = work_event.work.str_id();
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT,
&[work_id],
);
trace!(
self.log,
"Gossip processor skipping work";
"msg" => "chain is syncing"
"msg" => "chain is syncing",
"work_id" => work_id
);
}
// There is a new work event and the chain is not syncing. Process it.
Some(WorkEvent {
message_id,
peer_id,
work,
}) => match work {
Work::Attestation(_) if can_spawn => {
self.spawn_worker(idle_tx.clone(), message_id, peer_id, work)
Some(WorkEvent { work, .. }) => {
let work_id = work.str_id();
match work {
_ if can_spawn => self.spawn_worker(idle_tx.clone(), work),
Work::GossipAttestation { .. } => attestation_queue.push(work),
Work::GossipAggregate { .. } => aggregate_queue.push(work),
Work::GossipBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
}
}
Work::Attestation(attestation) => attestation_queue.push(QueueItem {
message_id,
peer_id,
item: attestation,
}),
Work::Aggregate(_) if can_spawn => {
self.spawn_worker(idle_tx.clone(), message_id, peer_id, work)
}
Work::Aggregate(aggregate) => aggregate_queue.push(QueueItem {
message_id,
peer_id,
item: aggregate,
}),
},
}
}
metrics::set_gauge(
&metrics::GOSSIP_PROCESSOR_WORKERS_ACTIVE_TOTAL,
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL,
self.current_workers as i64,
);
metrics::set_gauge(
&metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL,
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL,
attestation_queue.len() as i64,
);
metrics::set_gauge(
&metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL,
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL,
aggregate_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL,
gossip_block_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL,
rpc_block_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
chain_segment_queue.len() as i64,
);
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
@ -378,62 +546,77 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
// Spawn on the non-blocking executor.
executor.spawn(manager_future, MANAGER_TASK_NAME);
event_tx
}
/// Spawns a blocking worker thread to process some `Work`.
///
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
fn spawn_worker(
&mut self,
mut idle_tx: mpsc::Sender<()>,
message_id: MessageId,
peer_id: PeerId,
work: Work<T::EthSpec>,
) {
let worker_timer = metrics::start_timer(&metrics::GOSSIP_PROCESSOR_WORKER_TIME);
metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORKERS_SPAWNED_TOTAL);
fn spawn_worker(&mut self, mut idle_tx: mpsc::Sender<()>, work: Work<T::EthSpec>) {
let work_id = work.str_id();
let worker_timer =
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL);
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT,
&[work.str_id()],
);
let worker_id = self.current_workers;
self.current_workers = self.current_workers.saturating_add(1);
let chain = self.beacon_chain.clone();
let chain = if let Some(chain) = self.beacon_chain.upgrade() {
chain
} else {
debug!(
self.log,
"Beacon chain dropped, shutting down";
);
return;
};
let network_tx = self.network_tx.clone();
let sync_tx = self.sync_tx.clone();
let log = self.log.clone();
let executor = self.executor.clone();
trace!(
self.log,
"Spawning beacon processor worker";
"work" => work_id,
"worker" => worker_id,
);
executor.spawn_blocking(
move || {
let _worker_timer = worker_timer;
let inner_log = log.clone();
// We use this closure pattern to avoid using a `return` that prevents the
// `idle_tx` message from sending.
let handler = || {
let log = inner_log.clone();
match work {
/*
* Unaggregated attestation verification.
*/
Work::Attestation(boxed_tuple) => {
let (attestation, subnet_id, should_import) = *boxed_tuple;
let _attestation_timer = metrics::start_timer(
&metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_WORKER_TIME,
);
metrics::inc_counter(
&metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
Work::GossipAttestation {
message_id,
peer_id,
attestation,
subnet_id,
should_import,
} => {
let beacon_block_root = attestation.data.beacon_block_root;
let attestation = match chain
.verify_unaggregated_attestation_for_gossip(attestation, subnet_id)
.verify_unaggregated_attestation_for_gossip(*attestation, subnet_id)
{
Ok(attestation) => attestation,
Err(e) => {
handle_attestation_verification_failure(
&log,
sync_tx,
peer_id.clone(),
peer_id,
beacon_block_root,
"unaggregated",
e,
@ -451,7 +634,7 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
}
metrics::inc_counter(
&metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) {
@ -484,44 +667,44 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
);
}
/*
* Aggregated attestation verification.
*/
Work::Aggregate(boxed_aggregate) => {
let _attestation_timer = metrics::start_timer(
&metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_WORKER_TIME,
);
metrics::inc_counter(
&metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
Work::GossipAggregate {
message_id,
peer_id,
aggregate,
} => {
let beacon_block_root =
boxed_aggregate.message.aggregate.data.beacon_block_root;
aggregate.message.aggregate.data.beacon_block_root;
let aggregate = match chain
.verify_aggregated_attestation_for_gossip(*boxed_aggregate)
{
Ok(aggregate) => aggregate,
Err(e) => {
handle_attestation_verification_failure(
&log,
sync_tx,
peer_id.clone(),
beacon_block_root,
"aggregated",
e,
);
return;
}
};
let aggregate =
match chain.verify_aggregated_attestation_for_gossip(*aggregate) {
Ok(aggregate) => aggregate,
Err(e) => {
handle_attestation_verification_failure(
&log,
sync_tx,
peer_id,
beacon_block_root,
"aggregated",
e,
);
return;
}
};
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log);
metrics::inc_counter(
&metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) {
@ -554,11 +737,151 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
"beacon_block_root" => format!("{:?}", beacon_block_root)
)
}
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
);
}
/*
* Verification for beacon blocks received on gossip.
*/
Work::GossipBlock {
message_id,
peer_id,
block,
} => {
let verified_block = match chain.verify_block_for_gossip(*block) {
Ok(verified_block) => {
info!(
log,
"New block received";
"slot" => verified_block.block.slot(),
"hash" => verified_block.block_root.to_string()
);
propagate_gossip_message(
network_tx,
message_id,
peer_id.clone(),
&log,
);
verified_block
}
Err(BlockError::ParentUnknown(block)) => {
send_sync_message(
sync_tx,
SyncMessage::UnknownBlock(peer_id, block),
&log,
);
return;
}
Err(e) => {
warn!(
log,
"Could not verify block for gossip";
"error" => format!("{:?}", e)
);
return;
}
};
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL,
);
let block = Box::new(verified_block.block.clone());
match chain.process_block(verified_block) {
Ok(_block_root) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
);
trace!(
log,
"Gossipsub block processed";
"peer_id" => peer_id.to_string()
);
// TODO: It would be better if we can run this _after_ we publish the block to
// reduce block propagation latency.
//
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"location" => "block gossip"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "block gossip"
),
}
}
Err(BlockError::ParentUnknown { .. }) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
error!(
log,
"Block with unknown parent attempted to be processed";
"peer_id" => peer_id.to_string()
);
send_sync_message(
sync_tx,
SyncMessage::UnknownBlock(peer_id, block),
&log,
);
}
other => {
debug!(
log,
"Invalid gossip beacon block";
"outcome" => format!("{:?}", other),
"block root" => format!("{}", block.canonical_root()),
"block slot" => block.slot()
);
trace!(
log,
"Invalid gossip beacon block ssz";
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
);
}
};
}
/*
* Verification for beacon blocks received during syncing via RPC.
*/
Work::RpcBlock { block, result_tx } => {
let block_result = chain.process_block(*block);
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL,
);
if result_tx.send(block_result).is_err() {
crit!(log, "Failed return sync block result");
}
}
/*
* Verification for a chain segment (multiple blocks).
*/
Work::ChainSegment { process_id, blocks } => {
handle_chain_segment(chain, process_id, blocks, sync_tx, log)
}
};
};
handler();
trace!(
log,
"Beacon processor worker done";
"work" => work_id,
"worker" => worker_id,
);
idle_tx.try_send(()).unwrap_or_else(|e| {
crit!(
log,
@ -596,6 +919,19 @@ fn propagate_gossip_message<E: EthSpec>(
});
}
/// Send a message to `sync_tx`.
///
/// Creates a log if there is an interal error.
fn send_sync_message<E: EthSpec>(
sync_tx: mpsc::UnboundedSender<SyncMessage<E>>,
message: SyncMessage<E>,
log: &Logger,
) {
sync_tx
.send(message)
.unwrap_or_else(|_| error!(log, "Could not send message to the sync service"));
}
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
pub fn handle_attestation_verification_failure<E: EthSpec>(

View File

@ -6,6 +6,7 @@ pub mod error;
pub mod service;
mod attestation_service;
mod beacon_processor;
mod metrics;
mod persisted_dht;
mod router;

View File

@ -49,64 +49,101 @@ lazy_static! {
/*
* Gossip processor
*/
pub static ref GOSSIP_PROCESSOR_WORKERS_SPAWNED_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_workers_spawned_total",
pub static ref BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT: Result<IntCounterVec> = try_create_int_counter_vec(
"beacon_processor_work_events_rx_count",
"Count of work events received (but not necessarily processed)",
&["type"]
);
pub static ref BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT: Result<IntCounterVec> = try_create_int_counter_vec(
"beacon_processor_work_events_ignored_count",
"Count of work events purposefully ignored",
&["type"]
);
pub static ref BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT: Result<IntCounterVec> = try_create_int_counter_vec(
"beacon_processor_work_events_started_count",
"Count of work events which have been started by a worker",
&["type"]
);
pub static ref BEACON_PROCESSOR_WORKER_TIME: Result<HistogramVec> = try_create_histogram_vec(
"beacon_processor_worker_time",
"Time taken for a worker to fully process some parcel of work.",
&["type"]
);
pub static ref BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_workers_spawned_total",
"The number of workers ever spawned by the gossip processing pool."
);
pub static ref GOSSIP_PROCESSOR_WORKERS_ACTIVE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"gossip_processor_workers_active_total",
pub static ref BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_workers_active_total",
"Count of active workers in the gossip processing pool."
);
pub static ref GOSSIP_PROCESSOR_WORK_EVENTS_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_work_events_total",
"Count of work events processed by the gossip processor manager."
);
pub static ref GOSSIP_PROCESSOR_WORK_EVENTS_IGNORED_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_work_events_ignored_total",
"Count of work events processed by the gossip processor manager."
);
pub static ref GOSSIP_PROCESSOR_IDLE_EVENTS_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_idle_events_total",
pub static ref BEACON_PROCESSOR_IDLE_EVENTS_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_idle_events_total",
"Count of idle events processed by the gossip processor manager."
);
pub static ref GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS: Result<Histogram> = try_create_histogram(
"gossip_processor_event_handling_seconds",
pub static ref BEACON_PROCESSOR_EVENT_HANDLING_SECONDS: Result<Histogram> = try_create_histogram(
"beacon_processor_event_handling_seconds",
"Time spent handling a new message and allocating it to a queue or worker."
);
pub static ref GOSSIP_PROCESSOR_WORKER_TIME: Result<Histogram> = try_create_histogram(
"gossip_processor_worker_time",
"Time taken for a worker to fully process some parcel of work."
// Gossip blocks.
pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_gossip_block_queue_total",
"Count of blocks from gossip waiting to be verified."
);
pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"gossip_processor_unaggregated_attestation_queue_total",
pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_gossip_block_verified_total",
"Total number of gossip blocks verified for propagation."
);
pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_gossip_block_imported_total",
"Total number of gossip blocks imported to fork choice, etc."
);
// Rpc blocks.
pub static ref BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_block_queue_total",
"Count of blocks from the rpc waiting to be verified."
);
pub static ref BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_rpc_block_imported_total",
"Total number of gossip blocks imported to fork choice, etc."
);
// Chain segments.
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_chain_segment_queue_total",
"Count of chain segments from the rpc waiting to be verified."
);
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_chain_segment_success_total",
"Total number of chain segments successfully processed."
);
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_chain_segment_failed_total",
"Total number of chain segments that failed processing."
);
// Unaggregated attestations.
pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_unaggregated_attestation_queue_total",
"Count of unagg. attestations waiting to be processed."
);
pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_WORKER_TIME: Result<Histogram> = try_create_histogram(
"gossip_processor_unaggregated_attestation_worker_time",
"Time taken for a worker to fully process an unaggregated attestation."
);
pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_unaggregated_attestation_verified_total",
pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_unaggregated_attestation_verified_total",
"Total number of unaggregated attestations verified for gossip."
);
pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_unaggregated_attestation_imported_total",
pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_unaggregated_attestation_imported_total",
"Total number of unaggregated attestations imported to fork choice, etc."
);
pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"gossip_processor_aggregated_attestation_queue_total",
// Aggregated attestations.
pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_aggregated_attestation_queue_total",
"Count of agg. attestations waiting to be processed."
);
pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_WORKER_TIME: Result<Histogram> = try_create_histogram(
"gossip_processor_aggregated_attestation_worker_time",
"Time taken for a worker to fully process an aggregated attestation."
);
pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_aggregated_attestation_verified_total",
pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_aggregated_attestation_verified_total",
"Total number of aggregated attestations verified for gossip."
);
pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"gossip_processor_aggregated_attestation_imported_total",
pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_aggregated_attestation_imported_total",
"Total number of aggregated attestations imported to fork choice, etc."
);

View File

@ -5,19 +5,18 @@
//! syncing-related responses to the Sync manager.
#![allow(clippy::unit_arg)]
pub mod gossip_processor;
pub mod processor;
use crate::error;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{
rpc::{RPCError, RequestId},
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use futures::prelude::*;
use processor::Processor;
use slog::{debug, info, o, trace, warn};
use slog::{debug, o, trace, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
@ -229,21 +228,7 @@ impl<T: BeaconChainTypes> Router<T> {
);
}
PubsubMessage::BeaconBlock(block) => {
match self.processor.should_forward_block(block) {
Ok(verified_block) => {
info!(self.log, "New block received"; "slot" => verified_block.block.slot(), "hash" => verified_block.block_root.to_string());
self.propagate_message(id, peer_id.clone());
self.processor.on_block_gossip(peer_id, verified_block);
}
Err(BlockError::ParentUnknown(block)) => {
self.processor.on_unknown_parent(peer_id, block);
}
Err(e) => {
// performing a parent lookup
warn!(self.log, "Could not verify block for gossip";
"error" => format!("{:?}", e));
}
}
self.processor.on_block_gossip(id, peer_id, block);
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id));

View File

@ -1,17 +1,15 @@
use super::gossip_processor::{GossipProcessor, WorkEvent as GossipWorkEvent};
use crate::beacon_processor::{
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::service::NetworkMessage;
use crate::sync::{PeerSyncInfo, SyncMessage};
use beacon_chain::{
observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes, BlockError,
GossipVerifiedBlock,
};
use beacon_chain::{observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::*;
use eth2_libp2p::{
MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response,
};
use itertools::process_results;
use slog::{debug, error, o, trace, warn};
use ssz::Encode;
use state_processing::SigVerifiedOp;
use std::cmp;
use std::sync::Arc;
@ -36,8 +34,8 @@ pub struct Processor<T: BeaconChainTypes> {
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for consensus gossip messages.
gossip_processor_send: mpsc::Sender<GossipWorkEvent<T::EthSpec>>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
/// The `RPCHandler` logger.
log: slog::Logger,
}
@ -52,6 +50,8 @@ impl<T: BeaconChainTypes> Processor<T> {
log: &slog::Logger,
) -> Self {
let sync_logger = log.new(o!("service"=> "sync"));
let (beacon_processor_send, beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
// spawn the sync thread
let sync_send = crate::sync::manager::spawn(
@ -59,11 +59,12 @@ impl<T: BeaconChainTypes> Processor<T> {
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
beacon_processor_send.clone(),
sync_logger,
);
let gossip_processor_send = GossipProcessor {
beacon_chain: beacon_chain.clone(),
BeaconProcessor {
beacon_chain: Arc::downgrade(&beacon_chain),
network_tx: network_send.clone(),
sync_tx: sync_send.clone(),
network_globals,
@ -72,13 +73,13 @@ impl<T: BeaconChainTypes> Processor<T> {
current_workers: 0,
log: log.clone(),
}
.spawn_manager();
.spawn_manager(beacon_processor_receive);
Processor {
chain: beacon_chain,
sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()),
gossip_processor_send,
beacon_processor_send,
log: log.clone(),
}
}
@ -513,23 +514,6 @@ impl<T: BeaconChainTypes> Processor<T> {
}
}
/// Template function to be called on a block to determine if the block should be propagated
/// across the network.
pub fn should_forward_block(
&mut self,
block: Box<SignedBeaconBlock<T::EthSpec>>,
) -> Result<GossipVerifiedBlock<T>, BlockError<T::EthSpec>> {
self.chain.verify_block_for_gossip(*block)
}
pub fn on_unknown_parent(
&mut self,
peer_id: PeerId,
block: Box<SignedBeaconBlock<T::EthSpec>>,
) {
self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block));
}
/// Process a gossip message declaring a new block.
///
/// Attempts to apply to block to the beacon chain. May queue the block for later processing.
@ -537,65 +521,22 @@ impl<T: BeaconChainTypes> Processor<T> {
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
pub fn on_block_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
verified_block: GossipVerifiedBlock<T>,
) -> bool {
let block = Box::new(verified_block.block.clone());
match self.chain.process_block(verified_block) {
Ok(_block_root) => {
trace!(
self.log,
"Gossipsub block processed";
"peer_id" => peer_id.to_string()
);
// TODO: It would be better if we can run this _after_ we publish the block to
// reduce block propagation latency.
//
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
match self.chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "block gossip"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "block gossip"
),
}
}
Err(BlockError::ParentUnknown { .. }) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
block: Box<SignedBeaconBlock<T::EthSpec>>,
) {
self.beacon_processor_send
.try_send(BeaconWorkEvent::gossip_beacon_block(
message_id, peer_id, block,
))
.unwrap_or_else(|e| {
error!(
self.log,
"Block with unknown parent attempted to be processed";
"peer_id" => peer_id.to_string()
);
self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block));
}
other => {
warn!(
self.log,
"Invalid gossip beacon block";
"outcome" => format!("{:?}", other),
"block root" => format!("{}", block.canonical_root()),
"block slot" => block.slot()
);
trace!(
self.log,
"Invalid gossip beacon block ssz";
"ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())),
);
}
}
// TODO: Update with correct block gossip checking
true
&self.log,
"Unable to send to gossip processor";
"type" => "block gossip",
"error" => e.to_string(),
)
})
}
pub fn on_unaggregated_attestation_gossip(
@ -606,8 +547,8 @@ impl<T: BeaconChainTypes> Processor<T> {
subnet_id: SubnetId,
should_process: bool,
) {
self.gossip_processor_send
.try_send(GossipWorkEvent::unaggregated_attestation(
self.beacon_processor_send
.try_send(BeaconWorkEvent::unaggregated_attestation(
message_id,
peer_id,
unaggregated_attestation,
@ -630,8 +571,8 @@ impl<T: BeaconChainTypes> Processor<T> {
peer_id: PeerId,
aggregate: SignedAggregateAndProof<T::EthSpec>,
) {
self.gossip_processor_send
.try_send(GossipWorkEvent::aggregated_attestation(
self.beacon_processor_send
.try_send(BeaconWorkEvent::aggregated_attestation(
message_id, peer_id, aggregate,
))
.unwrap_or_else(|e| {

View File

@ -33,11 +33,11 @@
//! if an attestation references an unknown block) this manager can search for the block and
//! subsequently search for parents if needed.
use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId};
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{PeerSyncInfo, PeerSyncType};
use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH};
use super::RequestId;
use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason};
@ -109,6 +109,18 @@ pub enum SyncMessage<T: EthSpec> {
ParentLookupFailed(PeerId),
}
/// The result of processing a multiple blocks (a chain segment).
// TODO: When correct batch error handling occurs, we will include an error type.
#[derive(Debug)]
pub enum BatchProcessResult {
/// The batch was completed successfully.
Success,
/// The batch processing failed.
Failed,
/// The batch processing failed but managed to import at least one block.
Partial,
}
/// Maintains a sequential list of parents to lookup and the lookup's current state.
struct ParentRequests<T: EthSpec> {
/// The blocks that have currently been downloaded.
@ -158,8 +170,8 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// The logger for the import manager.
log: Logger,
/// The sending part of input_channel
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
}
/// Object representing a single block lookup request.
@ -187,6 +199,7 @@ pub fn spawn<T: BeaconChainTypes>(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
log: slog::Logger,
) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
assert!(
@ -201,7 +214,7 @@ pub fn spawn<T: BeaconChainTypes>(
range_sync: RangeSync::new(
beacon_chain.clone(),
network_globals.clone(),
sync_send.clone(),
beacon_processor_send.clone(),
log.clone(),
),
network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()),
@ -211,7 +224,7 @@ pub fn spawn<T: BeaconChainTypes>(
parent_queue: SmallVec::new(),
single_block_lookups: FnvHashMap::default(),
log: log.clone(),
sync_send: sync_send.clone(),
beacon_processor_send,
};
// spawn the sync manager thread
@ -300,7 +313,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// There are two reasons we could have received a BlocksByRoot response
/// - We requested a single hash and have received a response for the single_block_lookup
/// - We are looking up parent blocks in parent lookup search
fn blocks_by_root_response(
async fn blocks_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
@ -318,7 +331,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
single_block_hash = Some(block_request.hash);
}
if let Some(block_hash) = single_block_hash {
self.single_block_lookup_response(peer_id, block, block_hash);
self.single_block_lookup_response(peer_id, block, block_hash)
.await;
return;
}
@ -340,7 +354,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// add the block to response
parent_request.downloaded_blocks.push(block);
// queue for processing
self.process_parent_request(parent_request);
self.process_parent_request(parent_request).await;
}
None => {
// this is a stream termination
@ -381,10 +395,40 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
async fn process_block_async(
&mut self,
block: SignedBeaconBlock<T::EthSpec>,
) -> Option<Result<Hash256, BlockError<T::EthSpec>>> {
let (event, rx) = BeaconWorkEvent::rpc_beacon_block(Box::new(block));
match self.beacon_processor_send.try_send(event) {
Ok(_) => {}
Err(e) => {
error!(
self.log,
"Failed to send sync block to processor";
"error" => format!("{:?}", e)
);
return None;
}
}
match rx.await {
Ok(block_result) => Some(block_result),
Err(_) => {
warn!(
self.log,
"Sync block not processed";
"msg" => "likely due to system resource exhaustion"
);
None
}
}
}
/// Processes the response obtained from a single block lookup search. If the block is
/// processed or errors, the search ends. If the blocks parent is unknown, a block parent
/// lookup search is started.
fn single_block_lookup_response(
async fn single_block_lookup_response(
&mut self,
peer_id: PeerId,
block: SignedBeaconBlock<T::EthSpec>,
@ -399,8 +443,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return;
}
let block_result = match self.process_block_async(block.clone()).await {
Some(block_result) => block_result,
None => return,
};
// we have the correct block, try and process it
match self.chain.process_block(block.clone()) {
match block_result {
Ok(block_root) => {
info!(self.log, "Processed block"; "block" => format!("{}", block_root));
@ -599,7 +648,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// manager
/// A new block has been received for a parent lookup query, process it.
fn process_parent_request(&mut self, mut parent_request: ParentRequests<T::EthSpec>) {
async fn process_parent_request(&mut self, mut parent_request: ParentRequests<T::EthSpec>) {
// verify the last added block is the parent of the last requested block
if parent_request.downloaded_blocks.len() < 2 {
@ -652,7 +701,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.downloaded_blocks
.pop()
.expect("There is always at least one block in the queue");
match self.chain.process_block(newest_block.clone()) {
let block_result = match self.process_block_async(newest_block.clone()).await {
Some(block_result) => block_result,
None => return,
};
match block_result {
Err(BlockError::ParentUnknown { .. }) => {
// need to keep looking for parents
// add the block back to the queue and continue the search
@ -660,13 +715,23 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.request_parent(parent_request);
}
Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => {
spawn_block_processor(
Arc::downgrade(&self.chain),
ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()),
parent_request.downloaded_blocks,
self.sync_send.clone(),
self.log.clone(),
);
let process_id =
ProcessId::ParentLookup(parent_request.last_submitted_peer.clone());
let blocks = parent_request.downloaded_blocks;
match self
.beacon_processor_send
.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{
Ok(_) => {}
Err(e) => {
error!(
self.log,
"Failed to send chain segment to processor";
"error" => format!("{:?}", e)
);
}
}
}
Err(outcome) => {
// all else we consider the chain a failure and downvote the peer that sent
@ -760,7 +825,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
request_id,
beacon_block,
} => {
self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b));
self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b))
.await;
}
SyncMessage::UnknownBlock(peer_id, block) => {
self.add_unknown_block(peer_id, *block);

View File

@ -1,14 +1,14 @@
//! Syncing for lighthouse.
//!
//! Stores the various syncing methods for the beacon chain.
mod block_processor;
pub mod manager;
mod network_context;
mod peer_sync_info;
mod range_sync;
pub use manager::SyncMessage;
pub use manager::{BatchProcessResult, SyncMessage};
pub use peer_sync_info::PeerSyncInfo;
pub use range_sync::{BatchId, ChainId};
/// Type of id of rpc requests sent by sync
pub type RequestId = usize;

View File

@ -1,11 +1,12 @@
use super::batch::{Batch, BatchId, PendingBatches};
use crate::sync::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::{RequestId, SyncMessage};
use crate::beacon_processor::ProcessId;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::RequestId;
use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{PeerAction, PeerId};
use rand::prelude::*;
use slog::{crit, debug, warn};
use slog::{crit, debug, error, warn};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::mpsc;
@ -84,9 +85,8 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The current processing batch, if any.
current_processing_batch: Option<Batch<T::EthSpec>>,
/// A send channel to the sync manager. This is given to the batch processor thread to report
/// back once batch processing has completed.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
@ -111,7 +111,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
chain: Arc<BeaconChain<T>>,
log: slog::Logger,
) -> Self {
@ -131,7 +131,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
to_be_processed_id: BatchId(1),
state: ChainSyncingState::Stopped,
current_processing_batch: None,
sync_send,
beacon_processor_send,
chain,
log,
}
@ -255,18 +255,23 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
/// Sends a batch to the batch processor.
/// Sends a batch to the beacon processor for async processing in a queue.
fn process_batch(&mut self, mut batch: Batch<T::EthSpec>) {
let downloaded_blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new());
let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new());
let process_id = ProcessId::RangeBatchId(self.id, batch.id);
self.current_processing_batch = Some(batch);
spawn_block_processor(
Arc::downgrade(&self.chain.clone()),
process_id,
downloaded_blocks,
self.sync_send.clone(),
self.log.clone(),
);
if let Err(e) = self
.beacon_processor_send
.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{
error!(
self.log,
"Failed to send chain segment to processor";
"msg" => "process_batch",
"error" => format!("{:?}", e)
);
}
}
/// The block processor has completed processing a batch. This function handles the result

View File

@ -4,7 +4,7 @@
//! with this struct to to simplify the logic of the other layers of sync.
use super::chain::{ChainSyncingState, SyncingChain};
use crate::sync::manager::SyncMessage;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
@ -302,7 +302,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_head: Hash256,
target_slot: Slot,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
) {
let chain_id = rand::random();
self.finalized_chains.push(SyncingChain::new(
@ -311,7 +311,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_slot,
target_head,
peer_id,
sync_send,
beacon_processor_send,
self.beacon_chain.clone(),
self.log.clone(),
));
@ -326,7 +326,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_head: Hash256,
target_slot: Slot,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
) {
// remove the peer from any other head chains
@ -342,7 +342,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_slot,
target_head,
peer_id,
sync_send,
beacon_processor_send,
self.beacon_chain.clone(),
self.log.clone(),
);

View File

@ -43,9 +43,9 @@ use super::chain::{ChainId, ProcessingResult};
use super::chain_collection::{ChainCollection, RangeSyncState};
use super::sync_type::RangeSyncType;
use super::BatchId;
use crate::sync::block_processor::BatchProcessResult;
use crate::sync::manager::SyncMessage;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
use crate::sync::PeerSyncInfo;
use crate::sync::RequestId;
use beacon_chain::{BeaconChain, BeaconChainTypes};
@ -69,9 +69,8 @@ pub struct RangeSync<T: BeaconChainTypes> {
/// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before
/// the head chains are formed and downloaded.
awaiting_head_peers: HashSet<PeerId>,
/// The sync manager channel, allowing the batch processor thread to callback the sync task
/// once complete.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
/// The syncing logger.
log: slog::Logger,
}
@ -80,14 +79,14 @@ impl<T: BeaconChainTypes> RangeSync<T> {
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
log: slog::Logger,
) -> Self {
RangeSync {
beacon_chain: beacon_chain.clone(),
chains: ChainCollection::new(beacon_chain, network_globals, log.clone()),
awaiting_head_peers: HashSet::new(),
sync_send,
beacon_processor_send,
log,
}
}
@ -181,7 +180,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
remote_info.finalized_root,
remote_finalized_slot,
peer_id,
self.sync_send.clone(),
self.beacon_processor_send.clone(),
);
self.chains.update_finalized(network);
// update the global sync state
@ -228,7 +227,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
remote_info.head_root,
remote_info.head_slot,
peer_id,
self.sync_send.clone(),
self.beacon_processor_send.clone(),
);
}
self.chains.update_finalized(network);

View File

@ -57,8 +57,8 @@
use prometheus::{HistogramOpts, HistogramTimer, Opts};
pub use prometheus::{
Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result,
TextEncoder,
Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec, Result, TextEncoder,
};
/// Collect all the metrics for reporting.
@ -66,7 +66,7 @@ pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
prometheus::gather()
}
/// Attempts to crate an `IntCounter`, returning `Err` if the registry does not accept the counter
/// Attempts to create an `IntCounter`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_int_counter(name: &str, help: &str) -> Result<IntCounter> {
let opts = Opts::new(name, help);
@ -75,7 +75,7 @@ pub fn try_create_int_counter(name: &str, help: &str) -> Result<IntCounter> {
Ok(counter)
}
/// Attempts to crate an `IntGauge`, returning `Err` if the registry does not accept the counter
/// Attempts to create an `IntGauge`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_int_gauge(name: &str, help: &str) -> Result<IntGauge> {
let opts = Opts::new(name, help);
@ -84,7 +84,7 @@ pub fn try_create_int_gauge(name: &str, help: &str) -> Result<IntGauge> {
Ok(gauge)
}
/// Attempts to crate a `Gauge`, returning `Err` if the registry does not accept the counter
/// Attempts to create a `Gauge`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_float_gauge(name: &str, help: &str) -> Result<Gauge> {
let opts = Opts::new(name, help);
@ -93,7 +93,7 @@ pub fn try_create_float_gauge(name: &str, help: &str) -> Result<Gauge> {
Ok(gauge)
}
/// Attempts to crate a `Histogram`, returning `Err` if the registry does not accept the counter
/// Attempts to create a `Histogram`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_histogram(name: &str, help: &str) -> Result<Histogram> {
let opts = HistogramOpts::new(name, help);
@ -102,7 +102,7 @@ pub fn try_create_histogram(name: &str, help: &str) -> Result<Histogram> {
Ok(histogram)
}
/// Attempts to crate a `HistogramVec`, returning `Err` if the registry does not accept the counter
/// Attempts to create a `HistogramVec`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_histogram_vec(
name: &str,
@ -115,7 +115,7 @@ pub fn try_create_histogram_vec(
Ok(histogram_vec)
}
/// Attempts to crate a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge
/// Attempts to create a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge
/// (potentially due to naming conflict).
pub fn try_create_int_gauge_vec(
name: &str,
@ -128,7 +128,7 @@ pub fn try_create_int_gauge_vec(
Ok(counter_vec)
}
/// Attempts to crate a `GaugeVec`, returning `Err` if the registry does not accept the gauge
/// Attempts to create a `GaugeVec`, returning `Err` if the registry does not accept the gauge
/// (potentially due to naming conflict).
pub fn try_create_float_gauge_vec(
name: &str,
@ -141,6 +141,20 @@ pub fn try_create_float_gauge_vec(
Ok(counter_vec)
}
/// Attempts to create a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge
/// (potentially due to naming conflict).
pub fn try_create_int_counter_vec(
name: &str,
help: &str,
label_names: &[&str],
) -> Result<IntCounterVec> {
let opts = Opts::new(name, help);
let counter_vec = IntCounterVec::new(opts, label_names)?;
prometheus::register(Box::new(counter_vec.clone()))?;
Ok(counter_vec)
}
/// If `int_gauge_vec.is_ok()`, returns a gauge with the given `name`.
pub fn get_int_gauge(int_gauge_vec: &Result<IntGaugeVec>, name: &[&str]) -> Option<IntGauge> {
if let Ok(int_gauge_vec) = int_gauge_vec {
Some(int_gauge_vec.get_metric_with_label_values(name).ok()?)
@ -149,6 +163,26 @@ pub fn get_int_gauge(int_gauge_vec: &Result<IntGaugeVec>, name: &[&str]) -> Opti
}
}
/// If `int_counter_vec.is_ok()`, returns a counter with the given `name`.
pub fn get_int_counter(
int_counter_vec: &Result<IntCounterVec>,
name: &[&str],
) -> Option<IntCounter> {
if let Ok(int_counter_vec) = int_counter_vec {
Some(int_counter_vec.get_metric_with_label_values(name).ok()?)
} else {
None
}
}
/// Increments the `int_counter_vec` with the given `name`.
pub fn inc_counter_vec(int_counter_vec: &Result<IntCounterVec>, name: &[&str]) {
if let Some(counter) = get_int_counter(int_counter_vec, name) {
counter.inc()
}
}
/// If `histogram_vec.is_ok()`, returns a histogram with the given `name`.
pub fn get_histogram(histogram_vec: &Result<HistogramVec>, name: &[&str]) -> Option<Histogram> {
if let Ok(histogram_vec) = histogram_vec {
Some(histogram_vec.get_metric_with_label_values(name).ok()?)
@ -157,6 +191,11 @@ pub fn get_histogram(histogram_vec: &Result<HistogramVec>, name: &[&str]) -> Opt
}
}
/// Starts a timer on `vec` with the given `name`.
pub fn start_timer_vec(vec: &Result<HistogramVec>, name: &[&str]) -> Option<HistogramTimer> {
get_histogram(vec, name).map(|h| h.start_timer())
}
/// Starts a timer for the given `Histogram`, stopping when it gets dropped or given to `stop_timer(..)`.
pub fn start_timer(histogram: &Result<Histogram>) -> Option<HistogramTimer> {
if let Ok(histogram) = histogram {