lighthouse/beacon_node/websocket_server/src/lib.rs

118 lines
3.2 KiB
Rust
Raw Normal View History

use beacon_chain::events::{EventHandler, EventKind};
use futures::Future;
use slog::{debug, error, info, warn, Logger};
use std::marker::PhantomData;
2019-09-14 14:34:03 +00:00
use std::thread;
use tokio::runtime::TaskExecutor;
2019-09-14 14:34:03 +00:00
use types::EthSpec;
use ws::{Sender, WebSocket};
2019-09-15 02:57:46 +00:00
mod config;
2019-09-14 14:34:03 +00:00
2019-09-15 02:57:46 +00:00
pub use config::Config;
2019-09-14 14:34:03 +00:00
pub struct WebSocketSender<T: EthSpec> {
sender: Option<Sender>,
_phantom: PhantomData<T>,
2019-09-14 14:34:03 +00:00
}
impl<T: EthSpec> WebSocketSender<T> {
/// Creates a dummy websocket server that never starts and where all future calls are no-ops.
pub fn dummy() -> Self {
Self {
sender: None,
_phantom: PhantomData,
}
}
2019-09-14 14:34:03 +00:00
pub fn send_string(&self, string: String) -> Result<(), String> {
if let Some(sender) = &self.sender {
sender
.send(string)
.map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e))
} else {
Ok(())
}
}
}
impl<T: EthSpec> EventHandler<T> for WebSocketSender<T> {
fn register(&self, kind: EventKind<T>) -> Result<(), String> {
self.send_string(
serde_json::to_string(&kind)
.map_err(|e| format!("Unable to serialize event: {:?}", e))?,
)
2019-09-14 14:34:03 +00:00
}
}
pub fn start_server<T: EthSpec>(
config: &Config,
executor: &TaskExecutor,
log: &Logger,
) -> Result<(WebSocketSender<T>, exit_future::Signal), String> {
2019-09-14 14:34:03 +00:00
let server_string = format!("{}:{}", config.listen_address, config.port);
info!(
log,
"Websocket server starting";
"listen_address" => &server_string
);
// Create a server that simply ignores any incoming messages.
let server = WebSocket::new(|_| |_| Ok(()))
.map_err(|e| format!("Failed to initialize websocket server: {:?}", e))?;
let broadcaster = server.broadcaster();
// Produce a signal/channel that can gracefully shutdown the websocket server.
let exit_signal = {
let (exit_signal, exit) = exit_future::signal();
let log_inner = log.clone();
let broadcaster_inner = server.broadcaster();
let exit_future = exit.and_then(move |_| {
if let Err(e) = broadcaster_inner.shutdown() {
warn!(
log_inner,
"Websocket server errored on shutdown";
"error" => format!("{:?}", e)
);
} else {
info!(log_inner, "Websocket server shutdown");
}
Ok(())
});
// Place a future on the executor that will shutdown the websocket server when the
// application exits.
executor.spawn(exit_future);
exit_signal
};
2019-09-14 14:34:03 +00:00
let log_inner = log.clone();
let _handle = thread::spawn(move || match server.listen(server_string) {
Ok(_) => {
debug!(
2019-09-14 14:34:03 +00:00
log_inner,
"Websocket server thread stopped";
2019-09-14 14:34:03 +00:00
);
}
Err(e) => {
error!(
log_inner,
"Websocket server failed to start";
"error" => format!("{:?}", e)
);
}
});
Ok((
WebSocketSender {
sender: Some(broadcaster),
_phantom: PhantomData,
},
exit_signal,
))
2019-09-14 14:34:03 +00:00
}