Gracefully shutdown the websocket server

This commit is contained in:
Paul Hauner 2019-09-15 09:32:27 -04:00
parent 33e62fb843
commit 1b497e2e24
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
2 changed files with 54 additions and 13 deletions

View File

@ -64,6 +64,8 @@ where
pub slot_timer_exit_signal: Option<Signal>,
/// Signal to terminate the API
pub api_exit_signal: Option<Signal>,
/// Signal to terminate the websocket server
pub websocket_exit_signal: Option<Signal>,
/// The clients logger.
log: slog::Logger,
/*
@ -182,11 +184,17 @@ where
InteropEth1ChainBackend::new(String::new()).map_err(|e| format!("{:?}", e))?;
// Start the websocket server.
let websocket_sender: WebSocketSender<E> = if client_config.websocket_server.enabled {
websocket_server::start_server(&client_config.websocket_server, &log)?
} else {
WebSocketSender::dummy()
};
let (websocket_sender, websocket_exit_signal): (WebSocketSender<E>, Option<_>) =
if client_config.websocket_server.enabled {
let (sender, exit) = websocket_server::start_server(
&client_config.websocket_server,
executor,
&log,
)?;
(sender, Some(exit))
} else {
(WebSocketSender::dummy(), None)
};
let beacon_chain: Arc<BeaconChain<RuntimeBeaconChainTypes<S, E>>> = Arc::new(
beacon_chain_builder
@ -278,6 +286,7 @@ where
rpc_exit_signal,
slot_timer_exit_signal: Some(slot_timer_exit_signal),
api_exit_signal,
websocket_exit_signal,
log,
network,
})

View File

@ -1,7 +1,9 @@
use beacon_chain::events::{EventHandler, EventKind};
use slog::{error, info, Logger};
use futures::Future;
use slog::{debug, error, info, warn, Logger};
use std::marker::PhantomData;
use std::thread;
use tokio::runtime::TaskExecutor;
use types::EthSpec;
use ws::{Sender, WebSocket};
@ -45,8 +47,9 @@ impl<T: EthSpec> EventHandler<T> for WebSocketSender<T> {
pub fn start_server<T: EthSpec>(
config: &Config,
executor: &TaskExecutor,
log: &Logger,
) -> Result<WebSocketSender<T>, String> {
) -> Result<(WebSocketSender<T>, exit_future::Signal), String> {
let server_string = format!("{}:{}", config.listen_address, config.port);
info!(
@ -61,12 +64,38 @@ pub fn start_server<T: EthSpec>(
let broadcaster = server.broadcaster();
// Produce a signal/channel that can gracefully shutdown the websocket server.
let exit_signal = {
let (exit_signal, exit) = exit_future::signal();
let log_inner = log.clone();
let broadcaster_inner = server.broadcaster();
let exit_future = exit.and_then(move |_| {
if let Err(e) = broadcaster_inner.shutdown() {
warn!(
log_inner,
"Websocket server errored on shutdown";
"error" => format!("{:?}", e)
);
} else {
info!(log_inner, "Websocket server shutdown");
}
Ok(())
});
// Place a future on the executor that will shutdown the websocket server when the
// application exits.
executor.spawn(exit_future);
exit_signal
};
let log_inner = log.clone();
let _handle = thread::spawn(move || match server.listen(server_string) {
Ok(_) => {
info!(
debug!(
log_inner,
"Websocket server stopped";
"Websocket server thread stopped";
);
}
Err(e) => {
@ -78,8 +107,11 @@ pub fn start_server<T: EthSpec>(
}
});
Ok(WebSocketSender {
sender: Some(broadcaster),
_phantom: PhantomData,
})
Ok((
WebSocketSender {
sender: Some(broadcaster),
_phantom: PhantomData,
},
exit_signal,
))
}