Logging via the HTTP API (#4074)

This PR adds the ability to read the Lighthouse logs from the HTTP API for both the BN and the VC. 

This is done in such a way to as minimize any kind of performance hit by adding this feature.

The current design creates a tokio broadcast channel and mixes is into a form of slog drain that combines with our main global logger drain, only if the http api is enabled. 

The drain gets the logs, checks the log level and drops them if they are below INFO. If they are INFO or higher, it sends them via a broadcast channel only if there are users subscribed to the HTTP API channel. If not, it drops the logs. 

If there are more than one subscriber, the channel clones the log records and converts them to json in their independent HTTP API tasks. 

Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
Age Manning 2023-05-22 05:57:08 +00:00
parent c27f2bf9c6
commit aa1ed787e9
23 changed files with 1040 additions and 411 deletions

800
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,6 @@ edition = "2021"
[dev-dependencies]
serde_yaml = "0.8.13"
logging = { path = "../../common/logging" }
state_processing = { path = "../../consensus/state_processing" }
operation_pool = { path = "../operation_pool" }
tokio = "1.14.0"
@ -17,6 +16,7 @@ store = { path = "../store" }
network = { path = "../network" }
timer = { path = "../timer" }
lighthouse_network = { path = "../lighthouse_network" }
logging = { path = "../../common/logging" }
parking_lot = "0.12.0"
types = { path = "../../consensus/types" }
eth2_config = { path = "../../common/eth2_config" }

View File

@ -478,6 +478,7 @@ where
network_globals: None,
eth1_service: Some(genesis_service.eth1_service.clone()),
log: context.log().clone(),
sse_logging_components: runtime_context.sse_logging_components.clone(),
});
// Discard the error from the oneshot.
@ -698,6 +699,7 @@ where
network_senders: self.network_senders.clone(),
network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(),
sse_logging_components: runtime_context.sse_logging_components.clone(),
log: log.clone(),
});

View File

@ -36,11 +36,11 @@ tree_hash = "0.5.0"
sysinfo = "0.26.5"
system_health = { path = "../../common/system_health" }
directory = { path = "../../common/directory" }
logging = { path = "../../common/logging" }
ethereum_serde_utils = "0.5.0"
operation_pool = { path = "../operation_pool" }
sensitive_url = { path = "../../common/sensitive_url" }
unused_port = {path = "../../common/unused_port"}
logging = { path = "../../common/logging" }
store = { path = "../store" }
[dev-dependencies]
@ -51,4 +51,4 @@ genesis = { path = "../genesis" }
[[test]]
name = "bn_http_api_tests"
path = "tests/main.rs"
path = "tests/main.rs"

View File

@ -36,6 +36,7 @@ use eth2::types::{
};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
use operation_pool::ReceivedPreCapella;
use parking_lot::RwLock;
@ -108,6 +109,7 @@ pub struct Context<T: BeaconChainTypes> {
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub eth1_service: Option<eth1::Service>,
pub sse_logging_components: Option<SSELoggingComponents>,
pub log: Logger,
}
@ -448,6 +450,9 @@ pub fn serve<T: BeaconChainTypes>(
let inner_ctx = ctx.clone();
let log_filter = warp::any().map(move || inner_ctx.log.clone());
let inner_components = ctx.sse_logging_components.clone();
let sse_component_filter = warp::any().map(move || inner_components.clone());
// Create a `warp` filter that provides access to local system information.
let system_info = Arc::new(RwLock::new(sysinfo::System::new()));
{
@ -3729,6 +3734,44 @@ pub fn serve<T: BeaconChainTypes>(
},
);
// Subscribe to logs via Server Side Events
// /lighthouse/logs
let lighthouse_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
.and_then(|sse_component: Option<SSELoggingComponents>| {
blocking_response_task(move || {
if let Some(logging_components) = sse_component {
// Build a JSON stream
let s =
BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Side Event
Ok(json) => Ok(Event::default().data(json)),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to receive event {}", e),
)),
}
});
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
))
}
})
});
// Define the ultimate set of routes that will be provided to the server.
// Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`).
let routes = warp::get()
@ -3796,6 +3839,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_block_packing_efficiency)
.uor(get_lighthouse_merge_readiness)
.uor(get_events)
.uor(lighthouse_log_events.boxed())
.recover(warp_utils::reject::handle_rejection),
)
.boxed()

View File

@ -198,6 +198,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
network_senders: Some(network_senders),
network_globals: Some(network_globals),
eth1_service: Some(eth1_service),
sse_logging_components: None,
log,
});

View File

@ -1081,7 +1081,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.long("gui")
.hidden(true)
.help("Enable the graphical user interface and all its requirements. \
This is equivalent to --http and --validator-monitor-auto.")
This enables --http and --validator-monitor-auto and enables SSE logging.")
.takes_value(false)
)
.arg(

View File

@ -679,3 +679,31 @@ Caveats:
This is because the state _prior_ to the `start_epoch` needs to be loaded from the database, and
loading a state on a boundary is most efficient.
### `/lighthouse/logs`
This is a Server Side Event subscription endpoint. This allows a user to read
the Lighthouse logs directly from the HTTP API endpoint. This currently
exposes INFO and higher level logs. It is only enabled when the `--gui` flag is set in the CLI.
Example:
```bash
curl -N "http://localhost:5052/lighthouse/logs"
```
Should provide an output that emits log events as they occur:
```json
{
"data": {
"time": "Mar 13 15:28:41",
"level": "INFO",
"msg": "Syncing",
"service": "slot_notifier",
"est_time": "1 hr 27 mins",
"speed": "5.33 slots/sec",
"distance": "28141 slots (3 days 21 hrs)",
"peers": "8"
}
}
```

View File

@ -578,3 +578,33 @@ The following fields may be omitted or nullified to obtain default values:
### Example Response Body
*No data is included in the response body.*
## `GET /lighthouse/logs`
Provides a subscription to receive logs as Server Side Events. Currently the
logs emitted are INFO level or higher.
### HTTP Specification
| Property | Specification |
|-------------------|--------------------------------------------|
| Path | `/lighthouse/logs` |
| Method | GET |
| Required Headers | None |
| Typical Responses | 200 |
### Example Response Body
```json
{
"data": {
"time": "Mar 13 15:26:53",
"level": "INFO",
"msg": "Connected to beacon node(s)",
"service": "notifier",
"synced": 1,
"available": 1,
"total": 1
}
}
```

View File

@ -10,6 +10,13 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
[dependencies]
slog = "2.5.2"
slog-term = "2.6.0"
tokio = { version = "1.26.0", features = ["sync"] }
lighthouse_metrics = { path = "../lighthouse_metrics" }
lazy_static = "1.4.0"
sloggers = { version = "2.1.1", features = ["json"] }
slog-async = "2.7.0"
take_mut = "0.2.2"
parking_lot = "0.12.1"
serde = "1.0.153"
serde_json = "1.0.94"
chrono = "0.4.23"

View File

@ -0,0 +1,309 @@
//! An object that can be used to pass through a channel and be cloned. It can therefore be used
//! via the broadcast channel.
use parking_lot::Mutex;
use serde::ser::SerializeMap;
use serde::serde_if_integer128;
use serde::Serialize;
use slog::{BorrowedKV, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV};
use std::cell::RefCell;
use std::fmt;
use std::fmt::Write;
use std::sync::Arc;
use take_mut::take;
thread_local! {
static TL_BUF: RefCell<String> = RefCell::new(String::with_capacity(128))
}
/// Serialized record.
#[derive(Clone)]
pub struct AsyncRecord {
msg: String,
level: Level,
location: Box<slog::RecordLocation>,
tag: String,
logger_values: OwnedKVList,
kv: Arc<Mutex<dyn KV + Send>>,
}
impl AsyncRecord {
/// Serializes a `Record` and an `OwnedKVList`.
pub fn from(record: &Record, logger_values: &OwnedKVList) -> Self {
let mut ser = ToSendSerializer::new();
record
.kv()
.serialize(record, &mut ser)
.expect("`ToSendSerializer` can't fail");
AsyncRecord {
msg: fmt::format(*record.msg()),
level: record.level(),
location: Box::new(*record.location()),
tag: String::from(record.tag()),
logger_values: logger_values.clone(),
kv: Arc::new(Mutex::new(ser.finish())),
}
}
pub fn to_json_string(&self) -> Result<String, String> {
serde_json::to_string(&self).map_err(|e| format!("{:?}", e))
}
}
pub struct ToSendSerializer {
kv: Box<dyn KV + Send>,
}
impl ToSendSerializer {
fn new() -> Self {
ToSendSerializer { kv: Box::new(()) }
}
fn finish(self) -> Box<dyn KV + Send> {
self.kv
}
}
impl Serializer for ToSendSerializer {
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_unit(&mut self, key: Key) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
Ok(())
}
fn emit_none(&mut self, key: Key) -> slog::Result {
let val: Option<()> = None;
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(integer128)]
fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(integer128)]
fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
let val = val.to_owned();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
let val = fmt::format(*val);
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
}
impl Serialize for AsyncRecord {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// Get the current time
let dt = chrono::Local::now().format("%b %e %T").to_string();
let rs = RecordStatic {
location: &self.location,
level: self.level,
tag: &self.tag,
};
let mut map_serializer = SerdeSerializer::new(serializer)?;
// Serialize the time and log level first
map_serializer.serialize_entry("time", &dt)?;
map_serializer.serialize_entry("level", self.level.as_short_str())?;
let kv = self.kv.lock();
// Convoluted pattern to avoid binding `format_args!` to a temporary.
// See: https://stackoverflow.com/questions/56304313/cannot-use-format-args-due-to-temporary-value-is-freed-at-the-end-of-this-state
let mut f = |msg: std::fmt::Arguments| {
map_serializer.serialize_entry("msg", &msg.to_string())?;
let record = Record::new(&rs, &msg, BorrowedKV(&(*kv)));
self.logger_values
.serialize(&record, &mut map_serializer)
.map_err(serde::ser::Error::custom)?;
record
.kv()
.serialize(&record, &mut map_serializer)
.map_err(serde::ser::Error::custom)
};
f(format_args!("{}", self.msg))?;
map_serializer.end()
}
}
struct SerdeSerializer<S: serde::Serializer> {
/// Current state of map serializing: `serde::Serializer::MapState`
ser_map: S::SerializeMap,
}
impl<S: serde::Serializer> SerdeSerializer<S> {
fn new(ser: S) -> Result<Self, S::Error> {
let ser_map = ser.serialize_map(None)?;
Ok(SerdeSerializer { ser_map })
}
fn serialize_entry<K, V>(&mut self, key: K, value: V) -> Result<(), S::Error>
where
K: serde::Serialize,
V: serde::Serialize,
{
self.ser_map.serialize_entry(&key, &value)
}
/// Finish serialization, and return the serializer
fn end(self) -> Result<S::Ok, S::Error> {
self.ser_map.end()
}
}
// NOTE: This is borrowed from slog_json
macro_rules! impl_m(
($s:expr, $key:expr, $val:expr) => ({
let k_s: &str = $key.as_ref();
$s.ser_map.serialize_entry(k_s, $val)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("serde serialization error: {}", e)))?;
Ok(())
});
);
impl<S> slog::Serializer for SerdeSerializer<S>
where
S: serde::Serializer,
{
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_unit(&mut self, key: Key) -> slog::Result {
impl_m!(self, key, &())
}
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_none(&mut self, key: Key) -> slog::Result {
let val: Option<()> = None;
impl_m!(self, key, &val)
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
impl_m!(self, key, &val)
}
serde_if_integer128! {
fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
impl_m!(self, key, &val)
}
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
TL_BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.write_fmt(*val).unwrap();
let res = { || impl_m!(self, key, &*buf) }();
buf.clear();
res
})
}
}

View File

@ -11,6 +11,10 @@ use std::time::{Duration, Instant};
pub const MAX_MESSAGE_WIDTH: usize = 40;
pub mod async_record;
mod sse_logging_components;
pub use sse_logging_components::SSELoggingComponents;
/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);

View File

@ -0,0 +1,46 @@
//! This module provides an implementation of `slog::Drain` that optionally writes to a channel if
//! there are subscribers to a HTTP SSE stream.
use crate::async_record::AsyncRecord;
use slog::{Drain, OwnedKVList, Record};
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
/// Default log level for SSE Events.
// NOTE: Made this a constant. Debug level seems to be pretty intense. Can make this
// configurable later if needed.
const LOG_LEVEL: slog::Level = slog::Level::Info;
/// The components required in the HTTP API task to receive logged events.
#[derive(Clone)]
pub struct SSELoggingComponents {
/// The channel to receive events from.
pub sender: Arc<AssertUnwindSafe<Sender<AsyncRecord>>>,
}
impl SSELoggingComponents {
/// Create a new SSE drain.
pub fn new(channel_size: usize) -> Self {
let (sender, _receiver) = tokio::sync::broadcast::channel(channel_size);
let sender = Arc::new(AssertUnwindSafe(sender));
SSELoggingComponents { sender }
}
}
impl Drain for SSELoggingComponents {
type Ok = ();
type Err = &'static str;
fn log(&self, record: &Record, logger_values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
if record.level().is_at_least(LOG_LEVEL) {
// Attempt to send the logs
match self.sender.send(AsyncRecord::from(record, logger_values)) {
Ok(_num_sent) => {} // Everything got sent
Err(_err) => {} // There are no subscribers, do nothing
}
}
Ok(())
}
}

View File

@ -881,6 +881,7 @@ fn run<T: EthSpec>(
max_log_number: 0,
compression: false,
is_restricted: true,
sse_logging: false, // No SSE Logging in LCLI
})
.map_err(|e| format!("should start logger: {:?}", e))?
.build()

View File

@ -12,6 +12,7 @@ use eth2_network_config::Eth2NetworkConfig;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{future, StreamExt};
use logging::SSELoggingComponents;
use serde_derive::{Deserialize, Serialize};
use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger};
use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build};
@ -36,6 +37,7 @@ use {futures::channel::oneshot, std::cell::RefCell};
pub use task_executor::test_utils::null_logger;
const LOG_CHANNEL_SIZE: usize = 2048;
const SSE_LOG_CHANNEL_SIZE: usize = 2048;
/// The maximum time in seconds the client will wait for all internal tasks to shutdown.
const MAXIMUM_SHUTDOWN_TIME: u64 = 15;
@ -57,6 +59,7 @@ pub struct LoggerConfig {
pub max_log_number: usize,
pub compression: bool,
pub is_restricted: bool,
pub sse_logging: bool,
}
impl Default for LoggerConfig {
fn default() -> Self {
@ -72,14 +75,54 @@ impl Default for LoggerConfig {
max_log_number: 5,
compression: false,
is_restricted: true,
sse_logging: false,
}
}
}
/// An execution context that can be used by a service.
///
/// Distinct from an `Environment` because a `Context` is not able to give a mutable reference to a
/// `Runtime`, instead it only has access to a `Runtime`.
#[derive(Clone)]
pub struct RuntimeContext<E: EthSpec> {
pub executor: TaskExecutor,
pub eth_spec_instance: E,
pub eth2_config: Eth2Config,
pub eth2_network_config: Option<Arc<Eth2NetworkConfig>>,
pub sse_logging_components: Option<SSELoggingComponents>,
}
impl<E: EthSpec> RuntimeContext<E> {
/// Returns a sub-context of this context.
///
/// The generated service will have the `service_name` in all it's logs.
pub fn service_context(&self, service_name: String) -> Self {
Self {
executor: self.executor.clone_with_name(service_name),
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
sse_logging_components: self.sse_logging_components.clone(),
}
}
/// Returns the `eth2_config` for this service.
pub fn eth2_config(&self) -> &Eth2Config {
&self.eth2_config
}
/// Returns a reference to the logger for this service.
pub fn log(&self) -> &slog::Logger {
self.executor.log()
}
}
/// Builds an `Environment`.
pub struct EnvironmentBuilder<E: EthSpec> {
runtime: Option<Arc<Runtime>>,
log: Option<Logger>,
sse_logging_components: Option<SSELoggingComponents>,
eth_spec_instance: E,
eth2_config: Eth2Config,
eth2_network_config: Option<Eth2NetworkConfig>,
@ -91,6 +134,7 @@ impl EnvironmentBuilder<MinimalEthSpec> {
Self {
runtime: None,
log: None,
sse_logging_components: None,
eth_spec_instance: MinimalEthSpec,
eth2_config: Eth2Config::minimal(),
eth2_network_config: None,
@ -104,6 +148,7 @@ impl EnvironmentBuilder<MainnetEthSpec> {
Self {
runtime: None,
log: None,
sse_logging_components: None,
eth_spec_instance: MainnetEthSpec,
eth2_config: Eth2Config::mainnet(),
eth2_network_config: None,
@ -117,6 +162,7 @@ impl EnvironmentBuilder<GnosisEthSpec> {
Self {
runtime: None,
log: None,
sse_logging_components: None,
eth_spec_instance: GnosisEthSpec,
eth2_config: Eth2Config::gnosis(),
eth2_network_config: None,
@ -265,7 +311,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
.build()
.map_err(|e| format!("Unable to build file logger: {}", e))?;
let log = Logger::root(Duplicate::new(stdout_logger, file_logger).fuse(), o!());
let mut log = Logger::root(Duplicate::new(stdout_logger, file_logger).fuse(), o!());
info!(
log,
@ -273,6 +319,14 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
"path" => format!("{:?}", path)
);
// If the http API is enabled, we may need to send logs to be consumed by subscribers.
if config.sse_logging {
let sse_logger = SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE);
self.sse_logging_components = Some(sse_logger.clone());
log = Logger::root(Duplicate::new(log, sse_logger).fuse(), o!());
}
self.log = Some(log);
Ok(self)
@ -315,6 +369,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
signal: Some(signal),
exit,
log: self.log.ok_or("Cannot build environment without log")?,
sse_logging_components: self.sse_logging_components,
eth_spec_instance: self.eth_spec_instance,
eth2_config: self.eth2_config,
eth2_network_config: self.eth2_network_config.map(Arc::new),
@ -322,42 +377,6 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
}
}
/// An execution context that can be used by a service.
///
/// Distinct from an `Environment` because a `Context` is not able to give a mutable reference to a
/// `Runtime`, instead it only has access to a `Runtime`.
#[derive(Clone)]
pub struct RuntimeContext<E: EthSpec> {
pub executor: TaskExecutor,
pub eth_spec_instance: E,
pub eth2_config: Eth2Config,
pub eth2_network_config: Option<Arc<Eth2NetworkConfig>>,
}
impl<E: EthSpec> RuntimeContext<E> {
/// Returns a sub-context of this context.
///
/// The generated service will have the `service_name` in all it's logs.
pub fn service_context(&self, service_name: String) -> Self {
Self {
executor: self.executor.clone_with_name(service_name),
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
}
}
/// Returns the `eth2_config` for this service.
pub fn eth2_config(&self) -> &Eth2Config {
&self.eth2_config
}
/// Returns a reference to the logger for this service.
pub fn log(&self) -> &slog::Logger {
self.executor.log()
}
}
/// An environment where Lighthouse services can run. Used to start a production beacon node or
/// validator client, or to run tests that involve logging and async task execution.
pub struct Environment<E: EthSpec> {
@ -369,6 +388,7 @@ pub struct Environment<E: EthSpec> {
signal: Option<exit_future::Signal>,
exit: exit_future::Exit,
log: Logger,
sse_logging_components: Option<SSELoggingComponents>,
eth_spec_instance: E,
pub eth2_config: Eth2Config,
pub eth2_network_config: Option<Arc<Eth2NetworkConfig>>,
@ -395,6 +415,7 @@ impl<E: EthSpec> Environment<E> {
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
sse_logging_components: self.sse_logging_components.clone(),
}
}
@ -410,6 +431,7 @@ impl<E: EthSpec> Environment<E> {
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
sse_logging_components: self.sse_logging_components.clone(),
}
}

View File

@ -483,6 +483,16 @@ fn run<E: EthSpec>(
};
}
let sse_logging = {
if let Some(bn_matches) = matches.subcommand_matches("beacon_node") {
bn_matches.is_present("gui")
} else if let Some(vc_matches) = matches.subcommand_matches("validator_client") {
vc_matches.is_present("http")
} else {
false
}
};
let logger_config = LoggerConfig {
path: log_path,
debug_level: String::from(debug_level),
@ -495,6 +505,7 @@ fn run<E: EthSpec>(
max_log_number: logfile_max_number,
compression: logfile_compress,
is_restricted: logfile_restricted,
sse_logging,
};
let builder = environment_builder.initialize_logger(logger_config.clone())?;

View File

@ -72,6 +72,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
max_log_number: 0,
compression: false,
is_restricted: true,
sse_logging: false,
})?
.multi_threaded_tokio_runtime()?
.build()?;

View File

@ -54,6 +54,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
max_log_number: 0,
compression: false,
is_restricted: true,
sse_logging: false,
})?
.multi_threaded_tokio_runtime()?
.build()?;

View File

@ -58,6 +58,7 @@ fn syncing_sim(
max_log_number: 0,
compression: false,
is_restricted: true,
sse_logging: false,
})?
.multi_threaded_tokio_runtime()?
.build()?;

View File

@ -25,6 +25,7 @@ bincode = "1.3.1"
serde_json = "1.0.58"
slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_trace"] }
tokio = { version = "1.14.0", features = ["time"] }
tokio-stream = { version = "0.1.3", features = ["sync"] }
futures = "0.3.7"
dirs = "3.0.1"
directory = { path = "../common/directory" }
@ -61,4 +62,5 @@ url = "2.2.2"
malloc_utils = { path = "../common/malloc_utils" }
sysinfo = "0.26.5"
system_health = { path = "../common/system_health" }
logging = { path = "../common/logging" }

View File

@ -18,6 +18,7 @@ use eth2::lighthouse_vc::{
types::{self as api_types, GenericResponse, Graffiti, PublicKey, PublicKeyBytes},
};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use slog::{crit, info, warn, Logger};
@ -31,6 +32,7 @@ use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::observe_system_health_vc;
use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::{
@ -39,6 +41,7 @@ use warp::{
response::Response,
StatusCode,
},
sse::Event,
Filter,
};
@ -73,6 +76,7 @@ pub struct Context<T: SlotClock, E: EthSpec> {
pub spec: ChainSpec,
pub config: Config,
pub log: Logger,
pub sse_logging_components: Option<SSELoggingComponents>,
pub slot_clock: T,
pub _phantom: PhantomData<E>,
}
@ -201,6 +205,10 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let api_token_path_inner = api_token_path.clone();
let api_token_path_filter = warp::any().map(move || api_token_path_inner.clone());
// Filter for SEE Logging events
let inner_components = ctx.sse_logging_components.clone();
let sse_component_filter = warp::any().map(move || inner_components.clone());
// Create a `warp` filter that provides access to local system information.
let system_info = Arc::new(RwLock::new(sysinfo::System::new()));
{
@ -1021,6 +1029,49 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
});
// Subscribe to get VC logs via Server side events
// /lighthouse/logs
let get_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
.and_then(|sse_component: Option<SSELoggingComponents>| {
warp_utils::task::blocking_task(move || {
if let Some(logging_components) = sse_component {
// Build a JSON stream
let s =
BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default().json_data(json).map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to receive event {}", e),
)),
}
});
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
))
}
})
});
let routes = warp::any()
.and(authorization_header_filter)
// Note: it is critical that the `authorization_header_filter` is applied to all routes.
@ -1061,8 +1112,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.or(delete_std_remotekeys),
)),
)
// The auth route is the only route that is allowed to be accessed without the API token.
.or(warp::get().and(get_auth))
// The auth route and logs are the only routes that are allowed to be accessed without the API token.
.or(warp::get().and(get_auth.or(get_log_events.boxed())))
// Maps errors into HTTP responses.
.recover(warp_utils::reject::handle_rejection)
// Add a `Server` header.

View File

@ -134,7 +134,8 @@ impl ApiTester {
listen_port: 0,
allow_origin: None,
},
log: log.clone(),
sse_logging_components: None,
log,
slot_clock: slot_clock.clone(),
_phantom: PhantomData,
});

View File

@ -576,6 +576,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
graffiti_flag: self.config.graffiti,
spec: self.context.eth2_config.spec.clone(),
config: self.config.http_api.clone(),
sse_logging_components: self.context.sse_logging_components.clone(),
slot_clock: self.slot_clock.clone(),
log: log.clone(),
_phantom: PhantomData,