Merge branch 'block-processing-times' into bootstrap

This commit is contained in:
Paul Hauner 2019-08-14 12:32:16 +10:00
commit f2dedfac50
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
39 changed files with 854 additions and 764 deletions

View File

@ -11,6 +11,7 @@ members = [
"eth2/utils/eth2_interop_keypairs", "eth2/utils/eth2_interop_keypairs",
"eth2/utils/logging", "eth2/utils/logging",
"eth2/utils/eth2_hashing", "eth2/utils/eth2_hashing",
"eth2/utils/lighthouse_metrics",
"eth2/utils/merkle_proof", "eth2/utils/merkle_proof",
"eth2/utils/int_to_bytes", "eth2/utils/int_to_bytes",
"eth2/utils/serde_hex", "eth2/utils/serde_hex",
@ -25,7 +26,6 @@ members = [
"beacon_node", "beacon_node",
"beacon_node/store", "beacon_node/store",
"beacon_node/client", "beacon_node/client",
"beacon_node/http_server",
"beacon_node/rest_api", "beacon_node/rest_api",
"beacon_node/network", "beacon_node/network",
"beacon_node/eth2-libp2p", "beacon_node/eth2-libp2p",

View File

@ -7,7 +7,8 @@ edition = "2018"
[dependencies] [dependencies]
store = { path = "../store" } store = { path = "../store" }
parking_lot = "0.7" parking_lot = "0.7"
prometheus = "^0.6" lazy_static = "1.3.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
log = "0.4" log = "0.4"
operation_pool = { path = "../../eth2/operation_pool" } operation_pool = { path = "../../eth2/operation_pool" }
serde = "1.0" serde = "1.0"

View File

@ -2,7 +2,7 @@ use crate::checkpoint::CheckPoint;
use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator}; use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator};
use crate::metrics::Metrics; use crate::metrics;
use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
use lmd_ghost::LmdGhost; use lmd_ghost::LmdGhost;
use log::trace; use log::trace;
@ -106,8 +106,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// A state-machine that is updated with information from the network and chooses a canonical /// A state-machine that is updated with information from the network and chooses a canonical
/// head block. /// head block.
pub fork_choice: ForkChoice<T>, pub fork_choice: ForkChoice<T>,
/// Stores metrics about this `BeaconChain`.
pub metrics: Metrics,
/// Logging to CLI, etc. /// Logging to CLI, etc.
log: Logger, log: Logger,
} }
@ -157,7 +155,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
canonical_head, canonical_head,
genesis_block_root, genesis_block_root,
fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root), fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root),
metrics: Metrics::new()?,
store, store,
log, log,
}) })
@ -195,7 +192,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
canonical_head: RwLock::new(p.canonical_head), canonical_head: RwLock::new(p.canonical_head),
state: RwLock::new(p.state), state: RwLock::new(p.state),
genesis_block_root: p.genesis_block_root, genesis_block_root: p.genesis_block_root,
metrics: Metrics::new()?,
store, store,
log, log,
})) }))
@ -203,6 +199,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Attempt to save this instance to `self.store`. /// Attempt to save this instance to `self.store`.
pub fn persist(&self) -> Result<(), Error> { pub fn persist(&self) -> Result<(), Error> {
let timer = metrics::start_timer(&metrics::PERSIST_CHAIN);
let p: PersistedBeaconChain<T> = PersistedBeaconChain { let p: PersistedBeaconChain<T> = PersistedBeaconChain {
canonical_head: self.canonical_head.read().clone(), canonical_head: self.canonical_head.read().clone(),
op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool),
@ -213,6 +211,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes());
self.store.put(&key, &p)?; self.store.put(&key, &p)?;
metrics::stop_timer(timer);
Ok(()) Ok(())
} }
@ -472,8 +472,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
state: &BeaconState<T::EthSpec>, state: &BeaconState<T::EthSpec>,
) -> Result<AttestationData, Error> { ) -> Result<AttestationData, Error> {
// Collect some metrics. // Collect some metrics.
self.metrics.attestation_production_requests.inc(); metrics::inc_counter(&metrics::ATTESTATION_PRODUCTION_REQUESTS);
let timer = self.metrics.attestation_production_times.start_timer(); let timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_TIMES);
let slots_per_epoch = T::EthSpec::slots_per_epoch(); let slots_per_epoch = T::EthSpec::slots_per_epoch();
let current_epoch_start_slot = state.current_epoch().start_slot(slots_per_epoch); let current_epoch_start_slot = state.current_epoch().start_slot(slots_per_epoch);
@ -520,8 +520,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}; };
// Collect some metrics. // Collect some metrics.
self.metrics.attestation_production_successes.inc(); metrics::inc_counter(&metrics::ATTESTATION_PRODUCTION_SUCCESSES);
timer.observe_duration(); metrics::stop_timer(timer);
Ok(AttestationData { Ok(AttestationData {
beacon_block_root: head_block_root, beacon_block_root: head_block_root,
@ -547,11 +547,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self, &self,
attestation: Attestation<T::EthSpec>, attestation: Attestation<T::EthSpec>,
) -> Result<AttestationProcessingOutcome, Error> { ) -> Result<AttestationProcessingOutcome, Error> {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS);
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES);
// From the store, load the attestation's "head block". // From the store, load the attestation's "head block".
// //
// An honest validator would have set this block to be the head of the chain (i.e., the // An honest validator would have set this block to be the head of the chain (i.e., the
// result of running fork choice). // result of running fork choice).
if let Some(attestation_head_block) = self let result = if let Some(attestation_head_block) = self
.store .store
.get::<BeaconBlock<T::EthSpec>>(&attestation.data.beacon_block_root)? .get::<BeaconBlock<T::EthSpec>>(&attestation.data.beacon_block_root)?
{ {
@ -657,7 +660,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(AttestationProcessingOutcome::UnknownHeadBlock { Ok(AttestationProcessingOutcome::UnknownHeadBlock {
beacon_block_root: attestation.data.beacon_block_root, beacon_block_root: attestation.data.beacon_block_root,
}) })
};
metrics::stop_timer(timer);
if let Ok(AttestationProcessingOutcome::Processed) = &result {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
} }
result
} }
/// Verifies the `attestation` against the `state` to which it is attesting. /// Verifies the `attestation` against the `state` to which it is attesting.
@ -684,9 +695,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
state: &BeaconState<T::EthSpec>, state: &BeaconState<T::EthSpec>,
block: &BeaconBlock<T::EthSpec>, block: &BeaconBlock<T::EthSpec>,
) -> Result<AttestationProcessingOutcome, Error> { ) -> Result<AttestationProcessingOutcome, Error> {
self.metrics.attestation_processing_requests.inc();
let timer = self.metrics.attestation_processing_times.start_timer();
// Find the highest between: // Find the highest between:
// //
// - The highest valid finalized epoch we've ever seen (i.e., the head). // - The highest valid finalized epoch we've ever seen (i.e., the head).
@ -696,7 +704,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
state.finalized_checkpoint.epoch, state.finalized_checkpoint.epoch,
); );
let result = if block.slot <= finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()) { // A helper function to allow attestation processing to be metered.
let verify_attestation_for_state = |state, attestation, spec, verify_signatures| {
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_CORE);
let result = verify_attestation_for_state(state, attestation, spec, verify_signatures);
metrics::stop_timer(timer);
result
};
if block.slot <= finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()) {
// Ignore any attestation where the slot of `data.beacon_block_root` is equal to or // Ignore any attestation where the slot of `data.beacon_block_root` is equal to or
// prior to the finalized epoch. // prior to the finalized epoch.
// //
@ -730,14 +748,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.insert_attestation(attestation, state, &self.spec)?; .insert_attestation(attestation, state, &self.spec)?;
// Update the metrics. // Update the metrics.
self.metrics.attestation_processing_successes.inc(); metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
Ok(AttestationProcessingOutcome::Processed) Ok(AttestationProcessingOutcome::Processed)
}; }
timer.observe_duration();
result
} }
/// Accept some deposit and queue it for inclusion in an appropriate block. /// Accept some deposit and queue it for inclusion in an appropriate block.
@ -786,8 +800,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self, &self,
block: BeaconBlock<T::EthSpec>, block: BeaconBlock<T::EthSpec>,
) -> Result<BlockProcessingOutcome, Error> { ) -> Result<BlockProcessingOutcome, Error> {
self.metrics.block_processing_requests.inc(); metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
let timer = self.metrics.block_processing_times.start_timer(); let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
let finalized_slot = self let finalized_slot = self
.state .state
@ -804,8 +818,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(BlockProcessingOutcome::GenesisBlock); return Ok(BlockProcessingOutcome::GenesisBlock);
} }
let block_root_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_BLOCK_ROOT);
let block_root = block.canonical_root(); let block_root = block.canonical_root();
metrics::stop_timer(block_root_timer);
if block_root == self.genesis_block_root { if block_root == self.genesis_block_root {
return Ok(BlockProcessingOutcome::GenesisBlock); return Ok(BlockProcessingOutcome::GenesisBlock);
} }
@ -825,6 +843,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(BlockProcessingOutcome::BlockIsAlreadyKnown); return Ok(BlockProcessingOutcome::BlockIsAlreadyKnown);
} }
// Records the time taken to load the block and state from the database during block
// processing.
let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ);
// Load the blocks parent block from the database, returning invalid if that block is not // Load the blocks parent block from the database, returning invalid if that block is not
// found. // found.
let parent_block: BeaconBlock<T::EthSpec> = match self.store.get(&block.parent_root)? { let parent_block: BeaconBlock<T::EthSpec> = match self.store.get(&block.parent_root)? {
@ -844,15 +866,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get(&parent_state_root)? .get(&parent_state_root)?
.ok_or_else(|| Error::DBInconsistent(format!("Missing state {}", parent_state_root)))?; .ok_or_else(|| Error::DBInconsistent(format!("Missing state {}", parent_state_root)))?;
metrics::stop_timer(db_read_timer);
let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE);
// Transition the parent state to the block slot. // Transition the parent state to the block slot.
let mut state: BeaconState<T::EthSpec> = parent_state; let mut state: BeaconState<T::EthSpec> = parent_state;
for _ in state.slot.as_u64()..block.slot.as_u64() { for _ in state.slot.as_u64()..block.slot.as_u64() {
per_slot_processing(&mut state, &self.spec)?; per_slot_processing(&mut state, &self.spec)?;
} }
metrics::stop_timer(catchup_timer);
let commitee_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_COMMITTEE);
state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?; state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
metrics::stop_timer(commitee_timer);
let core_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CORE);
// Apply the received block to its parent state (which has been transitioned into this // Apply the received block to its parent state (which has been transitioned into this
// slot). // slot).
match per_block_processing(&mut state, &block, &self.spec) { match per_block_processing(&mut state, &block, &self.spec) {
@ -863,16 +897,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
_ => {} _ => {}
} }
metrics::stop_timer(core_timer);
let state_root_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_STATE_ROOT);
let state_root = state.canonical_root(); let state_root = state.canonical_root();
if block.state_root != state_root { if block.state_root != state_root {
return Ok(BlockProcessingOutcome::StateRootMismatch); return Ok(BlockProcessingOutcome::StateRootMismatch);
} }
metrics::stop_timer(state_root_timer);
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
// Store the block and state. // Store the block and state.
self.store.put(&block_root, &block)?; self.store.put(&block_root, &block)?;
self.store.put(&state_root, &state)?; self.store.put(&state_root, &state)?;
metrics::stop_timer(db_write_timer);
let fork_choice_register_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_REGISTER);
// Register the new block with the fork choice service. // Register the new block with the fork choice service.
if let Err(e) = self.fork_choice.process_block(&state, &block, block_root) { if let Err(e) = self.fork_choice.process_block(&state, &block, block_root) {
error!( error!(
@ -884,6 +931,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) )
} }
metrics::stop_timer(fork_choice_register_timer);
let find_head_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD);
// Execute the fork choice algorithm, enthroning a new head if discovered. // Execute the fork choice algorithm, enthroning a new head if discovered.
// //
// Note: in the future we may choose to run fork-choice less often, potentially based upon // Note: in the future we may choose to run fork-choice less often, potentially based upon
@ -896,11 +948,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) )
}; };
self.metrics.block_processing_successes.inc(); metrics::stop_timer(find_head_timer);
self.metrics
.operations_per_block_attestation metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
.observe(block.body.attestations.len() as f64); metrics::observe(
timer.observe_duration(); &metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body.attestations.len() as f64,
);
metrics::stop_timer(full_timer);
Ok(BlockProcessingOutcome::Processed { block_root }) Ok(BlockProcessingOutcome::Processed { block_root })
} }
@ -935,8 +990,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
produce_at_slot: Slot, produce_at_slot: Slot,
randao_reveal: Signature, randao_reveal: Signature,
) -> Result<(BeaconBlock<T::EthSpec>, BeaconState<T::EthSpec>), BlockProductionError> { ) -> Result<(BeaconBlock<T::EthSpec>, BeaconState<T::EthSpec>), BlockProductionError> {
self.metrics.block_production_requests.inc(); metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
let timer = self.metrics.block_production_times.start_timer(); let timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);
// If required, transition the new state to the present slot. // If required, transition the new state to the present slot.
while state.slot < produce_at_slot { while state.slot < produce_at_slot {
@ -988,28 +1043,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block.state_root = state_root; block.state_root = state_root;
self.metrics.block_production_successes.inc(); metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
timer.observe_duration(); metrics::stop_timer(timer);
Ok((block, state)) Ok((block, state))
} }
/// Execute the fork choice algorithm and enthrone the result as the canonical head. /// Execute the fork choice algorithm and enthrone the result as the canonical head.
pub fn fork_choice(&self) -> Result<(), Error> { pub fn fork_choice(&self) -> Result<(), Error> {
self.metrics.fork_choice_requests.inc(); metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
// Start fork choice metrics timer. // Start fork choice metrics timer.
let timer = self.metrics.fork_choice_times.start_timer(); let timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
// Determine the root of the block that is the head of the chain. // Determine the root of the block that is the head of the chain.
let beacon_block_root = self.fork_choice.find_head(&self)?; let beacon_block_root = self.fork_choice.find_head(&self)?;
// End fork choice metrics timer.
timer.observe_duration();
// If a new head was chosen. // If a new head was chosen.
if beacon_block_root != self.head().beacon_block_root { let result = if beacon_block_root != self.head().beacon_block_root {
self.metrics.fork_choice_changed_head.inc(); metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD);
let beacon_block: BeaconBlock<T::EthSpec> = self let beacon_block: BeaconBlock<T::EthSpec> = self
.store .store
@ -1027,7 +1079,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If we switched to a new chain (instead of building atop the present chain). // If we switched to a new chain (instead of building atop the present chain).
if self.head().beacon_block_root != beacon_block.parent_root { if self.head().beacon_block_root != beacon_block.parent_root {
self.metrics.fork_choice_reorg_count.inc(); metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT);
warn!( warn!(
self.log, self.log,
"Beacon chain re-org"; "Beacon chain re-org";
@ -1071,11 +1123,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} else { } else {
Ok(()) Ok(())
};
// End fork choice metrics timer.
metrics::stop_timer(timer);
if let Err(_) = result {
metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS);
} }
result
} }
/// Update the canonical head to `new_head`. /// Update the canonical head to `new_head`.
fn update_canonical_head(&self, new_head: CheckPoint<T::EthSpec>) -> Result<(), Error> { fn update_canonical_head(&self, new_head: CheckPoint<T::EthSpec>) -> Result<(), Error> {
let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
// Update the checkpoint that stores the head of the chain at the time it received the // Update the checkpoint that stores the head of the chain at the time it received the
// block. // block.
*self.canonical_head.write() = new_head; *self.canonical_head.write() = new_head;
@ -1102,6 +1165,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Save `self` to `self.store`. // Save `self` to `self.store`.
self.persist()?; self.persist()?;
metrics::stop_timer(timer);
Ok(()) Ok(())
} }
@ -1129,6 +1194,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.fork_choice self.fork_choice
.process_finalization(&finalized_block, finalized_block_root)?; .process_finalization(&finalized_block, finalized_block_root)?;
let finalized_state = self
.store
.get::<BeaconState<T::EthSpec>>(&finalized_block.state_root)?
.ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?;
self.op_pool.prune_all(&finalized_state, &self.spec);
Ok(()) Ok(())
} }
} }

View File

@ -1,5 +1,4 @@
use crate::fork_choice::Error as ForkChoiceError; use crate::fork_choice::Error as ForkChoiceError;
use crate::metrics::Error as MetricsError;
use state_processing::per_block_processing::errors::{ use state_processing::per_block_processing::errors::{
AttestationValidationError, IndexedAttestationValidationError, AttestationValidationError, IndexedAttestationValidationError,
}; };
@ -34,7 +33,6 @@ pub enum BeaconChainError {
MissingBeaconBlock(Hash256), MissingBeaconBlock(Hash256),
MissingBeaconState(Hash256), MissingBeaconState(Hash256),
SlotProcessingError(SlotProcessingError), SlotProcessingError(SlotProcessingError),
MetricsError(String),
NoStateForAttestation { NoStateForAttestation {
beacon_block_root: Hash256, beacon_block_root: Hash256,
}, },
@ -44,12 +42,6 @@ pub enum BeaconChainError {
easy_from_to!(SlotProcessingError, BeaconChainError); easy_from_to!(SlotProcessingError, BeaconChainError);
impl From<MetricsError> for BeaconChainError {
fn from(e: MetricsError) -> BeaconChainError {
BeaconChainError::MetricsError(format!("{:?}", e))
}
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum BlockProductionError { pub enum BlockProductionError {
UnableToGetBlockRootFromState, UnableToGetBlockRootFromState,

View File

@ -1,4 +1,4 @@
use crate::{BeaconChain, BeaconChainTypes}; use crate::{metrics, BeaconChain, BeaconChainTypes};
use lmd_ghost::LmdGhost; use lmd_ghost::LmdGhost;
use state_processing::common::get_attesting_indices; use state_processing::common::get_attesting_indices;
use std::sync::Arc; use std::sync::Arc;
@ -46,6 +46,8 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
} }
pub fn find_head(&self, chain: &BeaconChain<T>) -> Result<Hash256> { pub fn find_head(&self, chain: &BeaconChain<T>) -> Result<Hash256> {
let timer = metrics::start_timer(&metrics::FORK_CHOICE_FIND_HEAD_TIMES);
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
// From the specification: // From the specification:
@ -97,9 +99,14 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
.map(|v| v.effective_balance) .map(|v| v.effective_balance)
}; };
self.backend let result = self
.backend
.find_head(start_block_slot, start_block_root, weight) .find_head(start_block_slot, start_block_root, weight)
.map_err(Into::into) .map_err(Into::into);
metrics::stop_timer(timer);
result
} }
/// Process all attestations in the given `block`. /// Process all attestations in the given `block`.
@ -112,6 +119,7 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
block: &BeaconBlock<T::EthSpec>, block: &BeaconBlock<T::EthSpec>,
block_root: Hash256, block_root: Hash256,
) -> Result<()> { ) -> Result<()> {
let timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_BLOCK_TIMES);
// Note: we never count the block as a latest message, only attestations. // Note: we never count the block as a latest message, only attestations.
// //
// I (Paul H) do not have an explicit reference to this, but I derive it from this // I (Paul H) do not have an explicit reference to this, but I derive it from this
@ -136,6 +144,8 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
// a block that has the majority of votes applied to it. // a block that has the majority of votes applied to it.
self.backend.process_block(block, block_root)?; self.backend.process_block(block, block_root)?;
metrics::stop_timer(timer);
Ok(()) Ok(())
} }
@ -148,6 +158,8 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
attestation: &Attestation<T::EthSpec>, attestation: &Attestation<T::EthSpec>,
block: &BeaconBlock<T::EthSpec>, block: &BeaconBlock<T::EthSpec>,
) -> Result<()> { ) -> Result<()> {
let timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
let block_hash = attestation.data.beacon_block_root; let block_hash = attestation.data.beacon_block_root;
// Ignore any attestations to the zero hash. // Ignore any attestations to the zero hash.
@ -175,6 +187,8 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
} }
} }
metrics::stop_timer(timer);
Ok(()) Ok(())
} }

View File

@ -1,3 +1,7 @@
#![recursion_limit = "128"] // For lazy-static
#[macro_use]
extern crate lazy_static;
mod beacon_chain; mod beacon_chain;
mod checkpoint; mod checkpoint;
mod errors; mod errors;
@ -13,6 +17,7 @@ pub use self::beacon_chain::{
pub use self::checkpoint::CheckPoint; pub use self::checkpoint::CheckPoint;
pub use self::errors::{BeaconChainError, BlockProductionError}; pub use self::errors::{BeaconChainError, BlockProductionError};
pub use lmd_ghost; pub use lmd_ghost;
pub use metrics::scrape_for_metrics;
pub use parking_lot; pub use parking_lot;
pub use slot_clock; pub use slot_clock;
pub use state_processing::per_block_processing::errors::{ pub use state_processing::per_block_processing::errors::{

View File

@ -1,143 +1,276 @@
pub use prometheus::Error; use crate::{BeaconChain, BeaconChainTypes};
use prometheus::{Histogram, HistogramOpts, IntCounter, Opts, Registry}; pub use lighthouse_metrics::*;
use types::{BeaconState, Epoch, Hash256, Slot};
pub struct Metrics { lazy_static! {
pub block_processing_requests: IntCounter, /*
pub block_processing_successes: IntCounter, * Block Processing
pub block_processing_times: Histogram, */
pub block_production_requests: IntCounter, pub static ref BLOCK_PROCESSING_REQUESTS: Result<IntCounter> = try_create_int_counter(
pub block_production_successes: IntCounter, "beacon_block_processing_requests_total",
pub block_production_times: Histogram, "Count of blocks submitted for processing"
pub attestation_production_requests: IntCounter, );
pub attestation_production_successes: IntCounter, pub static ref BLOCK_PROCESSING_SUCCESSES: Result<IntCounter> = try_create_int_counter(
pub attestation_production_times: Histogram, "beacon_block_processing_successes_total",
pub attestation_processing_requests: IntCounter, "Count of blocks processed without error"
pub attestation_processing_successes: IntCounter, );
pub attestation_processing_times: Histogram, pub static ref BLOCK_PROCESSING_TIMES: Result<Histogram> =
pub fork_choice_requests: IntCounter, try_create_histogram("beacon_block_processing_seconds", "Full runtime of block processing");
pub fork_choice_changed_head: IntCounter, pub static ref BLOCK_PROCESSING_BLOCK_ROOT: Result<Histogram> = try_create_histogram(
pub fork_choice_reorg_count: IntCounter, "beacon_block_processing_block_root_seconds",
pub fork_choice_times: Histogram, "Time spent calculating the block root when processing a block."
pub operations_per_block_attestation: Histogram, );
pub static ref BLOCK_PROCESSING_DB_READ: Result<Histogram> = try_create_histogram(
"beacon_block_processing_db_read_seconds",
"Time spent loading block and state from DB for block processing"
);
pub static ref BLOCK_PROCESSING_CATCHUP_STATE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_catch_up_state_seconds",
"Time spent skipping slots on a state before processing a block."
);
pub static ref BLOCK_PROCESSING_COMMITTEE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_committee_building_seconds",
"Time spent building/obtaining committees for block processing."
);
pub static ref BLOCK_PROCESSING_CORE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_core_seconds",
"Time spent doing the core per_block_processing state processing."
);
pub static ref BLOCK_PROCESSING_STATE_ROOT: Result<Histogram> = try_create_histogram(
"beacon_block_processing_state_root_seconds",
"Time spent calculating the state root when processing a block."
);
pub static ref BLOCK_PROCESSING_DB_WRITE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_db_write_seconds",
"Time spent writing a newly processed block and state to DB"
);
pub static ref BLOCK_PROCESSING_FORK_CHOICE_REGISTER: Result<Histogram> = try_create_histogram(
"beacon_block_processing_fork_choice_register_seconds",
"Time spent registering the new block with fork choice (but not finding head)"
);
pub static ref BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD: Result<Histogram> = try_create_histogram(
"beacon_block_processing_fork_choice_find_head_seconds",
"Time spent finding the new head after processing a new block"
);
/*
* Block Production
*/
pub static ref BLOCK_PRODUCTION_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_block_production_requests_total",
"Count of all block production requests"
);
pub static ref BLOCK_PRODUCTION_SUCCESSES: Result<IntCounter> = try_create_int_counter(
"beacon_block_production_successes_total",
"Count of blocks successfully produced."
);
pub static ref BLOCK_PRODUCTION_TIMES: Result<Histogram> =
try_create_histogram("beacon_block_production_seconds", "Full runtime of block production");
/*
* Block Statistics
*/
pub static ref OPERATIONS_PER_BLOCK_ATTESTATION: Result<Histogram> = try_create_histogram(
"beacon_operations_per_block_attestation_total",
"Number of attestations in a block"
);
/*
* Attestation Processing
*/
pub static ref ATTESTATION_PROCESSING_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_attestation_processing_requests_total",
"Count of all attestations submitted for processing"
);
pub static ref ATTESTATION_PROCESSING_SUCCESSES: Result<IntCounter> = try_create_int_counter(
"beacon_attestation_processing_successes_total",
"total_attestation_processing_successes"
);
pub static ref ATTESTATION_PROCESSING_TIMES: Result<Histogram> = try_create_histogram(
"beacon_attestation_processing_seconds",
"Full runtime of attestation processing"
);
pub static ref ATTESTATION_PROCESSING_CORE: Result<Histogram> = try_create_histogram(
"beacon_attestation_processing_core_seconds",
"Time spent on the core spec processing of attestation processing"
);
/*
* Attestation Production
*/
pub static ref ATTESTATION_PRODUCTION_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_attestation_production_requests_total",
"Count of all attestation production requests"
);
pub static ref ATTESTATION_PRODUCTION_SUCCESSES: Result<IntCounter> = try_create_int_counter(
"beacon_attestation_production_successes_total",
"Count of attestations processed without error"
);
pub static ref ATTESTATION_PRODUCTION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_attestation_production_seconds",
"Full runtime of attestation production"
);
/*
* Fork Choice
*/
pub static ref FORK_CHOICE_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_fork_choice_requests_total",
"Count of occasions where fork choice has tried to find a head"
);
pub static ref FORK_CHOICE_ERRORS: Result<IntCounter> = try_create_int_counter(
"beacon_fork_choice_errors_total",
"Count of occasions where fork choice has returned an error when trying to find a head"
);
pub static ref FORK_CHOICE_CHANGED_HEAD: Result<IntCounter> = try_create_int_counter(
"beacon_fork_choice_changed_head_total",
"Count of occasions fork choice has found a new head"
);
pub static ref FORK_CHOICE_REORG_COUNT: Result<IntCounter> = try_create_int_counter(
"beacon_fork_choice_reorg_total",
"Count of occasions fork choice has switched to a different chain"
);
pub static ref FORK_CHOICE_TIMES: Result<Histogram> =
try_create_histogram("beacon_fork_choice_seconds", "Full runtime of fork choice");
pub static ref FORK_CHOICE_FIND_HEAD_TIMES: Result<Histogram> =
try_create_histogram("beacon_fork_choice_find_head_seconds", "Full runtime of fork choice find_head function");
pub static ref FORK_CHOICE_PROCESS_BLOCK_TIMES: Result<Histogram> = try_create_histogram(
"beacon_fork_choice_process_block_seconds",
"Time taken to add a block and all attestations to fork choice"
);
pub static ref FORK_CHOICE_PROCESS_ATTESTATION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_fork_choice_process_attestation_seconds",
"Time taken to add an attestation to fork choice"
);
/*
* Persisting BeaconChain to disk
*/
pub static ref PERSIST_CHAIN: Result<Histogram> =
try_create_histogram("beacon_persist_chain", "Time taken to update the canonical head");
/*
* Chain Head
*/
pub static ref UPDATE_HEAD_TIMES: Result<Histogram> =
try_create_histogram("beacon_update_head_seconds", "Time taken to update the canonical head");
pub static ref HEAD_STATE_SLOT: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_slot", "Slot of the block at the head of the chain");
pub static ref HEAD_STATE_ROOT: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_root", "Root of the block at the head of the chain");
pub static ref HEAD_STATE_LATEST_BLOCK_SLOT: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_latest_block_slot", "Latest block slot at the head of the chain");
pub static ref HEAD_STATE_CURRENT_JUSTIFIED_ROOT: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_current_justified_root", "Current justified root at the head of the chain");
pub static ref HEAD_STATE_CURRENT_JUSTIFIED_EPOCH: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_current_justified_epoch", "Current justified epoch at the head of the chain");
pub static ref HEAD_STATE_PREVIOUS_JUSTIFIED_ROOT: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_previous_justified_root", "Previous justified root at the head of the chain");
pub static ref HEAD_STATE_PREVIOUS_JUSTIFIED_EPOCH: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_previous_justified_epoch", "Previous justified epoch at the head of the chain");
pub static ref HEAD_STATE_FINALIZED_ROOT: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_finalized_root", "Finalized root at the head of the chain");
pub static ref HEAD_STATE_FINALIZED_EPOCH: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_finalized_epoch", "Finalized epoch at the head of the chain");
pub static ref HEAD_STATE_SHARDS: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_shard_total", "Count of shards in the beacon chain");
pub static ref HEAD_STATE_TOTAL_VALIDATORS: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_total_validators_total", "Count of validators at the head of the chain");
pub static ref HEAD_STATE_ACTIVE_VALIDATORS: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_active_validators_total", "Count of active validators at the head of the chain");
pub static ref HEAD_STATE_VALIDATOR_BALANCES: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_validator_balances_total", "Sum of all validator balances at the head of the chain");
pub static ref HEAD_STATE_SLASHED_VALIDATORS: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_slashed_validators_total", "Count of all slashed validators at the head of the chain");
pub static ref HEAD_STATE_WITHDRAWN_VALIDATORS: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_withdrawn_validators_total", "Sum of all validator balances at the head of the chain");
pub static ref HEAD_STATE_ETH1_DEPOSIT_INDEX: Result<IntGauge> =
try_create_int_gauge("beacon_head_state_eth1_deposit_index", "Eth1 deposit index at the head of the chain");
} }
impl Metrics { /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot,
pub fn new() -> Result<Self, Error> { /// head state info, etc) and update the Prometheus `DEFAULT_REGISTRY`.
Ok(Self { pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
block_processing_requests: { scrape_head_state::<T>(
let opts = Opts::new("block_processing_requests", "total_blocks_processed"); &beacon_chain.head().beacon_state,
IntCounter::with_opts(opts)? beacon_chain.head().beacon_state_root,
},
block_processing_successes: {
let opts = Opts::new("block_processing_successes", "total_valid_blocks_processed");
IntCounter::with_opts(opts)?
},
block_processing_times: {
let opts = HistogramOpts::new("block_processing_times", "block_processing_time");
Histogram::with_opts(opts)?
},
block_production_requests: {
let opts = Opts::new("block_production_requests", "attempts_to_produce_new_block");
IntCounter::with_opts(opts)?
},
block_production_successes: {
let opts = Opts::new("block_production_successes", "blocks_successfully_produced");
IntCounter::with_opts(opts)?
},
block_production_times: {
let opts = HistogramOpts::new("block_production_times", "block_production_time");
Histogram::with_opts(opts)?
},
attestation_production_requests: {
let opts = Opts::new(
"attestation_production_requests",
"total_attestation_production_requests",
); );
IntCounter::with_opts(opts)? }
},
attestation_production_successes: { /// Scrape the given `state` assuming it's the head state, updating the `DEFAULT_REGISTRY`.
let opts = Opts::new( fn scrape_head_state<T: BeaconChainTypes>(state: &BeaconState<T::EthSpec>, state_root: Hash256) {
"attestation_production_successes", set_gauge_by_slot(&HEAD_STATE_SLOT, state.slot);
"total_attestation_production_successes", set_gauge_by_hash(&HEAD_STATE_ROOT, state_root);
); set_gauge_by_slot(
IntCounter::with_opts(opts)? &HEAD_STATE_LATEST_BLOCK_SLOT,
}, state.latest_block_header.slot,
attestation_production_times: { );
let opts = HistogramOpts::new( set_gauge_by_hash(
"attestation_production_times", &HEAD_STATE_CURRENT_JUSTIFIED_ROOT,
"attestation_production_time", state.current_justified_checkpoint.root,
); );
Histogram::with_opts(opts)? set_gauge_by_epoch(
}, &HEAD_STATE_CURRENT_JUSTIFIED_EPOCH,
attestation_processing_requests: { state.current_justified_checkpoint.epoch,
let opts = Opts::new( );
"attestation_processing_requests", set_gauge_by_hash(
"total_attestation_processing_requests", &HEAD_STATE_PREVIOUS_JUSTIFIED_ROOT,
); state.previous_justified_checkpoint.root,
IntCounter::with_opts(opts)? );
}, set_gauge_by_epoch(
attestation_processing_successes: { &HEAD_STATE_PREVIOUS_JUSTIFIED_EPOCH,
let opts = Opts::new( state.previous_justified_checkpoint.epoch,
"attestation_processing_successes", );
"total_attestation_processing_successes", set_gauge_by_hash(&HEAD_STATE_FINALIZED_ROOT, state.finalized_checkpoint.root);
); set_gauge_by_epoch(
IntCounter::with_opts(opts)? &HEAD_STATE_FINALIZED_EPOCH,
}, state.finalized_checkpoint.epoch,
attestation_processing_times: { );
let opts = HistogramOpts::new( set_gauge_by_usize(&HEAD_STATE_SHARDS, state.previous_crosslinks.len());
"attestation_processing_times", set_gauge_by_usize(&HEAD_STATE_TOTAL_VALIDATORS, state.validators.len());
"attestation_processing_time", set_gauge_by_u64(
); &HEAD_STATE_VALIDATOR_BALANCES,
Histogram::with_opts(opts)? state.balances.iter().fold(0_u64, |acc, i| acc + i),
}, );
fork_choice_requests: { set_gauge_by_usize(
let opts = Opts::new("fork_choice_requests", "total_times_fork_choice_called"); &HEAD_STATE_ACTIVE_VALIDATORS,
IntCounter::with_opts(opts)? state
}, .validators
fork_choice_changed_head: { .iter()
let opts = Opts::new( .filter(|v| v.is_active_at(state.current_epoch()))
"fork_choice_changed_head", .count(),
"total_times_fork_choice_chose_a_new_head", );
); set_gauge_by_usize(
IntCounter::with_opts(opts)? &HEAD_STATE_SLASHED_VALIDATORS,
}, state.validators.iter().filter(|v| v.slashed).count(),
fork_choice_reorg_count: { );
let opts = Opts::new("fork_choice_reorg_count", "number_of_reorgs"); set_gauge_by_usize(
IntCounter::with_opts(opts)? &HEAD_STATE_WITHDRAWN_VALIDATORS,
}, state
fork_choice_times: { .validators
let opts = HistogramOpts::new("fork_choice_time", "total_time_to_run_fork_choice"); .iter()
Histogram::with_opts(opts)? .filter(|v| v.is_withdrawable_at(state.current_epoch()))
}, .count(),
operations_per_block_attestation: { );
let opts = HistogramOpts::new( set_gauge_by_u64(&HEAD_STATE_ETH1_DEPOSIT_INDEX, state.eth1_deposit_index);
"operations_per_block_attestation", }
"count_of_attestations_per_block",
); fn set_gauge_by_slot(gauge: &Result<IntGauge>, value: Slot) {
Histogram::with_opts(opts)? set_gauge(gauge, value.as_u64() as i64);
}, }
})
} fn set_gauge_by_epoch(gauge: &Result<IntGauge>, value: Epoch) {
set_gauge(gauge, value.as_u64() as i64);
pub fn register(&self, registry: &Registry) -> Result<(), Error> { }
registry.register(Box::new(self.block_processing_requests.clone()))?;
registry.register(Box::new(self.block_processing_successes.clone()))?; fn set_gauge_by_hash(gauge: &Result<IntGauge>, value: Hash256) {
registry.register(Box::new(self.block_processing_times.clone()))?; set_gauge(gauge, value.to_low_u64_le() as i64);
registry.register(Box::new(self.block_production_requests.clone()))?; }
registry.register(Box::new(self.block_production_successes.clone()))?;
registry.register(Box::new(self.block_production_times.clone()))?; fn set_gauge_by_usize(gauge: &Result<IntGauge>, value: usize) {
registry.register(Box::new(self.attestation_production_requests.clone()))?; set_gauge(gauge, value as i64);
registry.register(Box::new(self.attestation_production_successes.clone()))?; }
registry.register(Box::new(self.attestation_production_times.clone()))?;
registry.register(Box::new(self.attestation_processing_requests.clone()))?; fn set_gauge_by_u64(gauge: &Result<IntGauge>, value: u64) {
registry.register(Box::new(self.attestation_processing_successes.clone()))?; set_gauge(gauge, value as i64);
registry.register(Box::new(self.attestation_processing_times.clone()))?;
registry.register(Box::new(self.fork_choice_requests.clone()))?;
registry.register(Box::new(self.fork_choice_changed_head.clone()))?;
registry.register(Box::new(self.fork_choice_reorg_count.clone()))?;
registry.register(Box::new(self.fork_choice_times.clone()))?;
registry.register(Box::new(self.operations_per_block_attestation.clone()))?;
Ok(())
}
} }

View File

@ -7,7 +7,6 @@ edition = "2018"
[dependencies] [dependencies]
beacon_chain = { path = "../beacon_chain" } beacon_chain = { path = "../beacon_chain" }
network = { path = "../network" } network = { path = "../network" }
http_server = { path = "../http_server" }
rpc = { path = "../rpc" } rpc = { path = "../rpc" }
rest_api = { path = "../rest_api" } rest_api = { path = "../rest_api" }
prometheus = "^0.6" prometheus = "^0.6"

View File

@ -1,6 +1,5 @@
use crate::Eth2Config; use crate::Eth2Config;
use clap::ArgMatches; use clap::ArgMatches;
use http_server::HttpServerConfig;
use network::NetworkConfig; use network::NetworkConfig;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use slog::{info, o, Drain}; use slog::{info, o, Drain};
@ -25,7 +24,6 @@ pub struct Config {
pub genesis_state: GenesisState, pub genesis_state: GenesisState,
pub network: network::NetworkConfig, pub network: network::NetworkConfig,
pub rpc: rpc::RPCConfig, pub rpc: rpc::RPCConfig,
pub http: HttpServerConfig,
pub rest_api: rest_api::ApiConfig, pub rest_api: rest_api::ApiConfig,
} }
@ -61,7 +59,6 @@ impl Default for Config {
db_name: "chain_db".to_string(), db_name: "chain_db".to_string(),
network: NetworkConfig::new(), network: NetworkConfig::new(),
rpc: rpc::RPCConfig::default(), rpc: rpc::RPCConfig::default(),
http: HttpServerConfig::default(),
rest_api: rest_api::ApiConfig::default(), rest_api: rest_api::ApiConfig::default(),
spec_constants: TESTNET_SPEC_CONSTANTS.into(), spec_constants: TESTNET_SPEC_CONSTANTS.into(),
genesis_state: GenesisState::RecentGenesis { genesis_state: GenesisState::RecentGenesis {
@ -145,7 +142,6 @@ impl Config {
self.network.apply_cli_args(args)?; self.network.apply_cli_args(args)?;
self.rpc.apply_cli_args(args)?; self.rpc.apply_cli_args(args)?;
self.http.apply_cli_args(args)?;
self.rest_api.apply_cli_args(args)?; self.rest_api.apply_cli_args(args)?;
if let Some(log_file) = args.value_of("logfile") { if let Some(log_file) = args.value_of("logfile") {

View File

@ -11,7 +11,6 @@ use beacon_chain::BeaconChain;
use exit_future::Signal; use exit_future::Signal;
use futures::{future::Future, Stream}; use futures::{future::Future, Stream};
use network::Service as NetworkService; use network::Service as NetworkService;
use prometheus::Registry;
use slog::{error, info, o}; use slog::{error, info, o};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -37,8 +36,6 @@ pub struct Client<T: BeaconChainTypes> {
pub network: Arc<NetworkService<T>>, pub network: Arc<NetworkService<T>>,
/// Signal to terminate the RPC server. /// Signal to terminate the RPC server.
pub rpc_exit_signal: Option<Signal>, pub rpc_exit_signal: Option<Signal>,
/// Signal to terminate the HTTP server.
pub http_exit_signal: Option<Signal>,
/// Signal to terminate the slot timer. /// Signal to terminate the slot timer.
pub slot_timer_exit_signal: Option<Signal>, pub slot_timer_exit_signal: Option<Signal>,
/// Signal to terminate the API /// Signal to terminate the API
@ -61,7 +58,6 @@ where
log: slog::Logger, log: slog::Logger,
executor: &TaskExecutor, executor: &TaskExecutor,
) -> error::Result<Self> { ) -> error::Result<Self> {
let metrics_registry = Registry::new();
let store = Arc::new(store); let store = Arc::new(store);
let seconds_per_slot = eth2_config.spec.seconds_per_slot; let seconds_per_slot = eth2_config.spec.seconds_per_slot;
@ -72,11 +68,6 @@ where
eth2_config.spec.clone(), eth2_config.spec.clone(),
log.clone(), log.clone(),
)?); )?);
// Registry all beacon chain metrics with the global registry.
beacon_chain
.metrics
.register(&metrics_registry)
.expect("Failed to registry metrics");
if beacon_chain.read_slot_clock().is_none() { if beacon_chain.read_slot_clock().is_none() {
panic!("Cannot start client before genesis!") panic!("Cannot start client before genesis!")
@ -125,29 +116,13 @@ where
None None
}; };
// Start the `http_server` service.
//
// Note: presently we are ignoring the config and _always_ starting a HTTP server.
let http_exit_signal = if client_config.http.enabled {
Some(http_server::start_service(
&client_config.http,
executor,
network_send,
beacon_chain.clone(),
client_config.db_path().expect("unable to read datadir"),
metrics_registry,
&log,
))
} else {
None
};
// Start the `rest_api` service // Start the `rest_api` service
let api_exit_signal = if client_config.rest_api.enabled { let api_exit_signal = if client_config.rest_api.enabled {
match rest_api::start_server( match rest_api::start_server(
&client_config.rest_api, &client_config.rest_api,
executor, executor,
beacon_chain.clone(), beacon_chain.clone(),
client_config.db_path().expect("unable to read datadir"),
&log, &log,
) { ) {
Ok(s) => Some(s), Ok(s) => Some(s),
@ -189,7 +164,6 @@ where
Ok(Client { Ok(Client {
_client_config: client_config, _client_config: client_config,
beacon_chain, beacon_chain,
http_exit_signal,
rpc_exit_signal, rpc_exit_signal,
slot_timer_exit_signal: Some(slot_timer_exit_signal), slot_timer_exit_signal: Some(slot_timer_exit_signal),
api_exit_signal, api_exit_signal,

View File

@ -26,3 +26,5 @@ smallvec = "0.6.10"
fnv = "1.0.6" fnv = "1.0.6"
unsigned-varint = "0.2.2" unsigned-varint = "0.2.2"
bytes = "0.4.12" bytes = "0.4.12"
lazy_static = "1.3.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }

View File

@ -1,3 +1,4 @@
use crate::metrics;
use crate::{error, NetworkConfig}; use crate::{error, NetworkConfig};
/// This manages the discovery and management of peers. /// This manages the discovery and management of peers.
/// ///
@ -159,10 +160,16 @@ where
fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) {
self.connected_peers.insert(peer_id); self.connected_peers.insert(peer_id);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64);
} }
fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) {
self.connected_peers.remove(peer_id); self.connected_peers.remove(peer_id);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64);
} }
fn inject_replaced( fn inject_replaced(
@ -217,6 +224,7 @@ where
} }
Discv5Event::SocketUpdated(socket) => { Discv5Event::SocketUpdated(socket) => {
info!(self.log, "Address updated"; "IP" => format!("{}",socket.ip())); info!(self.log, "Address updated"; "IP" => format!("{}",socket.ip()));
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
let mut address = Multiaddr::from(socket.ip()); let mut address = Multiaddr::from(socket.ip());
address.push(Protocol::Tcp(self.tcp_port)); address.push(Protocol::Tcp(self.tcp_port));
let enr = self.discovery.local_enr(); let enr = self.discovery.local_enr();

View File

@ -2,10 +2,14 @@
/// all required libp2p functionality. /// all required libp2p functionality.
/// ///
/// This crate builds and manages the libp2p services required by the beacon node. /// This crate builds and manages the libp2p services required by the beacon node.
#[macro_use]
extern crate lazy_static;
pub mod behaviour; pub mod behaviour;
mod config; mod config;
mod discovery; mod discovery;
pub mod error; pub mod error;
mod metrics;
pub mod rpc; pub mod rpc;
mod service; mod service;

View File

@ -0,0 +1,20 @@
pub use lighthouse_metrics::*;
lazy_static! {
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_address_update_total",
"Count of libp2p socked updated events (when our view of our IP address has changed)"
);
pub static ref PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_peer_connected_peers_total",
"Count of libp2p peers currently connected"
);
pub static ref PEER_CONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_peer_connect_event_total",
"Count of libp2p peer connect events (not the current number of connected peers)"
);
pub static ref PEER_DISCONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_peer_disconnect_event_total",
"Count of libp2p peer disconnect events"
);
}

View File

@ -1,23 +0,0 @@
[package]
name = "http_server"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
iron = "^0.6"
router = "^0.6"
network = { path = "../network" }
types = { path = "../../eth2/types" }
slot_clock = { path = "../../eth2/utils/slot_clock" }
persistent = "^0.4"
prometheus = { version = "^0.6", features = ["process"] }
clap = "2.32.0"
futures = "0.1.23"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
tokio = "0.1.17"
exit-future = "0.1.4"

View File

@ -1,71 +0,0 @@
use crate::{key::BeaconChainKey, map_persistent_err_to_500};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use iron::prelude::*;
use iron::{
headers::{CacheControl, CacheDirective, ContentType},
status::Status,
AfterMiddleware, Handler, IronResult, Request, Response,
};
use persistent::Read;
use router::Router;
use serde_json::json;
use std::sync::Arc;
/// Yields a handler for the HTTP API.
pub fn build_handler<T: BeaconChainTypes + 'static>(
beacon_chain: Arc<BeaconChain<T>>,
) -> impl Handler {
let mut router = Router::new();
router.get("/node/fork", handle_fork::<T>, "fork");
let mut chain = Chain::new(router);
// Insert `BeaconChain` so it may be accessed in a request.
chain.link(Read::<BeaconChainKey<T>>::both(beacon_chain.clone()));
// Set the content-type headers.
chain.link_after(SetJsonContentType);
// Set the cache headers.
chain.link_after(SetCacheDirectives);
chain
}
/// Sets the `cache-control` headers on _all_ responses, unless they are already set.
struct SetCacheDirectives;
impl AfterMiddleware for SetCacheDirectives {
fn after(&self, _req: &mut Request, mut resp: Response) -> IronResult<Response> {
// This is run for every requests, AFTER all handlers have been executed
if resp.headers.get::<CacheControl>() == None {
resp.headers.set(CacheControl(vec![
CacheDirective::NoCache,
CacheDirective::NoStore,
]));
}
Ok(resp)
}
}
/// Sets the `content-type` headers on _all_ responses, unless they are already set.
struct SetJsonContentType;
impl AfterMiddleware for SetJsonContentType {
fn after(&self, _req: &mut Request, mut resp: Response) -> IronResult<Response> {
if resp.headers.get::<ContentType>() == None {
resp.headers.set(ContentType::json());
}
Ok(resp)
}
}
fn handle_fork<T: BeaconChainTypes + 'static>(req: &mut Request) -> IronResult<Response> {
let beacon_chain = req
.get::<Read<BeaconChainKey<T>>>()
.map_err(map_persistent_err_to_500)?;
let response = json!({
"fork": beacon_chain.head().beacon_state.fork,
"network_id": beacon_chain.spec.network_id
});
Ok(Response::with((Status::Ok, response.to_string())))
}

View File

@ -1,33 +0,0 @@
use crate::metrics::LocalMetrics;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use iron::typemap::Key;
use prometheus::Registry;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
pub struct BeaconChainKey<T> {
_phantom: PhantomData<T>,
}
impl<T: BeaconChainTypes + 'static> Key for BeaconChainKey<T> {
type Value = Arc<BeaconChain<T>>;
}
pub struct MetricsRegistryKey;
impl Key for MetricsRegistryKey {
type Value = Registry;
}
pub struct LocalMetricsKey;
impl Key for LocalMetricsKey {
type Value = LocalMetrics;
}
pub struct DBPathKey;
impl Key for DBPathKey {
type Value = PathBuf;
}

View File

@ -1,145 +0,0 @@
mod api;
mod key;
mod metrics;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use clap::ArgMatches;
use futures::Future;
use iron::prelude::*;
use network::NetworkMessage;
use prometheus::Registry;
use router::Router;
use serde_derive::{Deserialize, Serialize};
use slog::{info, o, warn};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use tokio::sync::mpsc;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct HttpServerConfig {
pub enabled: bool,
pub listen_address: String,
pub listen_port: String,
}
impl Default for HttpServerConfig {
fn default() -> Self {
Self {
enabled: false,
listen_address: "127.0.0.1".to_string(),
listen_port: "5052".to_string(),
}
}
}
impl HttpServerConfig {
pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> {
if args.is_present("http") {
self.enabled = true;
}
if let Some(listen_address) = args.value_of("http-address") {
self.listen_address = listen_address.to_string();
}
if let Some(listen_port) = args.value_of("http-port") {
self.listen_port = listen_port.to_string();
}
Ok(())
}
}
/// Build the `iron` HTTP server, defining the core routes.
pub fn create_iron_http_server<T: BeaconChainTypes + 'static>(
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,
) -> Iron<Router> {
let mut router = Router::new();
// A `GET` request to `/metrics` is handled by the `metrics` module.
router.get(
"/metrics",
metrics::build_handler(beacon_chain.clone(), db_path, metrics_registry),
"metrics",
);
// Any request to all other endpoints is handled by the `api` module.
router.any("/*", api::build_handler(beacon_chain.clone()), "api");
Iron::new(router)
}
/// Start the HTTP service on the tokio `TaskExecutor`.
pub fn start_service<T: BeaconChainTypes + 'static>(
config: &HttpServerConfig,
executor: &TaskExecutor,
_network_chan: mpsc::UnboundedSender<NetworkMessage>,
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,
log: &slog::Logger,
) -> exit_future::Signal {
let log = log.new(o!("Service"=>"HTTP"));
// Create:
// - `shutdown_trigger` a one-shot to shut down this service.
// - `wait_for_shutdown` a future that will wait until someone calls shutdown.
let (shutdown_trigger, wait_for_shutdown) = exit_future::signal();
// Create an `iron` http, without starting it yet.
let iron = create_iron_http_server(beacon_chain, db_path, metrics_registry);
// Create a HTTP server future.
//
// 1. Start the HTTP server
// 2. Build an exit future that will shutdown the server when requested.
// 3. Return the exit future, so the caller may shutdown the service when desired.
let http_service = {
let listen_address = format!("{}:{}", config.listen_address, config.listen_port);
// Start the HTTP server
let server_start_result = iron.http(listen_address.clone());
if server_start_result.is_ok() {
info!(log, "HTTP server running on {}", listen_address);
} else {
warn!(log, "HTTP server failed to start on {}", listen_address);
}
// Build a future that will shutdown the HTTP server when the `shutdown_trigger` is
// triggered.
wait_for_shutdown.and_then(move |_| {
info!(log, "HTTP server shutting down");
if let Ok(mut server) = server_start_result {
// According to the documentation, `server.close()` "doesn't work" and the server
// keeps listening.
//
// It is being called anyway, because it seems like the right thing to do. If you
// know this has negative side-effects, please create an issue to discuss.
//
// See: https://docs.rs/iron/0.6.0/iron/struct.Listening.html#impl
match server.close() {
_ => (),
};
}
info!(log, "HTTP server shutdown complete.");
Ok(())
})
};
// Attach the HTTP server to the executor.
executor.spawn(http_service);
shutdown_trigger
}
/// Helper function for mapping a failure to read state to a 500 server error.
fn map_persistent_err_to_500(e: persistent::PersistentError) -> iron::error::IronError {
iron::error::IronError {
error: Box::new(e),
response: iron::Response::with(iron::status::Status::InternalServerError),
}
}

View File

@ -1,72 +0,0 @@
use crate::{
key::{BeaconChainKey, DBPathKey, LocalMetricsKey, MetricsRegistryKey},
map_persistent_err_to_500,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use iron::prelude::*;
use iron::{status::Status, Handler, IronResult, Request, Response};
use persistent::Read;
use prometheus::{Encoder, Registry, TextEncoder};
use std::path::PathBuf;
use std::sync::Arc;
pub use local_metrics::LocalMetrics;
mod local_metrics;
/// Yields a handler for the metrics endpoint.
pub fn build_handler<T: BeaconChainTypes + 'static>(
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,
) -> impl Handler {
let mut chain = Chain::new(handle_metrics::<T>);
let local_metrics = LocalMetrics::new().unwrap();
local_metrics.register(&metrics_registry).unwrap();
chain.link(Read::<BeaconChainKey<T>>::both(beacon_chain));
chain.link(Read::<MetricsRegistryKey>::both(metrics_registry));
chain.link(Read::<LocalMetricsKey>::both(local_metrics));
chain.link(Read::<DBPathKey>::both(db_path));
chain
}
/// Handle a request for Prometheus metrics.
///
/// Returns a text string containing all metrics.
fn handle_metrics<T: BeaconChainTypes + 'static>(req: &mut Request) -> IronResult<Response> {
let beacon_chain = req
.get::<Read<BeaconChainKey<T>>>()
.map_err(map_persistent_err_to_500)?;
let r = req
.get::<Read<MetricsRegistryKey>>()
.map_err(map_persistent_err_to_500)?;
let local_metrics = req
.get::<Read<LocalMetricsKey>>()
.map_err(map_persistent_err_to_500)?;
let db_path = req
.get::<Read<DBPathKey>>()
.map_err(map_persistent_err_to_500)?;
// Update metrics that are calculated on each scrape.
local_metrics.update(&beacon_chain, &db_path);
let mut buffer = vec![];
let encoder = TextEncoder::new();
// Gather `DEFAULT_REGISTRY` metrics.
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// Gather metrics from our registry.
let metric_families = r.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
let prom_string = String::from_utf8(buffer).unwrap();
Ok(Response::with((Status::Ok, prom_string)))
}

View File

@ -1,154 +0,0 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use prometheus::{IntGauge, Opts, Registry};
use slot_clock::SlotClock;
use std::fs;
use std::path::PathBuf;
use types::{EthSpec, Slot};
// If set to `true` will iterate and sum the balances of all validators in the state for each
// scrape.
const SHOULD_SUM_VALIDATOR_BALANCES: bool = true;
pub struct LocalMetrics {
present_slot: IntGauge,
present_epoch: IntGauge,
best_slot: IntGauge,
best_beacon_block_root: IntGauge,
justified_beacon_block_root: IntGauge,
finalized_beacon_block_root: IntGauge,
validator_count: IntGauge,
justified_epoch: IntGauge,
finalized_epoch: IntGauge,
validator_balances_sum: IntGauge,
database_size: IntGauge,
}
impl LocalMetrics {
/// Create a new instance.
pub fn new() -> Result<Self, prometheus::Error> {
Ok(Self {
present_slot: {
let opts = Opts::new("present_slot", "slot_at_time_of_scrape");
IntGauge::with_opts(opts)?
},
present_epoch: {
let opts = Opts::new("present_epoch", "epoch_at_time_of_scrape");
IntGauge::with_opts(opts)?
},
best_slot: {
let opts = Opts::new("best_slot", "slot_of_block_at_chain_head");
IntGauge::with_opts(opts)?
},
best_beacon_block_root: {
let opts = Opts::new("best_beacon_block_root", "root_of_block_at_chain_head");
IntGauge::with_opts(opts)?
},
justified_beacon_block_root: {
let opts = Opts::new(
"justified_beacon_block_root",
"root_of_block_at_justified_head",
);
IntGauge::with_opts(opts)?
},
finalized_beacon_block_root: {
let opts = Opts::new(
"finalized_beacon_block_root",
"root_of_block_at_finalized_head",
);
IntGauge::with_opts(opts)?
},
validator_count: {
let opts = Opts::new("validator_count", "number_of_validators");
IntGauge::with_opts(opts)?
},
justified_epoch: {
let opts = Opts::new("justified_epoch", "state_justified_epoch");
IntGauge::with_opts(opts)?
},
finalized_epoch: {
let opts = Opts::new("finalized_epoch", "state_finalized_epoch");
IntGauge::with_opts(opts)?
},
validator_balances_sum: {
let opts = Opts::new("validator_balances_sum", "sum_of_all_validator_balances");
IntGauge::with_opts(opts)?
},
database_size: {
let opts = Opts::new("database_size", "size_of_on_disk_db_in_mb");
IntGauge::with_opts(opts)?
},
})
}
/// Registry this instance with the `registry`.
pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.present_slot.clone()))?;
registry.register(Box::new(self.present_epoch.clone()))?;
registry.register(Box::new(self.best_slot.clone()))?;
registry.register(Box::new(self.best_beacon_block_root.clone()))?;
registry.register(Box::new(self.justified_beacon_block_root.clone()))?;
registry.register(Box::new(self.finalized_beacon_block_root.clone()))?;
registry.register(Box::new(self.validator_count.clone()))?;
registry.register(Box::new(self.finalized_epoch.clone()))?;
registry.register(Box::new(self.justified_epoch.clone()))?;
registry.register(Box::new(self.validator_balances_sum.clone()))?;
registry.register(Box::new(self.database_size.clone()))?;
Ok(())
}
/// Update the metrics in `self` to the latest values.
pub fn update<T: BeaconChainTypes>(&self, beacon_chain: &BeaconChain<T>, db_path: &PathBuf) {
let state = &beacon_chain.head().beacon_state;
let present_slot = beacon_chain
.slot_clock
.present_slot()
.unwrap_or_else(|_| None)
.unwrap_or_else(|| Slot::new(0));
self.present_slot.set(present_slot.as_u64() as i64);
self.present_epoch
.set(present_slot.epoch(T::EthSpec::slots_per_epoch()).as_u64() as i64);
self.best_slot.set(state.slot.as_u64() as i64);
self.best_beacon_block_root
.set(beacon_chain.head().beacon_block_root.to_low_u64_le() as i64);
self.justified_beacon_block_root.set(
beacon_chain
.head()
.beacon_state
.current_justified_checkpoint
.root
.to_low_u64_le() as i64,
);
self.finalized_beacon_block_root.set(
beacon_chain
.head()
.beacon_state
.finalized_checkpoint
.root
.to_low_u64_le() as i64,
);
self.validator_count.set(state.validators.len() as i64);
self.justified_epoch
.set(state.current_justified_checkpoint.epoch.as_u64() as i64);
self.finalized_epoch
.set(state.finalized_checkpoint.epoch.as_u64() as i64);
if SHOULD_SUM_VALIDATOR_BALANCES {
self.validator_balances_sum
.set(state.balances.iter().sum::<u64>() as i64);
}
let db_size = if let Ok(iter) = fs::read_dir(db_path) {
iter.filter_map(Result::ok)
.map(size_of_dir_entry)
.fold(0_u64, |sum, val| sum + val)
} else {
0
};
self.database_size.set(db_size as i64);
}
}
fn size_of_dir_entry(dir: fs::DirEntry) -> u64 {
dir.metadata().map(|m| m.len()).unwrap_or(0)
}

View File

@ -18,8 +18,12 @@ state_processing = { path = "../../eth2/state_processing" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
clap = "2.32.0" clap = "2.32.0"
http = "^0.1.17" http = "^0.1.17"
prometheus = { version = "^0.6", features = ["process"] }
hyper = "0.12.32" hyper = "0.12.32"
futures = "0.1" futures = "0.1"
exit-future = "0.1.3" exit-future = "0.1.3"
tokio = "0.1.17" tokio = "0.1.17"
url = "2.0" url = "2.0"
lazy_static = "1.3.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
slot_clock = { path = "../../eth2/utils/slot_clock" }

View File

@ -18,7 +18,7 @@ impl Default for Config {
Config { Config {
enabled: true, // rest_api enabled by default enabled: true, // rest_api enabled by default
listen_address: Ipv4Addr::new(127, 0, 0, 1), listen_address: Ipv4Addr::new(127, 0, 0, 1),
port: 1248, port: 5052,
} }
} }
} }

View File

@ -1,8 +1,10 @@
extern crate futures; #[macro_use]
extern crate hyper; extern crate lazy_static;
mod beacon; mod beacon;
mod config; mod config;
mod helpers; mod helpers;
mod metrics;
mod node; mod node;
mod spec; mod spec;
mod url_query; mod url_query;
@ -13,6 +15,8 @@ use hyper::rt::Future;
use hyper::service::service_fn_ok; use hyper::service::service_fn_ok;
use hyper::{Body, Method, Response, Server, StatusCode}; use hyper::{Body, Method, Response, Server, StatusCode};
use slog::{info, o, warn}; use slog::{info, o, warn};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use url_query::UrlQuery; use url_query::UrlQuery;
@ -68,6 +72,7 @@ pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
config: &ApiConfig, config: &ApiConfig,
executor: &TaskExecutor, executor: &TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
log: &slog::Logger, log: &slog::Logger,
) -> Result<exit_future::Signal, hyper::Error> { ) -> Result<exit_future::Signal, hyper::Error> {
let log = log.new(o!("Service" => "Api")); let log = log.new(o!("Service" => "Api"));
@ -81,6 +86,8 @@ pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
Ok(()) Ok(())
}); });
let db_path = DBPath(db_path);
// Get the address to bind to // Get the address to bind to
let bind_addr = (config.listen_address, config.port).into(); let bind_addr = (config.listen_address, config.port).into();
@ -91,12 +98,17 @@ pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
let service = move || { let service = move || {
let log = server_log.clone(); let log = server_log.clone();
let beacon_chain = server_bc.clone(); let beacon_chain = server_bc.clone();
let db_path = db_path.clone();
// Create a simple handler for the router, inject our stateful objects into the request. // Create a simple handler for the router, inject our stateful objects into the request.
service_fn_ok(move |mut req| { service_fn_ok(move |mut req| {
metrics::inc_counter(&metrics::REQUEST_COUNT);
let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME);
req.extensions_mut().insert::<slog::Logger>(log.clone()); req.extensions_mut().insert::<slog::Logger>(log.clone());
req.extensions_mut() req.extensions_mut()
.insert::<Arc<BeaconChain<T>>>(beacon_chain.clone()); .insert::<Arc<BeaconChain<T>>>(beacon_chain.clone());
req.extensions_mut().insert::<DBPath>(db_path.clone());
let path = req.uri().path().to_string(); let path = req.uri().path().to_string();
@ -109,6 +121,7 @@ pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
} }
(&Method::GET, "/beacon/state") => beacon::get_state::<T>(req), (&Method::GET, "/beacon/state") => beacon::get_state::<T>(req),
(&Method::GET, "/beacon/state_root") => beacon::get_state_root::<T>(req), (&Method::GET, "/beacon/state_root") => beacon::get_state_root::<T>(req),
(&Method::GET, "/metrics") => metrics::get_prometheus::<T>(req),
(&Method::GET, "/node/version") => node::get_version(req), (&Method::GET, "/node/version") => node::get_version(req),
(&Method::GET, "/node/genesis_time") => node::get_genesis_time::<T>(req), (&Method::GET, "/node/genesis_time") => node::get_genesis_time::<T>(req),
(&Method::GET, "/spec") => spec::get_spec::<T>(req), (&Method::GET, "/spec") => spec::get_spec::<T>(req),
@ -116,9 +129,10 @@ pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
_ => Err(ApiError::MethodNotAllowed(path.clone())), _ => Err(ApiError::MethodNotAllowed(path.clone())),
}; };
match result { let response = match result {
// Return the `hyper::Response`. // Return the `hyper::Response`.
Ok(response) => { Ok(response) => {
metrics::inc_counter(&metrics::SUCCESS_COUNT);
slog::debug!(log, "Request successful: {:?}", path); slog::debug!(log, "Request successful: {:?}", path);
response response
} }
@ -127,7 +141,11 @@ pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
slog::debug!(log, "Request failure: {:?}", path); slog::debug!(log, "Request failure: {:?}", path);
e.into() e.into()
} }
} };
metrics::stop_timer(timer);
response
}) })
}; };
@ -160,3 +178,14 @@ fn success_response(body: Body) -> Response<Body> {
.body(body) .body(body)
.expect("We should always be able to make response from the success body.") .expect("We should always be able to make response from the success body.")
} }
#[derive(Clone)]
pub struct DBPath(PathBuf);
impl Deref for DBPath {
type Target = PathBuf;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -0,0 +1,69 @@
use crate::{success_response, ApiError, ApiResult, DBPath};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use hyper::{Body, Request};
use prometheus::{Encoder, TextEncoder};
use std::sync::Arc;
pub use lighthouse_metrics::*;
lazy_static! {
pub static ref REQUEST_RESPONSE_TIME: Result<Histogram> = try_create_histogram(
"http_server_request_duration_seconds",
"Time taken to build a response to a HTTP request"
);
pub static ref REQUEST_COUNT: Result<IntCounter> = try_create_int_counter(
"http_server_request_total",
"Total count of HTTP requests received"
);
pub static ref SUCCESS_COUNT: Result<IntCounter> = try_create_int_counter(
"http_server_success_total",
"Total count of HTTP 200 responses sent"
);
}
/// Returns the full set of Prometheus metrics for the Beacon Node application.
///
/// # Note
///
/// This is a HTTP handler method.
pub fn get_prometheus<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let beacon_chain = req
.extensions()
.get::<Arc<BeaconChain<T>>>()
.ok_or_else(|| ApiError::ServerError("Beacon chain extension missing".to_string()))?;
let db_path = req
.extensions()
.get::<DBPath>()
.ok_or_else(|| ApiError::ServerError("DBPath extension missing".to_string()))?;
// There are two categories of metrics:
//
// - Dynamically updated: things like histograms and event counters that are updated on the
// fly.
// - Statically updated: things which are only updated at the time of the scrape (used where we
// can avoid cluttering up code with metrics calls).
//
// The `lighthouse_metrics` crate has a `DEFAULT_REGISTRY` global singleton (via `lazy_static`)
// which keeps the state of all the metrics. Dynamically updated things will already be
// up-to-date in the registry (because they update themselves) however statically updated
// things need to be "scraped".
//
// We proceed by, first updating all the static metrics using `scrape_for_metrics(..)`. Then,
// using `lighthouse_metrics::gather(..)` to collect the global `DEFAULT_REGISTRY` metrics into
// a string that can be returned via HTTP.
slot_clock::scrape_for_metrics::<T::EthSpec, T::SlotClock>(&beacon_chain.slot_clock);
store::scrape_for_metrics(&db_path);
beacon_chain::scrape_for_metrics(&beacon_chain);
encoder
.encode(&lighthouse_metrics::gather(), &mut buffer)
.unwrap();
String::from_utf8(buffer)
.map(|string| success_response(Body::from(string)))
.map_err(|e| ApiError::ServerError(format!("Failed to encode prometheus info: {:?}", e)))
}

View File

@ -128,28 +128,6 @@ fn main() {
.help("Listen port for RPC endpoint.") .help("Listen port for RPC endpoint.")
.takes_value(true), .takes_value(true),
) )
/*
* HTTP server parameters.
*/
.arg(
Arg::with_name("http")
.long("http")
.help("Enable the HTTP server.")
.takes_value(false),
)
.arg(
Arg::with_name("http-address")
.long("http-address")
.value_name("Address")
.help("Listen address for the HTTP server.")
.takes_value(true),
)
.arg(
Arg::with_name("http-port")
.long("http-port")
.help("Listen port for the HTTP server.")
.takes_value(true),
)
/* Client related arguments */ /* Client related arguments */
.arg( .arg(
Arg::with_name("api") Arg::with_name("api")

View File

@ -15,3 +15,5 @@ eth2_ssz = "0.1"
eth2_ssz_derive = "0.1" eth2_ssz_derive = "0.1"
tree_hash = "0.1" tree_hash = "0.1"
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
lazy_static = "1.3.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }

View File

@ -9,10 +9,26 @@ impl<T: EthSpec> StoreItem for BeaconBlock<T> {
} }
fn as_store_bytes(&self) -> Vec<u8> { fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes() let timer = metrics::start_timer(&metrics::BEACON_BLOCK_WRITE_TIMES);
let bytes = self.as_ssz_bytes();
metrics::stop_timer(timer);
metrics::inc_counter(&metrics::BEACON_BLOCK_WRITE_COUNT);
metrics::inc_counter_by(&metrics::BEACON_BLOCK_WRITE_BYTES, bytes.len() as i64);
bytes
} }
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> { fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into) let timer = metrics::start_timer(&metrics::BEACON_BLOCK_READ_TIMES);
let len = bytes.len();
let result = Self::from_ssz_bytes(bytes).map_err(Into::into);
metrics::stop_timer(timer);
metrics::inc_counter(&metrics::BEACON_BLOCK_READ_COUNT);
metrics::inc_counter_by(&metrics::BEACON_BLOCK_READ_BYTES, len as i64);
result
} }
} }

View File

@ -53,12 +53,29 @@ impl<T: EthSpec> StoreItem for BeaconState<T> {
} }
fn as_store_bytes(&self) -> Vec<u8> { fn as_store_bytes(&self) -> Vec<u8> {
let timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_TIMES);
let container = StorageContainer::new(self); let container = StorageContainer::new(self);
container.as_ssz_bytes() let bytes = container.as_ssz_bytes();
metrics::stop_timer(timer);
metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT);
metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64);
bytes
} }
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> { fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
let timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES);
let len = bytes.len();
let container = StorageContainer::from_ssz_bytes(bytes)?; let container = StorageContainer::from_ssz_bytes(bytes)?;
container.try_into() let result = container.try_into();
metrics::stop_timer(timer);
metrics::inc_counter(&metrics::BEACON_STATE_READ_COUNT);
metrics::inc_counter_by(&metrics::BEACON_STATE_READ_BYTES, len as i64);
result
} }
} }

View File

@ -1,4 +1,5 @@
use super::*; use super::*;
use crate::metrics;
use db_key::Key; use db_key::Key;
use leveldb::database::kv::KV; use leveldb::database::kv::KV;
use leveldb::database::Database; use leveldb::database::Database;
@ -62,15 +63,27 @@ impl Store for LevelDB {
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> { fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let column_key = Self::get_key_for_col(col, key); let column_key = Self::get_key_for_col(col, key);
self.db metrics::inc_counter(&metrics::DISK_DB_READ_COUNT);
let result = self
.db
.get(self.read_options(), column_key) .get(self.read_options(), column_key)
.map_err(Into::into) .map_err(Into::into);
if let Ok(Some(bytes)) = &result {
metrics::inc_counter_by(&metrics::DISK_DB_READ_BYTES, bytes.len() as i64)
}
result
} }
/// Store some `value` in `column`, indexed with `key`. /// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key); let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT);
metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64);
self.db self.db
.put(self.write_options(), column_key, val) .put(self.write_options(), column_key, val)
.map_err(Into::into) .map_err(Into::into)
@ -80,6 +93,8 @@ impl Store for LevelDB {
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> { fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = Self::get_key_for_col(col, key); let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_EXISTS_COUNT);
self.db self.db
.get(self.read_options(), column_key) .get(self.read_options(), column_key)
.map_err(Into::into) .map_err(Into::into)
@ -89,6 +104,9 @@ impl Store for LevelDB {
/// Removes `key` from `column`. /// Removes `key` from `column`.
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key); let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_DELETE_COUNT);
self.db self.db
.delete(self.write_options(), column_key) .delete(self.write_options(), column_key)
.map_err(Into::into) .map_err(Into::into)

View File

@ -7,18 +7,22 @@
//! //!
//! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See //! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See
//! tests for implementation examples. //! tests for implementation examples.
#[macro_use]
extern crate lazy_static;
mod block_at_slot; mod block_at_slot;
mod errors; mod errors;
mod impls; mod impls;
mod leveldb_store; mod leveldb_store;
mod memory_store; mod memory_store;
mod metrics;
pub mod iter; pub mod iter;
pub use self::leveldb_store::LevelDB as DiskStore; pub use self::leveldb_store::LevelDB as DiskStore;
pub use self::memory_store::MemoryStore; pub use self::memory_store::MemoryStore;
pub use errors::Error; pub use errors::Error;
pub use metrics::scrape_for_metrics;
pub use types::*; pub use types::*;
/// An object capable of storing and retrieving objects implementing `StoreItem`. /// An object capable of storing and retrieving objects implementing `StoreItem`.

View File

@ -0,0 +1,106 @@
pub use lighthouse_metrics::{set_gauge, try_create_int_gauge, *};
use std::fs;
use std::path::PathBuf;
lazy_static! {
/*
* General
*/
pub static ref DISK_DB_SIZE: Result<IntGauge> =
try_create_int_gauge("store_disk_db_size", "Size of the on-disk database (bytes)");
pub static ref DISK_DB_WRITE_BYTES: Result<IntCounter> = try_create_int_counter(
"store_disk_db_write_bytes_total",
"Number of bytes attempted to be written to the on-disk DB"
);
pub static ref DISK_DB_READ_BYTES: Result<IntCounter> = try_create_int_counter(
"store_disk_db_read_bytes_total",
"Number of bytes read from the on-disk DB"
);
pub static ref DISK_DB_READ_COUNT: Result<IntCounter> = try_create_int_counter(
"store_disk_db_read_count_total",
"Total number of reads to the on-disk DB"
);
pub static ref DISK_DB_WRITE_COUNT: Result<IntCounter> = try_create_int_counter(
"store_disk_db_write_count_total",
"Total number of writes to the on-disk DB"
);
pub static ref DISK_DB_EXISTS_COUNT: Result<IntCounter> = try_create_int_counter(
"store_disk_db_exists_count_total",
"Total number of checks if a key is in the on-disk DB"
);
pub static ref DISK_DB_DELETE_COUNT: Result<IntCounter> = try_create_int_counter(
"store_disk_db_delete_count_total",
"Total number of deletions from the on-disk DB"
);
/*
* Beacon State
*/
pub static ref BEACON_STATE_READ_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_read_overhead_seconds",
"Overhead on reading a beacon state from the DB (e.g., decoding)"
);
pub static ref BEACON_STATE_READ_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_read_total",
"Total number of beacon state reads from the DB"
);
pub static ref BEACON_STATE_READ_BYTES: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_read_bytes_total",
"Total number of beacon state bytes read from the DB"
);
pub static ref BEACON_STATE_WRITE_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_write_overhead_seconds",
"Overhead on writing a beacon state to the DB (e.g., encoding)"
);
pub static ref BEACON_STATE_WRITE_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_write_total",
"Total number of beacon state writes the DB"
);
pub static ref BEACON_STATE_WRITE_BYTES: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_write_bytes_total",
"Total number of beacon state bytes written to the DB"
);
/*
* Beacon Block
*/
pub static ref BEACON_BLOCK_READ_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_block_read_overhead_seconds",
"Overhead on reading a beacon block from the DB (e.g., decoding)"
);
pub static ref BEACON_BLOCK_READ_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_block_read_total",
"Total number of beacon block reads from the DB"
);
pub static ref BEACON_BLOCK_READ_BYTES: Result<IntCounter> = try_create_int_counter(
"store_beacon_block_read_bytes_total",
"Total number of beacon block bytes read from the DB"
);
pub static ref BEACON_BLOCK_WRITE_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_block_write_overhead_seconds",
"Overhead on writing a beacon block to the DB (e.g., encoding)"
);
pub static ref BEACON_BLOCK_WRITE_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_block_write_total",
"Total number of beacon block writes the DB"
);
pub static ref BEACON_BLOCK_WRITE_BYTES: Result<IntCounter> = try_create_int_counter(
"store_beacon_block_write_bytes_total",
"Total number of beacon block bytes written to the DB"
);
}
/// Updates the global metrics registry with store-related information.
pub fn scrape_for_metrics(db_path: &PathBuf) {
let db_size = if let Ok(iter) = fs::read_dir(db_path) {
iter.filter_map(std::result::Result::ok)
.map(size_of_dir_entry)
.fold(0_u64, |sum, val| sum + val)
} else {
0
};
set_gauge(&DISK_DB_SIZE, db_size as i64);
}
fn size_of_dir_entry(dir: fs::DirEntry) -> u64 {
dir.metadata().map(|m| m.len()).unwrap_or(0)
}

View File

@ -78,14 +78,6 @@ enabled = false
listen_address = "127.0.0.1" listen_address = "127.0.0.1"
port = 5051 port = 5051
#
# Legacy HTTP server configuration. To be removed.
#
[http]
enabled = false
listen_address = "127.0.0.1"
listen_port = "5052"
# #
# RESTful HTTP API server configuration. # RESTful HTTP API server configuration.
# #
@ -95,4 +87,4 @@ enabled = true
# The listen port for the HTTP server. # The listen port for the HTTP server.
listen_address = "127.0.0.1" listen_address = "127.0.0.1"
# The listen port for the HTTP server. # The listen port for the HTTP server.
port = 1248 port = 5052

View File

@ -0,0 +1,11 @@
[package]
name = "lighthouse_metrics"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lazy_static = "1.3.0"
prometheus = "^0.6"

View File

@ -0,0 +1,72 @@
use prometheus::{HistogramOpts, HistogramTimer, Opts};
pub use prometheus::{Histogram, IntCounter, IntGauge, Result};
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
prometheus::gather()
}
pub fn try_create_int_counter(name: &str, help: &str) -> Result<IntCounter> {
let opts = Opts::new(name, help);
let counter = IntCounter::with_opts(opts)?;
prometheus::register(Box::new(counter.clone()))?;
Ok(counter)
}
pub fn try_create_int_gauge(name: &str, help: &str) -> Result<IntGauge> {
let opts = Opts::new(name, help);
let gauge = IntGauge::with_opts(opts)?;
prometheus::register(Box::new(gauge.clone()))?;
Ok(gauge)
}
pub fn try_create_histogram(name: &str, help: &str) -> Result<Histogram> {
let opts = HistogramOpts::new(name, help);
let histogram = Histogram::with_opts(opts)?;
prometheus::register(Box::new(histogram.clone()))?;
Ok(histogram)
}
pub fn start_timer(histogram: &Result<Histogram>) -> Option<HistogramTimer> {
if let Ok(histogram) = histogram {
Some(histogram.start_timer())
} else {
None
}
}
pub fn stop_timer(timer: Option<HistogramTimer>) {
timer.map(|t| t.observe_duration());
}
pub fn inc_counter(counter: &Result<IntCounter>) {
if let Ok(counter) = counter {
counter.inc();
}
}
pub fn inc_counter_by(counter: &Result<IntCounter>, value: i64) {
if let Ok(counter) = counter {
counter.inc_by(value);
}
}
pub fn set_gauge(gauge: &Result<IntGauge>, value: i64) {
if let Ok(gauge) = gauge {
gauge.set(value);
}
}
pub fn observe(histogram: &Result<Histogram>, value: f64) {
if let Ok(histogram) = histogram {
histogram.observe(value);
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}

View File

@ -6,3 +6,5 @@ edition = "2018"
[dependencies] [dependencies]
types = { path = "../../types" } types = { path = "../../types" }
lazy_static = "1.3.0"
lighthouse_metrics = { path = "../lighthouse_metrics" }

View File

@ -1,9 +1,15 @@
#[macro_use]
extern crate lazy_static;
mod metrics;
mod system_time_slot_clock; mod system_time_slot_clock;
mod testing_slot_clock; mod testing_slot_clock;
use std::time::Duration;
pub use crate::system_time_slot_clock::{Error as SystemTimeSlotClockError, SystemTimeSlotClock}; pub use crate::system_time_slot_clock::{Error as SystemTimeSlotClockError, SystemTimeSlotClock};
pub use crate::testing_slot_clock::{Error as TestingSlotClockError, TestingSlotClock}; pub use crate::testing_slot_clock::{Error as TestingSlotClockError, TestingSlotClock};
use std::time::Duration; pub use metrics::scrape_for_metrics;
pub use types::Slot; pub use types::Slot;
pub trait SlotClock: Send + Sync + Sized { pub trait SlotClock: Send + Sync + Sized {
@ -17,4 +23,6 @@ pub trait SlotClock: Send + Sync + Sized {
fn present_slot(&self) -> Result<Option<Slot>, Self::Error>; fn present_slot(&self) -> Result<Option<Slot>, Self::Error>;
fn duration_to_next_slot(&self) -> Result<Option<Duration>, Self::Error>; fn duration_to_next_slot(&self) -> Result<Option<Duration>, Self::Error>;
fn slot_duration_millis(&self) -> u64;
} }

View File

@ -0,0 +1,32 @@
use crate::SlotClock;
pub use lighthouse_metrics::*;
use types::{EthSpec, Slot};
lazy_static! {
pub static ref PRESENT_SLOT: Result<IntGauge> =
try_create_int_gauge("slotclock_present_slot", "The present wall-clock slot");
pub static ref PRESENT_EPOCH: Result<IntGauge> =
try_create_int_gauge("slotclock_present_epoch", "The present wall-clock epoch");
pub static ref SLOTS_PER_EPOCH: Result<IntGauge> =
try_create_int_gauge("slotclock_slots_per_epoch", "Slots per epoch (constant)");
pub static ref MILLISECONDS_PER_SLOT: Result<IntGauge> = try_create_int_gauge(
"slotclock_slot_time_milliseconds",
"The duration in milliseconds between each slot"
);
}
/// Update the global metrics `DEFAULT_REGISTRY` with info from the slot clock.
pub fn scrape_for_metrics<T: EthSpec, U: SlotClock>(clock: &U) {
let present_slot = match clock.present_slot() {
Ok(Some(slot)) => slot,
_ => Slot::new(0),
};
set_gauge(&PRESENT_SLOT, present_slot.as_u64() as i64);
set_gauge(
&PRESENT_EPOCH,
present_slot.epoch(T::slots_per_epoch()).as_u64() as i64,
);
set_gauge(&SLOTS_PER_EPOCH, T::slots_per_epoch() as i64);
set_gauge(&MILLISECONDS_PER_SLOT, clock.slot_duration_millis() as i64);
}

View File

@ -52,6 +52,10 @@ impl SlotClock for SystemTimeSlotClock {
fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> { fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> {
duration_to_next_slot(self.genesis_seconds, self.slot_duration_seconds) duration_to_next_slot(self.genesis_seconds, self.slot_duration_seconds)
} }
fn slot_duration_millis(&self) -> u64 {
self.slot_duration_seconds * 1000
}
} }
impl From<SystemTimeError> for Error { impl From<SystemTimeError> for Error {

View File

@ -40,6 +40,10 @@ impl SlotClock for TestingSlotClock {
fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> { fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> {
Ok(Some(Duration::from_secs(1))) Ok(Some(Duration::from_secs(1)))
} }
fn slot_duration_millis(&self) -> u64 {
0
}
} }
#[cfg(test)] #[cfg(test)]