Add more unix signal handlers (#2486)

## Issue Addressed

Resolves #2114 

Swapped out the ctrlc crate for tokio signals to hook register handlers for SIGPIPE and SIGHUP along with SIGTERM and SIGINT.

## Proposed Changes

- Swap out the ctrlc crate for tokio signals for unix signal handing
- Register signals for SIGPIPE and SHIGUP that trigger the same shutdown procedure as SIGTERM and SIGINT

## Additional Info

I tested these changes against the examples in the original issue and noticed some interesting behavior on my machine. When running `lighthouse bn --network pyrmont |& tee -a pyrmont_bn.log` or `lighthouse bn --network pyrmont 2>&1 | tee -a pyrmont_bn.log` none of the above signals are sent to the lighthouse program in a way I was able to observe. 

The only time it seems that the signal gets sent to the lighthouse program is if there is no redirection of stderr to stdout. I'm not as familiar with the details of how unix signals work in linux with a redirect like that so I'm not sure if this is a bug in the program or expected behavior.

Signals are correctly received without the redirection and if the above signals are sent directly to the program with something like `kill`.
This commit is contained in:
Mason Stallmo 2021-08-30 05:19:34 +00:00
parent 99737c551a
commit bc14d1d73d
4 changed files with 111 additions and 12 deletions

1
Cargo.lock generated
View File

@ -562,7 +562,6 @@ dependencies = [
"clap", "clap",
"clap_utils", "clap_utils",
"client", "client",
"ctrlc",
"directory", "directory",
"dirs", "dirs",
"environment", "environment",

View File

@ -25,7 +25,6 @@ rand = "0.7.3"
slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_trace"] } slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_trace"] }
slog-term = "2.6.0" slog-term = "2.6.0"
slog-async = "2.5.0" slog-async = "2.5.0"
ctrlc = { version = "3.1.6", features = ["termination"] }
tokio = { version = "1.10.0", features = ["time"] } tokio = { version = "1.10.0", features = ["time"] }
exit-future = "0.2.0" exit-future = "0.2.0"
dirs = "3.0.1" dirs = "3.0.1"

View File

@ -5,18 +5,20 @@ authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
tokio = { version = "1.10.0", features = ["macros", "rt", "rt-multi-thread" ] } tokio = { version = "1.10.0", features = ["macros", "rt", "rt-multi-thread", "signal" ] }
slog = { version = "2.5.2", features = ["max_level_trace"] } slog = { version = "2.5.2", features = ["max_level_trace"] }
sloggers = "1.0.1" sloggers = "1.0.1"
types = { "path" = "../../consensus/types" } types = { path = "../../consensus/types" }
eth2_config = { "path" = "../../common/eth2_config" } eth2_config = { path = "../../common/eth2_config" }
task_executor = { "path" = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
eth2_network_config = { path = "../../common/eth2_network_config" } eth2_network_config = { path = "../../common/eth2_network_config" }
logging = { path = "../../common/logging" } logging = { path = "../../common/logging" }
slog-term = "2.6.0" slog-term = "2.6.0"
slog-async = "2.5.0" slog-async = "2.5.0"
ctrlc = { version = "3.1.6", features = ["termination"] }
futures = "0.3.7" futures = "0.3.7"
parking_lot = "0.11.0" parking_lot = "0.11.0"
slog-json = "2.3.0" slog-json = "2.3.0"
exit-future = "0.2.0" exit-future = "0.2.0"
[target.'cfg(not(target_family = "unix"))'.dependencies]
ctrlc = { version = "3.1.6", features = ["termination"] }

View File

@ -9,15 +9,11 @@
use eth2_config::Eth2Config; use eth2_config::Eth2Config;
use eth2_network_config::Eth2NetworkConfig; use eth2_network_config::Eth2NetworkConfig;
use futures::channel::{ use futures::channel::mpsc::{channel, Receiver, Sender};
mpsc::{channel, Receiver, Sender},
oneshot,
};
use futures::{future, StreamExt}; use futures::{future, StreamExt};
use slog::{error, info, o, warn, Drain, Level, Logger}; use slog::{error, info, o, warn, Drain, Level, Logger};
use sloggers::{null::NullLoggerBuilder, Build}; use sloggers::{null::NullLoggerBuilder, Build};
use std::cell::RefCell;
use std::ffi::OsStr; use std::ffi::OsStr;
use std::fs::{rename as FsRename, OpenOptions}; use std::fs::{rename as FsRename, OpenOptions};
use std::path::PathBuf; use std::path::PathBuf;
@ -27,6 +23,16 @@ use task_executor::{ShutdownReason, TaskExecutor};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
use types::{EthSpec, MainnetEthSpec, MinimalEthSpec}; use types::{EthSpec, MainnetEthSpec, MinimalEthSpec};
#[cfg(target_family = "unix")]
use {
futures::Future,
std::{pin::Pin, task::Context, task::Poll},
tokio::signal::unix::{signal, Signal, SignalKind},
};
#[cfg(not(target_family = "unix"))]
use {futures::channel::oneshot, std::cell::RefCell};
const LOG_CHANNEL_SIZE: usize = 2048; const LOG_CHANNEL_SIZE: usize = 2048;
/// The maximum time in seconds the client will wait for all internal tasks to shutdown. /// The maximum time in seconds the client will wait for all internal tasks to shutdown.
const MAXIMUM_SHUTDOWN_TIME: u64 = 15; const MAXIMUM_SHUTDOWN_TIME: u64 = 15;
@ -340,6 +346,73 @@ impl<E: EthSpec> Environment<E> {
/// Block the current thread until a shutdown signal is received. /// Block the current thread until a shutdown signal is received.
/// ///
/// This can be either the user Ctrl-C'ing or a task requesting to shutdown. /// This can be either the user Ctrl-C'ing or a task requesting to shutdown.
#[cfg(target_family = "unix")]
pub fn block_until_shutdown_requested(&mut self) -> Result<ShutdownReason, String> {
// future of a task requesting to shutdown
let mut rx = self
.signal_rx
.take()
.ok_or("Inner shutdown already received")?;
let inner_shutdown =
async move { rx.next().await.ok_or("Internal shutdown channel exhausted") };
futures::pin_mut!(inner_shutdown);
match self.runtime().block_on(async {
let mut handles = vec![];
// setup for handling SIGTERM
match signal(SignalKind::terminate()) {
Ok(terminate_stream) => {
let terminate = SignalFuture::new(terminate_stream, "Received SIGTERM");
handles.push(terminate);
}
Err(e) => error!(self.log, "Could not register SIGTERM handler"; "error" => e),
};
// setup for handling SIGINT
match signal(SignalKind::interrupt()) {
Ok(interrupt_stream) => {
let interrupt = SignalFuture::new(interrupt_stream, "Received SIGINT");
handles.push(interrupt);
}
Err(e) => error!(self.log, "Could not register SIGINT handler"; "error" => e),
}
// setup for handling a SIGHUP
match signal(SignalKind::hangup()) {
Ok(hup_stream) => {
let hup = SignalFuture::new(hup_stream, "Received SIGHUP");
handles.push(hup);
}
Err(e) => error!(self.log, "Could not register SIGHUP handler"; "error" => e),
}
// setup for handling a SIGPIPE
match signal(SignalKind::pipe()) {
Ok(pipe_stream) => {
let pipe = SignalFuture::new(pipe_stream, "Received SIGPIPE");
handles.push(pipe);
}
Err(e) => error!(self.log, "Could not register SIGPIPE handler"; "error" => e),
}
future::select(inner_shutdown, future::select_all(handles.into_iter())).await
}) {
future::Either::Left((Ok(reason), _)) => {
info!(self.log, "Internal shutdown received"; "reason" => reason.message());
Ok(reason)
}
future::Either::Left((Err(e), _)) => Err(e.into()),
future::Either::Right(((res, _, _), _)) => {
res.ok_or_else(|| "Handler channel closed".to_string())
}
}
}
/// Block the current thread until a shutdown signal is received.
///
/// This can be either the user Ctrl-C'ing or a task requesting to shutdown.
#[cfg(not(target_family = "unix"))]
pub fn block_until_shutdown_requested(&mut self) -> Result<ShutdownReason, String> { pub fn block_until_shutdown_requested(&mut self) -> Result<ShutdownReason, String> {
// future of a task requesting to shutdown // future of a task requesting to shutdown
let mut rx = self let mut rx = self
@ -419,3 +492,29 @@ pub fn null_logger() -> Result<Logger, String> {
.build() .build()
.map_err(|e| format!("Failed to start null logger: {:?}", e)) .map_err(|e| format!("Failed to start null logger: {:?}", e))
} }
#[cfg(target_family = "unix")]
struct SignalFuture {
signal: Signal,
message: &'static str,
}
#[cfg(target_family = "unix")]
impl SignalFuture {
pub fn new(signal: Signal, message: &'static str) -> SignalFuture {
SignalFuture { signal, message }
}
}
#[cfg(target_family = "unix")]
impl Future for SignalFuture {
type Output = Option<ShutdownReason>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.signal.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(_)) => Poll::Ready(Some(ShutdownReason::Success(self.message))),
Poll::Ready(None) => Poll::Ready(None),
}
}
}