diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 519fa3596..34ae88a9e 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -36,6 +36,8 @@ pub enum Libp2pEvent { Behaviour(BehaviourEvent), /// A new listening address has been established. NewListenAddr(Multiaddr), + /// We reached zero listening addresses. + ZeroListeners, } /// The configuration and state of the libp2p components for the beacon node. @@ -283,10 +285,17 @@ impl Service { debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string()) } SwarmEvent::ListenerClosed { addresses, reason } => { - crit!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason)) + crit!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason)); + if Swarm::listeners(&self.swarm).count() == 0 { + return Libp2pEvent::ZeroListeners; + } } SwarmEvent::ListenerError { error } => { - warn!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())) + // this is non fatal, but we still check + warn!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())); + if Swarm::listeners(&self.swarm).count() == 0 { + return Libp2pEvent::ZeroListeners; + } } SwarmEvent::Dialing(peer_id) => { debug!(self.log, "Dialing peer"; "peer_id" => peer_id.to_string()); diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index d76c0aa92..db5d13ded 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -94,8 +94,13 @@ pub async fn build_libp2p_instance(boot_nodes: Vec, log: slog::Logger) -> L // launch libp2p service let (signal, exit) = exit_future::signal(); - let executor = - environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = environment::TaskExecutor::new( + tokio::runtime::Handle::current(), + exit, + log.clone(), + shutdown_tx, + ); Libp2pInstance( LibP2PService::new(executor, &config, EnrForkId::default(), &log) .await diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index f85953e1b..873354bbc 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -169,6 +169,7 @@ fn spawn_service( mut service: NetworkService, ) -> error::Result<()> { let mut exit_rx = executor.exit(); + let mut shutdown_sender = executor.shutdown_sender(); // spawn on the current executor executor.spawn_without_exit(async move { @@ -376,6 +377,12 @@ fn spawn_service( Libp2pEvent::NewListenAddr(multiaddr) => { service.network_globals.listen_multiaddrs.write().push(multiaddr); } + Libp2pEvent::ZeroListeners => { + let _ = shutdown_sender.send("All listeners are closed. Unable to listen").await.map_err(|e| { + warn!(service.log, "failed to send a shutdown signal"; "error" => e.to_string() + ) + }); + } } } } diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index af8e1ddde..8658a6324 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -40,7 +40,13 @@ mod tests { let runtime = Runtime::new().unwrap(); let (signal, exit) = exit_future::signal(); - let executor = environment::TaskExecutor::new(runtime.handle().clone(), exit, log.clone()); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = environment::TaskExecutor::new( + runtime.handle().clone(), + exit, + log.clone(), + shutdown_tx, + ); let mut config = NetworkConfig::default(); config.libp2p_port = 21212; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d487971d1..caccce4e6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -110,7 +110,7 @@ impl SyncNetworkContext { } pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) { - debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action"=> action.to_string()); + debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action" => action.to_string()); self.network_send .send(NetworkMessage::ReportPeer { peer_id, action }) .unwrap_or_else(|_| { diff --git a/lighthouse/environment/src/executor.rs b/lighthouse/environment/src/executor.rs index f7d06cc51..00b1d4b15 100644 --- a/lighthouse/environment/src/executor.rs +++ b/lighthouse/environment/src/executor.rs @@ -1,4 +1,5 @@ use crate::metrics; +use futures::channel::mpsc::Sender; use futures::prelude::*; use slog::{debug, trace}; use tokio::runtime::Handle; @@ -10,6 +11,12 @@ pub struct TaskExecutor { pub(crate) handle: Handle, /// The receiver exit future which on receiving shuts down the task pub(crate) exit: exit_future::Exit, + /// Sender given to tasks, so that if they encounter a state in which execution cannot + /// continue they can request that everything shuts down. + /// + /// The task must provide a reason for shutting down. + pub(crate) signal_tx: Sender<&'static str>, + pub(crate) log: slog::Logger, } @@ -18,8 +25,18 @@ impl TaskExecutor { /// /// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from /// a [`RuntimeContext`](struct.RuntimeContext.html) - pub fn new(handle: Handle, exit: exit_future::Exit, log: slog::Logger) -> Self { - Self { handle, exit, log } + pub fn new( + handle: Handle, + exit: exit_future::Exit, + log: slog::Logger, + signal_tx: Sender<&'static str>, + ) -> Self { + Self { + handle, + exit, + signal_tx, + log, + } } /// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled @@ -51,7 +68,7 @@ impl TaskExecutor { /// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit` /// like [spawn](#method.spawn). - /// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to + /// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to /// ensure that the task gets canceled appropriately. /// This function generates prometheus metrics on number of tasks and task duration. /// @@ -121,6 +138,11 @@ impl TaskExecutor { self.exit.clone() } + /// Get a channel to request shutting down. + pub fn shutdown_sender(&self) -> Sender<&'static str> { + self.signal_tx.clone() + } + /// Returns a reference to the logger. pub fn log(&self) -> &slog::Logger { &self.log diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index affd022da..549eeb089 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -9,7 +9,11 @@ use eth2_config::Eth2Config; use eth2_testnet_config::Eth2TestnetConfig; -use futures::channel::oneshot; +use futures::channel::{ + mpsc::{channel, Receiver, Sender}, + oneshot, +}; +use futures::{future, StreamExt}; pub use executor::TaskExecutor; use slog::{info, o, Drain, Level, Logger}; @@ -260,10 +264,13 @@ impl EnvironmentBuilder { /// Consumes the builder, returning an `Environment`. pub fn build(self) -> Result, String> { let (signal, exit) = exit_future::signal(); + let (signal_tx, signal_rx) = channel(1); Ok(Environment { runtime: self .runtime .ok_or_else(|| "Cannot build environment without runtime".to_string())?, + signal_tx, + signal_rx: Some(signal_rx), signal: Some(signal), exit, log: self @@ -295,6 +302,7 @@ impl RuntimeContext { Self { executor: TaskExecutor { handle: self.executor.handle.clone(), + signal_tx: self.executor.signal_tx.clone(), exit: self.executor.exit.clone(), log: self.executor.log.new(o!("service" => service_name)), }, @@ -318,6 +326,10 @@ impl RuntimeContext { /// validator client, or to run tests that involve logging and async task execution. pub struct Environment { runtime: Runtime, + /// Receiver side of an internal shutdown signal. + signal_rx: Option>, + /// Sender to request shutting down. + signal_tx: Sender<&'static str>, signal: Option, exit: exit_future::Exit, log: Logger, @@ -340,6 +352,7 @@ impl Environment { RuntimeContext { executor: TaskExecutor { exit: self.exit.clone(), + signal_tx: self.signal_tx.clone(), handle: self.runtime().handle().clone(), log: self.log.clone(), }, @@ -353,6 +366,7 @@ impl Environment { RuntimeContext { executor: TaskExecutor { exit: self.exit.clone(), + signal_tx: self.signal_tx.clone(), handle: self.runtime().handle().clone(), log: self.log.new(o!("service" => service_name)), }, @@ -361,8 +375,20 @@ impl Environment { } } - /// Block the current thread until Ctrl+C is received. - pub fn block_until_ctrl_c(&mut self) -> Result<(), String> { + /// Block the current thread until a shutdown signal is received. + /// + /// This can be either the user Ctrl-C'ing or a task requesting to shutdown. + pub fn block_until_shutdown_requested(&mut self) -> Result<(), String> { + // future of a task requesting to shutdown + let mut rx = self + .signal_rx + .take() + .ok_or("Inner shutdown already received")?; + let inner_shutdown = + async move { rx.next().await.ok_or("Internal shutdown channel exhausted") }; + futures::pin_mut!(inner_shutdown); + + // setup for handling a Ctrl-C let (ctrlc_send, ctrlc_oneshot) = oneshot::channel(); let ctrlc_send_c = RefCell::new(Some(ctrlc_send)); ctrlc::set_handler(move || { @@ -372,10 +398,18 @@ impl Environment { }) .map_err(|e| format!("Could not set ctrlc handler: {:?}", e))?; - // Block this thread until Crtl+C is pressed. - self.runtime() - .block_on(ctrlc_oneshot) - .map_err(|e| format!("Ctrlc oneshot failed: {:?}", e)) + // Block this thread until a shutdown signal is received. + match self + .runtime() + .block_on(future::select(inner_shutdown, ctrlc_oneshot)) + { + future::Either::Left((Ok(reason), _)) => { + info!(self.log, "Internal shutdown received"; "reason" => reason); + Ok(()) + } + future::Either::Left((Err(e), _)) => Err(e.into()), + future::Either::Right((x, _)) => x.map_err(|e| format!("Ctrlc oneshot failed: {}", e)), + } } /// Shutdown the `tokio` runtime when all tasks are idle. diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 2742f6c57..7ad191ea9 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -300,8 +300,8 @@ fn run( return Err("No subcommand supplied.".into()); } - // Block this thread until Crtl+C is pressed. - environment.block_until_ctrl_c()?; + // Block this thread until we get a ctrl-c or a task sends a shutdown signal. + environment.block_until_shutdown_requested()?; info!(log, "Shutting down.."); environment.fire_signal(); diff --git a/scripts/local_testnet/second_beacon_node.sh b/scripts/local_testnet/second_beacon_node.sh index ddba99b46..c7b7aadf3 100755 --- a/scripts/local_testnet/second_beacon_node.sh +++ b/scripts/local_testnet/second_beacon_node.sh @@ -16,5 +16,6 @@ exec lighthouse \ --testnet-dir $TESTNET_DIR \ --dummy-eth1 \ --http \ + --port 9902 \ --http-port 6052 \ --boot-nodes $(cat $BEACON_DIR/beacon/network/enr.dat)