diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 6433b94e2..9445799d5 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -35,6 +35,8 @@ pub struct Client { pub network: Arc>, /// Signal to terminate the RPC server. pub rpc_exit_signal: Option, + /// Signal to terminate the HTTP server. + pub http_exit_signal: Option, /// Signal to terminate the slot timer. pub slot_timer_exit_signal: Option, /// The clients logger. @@ -109,13 +111,13 @@ impl Client { // Start the `http_server` service. // // Note: presently we are ignoring the config and _always_ starting a HTTP server. - http_server::start_service( + let http_exit_signal = Some(http_server::start_service( &config.http_conf, executor, network_send, beacon_chain.clone(), &log, - ); + )); let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { @@ -146,6 +148,7 @@ impl Client { Ok(Client { _config: config, _beacon_chain: beacon_chain, + http_exit_signal, rpc_exit_signal, slot_timer_exit_signal: Some(slot_timer_exit_signal), log, diff --git a/beacon_node/http_server/Cargo.toml b/beacon_node/http_server/Cargo.toml index 5d5d7e492..6f4579d17 100644 --- a/beacon_node/http_server/Cargo.toml +++ b/beacon_node/http_server/Cargo.toml @@ -19,6 +19,7 @@ protos = { path = "../../protos" } fork_choice = { path = "../../eth2/fork_choice" } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } protobuf = "2.0.2" +prometheus = "^0.6" clap = "2.32.0" store = { path = "../store" } dirs = "1.0.3" diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index b230f924f..b2c3a86fc 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -1,13 +1,16 @@ +mod prometheus_handler; + use beacon_chain::BeaconChain; use futures::Future; +use iron::prelude::*; +use iron::{status::Status, Handler, IronResult, Request, Response}; use network::NetworkMessage; +use prometheus_handler::PrometheusHandler; +use router::Router; use slog::{info, o, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use types::EthSpec; -use iron::prelude::*; -use iron::{status::Status, Handler, IronResult, Request, Response}; -use router::Router; #[derive(PartialEq, Clone, Debug)] pub struct HttpServerConfig { @@ -42,13 +45,25 @@ impl Handler for IndexHandler { } } -pub fn create_iron_http_server() -> Iron { +pub fn create_iron_http_server( + beacon_chain: Arc>, +) -> Iron +where + T: store::Store + 'static, + U: slot_clock::SlotClock + 'static, + F: fork_choice::ForkChoice + 'static, + E: EthSpec + 'static, +{ let index_handler = IndexHandler { message: "Hello world".to_string(), }; + let prom_handler = PrometheusHandler { + beacon_chain: beacon_chain, + }; let mut router = Router::new(); router.get("/", index_handler, "index"); + router.get("/prometheus/", prom_handler, "prometheus"); Iron::new(router) } @@ -56,16 +71,16 @@ pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, _network_chan: crossbeam_channel::Sender, - _beacon_chain: Arc>, + beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal where - T: store::Store, - U: slot_clock::SlotClock, - F: fork_choice::ForkChoice, - E: EthSpec, + T: store::Store + 'static, + U: slot_clock::SlotClock + 'static, + F: fork_choice::ForkChoice + 'static, + E: EthSpec + 'static, { - let log = log.new(o!("Service"=>"RPC")); + let log = log.new(o!("Service"=>"HTTP")); // Create: // - `shutdown_trigger` a one-shot to shut down this service. @@ -73,9 +88,14 @@ where let (shutdown_trigger, wait_for_shutdown) = exit_future::signal(); // Create an `iron` http, without starting it yet. - let iron = create_iron_http_server(); + let iron = create_iron_http_server(beacon_chain); - let spawn_rpc = { + // Create a HTTP server future. + // + // 1. Start the HTTP server + // 2. Build an exit future that will shutdown the server when requested. + // 3. Return the exit future, so the caller may shutdown the service when desired. + let http_service = { // Start the HTTP server let server_start_result = iron.http(config.listen_address.clone()); @@ -102,13 +122,16 @@ where // // See: https://docs.rs/iron/0.6.0/iron/struct.Listening.html#impl match server.close() { - _=> () + _ => (), }; } info!(log, "HTTP server shutdown complete."); Ok(()) }) }; - executor.spawn(spawn_rpc); + + // Attach the HTTP server to the executor. + executor.spawn(http_service); + shutdown_trigger } diff --git a/beacon_node/http_server/src/prometheus_handler.rs b/beacon_node/http_server/src/prometheus_handler.rs new file mode 100644 index 000000000..cf577a9eb --- /dev/null +++ b/beacon_node/http_server/src/prometheus_handler.rs @@ -0,0 +1,43 @@ +use beacon_chain::BeaconChain; +use iron::{status::Status, Handler, IronResult, Request, Response}; +use prometheus::{IntCounter, Encoder, Opts, Registry, TextEncoder}; +use std::sync::Arc; +use types::EthSpec; + +pub struct PrometheusHandler { + pub beacon_chain: Arc>, +} + +impl PrometheusHandler where E: EthSpec {} + +impl Handler for PrometheusHandler +where + E: EthSpec + 'static, + U: slot_clock::SlotClock + Send + Sync + 'static, + T: Send + Sync + 'static, + F: Send + Sync + 'static, +{ + fn handle(&self, _: &mut Request) -> IronResult { + // Create a Counter. + let counter_opts = Opts::new("present_slot", "direct_slot_clock_reading"); + let counter = IntCounter::with_opts(counter_opts).unwrap(); + + // Create a Registry and register Counter. + let r = Registry::new(); + r.register(Box::new(counter.clone())).unwrap(); + + if let Ok(Some(slot)) = self.beacon_chain.slot_clock.present_slot() { + counter.inc_by(slot.as_u64() as i64); + } + + // Gather the metrics. + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = r.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let prom_string = String::from_utf8(buffer).unwrap(); + + Ok(Response::with((Status::Ok, prom_string))) + } +}