Add basic prometheus endpoint

This commit is contained in:
Paul Hauner 2019-05-25 17:25:21 +10:00
parent 596ff5178b
commit 85211ebccd
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
4 changed files with 86 additions and 16 deletions

View File

@ -35,6 +35,8 @@ pub struct Client<T: ClientTypes> {
pub network: Arc<NetworkService<T::EthSpec>>, pub network: Arc<NetworkService<T::EthSpec>>,
/// Signal to terminate the RPC server. /// Signal to terminate the RPC server.
pub rpc_exit_signal: Option<Signal>, pub rpc_exit_signal: Option<Signal>,
/// Signal to terminate the HTTP server.
pub http_exit_signal: Option<Signal>,
/// Signal to terminate the slot timer. /// Signal to terminate the slot timer.
pub slot_timer_exit_signal: Option<Signal>, pub slot_timer_exit_signal: Option<Signal>,
/// The clients logger. /// The clients logger.
@ -109,13 +111,13 @@ impl<TClientType: ClientTypes> Client<TClientType> {
// Start the `http_server` service. // Start the `http_server` service.
// //
// Note: presently we are ignoring the config and _always_ starting a HTTP server. // Note: presently we are ignoring the config and _always_ starting a HTTP server.
http_server::start_service( let http_exit_signal = Some(http_server::start_service(
&config.http_conf, &config.http_conf,
executor, executor,
network_send, network_send,
beacon_chain.clone(), beacon_chain.clone(),
&log, &log,
); ));
let (slot_timer_exit_signal, exit) = exit_future::signal(); let (slot_timer_exit_signal, exit) = exit_future::signal();
if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() {
@ -146,6 +148,7 @@ impl<TClientType: ClientTypes> Client<TClientType> {
Ok(Client { Ok(Client {
_config: config, _config: config,
_beacon_chain: beacon_chain, _beacon_chain: beacon_chain,
http_exit_signal,
rpc_exit_signal, rpc_exit_signal,
slot_timer_exit_signal: Some(slot_timer_exit_signal), slot_timer_exit_signal: Some(slot_timer_exit_signal),
log, log,

View File

@ -19,6 +19,7 @@ protos = { path = "../../protos" }
fork_choice = { path = "../../eth2/fork_choice" } fork_choice = { path = "../../eth2/fork_choice" }
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2" protobuf = "2.0.2"
prometheus = "^0.6"
clap = "2.32.0" clap = "2.32.0"
store = { path = "../store" } store = { path = "../store" }
dirs = "1.0.3" dirs = "1.0.3"

View File

@ -1,13 +1,16 @@
mod prometheus_handler;
use beacon_chain::BeaconChain; use beacon_chain::BeaconChain;
use futures::Future; use futures::Future;
use iron::prelude::*;
use iron::{status::Status, Handler, IronResult, Request, Response};
use network::NetworkMessage; use network::NetworkMessage;
use prometheus_handler::PrometheusHandler;
use router::Router;
use slog::{info, o, warn}; use slog::{info, o, warn};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use types::EthSpec; use types::EthSpec;
use iron::prelude::*;
use iron::{status::Status, Handler, IronResult, Request, Response};
use router::Router;
#[derive(PartialEq, Clone, Debug)] #[derive(PartialEq, Clone, Debug)]
pub struct HttpServerConfig { pub struct HttpServerConfig {
@ -42,13 +45,25 @@ impl Handler for IndexHandler {
} }
} }
pub fn create_iron_http_server() -> Iron<Router> { pub fn create_iron_http_server<T, U, F, E>(
beacon_chain: Arc<BeaconChain<T, U, F, E>>,
) -> Iron<Router>
where
T: store::Store + 'static,
U: slot_clock::SlotClock + 'static,
F: fork_choice::ForkChoice + 'static,
E: EthSpec + 'static,
{
let index_handler = IndexHandler { let index_handler = IndexHandler {
message: "Hello world".to_string(), message: "Hello world".to_string(),
}; };
let prom_handler = PrometheusHandler {
beacon_chain: beacon_chain,
};
let mut router = Router::new(); let mut router = Router::new();
router.get("/", index_handler, "index"); router.get("/", index_handler, "index");
router.get("/prometheus/", prom_handler, "prometheus");
Iron::new(router) Iron::new(router)
} }
@ -56,16 +71,16 @@ pub fn start_service<T, U, F, E>(
config: &HttpServerConfig, config: &HttpServerConfig,
executor: &TaskExecutor, executor: &TaskExecutor,
_network_chan: crossbeam_channel::Sender<NetworkMessage>, _network_chan: crossbeam_channel::Sender<NetworkMessage>,
_beacon_chain: Arc<BeaconChain<T, U, F, E>>, beacon_chain: Arc<BeaconChain<T, U, F, E>>,
log: &slog::Logger, log: &slog::Logger,
) -> exit_future::Signal ) -> exit_future::Signal
where where
T: store::Store, T: store::Store + 'static,
U: slot_clock::SlotClock, U: slot_clock::SlotClock + 'static,
F: fork_choice::ForkChoice, F: fork_choice::ForkChoice + 'static,
E: EthSpec, E: EthSpec + 'static,
{ {
let log = log.new(o!("Service"=>"RPC")); let log = log.new(o!("Service"=>"HTTP"));
// Create: // Create:
// - `shutdown_trigger` a one-shot to shut down this service. // - `shutdown_trigger` a one-shot to shut down this service.
@ -73,9 +88,14 @@ where
let (shutdown_trigger, wait_for_shutdown) = exit_future::signal(); let (shutdown_trigger, wait_for_shutdown) = exit_future::signal();
// Create an `iron` http, without starting it yet. // Create an `iron` http, without starting it yet.
let iron = create_iron_http_server(); let iron = create_iron_http_server(beacon_chain);
let spawn_rpc = { // Create a HTTP server future.
//
// 1. Start the HTTP server
// 2. Build an exit future that will shutdown the server when requested.
// 3. Return the exit future, so the caller may shutdown the service when desired.
let http_service = {
// Start the HTTP server // Start the HTTP server
let server_start_result = iron.http(config.listen_address.clone()); let server_start_result = iron.http(config.listen_address.clone());
@ -102,13 +122,16 @@ where
// //
// See: https://docs.rs/iron/0.6.0/iron/struct.Listening.html#impl // See: https://docs.rs/iron/0.6.0/iron/struct.Listening.html#impl
match server.close() { match server.close() {
_=> () _ => (),
}; };
} }
info!(log, "HTTP server shutdown complete."); info!(log, "HTTP server shutdown complete.");
Ok(()) Ok(())
}) })
}; };
executor.spawn(spawn_rpc);
// Attach the HTTP server to the executor.
executor.spawn(http_service);
shutdown_trigger shutdown_trigger
} }

View File

@ -0,0 +1,43 @@
use beacon_chain::BeaconChain;
use iron::{status::Status, Handler, IronResult, Request, Response};
use prometheus::{IntCounter, Encoder, Opts, Registry, TextEncoder};
use std::sync::Arc;
use types::EthSpec;
pub struct PrometheusHandler<T, U, F, E: EthSpec> {
pub beacon_chain: Arc<BeaconChain<T, U, F, E>>,
}
impl<T, U, F, E> PrometheusHandler<T, U, F, E> where E: EthSpec {}
impl<T, U, F, E> Handler for PrometheusHandler<T, U, F, E>
where
E: EthSpec + 'static,
U: slot_clock::SlotClock + Send + Sync + 'static,
T: Send + Sync + 'static,
F: Send + Sync + 'static,
{
fn handle(&self, _: &mut Request) -> IronResult<Response> {
// Create a Counter.
let counter_opts = Opts::new("present_slot", "direct_slot_clock_reading");
let counter = IntCounter::with_opts(counter_opts).unwrap();
// Create a Registry and register Counter.
let r = Registry::new();
r.register(Box::new(counter.clone())).unwrap();
if let Ok(Some(slot)) = self.beacon_chain.slot_clock.present_slot() {
counter.inc_by(slot.as_u64() as i64);
}
// Gather the metrics.
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = r.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
let prom_string = String::from_utf8(buffer).unwrap();
Ok(Response::with((Status::Ok, prom_string)))
}
}