Allow TaskExecutor to be used in async tests (#3178)

# Description

Since the `TaskExecutor` currently requires a `Weak<Runtime>`, it's impossible to use it in an async test where the `Runtime` is created outside our scope. Whilst we *could* create a new `Runtime` instance inside the async test, dropping that `Runtime` would cause a panic (you can't drop a `Runtime` in an async context).

To address this issue, this PR creates the `enum Handle`, which supports either:

- A `Weak<Runtime>` (for use in our production code)
- A `Handle` to a runtime (for use in testing)

In theory, there should be no change to the behaviour of our production code (beyond some slightly different descriptions in HTTP 500 errors), or even our tests. If there is no change, you might ask *"why bother?"*. There are two PRs (#3070 and #3175) that are waiting on these fixes to introduce some new tests. Since we've added the EL to the `BeaconChain` (for the merge), we are now doing more async stuff in tests.

I've also added a `RuntimeExecutor` to the `BeaconChainTestHarness`. Whilst that's not immediately useful, it will become useful in the near future with all the new async testing.
This commit is contained in:
Paul Hauner 2022-05-16 08:35:59 +00:00
parent 3f9e83e840
commit 38050fa460
20 changed files with 284 additions and 203 deletions

3
Cargo.lock generated
View File

@ -296,6 +296,7 @@ dependencies = [
"eth2_ssz_derive",
"eth2_ssz_types",
"execution_layer",
"exit-future",
"fork_choice",
"futures",
"genesis",
@ -2531,6 +2532,7 @@ dependencies = [
"slot_clock",
"state_processing",
"store",
"task_executor",
"tokio",
"tokio-stream",
"tree_hash",
@ -6028,6 +6030,7 @@ dependencies = [
"lazy_static",
"lighthouse_metrics",
"slog",
"sloggers",
"tokio",
]

View File

@ -61,6 +61,7 @@ execution_layer = { path = "../execution_layer" }
sensitive_url = { path = "../../common/sensitive_url" }
superstruct = "0.5.0"
hex = "0.4.2"
exit-future = "0.2.0"
[[test]]
name = "beacon_chain_tests"

View File

@ -27,7 +27,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::ShutdownReason;
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
@ -91,6 +91,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
// Pending I/O batch that is constructed during building and should be executed atomically
// alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called.
pending_io_batch: Vec<KeyValueStoreOp>,
task_executor: Option<TaskExecutor>,
}
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
@ -129,6 +130,7 @@ where
slasher: None,
validator_monitor: None,
pending_io_batch: vec![],
task_executor: None,
}
}
@ -182,6 +184,13 @@ where
self.log = Some(log);
self
}
/// Sets the task executor.
pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self {
self.task_executor = Some(task_executor);
self
}
/// Attempt to load an existing eth1 cache from the builder's `Store`.
pub fn get_persisted_eth1_backend(&self) -> Result<Option<SszEth1>, String> {
let store = self
@ -919,6 +928,7 @@ mod test {
use std::time::Duration;
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use task_executor::test_utils::TestRuntime;
use types::{EthSpec, MinimalEthSpec, Slot};
type TestEthSpec = MinimalEthSpec;
@ -952,10 +962,12 @@ mod test {
.expect("should create interop genesis state");
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let runtime = TestRuntime::default();
let chain = BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.store(Arc::new(store))
.task_executor(runtime.task_executor.clone())
.genesis_state(genesis_state)
.expect("should build state using recent genesis")
.dummy_eth1_backend()

View File

@ -12,15 +12,12 @@ use crate::{
};
use bls::get_withdrawal_credentials;
use execution_layer::{
test_utils::{
ExecutionBlockGenerator, ExecutionLayerRuntime, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK,
},
test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK},
ExecutionLayer,
};
use futures::channel::mpsc::Receiver;
pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
use logging::test_logger;
use merkle_proof::MerkleTree;
use parking_lot::Mutex;
use parking_lot::RwLockWriteGuard;
@ -41,7 +38,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore};
use task_executor::ShutdownReason;
use task_executor::{test_utils::TestRuntime, ShutdownReason};
use tree_hash::TreeHash;
use types::sync_selection_proof::SyncSelectionProof;
pub use types::test_utils::generate_deterministic_keypairs;
@ -151,8 +148,8 @@ pub struct Builder<T: BeaconChainTypes> {
initial_mutator: Option<BoxedMutator<T::EthSpec, T::HotStore, T::ColdStore>>,
store_mutator: Option<BoxedMutator<T::EthSpec, T::HotStore, T::ColdStore>>,
execution_layer: Option<ExecutionLayer>,
execution_layer_runtime: Option<ExecutionLayerRuntime>,
mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
runtime: TestRuntime,
log: Logger,
}
@ -255,6 +252,9 @@ where
Cold: ItemStore<E>,
{
pub fn new(eth_spec_instance: E) -> Self {
let runtime = TestRuntime::default();
let log = runtime.log.clone();
Self {
eth_spec_instance,
spec: None,
@ -266,8 +266,8 @@ where
store_mutator: None,
execution_layer: None,
mock_execution_layer: None,
execution_layer_runtime: None,
log: test_logger(),
runtime,
log,
}
}
@ -330,8 +330,6 @@ where
"execution layer already defined"
);
let el_runtime = ExecutionLayerRuntime::default();
let urls: Vec<SensitiveUrl> = urls
.iter()
.map(|s| SensitiveUrl::parse(*s))
@ -346,19 +344,19 @@ where
};
let execution_layer = ExecutionLayer::from_config(
config,
el_runtime.task_executor.clone(),
el_runtime.log.clone(),
self.runtime.task_executor.clone(),
self.log.clone(),
)
.unwrap();
self.execution_layer = Some(execution_layer);
self.execution_layer_runtime = Some(el_runtime);
self
}
pub fn mock_execution_layer(mut self) -> Self {
let spec = self.spec.clone().expect("cannot build without spec");
let mock = MockExecutionLayer::new(
self.runtime.task_executor.clone(),
spec.terminal_total_difficulty,
DEFAULT_TERMINAL_BLOCK,
spec.terminal_block_hash,
@ -383,7 +381,7 @@ where
pub fn build(self) -> BeaconChainHarness<BaseHarnessType<E, Hot, Cold>> {
let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1);
let log = test_logger();
let log = self.log;
let spec = self.spec.expect("cannot build without spec");
let seconds_per_slot = spec.seconds_per_slot;
let validator_keypairs = self
@ -395,6 +393,7 @@ where
.custom_spec(spec)
.store(self.store.expect("cannot build without store"))
.store_migrator_config(MigratorConfig::default().blocking())
.task_executor(self.runtime.task_executor.clone())
.execution_layer(self.execution_layer)
.dummy_eth1_backend()
.expect("should build dummy backend")
@ -434,8 +433,8 @@ where
chain: Arc::new(chain),
validator_keypairs,
shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)),
runtime: self.runtime,
mock_execution_layer: self.mock_execution_layer,
execution_layer_runtime: self.execution_layer_runtime,
rng: make_rng(),
}
}
@ -451,9 +450,9 @@ pub struct BeaconChainHarness<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
pub spec: ChainSpec,
pub shutdown_receiver: Arc<Mutex<Receiver<ShutdownReason>>>,
pub runtime: TestRuntime,
pub mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
pub execution_layer_runtime: Option<ExecutionLayerRuntime>,
pub rng: Mutex<StdRng>,
}

View File

@ -166,6 +166,7 @@ where
let builder = BeaconChainBuilder::new(eth_spec_instance)
.logger(context.log().clone())
.store(store)
.task_executor(context.executor.clone())
.custom_spec(spec.clone())
.chain_config(chain_config)
.graffiti(graffiti)

View File

@ -304,11 +304,7 @@ impl ExecutionLayer {
T: Fn(&'a Self) -> U,
U: Future<Output = Result<V, Error>>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
// TODO(merge): respect the shutdown signal.
runtime.block_on(generate_future(self))
}
@ -322,11 +318,7 @@ impl ExecutionLayer {
T: Fn(&'a Self) -> U,
U: Future<Output = V>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
// TODO(merge): respect the shutdown signal.
Ok(runtime.block_on(generate_future(self)))
}
@ -1263,13 +1255,15 @@ impl ExecutionLayer {
mod test {
use super::*;
use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer;
use task_executor::test_utils::TestRuntime;
use types::MainnetEthSpec;
type MockExecutionLayer = GenericMockExecutionLayer<MainnetEthSpec>;
#[tokio::test]
async fn produce_three_valid_pos_execution_blocks() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.produce_valid_execution_payload_on_head()
.await
@ -1281,7 +1275,8 @@ mod test {
#[tokio::test]
async fn finds_valid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_block_prior_to_terminal_block()
.with_terminal_block(|spec, el, _| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
@ -1300,7 +1295,8 @@ mod test {
#[tokio::test]
async fn verifies_valid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, terminal_block| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
@ -1316,7 +1312,8 @@ mod test {
#[tokio::test]
async fn rejects_invalid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, terminal_block| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
@ -1334,7 +1331,8 @@ mod test {
#[tokio::test]
async fn rejects_unknown_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, _| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;

View File

@ -2,61 +2,22 @@ use crate::{
test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY, JWT_SECRET},
Config, *,
};
use environment::null_logger;
use sensitive_url::SensitiveUrl;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tempfile::NamedTempFile;
use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256};
pub struct ExecutionLayerRuntime {
pub runtime: Option<Arc<tokio::runtime::Runtime>>,
pub _runtime_shutdown: exit_future::Signal,
pub task_executor: TaskExecutor,
pub log: Logger,
}
impl Default for ExecutionLayerRuntime {
fn default() -> Self {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let log = null_logger().unwrap();
let task_executor =
TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
Self {
runtime: Some(runtime),
_runtime_shutdown: runtime_shutdown,
task_executor,
log,
}
}
}
impl Drop for ExecutionLayerRuntime {
fn drop(&mut self) {
if let Some(runtime) = self.runtime.take() {
Arc::try_unwrap(runtime).unwrap().shutdown_background()
}
}
}
pub struct MockExecutionLayer<T: EthSpec> {
pub server: MockServer<T>,
pub el: ExecutionLayer,
pub el_runtime: ExecutionLayerRuntime,
pub executor: TaskExecutor,
pub spec: ChainSpec,
}
impl<T: EthSpec> MockExecutionLayer<T> {
pub fn default_params() -> Self {
pub fn default_params(executor: TaskExecutor) -> Self {
Self::new(
executor,
DEFAULT_TERMINAL_DIFFICULTY.into(),
DEFAULT_TERMINAL_BLOCK,
ExecutionBlockHash::zero(),
@ -65,13 +26,13 @@ impl<T: EthSpec> MockExecutionLayer<T> {
}
pub fn new(
executor: TaskExecutor,
terminal_total_difficulty: Uint256,
terminal_block: u64,
terminal_block_hash: ExecutionBlockHash,
terminal_block_hash_activation_epoch: Epoch,
) -> Self {
let el_runtime = ExecutionLayerRuntime::default();
let handle = el_runtime.runtime.as_ref().unwrap().handle();
let handle = executor.handle().unwrap();
let mut spec = T::default_spec();
spec.terminal_total_difficulty = terminal_total_difficulty;
@ -79,7 +40,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
spec.terminal_block_hash_activation_epoch = terminal_block_hash_activation_epoch;
let server = MockServer::new(
handle,
&handle,
terminal_total_difficulty,
terminal_block,
terminal_block_hash,
@ -97,17 +58,13 @@ impl<T: EthSpec> MockExecutionLayer<T> {
suggested_fee_recipient: Some(Address::repeat_byte(42)),
..Default::default()
};
let el = ExecutionLayer::from_config(
config,
el_runtime.task_executor.clone(),
el_runtime.log.clone(),
)
.unwrap();
let el =
ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap();
Self {
server,
el,
el_runtime,
executor,
spec,
}
}

View File

@ -22,7 +22,7 @@ use types::{EthSpec, ExecutionBlockHash, Uint256};
use warp::{http::StatusCode, Filter, Rejection};
pub use execution_block_generator::{generate_pow_block, ExecutionBlockGenerator};
pub use mock_execution_layer::{ExecutionLayerRuntime, MockExecutionLayer};
pub use mock_execution_layer::MockExecutionLayer;
pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400;
pub const DEFAULT_TERMINAL_BLOCK: u64 = 64;

View File

@ -30,6 +30,7 @@ futures = "0.3.8"
execution_layer = {path = "../execution_layer"}
parking_lot = "0.12.0"
safe_arith = {path = "../../consensus/safe_arith"}
task_executor = { path = "../../common/task_executor" }
[dev-dependencies]

View File

@ -20,6 +20,7 @@ use slot_clock::SlotClock;
use state_processing::per_slot_processing;
use std::convert::TryInto;
use std::sync::Arc;
use task_executor::test_utils::TestRuntime;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use tree_hash::TreeHash;
@ -63,6 +64,7 @@ struct ApiTester {
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
local_enr: Enr,
external_peer_id: PeerId,
_runtime: TestRuntime,
}
impl ApiTester {
@ -185,7 +187,7 @@ impl ApiTester {
external_peer_id,
} = create_api_server(chain.clone(), log).await;
tokio::spawn(server);
harness.runtime.task_executor.spawn(server, "api_server");
let client = BeaconNodeHttpClient::new(
SensitiveUrl::parse(&format!(
@ -212,6 +214,7 @@ impl ApiTester {
network_rx,
local_enr,
external_peer_id,
_runtime: harness.runtime,
}
}
@ -263,7 +266,7 @@ impl ApiTester {
external_peer_id,
} = create_api_server(chain.clone(), log).await;
tokio::spawn(server);
harness.runtime.task_executor.spawn(server, "api_server");
let client = BeaconNodeHttpClient::new(
SensitiveUrl::parse(&format!(
@ -290,6 +293,7 @@ impl ApiTester {
network_rx,
local_enr,
external_peer_id,
_runtime: harness.runtime,
}
}

View File

@ -20,7 +20,7 @@ use std::cmp;
use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock,
@ -324,20 +324,19 @@ impl TestRig {
.unwrap();
}
fn runtime(&mut self) -> Arc<Runtime> {
fn handle(&mut self) -> Handle {
self.environment
.as_mut()
.unwrap()
.core_context()
.executor
.runtime()
.upgrade()
.handle()
.unwrap()
}
/// Assert that the `BeaconProcessor` doesn't produce any events in the given `duration`.
pub fn assert_no_events_for(&mut self, duration: Duration) {
self.runtime().block_on(async {
self.handle().block_on(async {
tokio::select! {
_ = tokio::time::sleep(duration) => (),
event = self.work_journal_rx.recv() => panic!(
@ -360,7 +359,7 @@ impl TestRig {
.iter()
.all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO));
let (events, worker_freed_remaining) = self.runtime().block_on(async {
let (events, worker_freed_remaining) = self.handle().block_on(async {
let mut events = Vec::with_capacity(expected.len());
let mut worker_freed_remaining = expected.len();
@ -415,7 +414,7 @@ impl TestRig {
/// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense
/// to use the `NOTHING_TO_DO` event to ensure that execution has completed.
pub fn assert_event_journal_with_timeout(&mut self, expected: &[&str], timeout: Duration) {
let events = self.runtime().block_on(async {
let events = self.handle().block_on(async {
let mut events = Vec::with_capacity(expected.len());
let drain_future = async {

View File

@ -5,9 +5,10 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2021"
[dependencies]
tokio = { version = "1.14.0", features = ["rt"] }
tokio = { version = "1.14.0", features = ["rt-multi-thread"] }
slog = "2.5.2"
futures = "0.3.7"
exit-future = "0.2.0"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../lighthouse_metrics" }
sloggers = { version = "2.1.1", features = ["json"] }

View File

@ -1,10 +1,11 @@
mod metrics;
pub mod test_utils;
use futures::channel::mpsc::Sender;
use futures::prelude::*;
use slog::{crit, debug, o, trace};
use std::sync::Weak;
use tokio::runtime::Runtime;
use tokio::runtime::{Handle, Runtime};
/// Provides a reason when Lighthouse is shut down.
#[derive(Copy, Clone, Debug, PartialEq)]
@ -24,11 +25,51 @@ impl ShutdownReason {
}
}
/// Provides a `Handle` by either:
///
/// 1. Holding a `Weak<Runtime>` and calling `Runtime::handle`.
/// 2. Directly holding a `Handle` and cloning it.
///
/// This enum allows the `TaskExecutor` to work in production where a `Weak<Runtime>` is directly
/// accessible and in testing where the `Runtime` is hidden outside our scope.
#[derive(Clone)]
pub enum HandleProvider {
Runtime(Weak<Runtime>),
Handle(Handle),
}
impl From<Handle> for HandleProvider {
fn from(handle: Handle) -> Self {
HandleProvider::Handle(handle)
}
}
impl From<Weak<Runtime>> for HandleProvider {
fn from(weak_runtime: Weak<Runtime>) -> Self {
HandleProvider::Runtime(weak_runtime)
}
}
impl HandleProvider {
/// Returns a `Handle` to a `Runtime`.
///
/// May return `None` if the weak reference to the `Runtime` has been dropped (this generally
/// means Lighthouse is shutting down).
pub fn handle(&self) -> Option<Handle> {
match self {
HandleProvider::Runtime(weak_runtime) => weak_runtime
.upgrade()
.map(|runtime| runtime.handle().clone()),
HandleProvider::Handle(handle) => Some(handle.clone()),
}
}
}
/// A wrapper over a runtime handle which can spawn async and blocking tasks.
#[derive(Clone)]
pub struct TaskExecutor {
/// The handle to the runtime on which tasks are spawned
runtime: Weak<Runtime>,
handle_provider: HandleProvider,
/// The receiver exit future which on receiving shuts down the task
exit: exit_future::Exit,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
@ -43,16 +84,19 @@ pub struct TaskExecutor {
impl TaskExecutor {
/// Create a new task executor.
///
/// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from
/// a [`RuntimeContext`](struct.RuntimeContext.html)
pub fn new(
runtime: Weak<Runtime>,
/// ## Note
///
/// This function should only be used during testing. In production, prefer to obtain an
/// instance of `Self` via a `environment::RuntimeContext` (see the `lighthouse/environment`
/// crate).
pub fn new<T: Into<HandleProvider>>(
handle: T,
exit: exit_future::Exit,
log: slog::Logger,
signal_tx: Sender<ShutdownReason>,
) -> Self {
Self {
runtime,
handle_provider: handle.into(),
exit,
signal_tx,
log,
@ -62,7 +106,7 @@ impl TaskExecutor {
/// Clones the task executor adding a service name.
pub fn clone_with_name(&self, service_name: String) -> Self {
TaskExecutor {
runtime: self.runtime.clone(),
handle_provider: self.handle_provider.clone(),
exit: self.exit.clone(),
signal_tx: self.signal_tx.clone(),
log: self.log.new(o!("service" => service_name)),
@ -94,8 +138,8 @@ impl TaskExecutor {
let mut shutdown_sender = self.shutdown_sender();
let log = self.log.clone();
if let Some(runtime) = self.runtime.upgrade() {
runtime.spawn(async move {
if let Some(handle) = self.handle() {
handle.spawn(async move {
let timer = metrics::start_timer_vec(&metrics::TASKS_HISTOGRAM, &[name]);
if let Err(join_error) = task_handle.await {
if let Ok(panic) = join_error.try_into_panic() {
@ -160,8 +204,8 @@ impl TaskExecutor {
});
int_gauge.inc();
if let Some(runtime) = self.runtime.upgrade() {
runtime.spawn(future);
if let Some(handle) = self.handle() {
handle.spawn(future);
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
}
@ -211,8 +255,8 @@ impl TaskExecutor {
});
int_gauge.inc();
if let Some(runtime) = self.runtime.upgrade() {
Some(runtime.spawn(future))
if let Some(handle) = self.handle() {
Some(handle.spawn(future))
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
None
@ -242,8 +286,8 @@ impl TaskExecutor {
let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]);
metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]);
let join_handle = if let Some(runtime) = self.runtime.upgrade() {
runtime.spawn_blocking(task)
let join_handle = if let Some(handle) = self.handle() {
handle.spawn_blocking(task)
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
return None;
@ -268,8 +312,9 @@ impl TaskExecutor {
Some(future)
}
pub fn runtime(&self) -> Weak<Runtime> {
self.runtime.clone()
/// Returns a `Handle` to the current runtime.
pub fn handle(&self) -> Option<Handle> {
self.handle_provider.handle()
}
/// Returns a copy of the `exit_future::Exit`.

View File

@ -0,0 +1,68 @@
use crate::TaskExecutor;
use slog::Logger;
use sloggers::{null::NullLoggerBuilder, Build};
use std::sync::Arc;
use tokio::runtime;
/// Whilst the `TestRuntime` is not necessarily useful in itself, it provides the necessary
/// components for creating a `TaskExecutor` during tests.
///
/// May create its own runtime or use an existing one.
///
/// ## Warning
///
/// This struct should never be used in production, only testing.
pub struct TestRuntime {
runtime: Option<Arc<tokio::runtime::Runtime>>,
_runtime_shutdown: exit_future::Signal,
pub task_executor: TaskExecutor,
pub log: Logger,
}
impl Default for TestRuntime {
/// If called *inside* an existing runtime, instantiates `Self` using a handle to that runtime. If
/// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the
/// `Self` is dropped.
fn default() -> Self {
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let log = null_logger().unwrap();
let (runtime, handle) = if let Ok(handle) = runtime::Handle::try_current() {
(None, handle)
} else {
let runtime = Arc::new(
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let handle = runtime.handle().clone();
(Some(runtime), handle)
};
let task_executor = TaskExecutor::new(handle, exit, log.clone(), shutdown_tx);
Self {
runtime,
_runtime_shutdown: runtime_shutdown,
task_executor,
log,
}
}
}
impl Drop for TestRuntime {
fn drop(&mut self) {
if let Some(runtime) = self.runtime.take() {
Arc::try_unwrap(runtime).unwrap().shutdown_background()
}
}
}
pub fn null_logger() -> Result<Logger, String> {
let log_builder = NullLoggerBuilder;
log_builder
.build()
.map_err(|e| format!("Failed to start null logger: {:?}", e))
}

View File

@ -13,9 +13,7 @@ use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{future, StreamExt};
use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger};
use sloggers::{
file::FileLoggerBuilder, null::NullLoggerBuilder, types::Format, types::Severity, Build,
};
use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build};
use std::fs::create_dir_all;
use std::path::PathBuf;
use std::sync::Arc;
@ -33,6 +31,8 @@ use {
#[cfg(not(target_family = "unix"))]
use {futures::channel::oneshot, std::cell::RefCell};
pub use task_executor::test_utils::null_logger;
const LOG_CHANNEL_SIZE: usize = 2048;
/// The maximum time in seconds the client will wait for all internal tasks to shutdown.
const MAXIMUM_SHUTDOWN_TIME: u64 = 15;
@ -506,13 +506,6 @@ impl<E: EthSpec> Environment<E> {
}
}
pub fn null_logger() -> Result<Logger, String> {
let log_builder = NullLoggerBuilder;
log_builder
.build()
.map_err(|e| format!("Failed to start null logger: {:?}", e))
}
#[cfg(target_family = "unix")]
struct SignalFuture {
signal: Signal,

View File

@ -14,8 +14,8 @@ use slog::{info, warn, Logger};
use slot_clock::SlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Weak;
use tokio::runtime::Runtime;
use task_executor::TaskExecutor;
use tokio::runtime::Handle;
use types::{EthSpec, PublicKeyBytes};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::Rejection;
@ -59,7 +59,7 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportKeystoresRequest,
validator_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
runtime: Weak<Runtime>,
task_executor: TaskExecutor,
log: Logger,
) -> Result<ImportKeystoresResponse, Rejection> {
// Check request validity. This is the only cases in which we should return a 4xx code.
@ -122,14 +122,14 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
ImportKeystoreStatus::Error,
format!("slashing protection import failed: {:?}", e),
)
} else if let Some(runtime) = runtime.upgrade() {
} else if let Some(handle) = task_executor.handle() {
// Import the keystore.
match import_single_keystore(
keystore,
password,
validator_dir.clone(),
&validator_store,
runtime,
handle,
) {
Ok(status) => Status::ok(status),
Err(e) => {
@ -159,7 +159,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
password: ZeroizeString,
validator_dir_path: PathBuf,
validator_store: &ValidatorStore<T, E>,
runtime: Arc<Runtime>,
handle: Handle,
) -> Result<ImportKeystoreStatus, String> {
// Check if the validator key already exists, erroring if it is a remote signer validator.
let pubkey = keystore
@ -198,7 +198,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
let voting_keystore_path = validator_dir.voting_keystore_path();
drop(validator_dir);
runtime
handle
.block_on(validator_store.add_validator_keystore(
voting_keystore_path,
password,
@ -214,7 +214,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<ValidatorStore<T, E>>,
runtime: Weak<Runtime>,
task_executor: TaskExecutor,
log: Logger,
) -> Result<DeleteKeystoresResponse, Rejection> {
// Remove from initialized validators.
@ -225,8 +225,11 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
.pubkeys
.iter()
.map(|pubkey_bytes| {
match delete_single_keystore(pubkey_bytes, &mut initialized_validators, runtime.clone())
{
match delete_single_keystore(
pubkey_bytes,
&mut initialized_validators,
task_executor.clone(),
) {
Ok(status) => Status::ok(status),
Err(error) => {
warn!(
@ -244,8 +247,8 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
// Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out
// of date as it resets when it can't be decrypted. We update it just a single time to avoid
// continually resetting it after each key deletion.
if let Some(runtime) = runtime.upgrade() {
runtime
if let Some(handle) = task_executor.handle() {
handle
.block_on(initialized_validators.update_validators())
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
}
@ -278,14 +281,14 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
fn delete_single_keystore(
pubkey_bytes: &PublicKeyBytes,
initialized_validators: &mut InitializedValidators,
runtime: Weak<Runtime>,
task_executor: TaskExecutor,
) -> Result<DeleteKeystoreStatus, String> {
if let Some(runtime) = runtime.upgrade() {
if let Some(handle) = task_executor.handle() {
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
match runtime.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true))
match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true))
{
Ok(_) => Ok(DeleteKeystoreStatus::Deleted),
Err(e) => match e {

View File

@ -22,8 +22,8 @@ use std::future::Future;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use tokio::runtime::Runtime;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::{
@ -59,7 +59,7 @@ impl From<String> for Error {
///
/// The server will gracefully handle the case where any fields are `None`.
pub struct Context<T: SlotClock, E: EthSpec> {
pub runtime: Weak<Runtime>,
pub task_executor: TaskExecutor,
pub api_secret: ApiSecret,
pub validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub validator_dir: Option<PathBuf>,
@ -161,8 +161,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
});
let inner_runtime = ctx.runtime.clone();
let runtime_filter = warp::any().map(move || inner_runtime.clone());
let inner_task_executor = ctx.task_executor.clone();
let task_executor_filter = warp::any().map(move || inner_task_executor.clone());
let inner_validator_dir = ctx.validator_dir.clone();
let validator_dir_filter = warp::any()
@ -290,18 +290,18 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(spec_filter.clone())
.and(signer.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and_then(
|body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>,
signer,
runtime: Weak<Runtime>| {
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
if let Some(runtime) = runtime.upgrade() {
if let Some(handle) = task_executor.handle() {
let (validators, mnemonic) =
runtime.block_on(create_validators_mnemonic(
handle.block_on(create_validators_mnemonic(
None,
None,
&body,
@ -316,7 +316,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(api_types::GenericResponse::from(response))
} else {
Err(warp_utils::reject::custom_server_error(
"Runtime shutdown".into(),
"Lighthouse shutting down".into(),
))
}
})
@ -333,16 +333,16 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(spec_filter)
.and(signer.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and_then(
|body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>,
signer,
runtime: Weak<Runtime>| {
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
if let Some(runtime) = runtime.upgrade() {
if let Some(handle) = task_executor.handle() {
let mnemonic =
mnemonic_from_phrase(body.mnemonic.as_str()).map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
@ -351,7 +351,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
))
})?;
let (validators, _mnemonic) =
runtime.block_on(create_validators_mnemonic(
handle.block_on(create_validators_mnemonic(
Some(mnemonic),
Some(body.key_derivation_path_offset),
&body.validators,
@ -362,7 +362,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(api_types::GenericResponse::from(validators))
} else {
Err(warp_utils::reject::custom_server_error(
"Runtime shutdown".into(),
"Lighthouse shutting down".into(),
))
}
})
@ -378,13 +378,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_dir_filter.clone())
.and(validator_store_filter.clone())
.and(signer.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and_then(
|body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
runtime: Weak<Runtime>| {
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
// Check to ensure the password is correct.
let keypair = body
@ -416,8 +416,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let suggested_fee_recipient = body.suggested_fee_recipient;
let validator_def = {
if let Some(runtime) = runtime.upgrade() {
runtime
if let Some(handle) = task_executor.handle() {
handle
.block_on(validator_store.add_validator_keystore(
voting_keystore_path,
voting_password,
@ -433,7 +433,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})?
} else {
return Err(warp_utils::reject::custom_server_error(
"Runtime shutdown".into(),
"Lighthouse shutting down".into(),
));
}
};
@ -455,14 +455,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(signer.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and_then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
runtime: Weak<Runtime>| {
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
if let Some(runtime) = runtime.upgrade() {
if let Some(handle) = task_executor.handle() {
let web3signers: Vec<ValidatorDefinition> = body
.into_iter()
.map(|web3signer| ValidatorDefinition {
@ -478,14 +478,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
},
})
.collect();
runtime.block_on(create_validators_web3signer(
handle.block_on(create_validators_web3signer(
web3signers,
&validator_store,
))?;
Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"Runtime shutdown".into(),
"Lighthouse shutting down".into(),
))
}
})
@ -500,13 +500,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(signer.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and_then(
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
runtime: Weak<Runtime>| {
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();
@ -518,8 +518,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
))),
Some(enabled) if enabled == body.enabled => Ok(()),
Some(_) => {
if let Some(runtime) = runtime.upgrade() {
runtime
if let Some(handle) = task_executor.handle() {
handle
.block_on(
initialized_validators
.set_validator_status(&validator_pubkey, body.enabled),
@ -533,7 +533,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"Runtime shutdown".into(),
"Lighthouse shutting down".into(),
))
}
}
@ -574,12 +574,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(signer.clone())
.and(validator_dir_filter)
.and(validator_store_filter.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
|request, signer, validator_dir, validator_store, runtime, log| {
|request, signer, validator_dir, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
keystores::import(request, validator_dir, validator_store, runtime, log)
keystores::import(request, validator_dir, validator_store, task_executor, log)
})
},
);
@ -589,11 +589,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, runtime, log| {
.and_then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
keystores::delete(request, validator_store, runtime, log)
keystores::delete(request, validator_store, task_executor, log)
})
});
@ -610,11 +610,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone())
.and(runtime_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, runtime, log| {
.and_then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::import(request, validator_store, runtime, log)
remotekeys::import(request, validator_store, task_executor, log)
})
});
@ -623,11 +623,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(signer)
.and(validator_store_filter)
.and(runtime_filter)
.and(task_executor_filter)
.and(log_filter.clone())
.and_then(|request, signer, validator_store, runtime, log| {
.and_then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::delete(request, validator_store, runtime, log)
remotekeys::delete(request, validator_store, task_executor, log)
})
});

View File

@ -8,8 +8,9 @@ use eth2::lighthouse_vc::std_types::{
};
use slog::{info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::{Arc, Weak};
use tokio::runtime::Runtime;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::runtime::Handle;
use types::{EthSpec, PublicKeyBytes};
use url::Url;
use warp::Rejection;
@ -45,7 +46,7 @@ pub fn list<T: SlotClock + 'static, E: EthSpec>(
pub fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportRemotekeysRequest,
validator_store: Arc<ValidatorStore<T, E>>,
runtime: Weak<Runtime>,
task_executor: TaskExecutor,
log: Logger,
) -> Result<ImportRemotekeysResponse, Rejection> {
info!(
@ -57,14 +58,10 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
let mut statuses = Vec::with_capacity(request.remote_keys.len());
for remotekey in request.remote_keys {
let status = if let Some(runtime) = runtime.upgrade() {
let status = if let Some(handle) = task_executor.handle() {
// Import the keystore.
match import_single_remotekey(
remotekey.pubkey,
remotekey.url,
&validator_store,
runtime,
) {
match import_single_remotekey(remotekey.pubkey, remotekey.url, &validator_store, handle)
{
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
@ -91,7 +88,7 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
pubkey: PublicKeyBytes,
url: String,
validator_store: &ValidatorStore<T, E>,
runtime: Arc<Runtime>,
handle: Handle,
) -> Result<ImportRemotekeyStatus, String> {
if let Err(url_err) = Url::parse(&url) {
return Err(format!("failed to parse remotekey URL: {}", url_err));
@ -129,7 +126,7 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
request_timeout_ms: None,
},
};
runtime
handle
.block_on(validator_store.add_validator(web3signer_validator))
.map_err(|e| format!("failed to initialize validator: {:?}", e))?;
@ -139,7 +136,7 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteRemotekeysRequest,
validator_store: Arc<ValidatorStore<T, E>>,
runtime: Weak<Runtime>,
task_executor: TaskExecutor,
log: Logger,
) -> Result<DeleteRemotekeysResponse, Rejection> {
info!(
@ -158,7 +155,7 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
match delete_single_remotekey(
pubkey_bytes,
&mut initialized_validators,
runtime.clone(),
task_executor.clone(),
) {
Ok(status) => Status::ok(status),
Err(error) => {
@ -177,8 +174,8 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
// Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out
// of date as it resets when it can't be decrypted. We update it just a single time to avoid
// continually resetting it after each key deletion.
if let Some(runtime) = runtime.upgrade() {
runtime
if let Some(handle) = task_executor.handle() {
handle
.block_on(initialized_validators.update_validators())
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
}
@ -189,15 +186,14 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
fn delete_single_remotekey(
pubkey_bytes: &PublicKeyBytes,
initialized_validators: &mut InitializedValidators,
runtime: Weak<Runtime>,
task_executor: TaskExecutor,
) -> Result<DeleteRemotekeyStatus, String> {
if let Some(runtime) = runtime.upgrade() {
if let Some(handle) = task_executor.handle() {
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
match runtime
.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false))
match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false))
{
Ok(_) => Ok(DeleteRemotekeyStatus::Deleted),
Err(e) => match e {

View File

@ -102,7 +102,7 @@ impl ApiTester {
spec,
Some(Arc::new(DoppelgangerService::new(log.clone()))),
slot_clock,
executor,
executor.clone(),
log.clone(),
));
@ -113,7 +113,7 @@ impl ApiTester {
let initialized_validators = validator_store.initialized_validators();
let context = Arc::new(Context {
runtime,
task_executor: executor,
api_secret,
validator_dir: Some(validator_dir.path().into()),
validator_store: Some(validator_store.clone()),

View File

@ -498,7 +498,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
self.http_api_listen_addr = if self.config.http_api.enabled {
let ctx = Arc::new(http_api::Context {
runtime: self.context.executor.runtime(),
task_executor: self.context.executor.clone(),
api_secret,
validator_store: Some(self.validator_store.clone()),
validator_dir: Some(self.config.validator_dir.clone()),