Add first Server Sent Events API endpoint (#1107)

* Add Server Sent Events API endpoint

* Support both event handlers as a transitory measure

* Fix merge conflicts
This commit is contained in:
Adam Szkoda 2020-06-06 08:39:11 +02:00 committed by GitHub
parent 036096ef61
commit e20a2deebd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 244 additions and 24 deletions

28
Cargo.lock generated
View File

@ -202,6 +202,12 @@ dependencies = [
"webpki-roots 0.19.0",
]
[[package]]
name = "atomic-option"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593"
[[package]]
name = "atty"
version = "0.2.14"
@ -272,6 +278,7 @@ version = "0.1.2"
dependencies = [
"bitvec",
"bls",
"bus",
"environment",
"eth1",
"eth2_config",
@ -478,6 +485,18 @@ version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5356f1d23ee24a1f785a56d1d1a5f0fd5b0f6a0c0fb2412ce11da71649ab78f6"
[[package]]
name = "bus"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1e66e1779f5b1440f1a58220ba3b3ded4427175f0a9fb8d7066521f8b4e8f2b"
dependencies = [
"atomic-option",
"crossbeam-channel",
"num_cpus",
"parking_lot_core 0.7.2",
]
[[package]]
name = "byte-slice-cast"
version = "0.3.5"
@ -616,6 +635,7 @@ name = "client"
version = "0.1.2"
dependencies = [
"beacon_chain",
"bus",
"dirs",
"environment",
"error-chain",
@ -3822,6 +3842,7 @@ dependencies = [
"assert_matches",
"beacon_chain",
"bls",
"bus",
"environment",
"eth2-libp2p",
"eth2_config",
@ -3852,6 +3873,7 @@ dependencies = [
"tokio 0.2.21",
"tree_hash",
"types",
"uhttp_sse",
"url 2.1.1",
"version",
]
@ -5372,6 +5394,12 @@ dependencies = [
"tree_hash_derive",
]
[[package]]
name = "uhttp_sse"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6ff93345ba2206230b1bb1aa3ece1a63dd9443b7531024575d16a0680a59444"
[[package]]
name = "uint"
version = "0.8.3"

View File

@ -47,6 +47,7 @@ bitvec = "0.17.4"
bls = { path = "../../crypto/bls" }
safe_arith = { path = "../../consensus/safe_arith" }
environment = { path = "../../lighthouse/environment" }
bus = "2.2.3"
[dev-dependencies]
lazy_static = "1.4.0"

View File

@ -1,6 +1,10 @@
use bus::Bus;
use parking_lot::Mutex;
use serde_derive::{Deserialize, Serialize};
use slog::{error, Logger};
use std::marker::PhantomData;
use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock};
use std::sync::Arc;
use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHash};
pub use websocket_server::WebSocketSender;
pub trait EventHandler<T: EthSpec>: Sized + Send + Sync {
@ -18,6 +22,80 @@ impl<T: EthSpec> EventHandler<T> for WebSocketSender<T> {
}
}
pub struct ServerSentEvents<T: EthSpec> {
// Bus<> is itself Sync + Send. We use Mutex<> here only because of the surrounding code does
// not enforce mutability statically (i.e. relies on interior mutability).
head_changed_queue: Arc<Mutex<Bus<SignedBeaconBlockHash>>>,
log: Logger,
_phantom: PhantomData<T>,
}
impl<T: EthSpec> ServerSentEvents<T> {
pub fn new(log: Logger) -> (Self, Arc<Mutex<Bus<SignedBeaconBlockHash>>>) {
let bus = Bus::new(T::slots_per_epoch() as usize);
let mutex = Mutex::new(bus);
let arc = Arc::new(mutex);
let this = Self {
head_changed_queue: arc.clone(),
log: log,
_phantom: PhantomData,
};
(this, arc)
}
}
impl<T: EthSpec> EventHandler<T> for ServerSentEvents<T> {
fn register(&self, kind: EventKind<T>) -> Result<(), String> {
match kind {
EventKind::BeaconHeadChanged {
current_head_beacon_block_root,
..
} => {
let mut guard = self.head_changed_queue.lock();
if let Err(_) = guard.try_broadcast(current_head_beacon_block_root.into()) {
error!(
self.log,
"Head change streaming queue full";
"dropped_change" => format!("{}", current_head_beacon_block_root),
);
}
Ok(())
}
_ => Ok(()),
}
}
}
// An event handler that pushes events to both the websockets handler and the SSE handler.
// Named after the unix `tee` command. Meant as a temporary solution before ditching WebSockets
// completely once SSE functions well enough.
pub struct TeeEventHandler<E: EthSpec> {
websockets_handler: WebSocketSender<E>,
sse_handler: ServerSentEvents<E>,
}
impl<E: EthSpec> TeeEventHandler<E> {
pub fn new(
log: Logger,
websockets_handler: WebSocketSender<E>,
) -> Result<(Self, Arc<Mutex<Bus<SignedBeaconBlockHash>>>), String> {
let (sse_handler, bus) = ServerSentEvents::new(log);
let result = Self {
websockets_handler: websockets_handler,
sse_handler: sse_handler,
};
Ok((result, bus))
}
}
impl<E: EthSpec> EventHandler<E> for TeeEventHandler<E> {
fn register(&self, kind: EventKind<E>) -> Result<(), String> {
self.websockets_handler.register(kind.clone())?;
self.sse_handler.register(kind)?;
Ok(())
}
}
impl<T: EthSpec> EventHandler<T> for NullEventHandler<T> {
fn register(&self, _kind: EventKind<T>) -> Result<(), String> {
Ok(())
@ -30,7 +108,7 @@ impl<T: EthSpec> Default for NullEventHandler<T> {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(
bound = "T: EthSpec",
rename_all = "snake_case",

View File

@ -40,3 +40,4 @@ eth2_ssz = "0.1.2"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
time = "0.2.16"
bus = "2.2.3"

View File

@ -1,6 +1,7 @@
use crate::config::{ClientGenesis, Config as ClientConfig};
use crate::notifier::spawn_notifier;
use crate::Client;
use beacon_chain::events::TeeEventHandler;
use beacon_chain::{
builder::{BeaconChainBuilder, Witness},
eth1_chain::{CachingEth1Backend, Eth1Chain},
@ -9,12 +10,14 @@ use beacon_chain::{
store::{HotColdDB, MemoryStore, Store, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler,
};
use bus::Bus;
use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service};
use eth2_config::Eth2Config;
use eth2_libp2p::NetworkGlobals;
use genesis::{interop_genesis_state, Eth1GenesisService};
use network::{NetworkConfig, NetworkMessage, NetworkService};
use parking_lot::Mutex;
use slog::info;
use ssz::Decode;
use std::net::SocketAddr;
@ -23,7 +26,10 @@ use std::sync::Arc;
use std::time::Duration;
use timer::spawn_timer;
use tokio::sync::mpsc::UnboundedSender;
use types::{test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec};
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec,
SignedBeaconBlockHash,
};
use websocket_server::{Config as WebSocketConfig, WebSocketSender};
/// Interval between polling the eth1 node for genesis information.
@ -260,6 +266,7 @@ where
mut self,
client_config: &ClientConfig,
eth2_config: &Eth2Config,
events: Arc<Mutex<Bus<SignedBeaconBlockHash>>>,
) -> Result<Self, String> {
let beacon_chain = self
.beacon_chain
@ -296,6 +303,7 @@ where
.create_freezer_db_path()
.map_err(|_| "unable to read freezer DB dir")?,
eth2_config.clone(),
events,
)
.map_err(|e| format!("Failed to start HTTP API: {:?}", e))?;
@ -434,6 +442,51 @@ where
}
}
impl<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec>
ClientBuilder<
Witness<
TStore,
TStoreMigrator,
TSlotClock,
TEth1Backend,
TEthSpec,
TeeEventHandler<TEthSpec>,
>,
>
where
TStore: Store<TEthSpec> + 'static,
TStoreMigrator: Migrate<TEthSpec>,
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<TEthSpec, TStore> + 'static,
TEthSpec: EthSpec + 'static,
{
/// Specifies that the `BeaconChain` should publish events using the WebSocket server.
pub fn tee_event_handler(
mut self,
config: WebSocketConfig,
) -> Result<(Self, Arc<Mutex<Bus<SignedBeaconBlockHash>>>), String> {
let context = self
.runtime_context
.as_ref()
.ok_or_else(|| "websocket_event_handler requires a runtime_context")?
.service_context("ws".into());
let log = context.log().clone();
let (sender, listening_addr): (WebSocketSender<TEthSpec>, Option<_>) = if config.enabled {
let (sender, listening_addr) =
websocket_server::start_server(context.executor, &config)?;
(sender, Some(listening_addr))
} else {
(WebSocketSender::dummy(), None)
};
self.websocket_listen_addr = listening_addr;
let (tee_event_handler, bus) = TeeEventHandler::new(log, sender)?;
self.event_handler = Some(tee_event_handler);
Ok((self, bus))
}
}
impl<TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler>
ClientBuilder<
Witness<

View File

@ -37,6 +37,8 @@ futures = "0.3.5"
operation_pool = { path = "../operation_pool" }
rayon = "1.3.0"
environment = { path = "../../lighthouse/environment" }
uhttp_sse = "0.5.1"
bus = "2.2.3"
[dev-dependencies]
assert_matches = "1.3.0"

View File

@ -3,16 +3,22 @@ use crate::response_builder::ResponseBuilder;
use crate::validator::get_state_for_epoch;
use crate::{ApiError, ApiResult, UrlQuery};
use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig};
use hyper::{Body, Request};
use bus::BusReader;
use futures::executor::block_on;
use hyper::body::Bytes;
use hyper::{Body, Request, Response};
use rest_types::{
BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse,
ValidatorRequest, ValidatorResponse,
};
use std::io::Write;
use std::sync::Arc;
use store::Store;
use slog::{error, Logger};
use types::{
AttesterSlashing, BeaconState, EthSpec, Hash256, ProposerSlashing, PublicKeyBytes,
RelativeEpoch, Slot,
RelativeEpoch, SignedBeaconBlockHash, Slot,
};
/// HTTP handler to return a `BeaconBlock` at a given `root` or `slot`.
@ -122,6 +128,48 @@ pub fn get_block_root<T: BeaconChainTypes>(
ResponseBuilder::new(&req)?.body(&root)
}
fn make_sse_response_chunk(new_head_hash: SignedBeaconBlockHash) -> std::io::Result<Bytes> {
let mut buffer = Vec::new();
{
let mut sse_message = uhttp_sse::SseMessage::new(&mut buffer);
let untyped_hash: Hash256 = new_head_hash.into();
write!(sse_message.data()?, "{:?}", untyped_hash)?;
}
let bytes: Bytes = buffer.into();
Ok(bytes)
}
pub fn stream_forks<T: BeaconChainTypes>(
log: Logger,
mut events: BusReader<SignedBeaconBlockHash>,
) -> ApiResult {
let (mut sender, body) = Body::channel();
std::thread::spawn(move || {
while let Ok(new_head_hash) = events.recv() {
let chunk = match make_sse_response_chunk(new_head_hash) {
Ok(chunk) => chunk,
Err(e) => {
error!(log, "Failed to make SSE chunk"; "error" => e.to_string());
sender.abort();
break;
}
};
if let Err(bytes) = block_on(sender.send_data(chunk)) {
error!(log, "Couldn't stream piece {:?}", bytes);
}
}
});
let response = Response::builder()
.status(200)
.header("Content-Type", "text/event-stream")
.header("Connection", "Keep-Alive")
.header("Cache-Control", "no-cache")
.header("Access-Control-Allow-Origin", "*")
.body(body)
.map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e)))?;
Ok(response)
}
/// HTTP handler to return the `Fork` of the current head.
pub fn get_fork<T: BeaconChainTypes>(
req: Request<Body>,

View File

@ -71,6 +71,12 @@ impl From<hyper::error::Error> for ApiError {
}
}
impl From<std::io::Error> for ApiError {
fn from(e: std::io::Error) -> ApiError {
ApiError::ServerError(format!("IO error: {:?}", e))
}
}
impl StdError for ApiError {
fn cause(&self) -> Option<&dyn StdError> {
None

View File

@ -21,6 +21,7 @@ mod url_query;
mod validator;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bus::Bus;
use client_network::NetworkMessage;
pub use config::ApiEncodingFormat;
use error::{ApiError, ApiResult};
@ -30,12 +31,13 @@ use futures::future::TryFutureExt;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Server};
use parking_lot::Mutex;
use slog::{info, warn};
use std::net::SocketAddr;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::SignedBeaconBlockHash;
use url_query::UrlQuery;
pub use crate::helpers::parse_pubkey_bytes;
@ -58,6 +60,7 @@ pub fn start_server<T: BeaconChainTypes>(
db_path: PathBuf,
freezer_db_path: PathBuf,
eth2_config: Eth2Config,
events: Arc<Mutex<Bus<SignedBeaconBlockHash>>>,
) -> Result<SocketAddr, hyper::Error> {
let log = executor.log();
let inner_log = log.clone();
@ -72,6 +75,7 @@ pub fn start_server<T: BeaconChainTypes>(
let network_channel = network_info.network_chan.clone();
let db_path = db_path.clone();
let freezer_db_path = freezer_db_path.clone();
let events = events.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
@ -84,6 +88,7 @@ pub fn start_server<T: BeaconChainTypes>(
log.clone(),
db_path.clone(),
freezer_db_path.clone(),
events.clone(),
)
}))
}
@ -131,14 +136,3 @@ pub fn start_server<T: BeaconChainTypes>(
Ok(actual_listen_addr)
}
#[derive(Clone)]
pub struct DBPath(PathBuf);
impl Deref for DBPath {
type Target = PathBuf;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -3,14 +3,16 @@ use crate::{
spec, validator, NetworkChannel,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bus::Bus;
use eth2_config::Eth2Config;
use eth2_libp2p::NetworkGlobals;
use hyper::{Body, Error, Method, Request, Response};
use parking_lot::Mutex;
use slog::debug;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use types::Slot;
use types::{SignedBeaconBlockHash, Slot};
// Allowing more than 7 arguments.
#[allow(clippy::too_many_arguments)]
@ -23,6 +25,7 @@ pub async fn route<T: BeaconChainTypes>(
local_log: slog::Logger,
db_path: PathBuf,
freezer_db_path: PathBuf,
events: Arc<Mutex<Bus<SignedBeaconBlockHash>>>,
) -> Result<Response<Body>, Error> {
metrics::inc_counter(&metrics::REQUEST_COUNT);
let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME);
@ -63,6 +66,10 @@ pub async fn route<T: BeaconChainTypes>(
(&Method::GET, "/beacon/block") => beacon::get_block::<T>(req, beacon_chain),
(&Method::GET, "/beacon/block_root") => beacon::get_block_root::<T>(req, beacon_chain),
(&Method::GET, "/beacon/fork") => beacon::get_fork::<T>(req, beacon_chain),
(&Method::GET, "/beacon/fork/stream") => {
let reader = events.lock().add_rx();
beacon::stream_forks::<T>(log, reader)
}
(&Method::GET, "/beacon/genesis_time") => beacon::get_genesis_time::<T>(req, beacon_chain),
(&Method::GET, "/beacon/genesis_validators_root") => {
beacon::get_genesis_validators_root::<T>(req, beacon_chain)

View File

@ -10,10 +10,10 @@ pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis};
pub use config::{get_data_dir, get_eth2_testnet_config, get_testnet_dir};
pub use eth2_config::Eth2Config;
use beacon_chain::events::TeeEventHandler;
use beacon_chain::migrate::{BackgroundMigrator, HotColdDB};
use beacon_chain::{
builder::Witness, eth1_chain::CachingEth1Backend, events::WebSocketSender,
slot_clock::SystemTimeSlotClock,
builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock,
};
use clap::ArgMatches;
use config::get_config;
@ -30,7 +30,7 @@ pub type ProductionClient<E> = Client<
SystemTimeSlotClock,
CachingEth1Backend<E, HotColdDB<E>>,
E,
WebSocketSender<E>,
TeeEventHandler<E>,
>,
>;
@ -113,15 +113,17 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
builder.no_eth1_backend()?
};
let builder = builder
let (builder, events) = builder
.system_time_slot_clock()?
.websocket_event_handler(client_config.websocket_server.clone())?
.tee_event_handler(client_config.websocket_server.clone())?;
let builder = builder
.build_beacon_chain()?
.network(&mut client_config.network)?
.notifier()?;
let builder = if client_config.rest_api.enabled {
builder.http_server(&client_config, &http_eth2_config)?
builder.http_server(&client_config, &http_eth2_config, events)?
} else {
builder
};