Add global metrics registry, pass to BeaconState
This commit is contained in:
parent
e756a0aaa4
commit
345f7d5f18
@ -13,6 +13,7 @@ failure_derive = "0.1"
|
||||
hashing = { path = "../../eth2/utils/hashing" }
|
||||
fork_choice = { path = "../../eth2/fork_choice" }
|
||||
parking_lot = "0.7"
|
||||
prometheus = "^0.6"
|
||||
log = "0.4"
|
||||
operation_pool = { path = "../../eth2/operation_pool" }
|
||||
env_logger = "0.6"
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::checkpoint::CheckPoint;
|
||||
use crate::errors::{BeaconChainError as Error, BlockProductionError};
|
||||
use crate::metrics::Metrics;
|
||||
use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
|
||||
use fork_choice::{ForkChoice, ForkChoiceError};
|
||||
use log::{debug, trace};
|
||||
@ -96,6 +97,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub state: RwLock<BeaconState<T::EthSpec>>,
|
||||
pub spec: ChainSpec,
|
||||
pub fork_choice: RwLock<T::ForkChoice>,
|
||||
pub metrics: Metrics,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
@ -138,6 +140,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
canonical_head,
|
||||
spec,
|
||||
fork_choice: RwLock::new(fork_choice),
|
||||
metrics: Metrics::new()?,
|
||||
})
|
||||
}
|
||||
|
||||
@ -169,6 +172,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
state: RwLock::new(p.state),
|
||||
spec,
|
||||
fork_choice: RwLock::new(fork_choice),
|
||||
metrics: Metrics::new()?,
|
||||
}))
|
||||
}
|
||||
|
||||
@ -621,6 +625,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Will accept blocks from prior slots, however it will reject any block from a future slot.
|
||||
pub fn process_block(&self, block: BeaconBlock) -> Result<BlockProcessingOutcome, Error> {
|
||||
debug!("Processing block with slot {}...", block.slot);
|
||||
self.metrics.blocks_processed.inc();
|
||||
|
||||
let block_root = block.block_header().canonical_root();
|
||||
|
||||
@ -704,6 +709,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.update_state(state)?;
|
||||
}
|
||||
|
||||
self.metrics.valid_blocks_processed.inc();
|
||||
|
||||
Ok(BlockProcessingOutcome::ValidBlock(ValidBlock::Processed))
|
||||
}
|
||||
|
||||
@ -716,6 +723,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
randao_reveal: Signature,
|
||||
) -> Result<(BeaconBlock, BeaconState<T::EthSpec>), BlockProductionError> {
|
||||
debug!("Producing block at slot {}...", self.state.read().slot);
|
||||
self.metrics.block_production_requests.inc();
|
||||
|
||||
let mut state = self.state.read().clone();
|
||||
|
||||
@ -766,6 +774,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
block.state_root = state_root;
|
||||
|
||||
self.metrics.block_production_successes.inc();
|
||||
|
||||
Ok((block, state))
|
||||
}
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
use crate::metrics::Error as MetricsError;
|
||||
use fork_choice::ForkChoiceError;
|
||||
use state_processing::BlockProcessingError;
|
||||
use state_processing::SlotProcessingError;
|
||||
@ -25,10 +26,17 @@ pub enum BeaconChainError {
|
||||
MissingBeaconBlock(Hash256),
|
||||
MissingBeaconState(Hash256),
|
||||
SlotProcessingError(SlotProcessingError),
|
||||
MetricsError(String),
|
||||
}
|
||||
|
||||
easy_from_to!(SlotProcessingError, BeaconChainError);
|
||||
|
||||
impl From<MetricsError> for BeaconChainError {
|
||||
fn from(e: MetricsError) -> BeaconChainError {
|
||||
BeaconChainError::MetricsError(format!("{:?}", e))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum BlockProductionError {
|
||||
UnableToGetBlockRootFromState,
|
||||
|
@ -1,6 +1,7 @@
|
||||
mod beacon_chain;
|
||||
mod checkpoint;
|
||||
mod errors;
|
||||
mod metrics;
|
||||
mod persisted_beacon_chain;
|
||||
|
||||
pub use self::beacon_chain::{
|
||||
|
59
beacon_node/beacon_chain/src/metrics.rs
Normal file
59
beacon_node/beacon_chain/src/metrics.rs
Normal file
@ -0,0 +1,59 @@
|
||||
pub use prometheus::Error;
|
||||
use prometheus::{IntCounter, Opts, Registry};
|
||||
|
||||
pub struct Metrics {
|
||||
pub blocks_processed: IntCounter,
|
||||
pub valid_blocks_processed: IntCounter,
|
||||
pub block_production_requests: IntCounter,
|
||||
pub block_production_successes: IntCounter,
|
||||
pub attestation_production_requests: IntCounter,
|
||||
pub attestation_production_successes: IntCounter,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub fn new() -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
blocks_processed: {
|
||||
let opts = Opts::new("blocks_processed", "total_blocks_processed");
|
||||
IntCounter::with_opts(opts)?
|
||||
},
|
||||
valid_blocks_processed: {
|
||||
let opts = Opts::new("valid_blocks_processed", "total_valid_blocks_processed");
|
||||
IntCounter::with_opts(opts)?
|
||||
},
|
||||
block_production_requests: {
|
||||
let opts = Opts::new("block_production_requests", "attempts_to_produce_new_block");
|
||||
IntCounter::with_opts(opts)?
|
||||
},
|
||||
block_production_successes: {
|
||||
let opts = Opts::new("block_production_successes", "blocks_successfully_produced");
|
||||
IntCounter::with_opts(opts)?
|
||||
},
|
||||
attestation_production_requests: {
|
||||
let opts = Opts::new(
|
||||
"attestation_production_requests",
|
||||
"total_attestation_production_requests",
|
||||
);
|
||||
IntCounter::with_opts(opts)?
|
||||
},
|
||||
attestation_production_successes: {
|
||||
let opts = Opts::new(
|
||||
"attestation_production_successes",
|
||||
"total_attestation_production_successes",
|
||||
);
|
||||
IntCounter::with_opts(opts)?
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn register(&self, registry: &Registry) -> Result<(), Error> {
|
||||
registry.register(Box::new(self.blocks_processed.clone()))?;
|
||||
registry.register(Box::new(self.valid_blocks_processed.clone()))?;
|
||||
registry.register(Box::new(self.block_production_requests.clone()))?;
|
||||
registry.register(Box::new(self.block_production_successes.clone()))?;
|
||||
registry.register(Box::new(self.attestation_production_requests.clone()))?;
|
||||
registry.register(Box::new(self.attestation_production_successes.clone()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -11,6 +11,7 @@ store = { path = "../store" }
|
||||
http_server = { path = "../http_server" }
|
||||
rpc = { path = "../rpc" }
|
||||
fork_choice = { path = "../../eth2/fork_choice" }
|
||||
prometheus = "^0.6"
|
||||
types = { path = "../../eth2/types" }
|
||||
tree_hash = { path = "../../eth2/utils/tree_hash" }
|
||||
slot_clock = { path = "../../eth2/utils/slot_clock" }
|
||||
|
@ -10,6 +10,7 @@ use beacon_chain_types::InitialiseBeaconChain;
|
||||
use exit_future::Signal;
|
||||
use futures::{future::Future, Stream};
|
||||
use network::Service as NetworkService;
|
||||
use prometheus::Registry;
|
||||
use slog::{error, info, o};
|
||||
use slot_clock::SlotClock;
|
||||
use std::marker::PhantomData;
|
||||
@ -54,10 +55,16 @@ where
|
||||
log: slog::Logger,
|
||||
executor: &TaskExecutor,
|
||||
) -> error::Result<Self> {
|
||||
let metrics_registry = Registry::new();
|
||||
let store = Arc::new(store);
|
||||
|
||||
// Load a `BeaconChain` from the store, or create a new one if it does not exist.
|
||||
let beacon_chain = Arc::new(T::initialise_beacon_chain(store, log.clone()));
|
||||
// Registry all beacon chain metrics with the global registry.
|
||||
beacon_chain
|
||||
.metrics
|
||||
.register(&metrics_registry)
|
||||
.expect("Failed to registry metrics");
|
||||
|
||||
if beacon_chain.read_slot_clock().is_none() {
|
||||
panic!("Cannot start client before genesis!")
|
||||
@ -121,6 +128,7 @@ where
|
||||
executor,
|
||||
network_send,
|
||||
beacon_chain.clone(),
|
||||
metrics_registry,
|
||||
&log,
|
||||
))
|
||||
} else {
|
||||
|
@ -1,5 +1,6 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use iron::typemap::Key;
|
||||
use prometheus::Registry;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -10,3 +11,9 @@ pub struct BeaconChainKey<T> {
|
||||
impl<T: BeaconChainTypes + 'static> Key for BeaconChainKey<T> {
|
||||
type Value = Arc<BeaconChain<T>>;
|
||||
}
|
||||
|
||||
pub struct MetricsRegistryKey;
|
||||
|
||||
impl Key for MetricsRegistryKey {
|
||||
type Value = Registry;
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use futures::Future;
|
||||
use iron::prelude::*;
|
||||
use network::NetworkMessage;
|
||||
use prometheus::Registry;
|
||||
use router::Router;
|
||||
use slog::{info, o, warn};
|
||||
use std::sync::Arc;
|
||||
@ -31,13 +32,14 @@ impl Default for HttpServerConfig {
|
||||
/// Build the `iron` HTTP server, defining the core routes.
|
||||
pub fn create_iron_http_server<T: BeaconChainTypes + 'static>(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
metrics_registry: Registry,
|
||||
) -> Iron<Router> {
|
||||
let mut router = Router::new();
|
||||
|
||||
// A `GET` request to `/metrics` is handled by the `metrics` module.
|
||||
router.get(
|
||||
"/metrics",
|
||||
metrics::build_handler(beacon_chain.clone()),
|
||||
metrics::build_handler(beacon_chain.clone(), metrics_registry),
|
||||
"metrics",
|
||||
);
|
||||
|
||||
@ -53,6 +55,7 @@ pub fn start_service<T: BeaconChainTypes + 'static>(
|
||||
executor: &TaskExecutor,
|
||||
_network_chan: crossbeam_channel::Sender<NetworkMessage>,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
metrics_registry: Registry,
|
||||
log: &slog::Logger,
|
||||
) -> exit_future::Signal {
|
||||
let log = log.new(o!("Service"=>"HTTP"));
|
||||
@ -63,7 +66,7 @@ pub fn start_service<T: BeaconChainTypes + 'static>(
|
||||
let (shutdown_trigger, wait_for_shutdown) = exit_future::signal();
|
||||
|
||||
// Create an `iron` http, without starting it yet.
|
||||
let iron = create_iron_http_server(beacon_chain);
|
||||
let iron = create_iron_http_server(beacon_chain, metrics_registry);
|
||||
|
||||
// Create a HTTP server future.
|
||||
//
|
||||
|
@ -1,4 +1,7 @@
|
||||
use crate::{key::BeaconChainKey, map_persistent_err_to_500};
|
||||
use crate::{
|
||||
key::{BeaconChainKey, MetricsRegistryKey},
|
||||
map_persistent_err_to_500,
|
||||
};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use iron::prelude::*;
|
||||
use iron::{status::Status, Handler, IronResult, Request, Response};
|
||||
@ -11,10 +14,12 @@ use types::Slot;
|
||||
/// Yields a handler for the metrics endpoint.
|
||||
pub fn build_handler<T: BeaconChainTypes + 'static>(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
metrics_registry: Registry,
|
||||
) -> impl Handler {
|
||||
let mut chain = Chain::new(handle_metrics::<T>);
|
||||
|
||||
chain.link(Read::<BeaconChainKey<T>>::both(beacon_chain));
|
||||
chain.link(Read::<MetricsRegistryKey>::both(metrics_registry));
|
||||
|
||||
chain
|
||||
}
|
||||
@ -27,7 +32,9 @@ fn handle_metrics<T: BeaconChainTypes + 'static>(req: &mut Request) -> IronResul
|
||||
.get::<Read<BeaconChainKey<T>>>()
|
||||
.map_err(map_persistent_err_to_500)?;
|
||||
|
||||
let r = Registry::new();
|
||||
let r = req
|
||||
.get::<Read<MetricsRegistryKey>>()
|
||||
.map_err(map_persistent_err_to_500)?;
|
||||
|
||||
let present_slot = beacon_chain
|
||||
.slot_clock
|
||||
|
Loading…
Reference in New Issue
Block a user