diff --git a/Cargo.lock b/Cargo.lock index 0eb9e0396..13b02ce2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -562,7 +562,6 @@ dependencies = [ "clap", "clap_utils", "client", - "ctrlc", "directory", "dirs", "environment", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 8678774c2..c4d1a05ed 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -25,7 +25,6 @@ rand = "0.7.3" slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_trace"] } slog-term = "2.6.0" slog-async = "2.5.0" -ctrlc = { version = "3.1.6", features = ["termination"] } tokio = { version = "1.10.0", features = ["time"] } exit-future = "0.2.0" dirs = "3.0.1" diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index f2b020dc5..56a78ada1 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -5,18 +5,20 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] -tokio = { version = "1.10.0", features = ["macros", "rt", "rt-multi-thread" ] } +tokio = { version = "1.10.0", features = ["macros", "rt", "rt-multi-thread", "signal" ] } slog = { version = "2.5.2", features = ["max_level_trace"] } sloggers = "1.0.1" -types = { "path" = "../../consensus/types" } -eth2_config = { "path" = "../../common/eth2_config" } -task_executor = { "path" = "../../common/task_executor" } +types = { path = "../../consensus/types" } +eth2_config = { path = "../../common/eth2_config" } +task_executor = { path = "../../common/task_executor" } eth2_network_config = { path = "../../common/eth2_network_config" } logging = { path = "../../common/logging" } slog-term = "2.6.0" slog-async = "2.5.0" -ctrlc = { version = "3.1.6", features = ["termination"] } futures = "0.3.7" parking_lot = "0.11.0" slog-json = "2.3.0" exit-future = "0.2.0" + +[target.'cfg(not(target_family = "unix"))'.dependencies] +ctrlc = { version = "3.1.6", features = ["termination"] } diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 4e0078554..46badb73c 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -9,15 +9,11 @@ use eth2_config::Eth2Config; use eth2_network_config::Eth2NetworkConfig; -use futures::channel::{ - mpsc::{channel, Receiver, Sender}, - oneshot, -}; +use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{future, StreamExt}; use slog::{error, info, o, warn, Drain, Level, Logger}; use sloggers::{null::NullLoggerBuilder, Build}; -use std::cell::RefCell; use std::ffi::OsStr; use std::fs::{rename as FsRename, OpenOptions}; use std::path::PathBuf; @@ -27,6 +23,16 @@ use task_executor::{ShutdownReason, TaskExecutor}; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use types::{EthSpec, MainnetEthSpec, MinimalEthSpec}; +#[cfg(target_family = "unix")] +use { + futures::Future, + std::{pin::Pin, task::Context, task::Poll}, + tokio::signal::unix::{signal, Signal, SignalKind}, +}; + +#[cfg(not(target_family = "unix"))] +use {futures::channel::oneshot, std::cell::RefCell}; + const LOG_CHANNEL_SIZE: usize = 2048; /// The maximum time in seconds the client will wait for all internal tasks to shutdown. const MAXIMUM_SHUTDOWN_TIME: u64 = 15; @@ -340,6 +346,73 @@ impl Environment { /// Block the current thread until a shutdown signal is received. /// /// This can be either the user Ctrl-C'ing or a task requesting to shutdown. + #[cfg(target_family = "unix")] + pub fn block_until_shutdown_requested(&mut self) -> Result { + // future of a task requesting to shutdown + let mut rx = self + .signal_rx + .take() + .ok_or("Inner shutdown already received")?; + let inner_shutdown = + async move { rx.next().await.ok_or("Internal shutdown channel exhausted") }; + futures::pin_mut!(inner_shutdown); + + match self.runtime().block_on(async { + let mut handles = vec![]; + + // setup for handling SIGTERM + match signal(SignalKind::terminate()) { + Ok(terminate_stream) => { + let terminate = SignalFuture::new(terminate_stream, "Received SIGTERM"); + handles.push(terminate); + } + Err(e) => error!(self.log, "Could not register SIGTERM handler"; "error" => e), + }; + + // setup for handling SIGINT + match signal(SignalKind::interrupt()) { + Ok(interrupt_stream) => { + let interrupt = SignalFuture::new(interrupt_stream, "Received SIGINT"); + handles.push(interrupt); + } + Err(e) => error!(self.log, "Could not register SIGINT handler"; "error" => e), + } + + // setup for handling a SIGHUP + match signal(SignalKind::hangup()) { + Ok(hup_stream) => { + let hup = SignalFuture::new(hup_stream, "Received SIGHUP"); + handles.push(hup); + } + Err(e) => error!(self.log, "Could not register SIGHUP handler"; "error" => e), + } + + // setup for handling a SIGPIPE + match signal(SignalKind::pipe()) { + Ok(pipe_stream) => { + let pipe = SignalFuture::new(pipe_stream, "Received SIGPIPE"); + handles.push(pipe); + } + Err(e) => error!(self.log, "Could not register SIGPIPE handler"; "error" => e), + } + + future::select(inner_shutdown, future::select_all(handles.into_iter())).await + }) { + future::Either::Left((Ok(reason), _)) => { + info!(self.log, "Internal shutdown received"; "reason" => reason.message()); + Ok(reason) + } + future::Either::Left((Err(e), _)) => Err(e.into()), + future::Either::Right(((res, _, _), _)) => { + res.ok_or_else(|| "Handler channel closed".to_string()) + } + } + } + + /// Block the current thread until a shutdown signal is received. + /// + /// This can be either the user Ctrl-C'ing or a task requesting to shutdown. + #[cfg(not(target_family = "unix"))] pub fn block_until_shutdown_requested(&mut self) -> Result { // future of a task requesting to shutdown let mut rx = self @@ -419,3 +492,29 @@ pub fn null_logger() -> Result { .build() .map_err(|e| format!("Failed to start null logger: {:?}", e)) } + +#[cfg(target_family = "unix")] +struct SignalFuture { + signal: Signal, + message: &'static str, +} + +#[cfg(target_family = "unix")] +impl SignalFuture { + pub fn new(signal: Signal, message: &'static str) -> SignalFuture { + SignalFuture { signal, message } + } +} + +#[cfg(target_family = "unix")] +impl Future for SignalFuture { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.signal.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(_)) => Poll::Ready(Some(ShutdownReason::Success(self.message))), + Poll::Ready(None) => Poll::Ready(None), + } + } +}