diff --git a/Cargo.lock b/Cargo.lock index f412691e8..6959799be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -606,7 +606,9 @@ dependencies = [ "lighthouse_metrics", "lighthouse_network", "logging", + "num_cpus", "parking_lot 0.12.1", + "serde", "slog", "slot_clock", "strum", @@ -3334,6 +3336,7 @@ name = "http_api" version = "0.1.0" dependencies = [ "beacon_chain", + "beacon_processor", "bs58 0.4.0", "bytes", "directory", @@ -4407,6 +4410,7 @@ dependencies = [ "account_manager", "account_utils", "beacon_node", + "beacon_processor", "bls", "boot_node", "clap", @@ -8984,6 +8988,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "task_executor", "testcontainers", "tokio", "tokio-postgres", diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index efbc9905b..d1bddcf73 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -79,8 +79,6 @@ pub struct ChainConfig { /// /// This is useful for block builders and testing. pub always_prepare_payload: bool, - /// Whether backfill sync processing should be rate-limited. - pub enable_backfill_rate_limiting: bool, /// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation. pub progressive_balances_mode: ProgressiveBalancesMode, /// Number of epochs between each migration of data from the hot database to the freezer. @@ -114,7 +112,6 @@ impl Default for ChainConfig { shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, genesis_backfill: false, always_prepare_payload: false, - enable_backfill_rate_limiting: true, progressive_balances_mode: ProgressiveBalancesMode::Checked, epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, } diff --git a/beacon_node/beacon_processor/Cargo.toml b/beacon_node/beacon_processor/Cargo.toml index 5c5200e10..c626441bb 100644 --- a/beacon_node/beacon_processor/Cargo.toml +++ b/beacon_node/beacon_processor/Cargo.toml @@ -21,4 +21,6 @@ types = { path = "../../consensus/types" } ethereum_ssz = "0.5.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } -parking_lot = "0.12.0" \ No newline at end of file +parking_lot = "0.12.0" +num_cpus = "1.13.0" +serde = { version = "1.0.116", features = ["derive"] } \ No newline at end of file diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index afa70bcb6..87a3ec3b5 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -46,6 +46,7 @@ use futures::task::Poll; use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::TimeLatch; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::cmp; @@ -73,7 +74,7 @@ pub mod work_reprocessing_queue; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. -pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; +const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for idle events to the `BeaconProcessor`. /// @@ -82,7 +83,7 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. -pub const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4; +const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; /// The maximum number of queued `Attestation` objects that will be stored before we start dropping /// them. @@ -179,6 +180,14 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384; /// will be stored before we start dropping them. const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; +/// The maximum number of priority-0 (highest priority) messages that will be queued before +/// they begin to be dropped. +const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024; + +/// The maximum number of priority-1 (second-highest priority) messages that will be queued before +/// they begin to be dropped. +const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024; + /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -196,8 +205,8 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// Poisoning occurs when an invalid signature is included in a batch of attestations. A single /// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to /// individually verifying each attestation signature. -const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; -const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; +const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; +const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; /// Unique IDs used for metrics and testing. pub const WORKER_FREED: &str = "worker_freed"; @@ -231,6 +240,61 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; +pub const API_REQUEST_P0: &str = "api_request_p0"; +pub const API_REQUEST_P1: &str = "api_request_p1"; + +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct BeaconProcessorConfig { + pub max_workers: usize, + pub max_work_event_queue_len: usize, + pub max_scheduled_work_queue_len: usize, + pub max_gossip_attestation_batch_size: usize, + pub max_gossip_aggregate_batch_size: usize, + pub enable_backfill_rate_limiting: bool, +} + +impl Default for BeaconProcessorConfig { + fn default() -> Self { + Self { + max_workers: cmp::max(1, num_cpus::get()), + max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN, + max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN, + max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE, + max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE, + enable_backfill_rate_limiting: true, + } + } +} + +// The channels necessary to instantiate a `BeaconProcessor`. +pub struct BeaconProcessorChannels { + pub beacon_processor_tx: BeaconProcessorSend, + pub beacon_processor_rx: mpsc::Receiver>, + pub work_reprocessing_tx: mpsc::Sender, + pub work_reprocessing_rx: mpsc::Receiver, +} + +impl BeaconProcessorChannels { + pub fn new(config: &BeaconProcessorConfig) -> Self { + let (beacon_processor_tx, beacon_processor_rx) = + mpsc::channel(config.max_scheduled_work_queue_len); + let (work_reprocessing_tx, work_reprocessing_rx) = + mpsc::channel(config.max_scheduled_work_queue_len); + + Self { + beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx), + beacon_processor_rx, + work_reprocessing_rx, + work_reprocessing_tx, + } + } +} + +impl Default for BeaconProcessorChannels { + fn default() -> Self { + Self::new(&BeaconProcessorConfig::default()) + } +} /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -379,7 +443,7 @@ impl WorkEvent { } } -impl std::convert::From for WorkEvent { +impl From for WorkEvent { fn from(ready_work: ReadyWork) -> Self { match ready_work { ReadyWork::Block(QueuedGossipBlock { @@ -481,6 +545,10 @@ impl BeaconProcessorSend { pub type AsyncFn = Pin + Send + Sync>>; pub type BlockingFn = Box; pub type BlockingFnWithManualSendOnIdle = Box; +pub enum BlockingOrAsync { + Blocking(BlockingFn), + Async(AsyncFn), +} /// Indicates the type of work to be performed and therefore its priority and /// queuing specifics. @@ -545,6 +613,8 @@ pub enum Work { BlobsByRootsRequest(BlockingFnWithManualSendOnIdle), GossipBlsToExecutionChange(BlockingFn), LightClientBootstrapRequest(BlockingFn), + ApiRequestP0(BlockingOrAsync), + ApiRequestP1(BlockingOrAsync), } impl fmt::Debug for Work { @@ -586,6 +656,8 @@ impl Work { Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, + Work::ApiRequestP0 { .. } => API_REQUEST_P0, + Work::ApiRequestP1 { .. } => API_REQUEST_P1, } } } @@ -664,7 +736,7 @@ pub struct BeaconProcessor { pub executor: TaskExecutor, pub max_workers: usize, pub current_workers: usize, - pub enable_backfill_rate_limiting: bool, + pub config: BeaconProcessorConfig, pub log: Logger, } @@ -744,11 +816,13 @@ impl BeaconProcessor { let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); + let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN); + let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN); + // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = - mpsc::channel::(MAX_SCHEDULED_WORK_QUEUE_LEN); - + mpsc::channel::(self.config.max_scheduled_work_queue_len); spawn_reprocess_scheduler( ready_work_tx, work_reprocessing_rx, @@ -769,7 +843,7 @@ impl BeaconProcessor { reprocess_work_rx: ready_work_rx, }; - let enable_backfill_rate_limiting = self.enable_backfill_rate_limiting; + let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting; loop { let work_event = match inbound_events.next().await { @@ -884,12 +958,17 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = gossip_blob_queue.pop() { self.spawn_worker(item, idle_tx); + // Check the priority 0 API requests after blocks and blobs, but before attestations. + } else if let Some(item) = api_request_p0_queue.pop() { + self.spawn_worker(item, idle_tx); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. } else if aggregate_queue.len() > 0 { - let batch_size = - cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE); + let batch_size = cmp::min( + aggregate_queue.len(), + self.config.max_gossip_aggregate_batch_size, + ); if batch_size < 2 { // One single aggregate is in the queue, process it individually. @@ -948,7 +1027,7 @@ impl BeaconProcessor { } else if attestation_queue.len() > 0 { let batch_size = cmp::min( attestation_queue.len(), - MAX_GOSSIP_ATTESTATION_BATCH_SIZE, + self.config.max_gossip_attestation_batch_size, ); if batch_size < 2 { @@ -1043,6 +1122,12 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { self.spawn_worker(item, idle_tx); + // Check the priority 1 API requests after we've + // processed all the interesting things from the network + // and things required for us to stay in good repute + // with our P2P peers. + } else if let Some(item) = api_request_p1_queue.pop() { + self.spawn_worker(item, idle_tx); // Handle backfill sync chain segments. } else if let Some(item) = backfill_chain_segment.pop() { self.spawn_worker(item, idle_tx); @@ -1175,6 +1260,12 @@ impl BeaconProcessor { Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id, &self.log) } + Work::ApiRequestP0 { .. } => { + api_request_p0_queue.push(work, work_id, &self.log) + } + Work::ApiRequestP1 { .. } => { + api_request_p1_queue.push(work, work_id, &self.log) + } } } } @@ -1235,6 +1326,14 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL, gossip_bls_to_execution_change_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL, + api_request_p0_queue.len() as i64, + ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL, + api_request_p1_queue.len() as i64, + ); if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( @@ -1357,6 +1456,10 @@ impl BeaconProcessor { task_spawner.spawn_blocking_with_manual_send_idle(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), + Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { + BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), + BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn), + }, Work::GossipVoluntaryExit(process_fn) | Work::GossipProposerSlashing(process_fn) | Work::GossipAttesterSlashing(process_fn) diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index dbe6d59ee..9082a7d47 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -110,6 +110,15 @@ lazy_static::lazy_static! { "beacon_processor_sync_contribution_queue_total", "Count of sync committee contributions waiting to be processed." ); + // HTTP API requests. + pub static ref BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_api_request_p0_queue_total", + "Count of P0 HTTP requesets waiting to be processed." + ); + pub static ref BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_api_request_p1_queue_total", + "Count of P1 HTTP requesets waiting to be processed." + ); /* * Attestation reprocessing queue metrics. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 80c73de8a..e1dc56204 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -14,10 +14,8 @@ use beacon_chain::{ store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler, }; -use beacon_processor::{ - work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessor, BeaconProcessorSend, - WorkEvent, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, -}; +use beacon_processor::BeaconProcessorConfig; +use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2::{ @@ -39,7 +37,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use timer::spawn_timer; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use types::{ test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, ExecutionBlockHash, Hash256, SignedBeaconBlock, @@ -79,11 +77,9 @@ pub struct ClientBuilder { http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, slasher: Option>>, + beacon_processor_config: Option, + beacon_processor_channels: Option>, eth_spec_instance: T::EthSpec, - beacon_processor_send: BeaconProcessorSend, - beacon_processor_receive: mpsc::Receiver>, - work_reprocessing_tx: mpsc::Sender, - work_reprocessing_rx: mpsc::Receiver, } impl @@ -99,10 +95,6 @@ where /// /// The `eth_spec_instance` parameter is used to concretize `TEthSpec`. pub fn new(eth_spec_instance: TEthSpec) -> Self { - let (beacon_processor_send, beacon_processor_receive) = - mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); - let (work_reprocessing_tx, work_reprocessing_rx) = - mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); Self { slot_clock: None, store: None, @@ -121,10 +113,8 @@ where http_metrics_config: <_>::default(), slasher: None, eth_spec_instance, - beacon_processor_send: BeaconProcessorSend(beacon_processor_send), - beacon_processor_receive, - work_reprocessing_tx, - work_reprocessing_rx, + beacon_processor_config: None, + beacon_processor_channels: None, } } @@ -140,6 +130,12 @@ where self } + pub fn beacon_processor(mut self, config: BeaconProcessorConfig) -> Self { + self.beacon_processor_channels = Some(BeaconProcessorChannels::new(&config)); + self.beacon_processor_config = Some(config); + self + } + pub fn slasher(mut self, slasher: Arc>) -> Self { self.slasher = Some(slasher); self @@ -479,6 +475,7 @@ where chain: None, network_senders: None, network_globals: None, + beacon_processor_send: None, eth1_service: Some(genesis_service.eth1_service.clone()), log: context.log().clone(), sse_logging_components: runtime_context.sse_logging_components.clone(), @@ -556,6 +553,10 @@ where .as_ref() .ok_or("network requires a runtime_context")? .clone(); + let beacon_processor_channels = self + .beacon_processor_channels + .as_ref() + .ok_or("network requires beacon_processor_channels")?; // If gossipsub metrics are required we build a registry to record them let mut gossipsub_registry = if config.metrics_enabled { @@ -571,8 +572,8 @@ where gossipsub_registry .as_mut() .map(|registry| registry.sub_registry_with_prefix("gossipsub")), - self.beacon_processor_send.clone(), - self.work_reprocessing_tx.clone(), + beacon_processor_channels.beacon_processor_tx.clone(), + beacon_processor_channels.work_reprocessing_tx.clone(), ) .await .map_err(|e| format!("Failed to start network: {:?}", e))?; @@ -695,6 +696,14 @@ where .runtime_context .as_ref() .ok_or("build requires a runtime context")?; + let beacon_processor_channels = self + .beacon_processor_channels + .take() + .ok_or("build requires beacon_processor_channels")?; + let beacon_processor_config = self + .beacon_processor_config + .take() + .ok_or("build requires a beacon_processor_config")?; let log = runtime_context.log().clone(); let http_api_listen_addr = if self.http_api_config.enabled { @@ -704,6 +713,7 @@ where network_senders: self.network_senders.clone(), network_globals: self.network_globals.clone(), eth1_service: self.eth1_service.clone(), + beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), sse_logging_components: runtime_context.sse_logging_components.clone(), log: log.clone(), }); @@ -767,15 +777,13 @@ where executor: beacon_processor_context.executor.clone(), max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, - enable_backfill_rate_limiting: beacon_chain - .config - .enable_backfill_rate_limiting, + config: beacon_processor_config, log: beacon_processor_context.log().clone(), } .spawn_manager( - self.beacon_processor_receive, - self.work_reprocessing_tx, - self.work_reprocessing_rx, + beacon_processor_channels.beacon_processor_rx, + beacon_processor_channels.work_reprocessing_tx, + beacon_processor_channels.work_reprocessing_rx, None, beacon_chain.slot_clock.clone(), beacon_chain.spec.maximum_gossip_clock_disparity(), diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 0be6bddfc..a5d36e943 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -1,5 +1,6 @@ use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::TrustedSetup; +use beacon_processor::BeaconProcessorConfig; use directory::DEFAULT_ROOT_DIR; use environment::LoggerConfig; use network::NetworkConfig; @@ -89,6 +90,7 @@ pub struct Config { pub slasher: Option, pub logger_config: LoggerConfig, pub always_prefer_builder_payload: bool, + pub beacon_processor: BeaconProcessorConfig, } impl Default for Config { @@ -118,6 +120,7 @@ impl Default for Config { validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, logger_config: LoggerConfig::default(), always_prefer_builder_payload: false, + beacon_processor: <_>::default(), } } } diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 4b6a83a01..3f845fc78 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -3,12 +3,12 @@ name = "http_api" version = "0.1.0" authors = ["Paul Hauner "] edition = "2021" -autotests = false # using a single test binary compiles faster +autotests = false # using a single test binary compiles faster [dependencies] warp = { version = "0.3.2", features = ["tls"] } serde = { version = "1.0.116", features = ["derive"] } -tokio = { version = "1.14.0", features = ["macros","sync"] } +tokio = { version = "1.14.0", features = ["macros", "sync"] } tokio-stream = { version = "0.1.3", features = ["sync"] } types = { path = "../../consensus/types" } hex = "0.4.2" @@ -27,9 +27,9 @@ slot_clock = { path = "../../common/slot_clock" } ethereum_ssz = "0.5.0" bs58 = "0.4.0" futures = "0.3.8" -execution_layer = {path = "../execution_layer"} +execution_layer = { path = "../execution_layer" } parking_lot = "0.12.0" -safe_arith = {path = "../../consensus/safe_arith"} +safe_arith = { path = "../../consensus/safe_arith" } task_executor = { path = "../../common/task_executor" } lru = "0.7.7" tree_hash = "0.5.2" @@ -40,9 +40,10 @@ logging = { path = "../../common/logging" } ethereum_serde_utils = "0.5.0" operation_pool = { path = "../operation_pool" } sensitive_url = { path = "../../common/sensitive_url" } -unused_port = {path = "../../common/unused_port"} +unused_port = { path = "../../common/unused_port" } store = { path = "../store" } bytes = "1.1.0" +beacon_processor = { path = "../beacon_processor" } [dev-dependencies] environment = { path = "../../lighthouse/environment" } @@ -52,4 +53,4 @@ genesis = { path = "../genesis" } [[test]] name = "bn_http_api_tests" -path = "tests/main.rs" \ No newline at end of file +path = "tests/main.rs" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b0a0816e3..4b95a04a4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -19,8 +19,10 @@ mod standard_block_rewards; mod state_id; mod sync_committee_rewards; mod sync_committees; +mod task_spawner; pub mod test_utils; mod ui; +mod validator; mod validator_inclusion; mod version; @@ -29,6 +31,7 @@ use beacon_chain::{ validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped, }; +use beacon_processor::BeaconProcessorSend; pub use block_id::BlockId; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; @@ -58,7 +61,11 @@ use std::pin::Pin; use std::sync::Arc; use sysinfo::{System, SystemExt}; use system_health::observe_system_health_bn; -use tokio::sync::mpsc::{Sender, UnboundedSender}; +use task_spawner::{Priority, TaskSpawner}; +use tokio::sync::{ + mpsc::{Sender, UnboundedSender}, + oneshot, +}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, @@ -67,6 +74,7 @@ use types::{ SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; +use validator::pubkey_to_validator_index; use version::{ add_consensus_version_header, execution_optimistic_finalized_fork_versioned_response, fork_versioned_response, inconsistent_fork_rejection, unsupported_version_rejection, V1, V2, @@ -75,11 +83,7 @@ use warp::http::StatusCode; use warp::sse::Event; use warp::Reply; use warp::{http::Response, Filter}; -use warp_utils::{ - query::multi_key_query, - task::{blocking_json_task, blocking_response_task}, - uor::UnifyingOrFilter, -}; +use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter}; const API_PREFIX: &str = "eth"; @@ -111,6 +115,7 @@ pub struct Context { pub chain: Option>>, pub network_senders: Option>, pub network_globals: Option>>, + pub beacon_processor_send: Option>, pub eth1_service: Option, pub sse_logging_components: Option, pub log: Logger, @@ -127,6 +132,7 @@ pub struct Config { pub allow_sync_stalled: bool, pub spec_fork_name: Option, pub data_dir: PathBuf, + pub enable_beacon_processor: bool, } impl Default for Config { @@ -140,6 +146,7 @@ impl Default for Config { allow_sync_stalled: false, spec_fork_name: None, data_dir: PathBuf::from(DEFAULT_ROOT_DIR), + enable_beacon_processor: true, } } } @@ -488,6 +495,14 @@ pub fn serve( let app_start = std::time::Instant::now(); let app_start_filter = warp::any().map(move || app_start); + // Create a `warp` filter that provides access to the `TaskSpawner`. + let beacon_processor_send = ctx + .beacon_processor_send + .clone() + .filter(|_| config.enable_beacon_processor); + let task_spawner_filter = + warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone())); + /* * * Start of HTTP method definitions. @@ -499,17 +514,20 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("genesis")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let genesis_data = api_types::GenesisData { - genesis_time: chain.genesis_time, - genesis_validators_root: chain.genesis_validators_root, - genesis_fork_version: chain.spec.genesis_fork_version, - }; - Ok(api_types::GenericResponse::from(genesis_data)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let genesis_data = api_types::GenesisData { + genesis_time: chain.genesis_time, + genesis_validators_root: chain.genesis_validators_root, + genesis_fork_version: chain.spec.genesis_fork_version, + }; + Ok(api_types::GenericResponse::from(genesis_data)) + }) + }, + ); /* * beacon/states/{state_id} @@ -523,6 +541,7 @@ pub fn serve( "Invalid state ID".to_string(), )) })) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); // GET beacon/states/{state_id}/root @@ -530,65 +549,77 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || { - let (root, execution_optimistic, finalized) = state_id.root(&chain)?; - Ok(root) - .map(api_types::RootData::from) - .map(api_types::GenericResponse::from) - .map(|resp| { - resp.add_execution_optimistic_finalized(execution_optimistic, finalized) - }) - }) - }); + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (root, execution_optimistic, finalized) = state_id.root(&chain)?; + Ok(root) + .map(api_types::RootData::from) + .map(api_types::GenericResponse::from) + .map(|resp| { + resp.add_execution_optimistic_finalized(execution_optimistic, finalized) + }) + }) + }, + ); // GET beacon/states/{state_id}/fork let get_beacon_state_fork = beacon_states_path .clone() .and(warp::path("fork")) .and(warp::path::end()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || { - let (fork, execution_optimistic, finalized) = - state_id.fork_and_execution_optimistic_and_finalized(&chain)?; - Ok(api_types::ExecutionOptimisticFinalizedResponse { - data: fork, - execution_optimistic: Some(execution_optimistic), - finalized: Some(finalized), + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (fork, execution_optimistic, finalized) = + state_id.fork_and_execution_optimistic_and_finalized(&chain)?; + Ok(api_types::ExecutionOptimisticFinalizedResponse { + data: fork, + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }) }) - }) - }); + }, + ); // GET beacon/states/{state_id}/finality_checkpoints let get_beacon_state_finality_checkpoints = beacon_states_path .clone() .and(warp::path("finality_checkpoints")) .and(warp::path::end()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || { - let (data, execution_optimistic, finalized) = state_id - .map_state_and_execution_optimistic_and_finalized( - &chain, - |state, execution_optimistic, finalized| { - Ok(( - api_types::FinalityCheckpointsData { - previous_justified: state.previous_justified_checkpoint(), - current_justified: state.current_justified_checkpoint(), - finalized: state.finalized_checkpoint(), - }, - execution_optimistic, - finalized, - )) - }, - )?; + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (data, execution_optimistic, finalized) = state_id + .map_state_and_execution_optimistic_and_finalized( + &chain, + |state, execution_optimistic, finalized| { + Ok(( + api_types::FinalityCheckpointsData { + previous_justified: state.previous_justified_checkpoint(), + current_justified: state.current_justified_checkpoint(), + finalized: state.finalized_checkpoint(), + }, + execution_optimistic, + finalized, + )) + }, + )?; - Ok(api_types::ExecutionOptimisticFinalizedResponse { - data, - execution_optimistic: Some(execution_optimistic), - finalized: Some(finalized), + Ok(api_types::ExecutionOptimisticFinalizedResponse { + data, + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }) }) - }) - }); + }, + ); // GET beacon/states/{state_id}/validator_balances?id let get_beacon_state_validator_balances = beacon_states_path @@ -598,9 +629,10 @@ pub fn serve( .and(multi_key_query::()) .and_then( |state_id: StateId, + task_spawner: TaskSpawner, chain: Arc>, query_res: Result| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let query = query_res?; let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( @@ -655,9 +687,10 @@ pub fn serve( .and(multi_key_query::()) .and_then( |state_id: StateId, + task_spawner: TaskSpawner, chain: Arc>, query_res: Result| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let query = query_res?; let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( @@ -737,16 +770,24 @@ pub fn serve( })) .and(warp::path::end()) .and_then( - |state_id: StateId, chain: Arc>, validator_id: ValidatorId| { - blocking_json_task(move || { + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>, + validator_id: ValidatorId| { + task_spawner.blocking_json_task(Priority::P1, move || { let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, |state, execution_optimistic, finalized| { let index_opt = match &validator_id { - ValidatorId::PublicKey(pubkey) => { - state.validators().iter().position(|v| v.pubkey == *pubkey) - } + ValidatorId::PublicKey(pubkey) => pubkey_to_validator_index( + &chain, state, pubkey, + ) + .map_err(|e| { + warp_utils::reject::custom_not_found(format!( + "unable to access pubkey cache: {e:?}", + )) + })?, ValidatorId::Index(index) => Some(*index as usize), }; @@ -797,8 +838,11 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and_then( - |state_id: StateId, chain: Arc>, query: api_types::CommitteesQuery| { - blocking_json_task(move || { + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>, + query: api_types::CommitteesQuery| { + task_spawner.blocking_json_task(Priority::P1, move || { let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -978,9 +1022,10 @@ pub fn serve( .and(warp::path::end()) .and_then( |state_id: StateId, + task_spawner: TaskSpawner, chain: Arc>, query: api_types::SyncCommitteesQuery| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let (sync_committee, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -1042,8 +1087,11 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and_then( - |state_id: StateId, chain: Arc>, query: api_types::RandaoQuery| { - blocking_json_task(move || { + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>, + query: api_types::RandaoQuery| { + task_spawner.blocking_json_task(Priority::P1, move || { let (randao, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -1078,10 +1126,13 @@ pub fn serve( .and(warp::path("headers")) .and(warp::query::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |query: api_types::HeadersQuery, chain: Arc>| { - blocking_json_task(move || { + |query: api_types::HeadersQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { let (root, block, execution_optimistic, finalized) = match (query.slot, query.parent_root) { // No query parameters, return the canonical head block. @@ -1175,36 +1226,41 @@ pub fn serve( )) })) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|block_id: BlockId, chain: Arc>| { - blocking_json_task(move || { - let (root, execution_optimistic, finalized) = block_id.root(&chain)?; - // Ignore the second `execution_optimistic` since the first one has more - // information about the original request. - let (block, _execution_optimistic, _finalized) = - BlockId::from_root(root).blinded_block(&chain)?; + .and_then( + |block_id: BlockId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (root, execution_optimistic, finalized) = block_id.root(&chain)?; + // Ignore the second `execution_optimistic` since the first one has more + // information about the original request. + let (block, _execution_optimistic, _finalized) = + BlockId::from_root(root).blinded_block(&chain)?; - let canonical = chain - .block_root_at_slot(block.slot(), WhenSlotSkipped::None) - .map_err(warp_utils::reject::beacon_chain_error)? - .map_or(false, |canonical| root == canonical); + let canonical = chain + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) + .map_err(warp_utils::reject::beacon_chain_error)? + .map_or(false, |canonical| root == canonical); - let data = api_types::BlockHeaderData { - root, - canonical, - header: api_types::BlockHeaderAndSignature { - message: block.message().block_header(), - signature: block.signature().clone().into(), - }, - }; + let data = api_types::BlockHeaderData { + root, + canonical, + header: api_types::BlockHeaderAndSignature { + message: block.message().block_header(), + signature: block.signature().clone().into(), + }, + }; - Ok(api_types::ExecutionOptimisticFinalizedResponse { - execution_optimistic: Some(execution_optimistic), - finalized: Some(finalized), - data, + Ok(api_types::ExecutionOptimisticFinalizedResponse { + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + data, + }) }) - }) - }); + }, + ); /* * beacon/blocks @@ -1216,24 +1272,28 @@ pub fn serve( .and(warp::path("blocks")) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( |block_contents: SignedBlockContents, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - publish_blocks::publish_block( - None, - ProvenancedBlock::local(block_contents), - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + publish_blocks::publish_block( + None, + ProvenancedBlock::local(block_contents), + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1278,35 +1338,39 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( |validation_level: api_types::BroadcastValidationQuery, block_contents: SignedBlockContents, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - match publish_blocks::publish_block( - None, - ProvenancedBlock::local(block_contents), - chain, - &network_tx, - log, - validation_level.broadcast_validation, - ) - .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + log: Logger| { + task_spawner.spawn_async(Priority::P1, async move { + match publish_blocks::publish_block( + None, + ProvenancedBlock::local(block_contents), + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + { + Ok(()) => warp::reply().into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + StatusCode::INTERNAL_SERVER_ERROR, + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } + }) }, ); @@ -1371,23 +1435,27 @@ pub fn serve( .and(warp::path("blinded_blocks")) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( |block: SignedBlockContents>, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - publish_blocks::publish_blinded_block( - block, - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + publish_blocks::publish_blinded_block( + block, + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1433,34 +1501,38 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( |validation_level: api_types::BroadcastValidationQuery, block_contents: SignedBlockContents>, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - match publish_blocks::publish_blinded_block( - block_contents, - chain, - &network_tx, - log, - validation_level.broadcast_validation, - ) - .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + log: Logger| { + task_spawner.spawn_async(Priority::P0, async move { + match publish_blocks::publish_blinded_block( + block_contents, + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + { + Ok(()) => warp::reply().into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + StatusCode::INTERNAL_SERVER_ERROR, + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } + }) }, ); @@ -1525,12 +1597,14 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("blocks")) .and(block_id_or_err) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); let beacon_blocks_path_any = any_version .and(warp::path("beacon")) .and(warp::path("blocks")) .and(block_id_or_err) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); // GET beacon/blocks/{block_id} @@ -1541,9 +1615,10 @@ pub fn serve( .and_then( |endpoint_version: EndpointVersion, block_id: BlockId, + task_spawner: TaskSpawner, chain: Arc>, accept_header: Option| { - async move { + task_spawner.spawn_async_with_rejection(Priority::P1, async move { let (block, execution_optimistic, finalized) = block_id.full_block(&chain).await?; let fork_name = block @@ -1571,7 +1646,7 @@ pub fn serve( .map(|res| warp::reply::json(&res).into_response()), } .map(|resp| add_consensus_version_header(resp, fork_name)) - } + }) }, ); @@ -1580,44 +1655,56 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then(|block_id: BlockId, chain: Arc>| { - blocking_json_task(move || { - let (block, execution_optimistic, finalized) = block_id.blinded_block(&chain)?; - Ok(api_types::GenericResponse::from(api_types::RootData::from( - block.canonical_root(), - )) - .add_execution_optimistic_finalized(execution_optimistic, finalized)) - }) - }); + .and_then( + |block_id: BlockId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (block, execution_optimistic, finalized) = + block_id.blinded_block(&chain)?; + Ok(api_types::GenericResponse::from(api_types::RootData::from( + block.canonical_root(), + )) + .add_execution_optimistic_finalized(execution_optimistic, finalized)) + }) + }, + ); // GET beacon/blocks/{block_id}/attestations let get_beacon_block_attestations = beacon_blocks_path_v1 .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and_then(|block_id: BlockId, chain: Arc>| { - blocking_json_task(move || { - let (block, execution_optimistic, finalized) = block_id.blinded_block(&chain)?; - Ok( - api_types::GenericResponse::from(block.message().body().attestations().clone()) - .add_execution_optimistic_finalized(execution_optimistic, finalized), - ) - }) - }); + .and_then( + |block_id: BlockId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (block, execution_optimistic, finalized) = + block_id.blinded_block(&chain)?; + Ok(api_types::GenericResponse::from( + block.message().body().attestations().clone(), + ) + .add_execution_optimistic_finalized(execution_optimistic, finalized)) + }) + }, + ); // GET beacon/blinded_blocks/{block_id} let get_beacon_blinded_block = eth_v1 .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(block_id_or_err) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(warp::path::end()) .and(warp::header::optional::("accept")) .and_then( |block_id: BlockId, + task_spawner: TaskSpawner, chain: Arc>, accept_header: Option| { - blocking_response_task(move || { + task_spawner.blocking_response_task(Priority::P1, move || { let (block, execution_optimistic, finalized) = block_id.blinded_block(&chain)?; let fork_name = block @@ -1700,6 +1787,7 @@ pub fn serve( let beacon_pool_path = eth_v1 .and(warp::path("beacon")) .and(warp::path("pool")) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); // POST beacon/pool/attestations @@ -1711,11 +1799,12 @@ pub fn serve( .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, attestations: Vec>, network_tx: UnboundedSender>, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { let seen_timestamp = timestamp_now(); let mut failures = Vec::new(); let mut num_already_known = 0; @@ -1852,8 +1941,10 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and_then( - |chain: Arc>, query: api_types::AttestationPoolQuery| { - blocking_json_task(move || { + |task_spawner: TaskSpawner, + chain: Arc>, + query: api_types::AttestationPoolQuery| { + task_spawner.blocking_json_task(Priority::P1, move || { let query_filter = |data: &AttestationData| { query.slot.map_or(true, |slot| slot == data.slot) && query @@ -1883,10 +1974,11 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, slashing: AttesterSlashing, network_tx: UnboundedSender>| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { let outcome = chain .verify_attester_slashing_for_gossip(slashing.clone()) .map_err(|e| { @@ -1923,12 +2015,14 @@ pub fn serve( .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let attestations = chain.op_pool.get_all_attester_slashings(); - Ok(api_types::GenericResponse::from(attestations)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let attestations = chain.op_pool.get_all_attester_slashings(); + Ok(api_types::GenericResponse::from(attestations)) + }) + }, + ); // POST beacon/pool/proposer_slashings let post_beacon_pool_proposer_slashings = beacon_pool_path @@ -1938,10 +2032,11 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, slashing: ProposerSlashing, network_tx: UnboundedSender>| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { let outcome = chain .verify_proposer_slashing_for_gossip(slashing.clone()) .map_err(|e| { @@ -1978,12 +2073,14 @@ pub fn serve( .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let attestations = chain.op_pool.get_all_proposer_slashings(); - Ok(api_types::GenericResponse::from(attestations)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let attestations = chain.op_pool.get_all_proposer_slashings(); + Ok(api_types::GenericResponse::from(attestations)) + }) + }, + ); // POST beacon/pool/voluntary_exits let post_beacon_pool_voluntary_exits = beacon_pool_path @@ -1993,10 +2090,11 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, exit: SignedVoluntaryExit, network_tx: UnboundedSender>| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { let outcome = chain .verify_voluntary_exit_for_gossip(exit.clone()) .map_err(|e| { @@ -2031,12 +2129,14 @@ pub fn serve( .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let attestations = chain.op_pool.get_all_voluntary_exits(); - Ok(api_types::GenericResponse::from(attestations)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let attestations = chain.op_pool.get_all_voluntary_exits(); + Ok(api_types::GenericResponse::from(attestations)) + }) + }, + ); // POST beacon/pool/sync_committees let post_beacon_pool_sync_committees = beacon_pool_path @@ -2047,11 +2147,12 @@ pub fn serve( .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, signatures: Vec, network_tx: UnboundedSender>, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { sync_committees::process_sync_committee_signatures( signatures, network_tx, &chain, log, )?; @@ -2065,12 +2166,14 @@ pub fn serve( .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let address_changes = chain.op_pool.get_all_bls_to_execution_changes(); - Ok(api_types::GenericResponse::from(address_changes)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let address_changes = chain.op_pool.get_all_bls_to_execution_changes(); + Ok(api_types::GenericResponse::from(address_changes)) + }) + }, + ); // POST beacon/pool/bls_to_execution_changes let post_beacon_pool_bls_to_execution_changes = beacon_pool_path @@ -2081,11 +2184,12 @@ pub fn serve( .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, address_changes: Vec, network_tx: UnboundedSender>, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { let mut failures = vec![]; for (index, address_change) in address_changes.into_iter().enumerate() { @@ -2169,10 +2273,13 @@ pub fn serve( .and(warp::path("deposit_snapshot")) .and(warp::path::end()) .and(warp::header::optional::("accept")) + .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) .and_then( - |accept_header: Option, eth1_service: eth1::Service| { - blocking_response_task(move || match accept_header { + |accept_header: Option, + task_spawner: TaskSpawner, + eth1_service: eth1::Service| { + task_spawner.blocking_response_task(Priority::P1, move || match accept_header { Some(api_types::Accept::Json) | None => { let snapshot = eth1_service.get_deposit_snapshot(); Ok( @@ -2213,6 +2320,7 @@ pub fn serve( let beacon_rewards_path = eth_v1 .and(warp::path("beacon")) .and(warp::path("rewards")) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); // GET beacon/rewards/blocks/{block_id} @@ -2221,17 +2329,21 @@ pub fn serve( .and(warp::path("blocks")) .and(block_id_or_err) .and(warp::path::end()) - .and_then(|chain: Arc>, block_id: BlockId| { - blocking_json_task(move || { - let (rewards, execution_optimistic, finalized) = - standard_block_rewards::compute_beacon_block_rewards(chain, block_id)?; - Ok(rewards) - .map(api_types::GenericResponse::from) - .map(|resp| { - resp.add_execution_optimistic_finalized(execution_optimistic, finalized) - }) - }) - }); + .and_then( + |task_spawner: TaskSpawner, + chain: Arc>, + block_id: BlockId| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (rewards, execution_optimistic, finalized) = + standard_block_rewards::compute_beacon_block_rewards(chain, block_id)?; + Ok(rewards) + .map(api_types::GenericResponse::from) + .map(|resp| { + resp.add_execution_optimistic_finalized(execution_optimistic, finalized) + }) + }) + }, + ); /* * beacon/rewards @@ -2240,6 +2352,7 @@ pub fn serve( let beacon_rewards_path = eth_v1 .and(warp::path("beacon")) .and(warp::path("rewards")) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); // POST beacon/rewards/attestations/{epoch} @@ -2250,8 +2363,11 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and_then( - |chain: Arc>, epoch: Epoch, validators: Vec| { - blocking_json_task(move || { + |task_spawner: TaskSpawner, + chain: Arc>, + epoch: Epoch, + validators: Vec| { + task_spawner.blocking_json_task(Priority::P1, move || { let attestation_rewards = chain .compute_attestation_rewards(epoch, validators) .map_err(|e| match e { @@ -2299,11 +2415,12 @@ pub fn serve( .and(warp::body::json()) .and(log_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, block_id: BlockId, validators: Vec, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let (rewards, execution_optimistic, finalized) = sync_committee_rewards::compute_sync_committee_rewards( chain, block_id, validators, log, @@ -2328,46 +2445,55 @@ pub fn serve( let get_config_fork_schedule = config_path .and(warp::path("fork_schedule")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let forks = ForkName::list_all() - .into_iter() - .filter_map(|fork_name| chain.spec.fork_for_name(fork_name)) - .collect::>(); - Ok(api_types::GenericResponse::from(forks)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let forks = ForkName::list_all() + .into_iter() + .filter_map(|fork_name| chain.spec.fork_for_name(fork_name)) + .collect::>(); + Ok(api_types::GenericResponse::from(forks)) + }) + }, + ); // GET config/spec let spec_fork_name = ctx.config.spec_fork_name; let get_config_spec = config_path .and(warp::path("spec")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(move |chain: Arc>| { - blocking_json_task(move || { - let config_and_preset = - ConfigAndPreset::from_chain_spec::(&chain.spec, spec_fork_name); - Ok(api_types::GenericResponse::from(config_and_preset)) - }) - }); + .and_then( + move |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let config_and_preset = + ConfigAndPreset::from_chain_spec::(&chain.spec, spec_fork_name); + Ok(api_types::GenericResponse::from(config_and_preset)) + }) + }, + ); // GET config/deposit_contract let get_config_deposit_contract = config_path .and(warp::path("deposit_contract")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - Ok(api_types::GenericResponse::from( - api_types::DepositContractData { - address: chain.spec.deposit_contract_address, - chain_id: chain.spec.deposit_chain_id, - }, - )) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + Ok(api_types::GenericResponse::from( + api_types::DepositContractData { + address: chain.spec.deposit_contract_address, + chain_id: chain.spec.deposit_chain_id, + }, + )) + }) + }, + ); /* * debug @@ -2385,13 +2511,15 @@ pub fn serve( })) .and(warp::path::end()) .and(warp::header::optional::("accept")) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( |endpoint_version: EndpointVersion, state_id: StateId, accept_header: Option, + task_spawner: TaskSpawner, chain: Arc>| { - blocking_response_task(move || match accept_header { + task_spawner.blocking_response_task(Priority::P1, move || match accept_header { Some(api_types::Accept::Ssz) => { // We can ignore the optimistic status for the "fork" since it's a // specification constant that doesn't change across competing heads of the @@ -2443,10 +2571,13 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("heads")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |endpoint_version: EndpointVersion, chain: Arc>| { - blocking_json_task(move || { + |endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { let heads = chain .heads() .into_iter() @@ -2479,48 +2610,51 @@ pub fn serve( .and(warp::path("debug")) .and(warp::path("fork_choice")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let beacon_fork_choice = chain.canonical_head.fork_choice_read_lock(); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let beacon_fork_choice = chain.canonical_head.fork_choice_read_lock(); - let proto_array = beacon_fork_choice.proto_array().core_proto_array(); + let proto_array = beacon_fork_choice.proto_array().core_proto_array(); - let fork_choice_nodes = proto_array - .nodes - .iter() - .map(|node| { - let execution_status = if node.execution_status.is_execution_enabled() { - Some(node.execution_status.to_string()) - } else { - None - }; + let fork_choice_nodes = proto_array + .nodes + .iter() + .map(|node| { + let execution_status = if node.execution_status.is_execution_enabled() { + Some(node.execution_status.to_string()) + } else { + None + }; - ForkChoiceNode { - slot: node.slot, - block_root: node.root, - parent_root: node - .parent - .and_then(|index| proto_array.nodes.get(index)) - .map(|parent| parent.root), - justified_epoch: node.justified_checkpoint.epoch, - finalized_epoch: node.finalized_checkpoint.epoch, - weight: node.weight, - validity: execution_status, - execution_block_hash: node - .execution_status - .block_hash() - .map(|block_hash| block_hash.into_root()), - } + ForkChoiceNode { + slot: node.slot, + block_root: node.root, + parent_root: node + .parent + .and_then(|index| proto_array.nodes.get(index)) + .map(|parent| parent.root), + justified_epoch: node.justified_checkpoint.epoch, + finalized_epoch: node.finalized_checkpoint.epoch, + weight: node.weight, + validity: execution_status, + execution_block_hash: node + .execution_status + .block_hash() + .map(|block_hash| block_hash.into_root()), + } + }) + .collect::>(); + Ok(ForkChoice { + justified_checkpoint: proto_array.justified_checkpoint, + finalized_checkpoint: proto_array.finalized_checkpoint, + fork_choice_nodes, }) - .collect::>(); - Ok(ForkChoice { - justified_checkpoint: proto_array.justified_checkpoint, - finalized_checkpoint: proto_array.finalized_checkpoint, - fork_choice_nodes, }) - }) - }); + }, + ); /* * node @@ -2531,50 +2665,54 @@ pub fn serve( .and(warp::path("node")) .and(warp::path("identity")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then(|network_globals: Arc>| { - blocking_json_task(move || { - let enr = network_globals.local_enr(); - let p2p_addresses = enr.multiaddr_p2p_tcp(); - let discovery_addresses = enr.multiaddr_p2p_udp(); - let meta_data = network_globals.local_metadata.read(); - Ok(api_types::GenericResponse::from(api_types::IdentityData { - peer_id: network_globals.local_peer_id().to_base58(), - enr, - p2p_addresses, - discovery_addresses, - metadata: api_types::MetaData { - seq_number: *meta_data.seq_number(), - attnets: format!( - "0x{}", - hex::encode(meta_data.attnets().clone().into_bytes()), - ), - syncnets: format!( - "0x{}", - hex::encode( - meta_data - .syncnets() - .map(|x| x.clone()) - .unwrap_or_default() - .into_bytes() - ) - ), - }, - })) - }) - }); + .and_then( + |task_spawner: TaskSpawner, + network_globals: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let enr = network_globals.local_enr(); + let p2p_addresses = enr.multiaddr_p2p_tcp(); + let discovery_addresses = enr.multiaddr_p2p_udp(); + let meta_data = network_globals.local_metadata.read(); + Ok(api_types::GenericResponse::from(api_types::IdentityData { + peer_id: network_globals.local_peer_id().to_base58(), + enr, + p2p_addresses, + discovery_addresses, + metadata: api_types::MetaData { + seq_number: *meta_data.seq_number(), + attnets: format!( + "0x{}", + hex::encode(meta_data.attnets().clone().into_bytes()), + ), + syncnets: format!( + "0x{}", + hex::encode( + meta_data + .syncnets() + .map(|x| x.clone()) + .unwrap_or_default() + .into_bytes() + ) + ), + }, + })) + }) + }, + ); // GET node/version let get_node_version = eth_v1 .and(warp::path("node")) .and(warp::path("version")) .and(warp::path::end()) - .and_then(|| { - blocking_json_task(move || { - Ok(api_types::GenericResponse::from(api_types::VersionData { - version: version_with_platform(), - })) - }) + // Bypass the `task_spawner` since this method returns a static string. + .then(|| async { + warp::reply::json(&api_types::GenericResponse::from(api_types::VersionData { + version: version_with_platform(), + })) + .into_response() }); // GET node/syncing @@ -2582,10 +2720,13 @@ pub fn serve( .and(warp::path("node")) .and(warp::path("syncing")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) .and_then( - |network_globals: Arc>, chain: Arc>| { + |task_spawner: TaskSpawner, + network_globals: Arc>, + chain: Arc>| { async move { let el_offline = if let Some(el) = &chain.execution_layer { el.is_offline_or_erroring().await @@ -2593,32 +2734,34 @@ pub fn serve( true }; - blocking_json_task(move || { - let head_slot = chain.canonical_head.cached_head().head_slot(); - let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| { - warp_utils::reject::custom_server_error( - "Unable to read slot clock".into(), - ) - })?; + task_spawner + .blocking_json_task(Priority::P0, move || { + let head_slot = chain.canonical_head.cached_head().head_slot(); + let current_slot = + chain.slot_clock.now_or_genesis().ok_or_else(|| { + warp_utils::reject::custom_server_error( + "Unable to read slot clock".into(), + ) + })?; - // Taking advantage of saturating subtraction on slot. - let sync_distance = current_slot - head_slot; + // Taking advantage of saturating subtraction on slot. + let sync_distance = current_slot - head_slot; - let is_optimistic = chain - .is_optimistic_or_invalid_head() - .map_err(warp_utils::reject::beacon_chain_error)?; + let is_optimistic = chain + .is_optimistic_or_invalid_head() + .map_err(warp_utils::reject::beacon_chain_error)?; - let syncing_data = api_types::SyncingData { - is_syncing: network_globals.sync_state.read().is_syncing(), - is_optimistic: Some(is_optimistic), - el_offline: Some(el_offline), - head_slot, - sync_distance, - }; + let syncing_data = api_types::SyncingData { + is_syncing: network_globals.sync_state.read().is_syncing(), + is_optimistic: Some(is_optimistic), + el_offline: Some(el_offline), + head_slot, + sync_distance, + }; - Ok(api_types::GenericResponse::from(syncing_data)) - }) - .await + Ok(api_types::GenericResponse::from(syncing_data)) + }) + .await } }, ); @@ -2628,10 +2771,13 @@ pub fn serve( .and(warp::path("node")) .and(warp::path("health")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) .and_then( - |network_globals: Arc>, chain: Arc>| { + |task_spawner: TaskSpawner, + network_globals: Arc>, + chain: Arc>| { async move { let el_offline = if let Some(el) = &chain.execution_layer { el.is_offline_or_erroring().await @@ -2639,28 +2785,31 @@ pub fn serve( true }; - blocking_response_task(move || { - let is_optimistic = chain - .is_optimistic_or_invalid_head() - .map_err(warp_utils::reject::beacon_chain_error)?; + task_spawner + .blocking_response_task(Priority::P0, move || { + let is_optimistic = chain + .is_optimistic_or_invalid_head() + .map_err(warp_utils::reject::beacon_chain_error)?; - let is_syncing = !network_globals.sync_state.read().is_synced(); + let is_syncing = !network_globals.sync_state.read().is_synced(); - if el_offline { - Err(warp_utils::reject::not_synced("execution layer is offline".to_string())) - } else if is_syncing || is_optimistic { - Ok(warp::reply::with_status( - warp::reply(), - warp::http::StatusCode::PARTIAL_CONTENT, - )) - } else { - Ok(warp::reply::with_status( - warp::reply(), - warp::http::StatusCode::OK, - )) - } - }) - .await + if el_offline { + Err(warp_utils::reject::not_synced( + "execution layer is offline".to_string(), + )) + } else if is_syncing || is_optimistic { + Ok(warp::reply::with_status( + warp::reply(), + warp::http::StatusCode::PARTIAL_CONTENT, + )) + } else { + Ok(warp::reply::with_status( + warp::reply(), + warp::http::StatusCode::OK, + )) + } + }) + .await } }, ); @@ -2671,10 +2820,13 @@ pub fn serve( .and(warp::path("peers")) .and(warp::path::param::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and_then( - |requested_peer_id: String, network_globals: Arc>| { - blocking_json_task(move || { + |requested_peer_id: String, + task_spawner: TaskSpawner, + network_globals: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { let peer_id = PeerId::from_bytes( &bs58::decode(requested_peer_id.as_str()) .into_vec() @@ -2728,11 +2880,13 @@ pub fn serve( .and(warp::path("peers")) .and(warp::path::end()) .and(multi_key_query::()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and_then( |query_res: Result, + task_spawner: TaskSpawner, network_globals: Arc>| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let query = query_res?; let mut peers: Vec = Vec::new(); network_globals @@ -2796,38 +2950,42 @@ pub fn serve( .and(warp::path("node")) .and(warp::path("peer_count")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then(|network_globals: Arc>| { - blocking_json_task(move || { - let mut connected: u64 = 0; - let mut connecting: u64 = 0; - let mut disconnected: u64 = 0; - let mut disconnecting: u64 = 0; + .and_then( + |task_spawner: TaskSpawner, + network_globals: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let mut connected: u64 = 0; + let mut connecting: u64 = 0; + let mut disconnected: u64 = 0; + let mut disconnecting: u64 = 0; - network_globals - .peers - .read() - .peers() - .for_each(|(_, peer_info)| { - let state = api_types::PeerState::from_peer_connection_status( - peer_info.connection_status(), - ); - match state { - api_types::PeerState::Connected => connected += 1, - api_types::PeerState::Connecting => connecting += 1, - api_types::PeerState::Disconnected => disconnected += 1, - api_types::PeerState::Disconnecting => disconnecting += 1, - } - }); + network_globals + .peers + .read() + .peers() + .for_each(|(_, peer_info)| { + let state = api_types::PeerState::from_peer_connection_status( + peer_info.connection_status(), + ); + match state { + api_types::PeerState::Connected => connected += 1, + api_types::PeerState::Connecting => connecting += 1, + api_types::PeerState::Disconnected => disconnected += 1, + api_types::PeerState::Disconnecting => disconnecting += 1, + } + }); - Ok(api_types::GenericResponse::from(api_types::PeerCount { - connected, - connecting, - disconnected, - disconnecting, - })) - }) - }); + Ok(api_types::GenericResponse::from(api_types::PeerCount { + connected, + connecting, + disconnected, + disconnecting, + })) + }) + }, + ); /* * validator */ @@ -2844,11 +3002,19 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then(|epoch: Epoch, chain: Arc>, log: Logger| { - blocking_json_task(move || proposer_duties::proposer_duties(epoch, &chain, &log)) - }); + .and_then( + |epoch: Epoch, + task_spawner: TaskSpawner, + chain: Arc>, + log: Logger| { + task_spawner.blocking_json_task(Priority::P0, move || { + proposer_duties::proposer_duties(epoch, &chain, &log) + }) + }, + ); // GET validator/blocks/{slot} let get_validator_blocks = any_version @@ -2862,60 +3028,64 @@ pub fn serve( .and(warp::path::end()) .and(not_while_syncing_filter.clone()) .and(warp::query::()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) .and_then( |endpoint_version: EndpointVersion, slot: Slot, query: api_types::ValidatorBlocksQuery, + task_spawner: TaskSpawner, chain: Arc>, - log: Logger| async move { - debug!( - log, - "Block production request from HTTP API"; - "slot" => slot - ); + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + debug!( + log, + "Block production request from HTTP API"; + "slot" => slot + ); - let randao_reveal = query.randao_reveal.decompress().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not a valid BLS signature: {:?}", - e - )) - })?; + let randao_reveal = query.randao_reveal.decompress().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not a valid BLS signature: {:?}", + e + )) + })?; - let randao_verification = - if query.skip_randao_verification == SkipRandaoVerification::Yes { - if !randao_reveal.is_infinity() { - return Err(warp_utils::reject::custom_bad_request( - "randao_reveal must be point-at-infinity if verification is skipped" - .into(), - )); - } - ProduceBlockVerification::NoVerification - } else { - ProduceBlockVerification::VerifyRandao - }; + let randao_verification = + if query.skip_randao_verification == SkipRandaoVerification::Yes { + if !randao_reveal.is_infinity() { + return Err(warp_utils::reject::custom_bad_request( + "randao_reveal must be point-at-infinity if verification is skipped" + .into(), + )); + } + ProduceBlockVerification::NoVerification + } else { + ProduceBlockVerification::VerifyRandao + }; - let (block, _) = chain - .produce_block_with_verification::>( - randao_reveal, - slot, - query.graffiti.map(Into::into), - randao_verification, - ) - .await - .map_err(warp_utils::reject::block_production_error)?; - let fork_name = block - .to_ref() - .fork_name(&chain.spec) - .map_err(inconsistent_fork_rejection)?; + let (block, _) = chain + .produce_block_with_verification::>( + randao_reveal, + slot, + query.graffiti.map(Into::into), + randao_verification, + ) + .await + .map_err(warp_utils::reject::block_production_error)?; + let fork_name = block + .to_ref() + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; - let block_contents = - build_block_contents::build_block_contents(fork_name, chain, block); + let block_contents = + build_block_contents::build_block_contents(fork_name, chain, block); - fork_versioned_response(endpoint_version, fork_name, block_contents?) - .map(|response| warp::reply::json(&response).into_response()) - .map(|res| add_consensus_version_header(res, fork_name)) + fork_versioned_response(endpoint_version, fork_name, block_contents?) + .map(|response| warp::reply::json(&response).into_response()) + .map(|res| add_consensus_version_header(res, fork_name)) + }) }, ); @@ -2931,49 +3101,53 @@ pub fn serve( .and(warp::path::end()) .and(not_while_syncing_filter.clone()) .and(warp::query::()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( |slot: Slot, query: api_types::ValidatorBlocksQuery, - chain: Arc>| async move { - let randao_reveal = query.randao_reveal.decompress().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not a valid BLS signature: {:?}", - e - )) - })?; + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let randao_reveal = query.randao_reveal.decompress().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not a valid BLS signature: {:?}", + e + )) + })?; - let randao_verification = - if query.skip_randao_verification == SkipRandaoVerification::Yes { - if !randao_reveal.is_infinity() { - return Err(warp_utils::reject::custom_bad_request( + let randao_verification = + if query.skip_randao_verification == SkipRandaoVerification::Yes { + if !randao_reveal.is_infinity() { + return Err(warp_utils::reject::custom_bad_request( "randao_reveal must be point-at-infinity if verification is skipped" .into() )); - } - ProduceBlockVerification::NoVerification - } else { - ProduceBlockVerification::VerifyRandao - }; + } + ProduceBlockVerification::NoVerification + } else { + ProduceBlockVerification::VerifyRandao + }; - let (block, _) = chain - .produce_block_with_verification::>( - randao_reveal, - slot, - query.graffiti.map(Into::into), - randao_verification, - ) - .await - .map_err(warp_utils::reject::block_production_error)?; - let fork_name = block - .to_ref() - .fork_name(&chain.spec) - .map_err(inconsistent_fork_rejection)?; + let (block, _) = chain + .produce_block_with_verification::>( + randao_reveal, + slot, + query.graffiti.map(Into::into), + randao_verification, + ) + .await + .map_err(warp_utils::reject::block_production_error)?; + let fork_name = block + .to_ref() + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; - // Pose as a V2 endpoint so we return the fork `version`. - fork_versioned_response(V2, fork_name, block) - .map(|response| warp::reply::json(&response).into_response()) - .map(|res| add_consensus_version_header(res, fork_name)) + // Pose as a V2 endpoint so we return the fork `version`. + fork_versioned_response(V2, fork_name, block) + .map(|response| warp::reply::json(&response).into_response()) + .map(|res| add_consensus_version_header(res, fork_name)) + }) }, ); @@ -2984,10 +3158,13 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { - blocking_json_task(move || { + |query: api_types::ValidatorAttestationDataQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { let current_slot = chain .slot() .map_err(warp_utils::reject::beacon_chain_error)?; @@ -3016,10 +3193,13 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |query: api_types::ValidatorAggregateAttestationQuery, chain: Arc>| { - blocking_json_task(move || { + |query: api_types::ValidatorAggregateAttestationQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { chain .get_aggregated_attestation_by_slot_and_root( query.slot, @@ -3054,10 +3234,14 @@ pub fn serve( .and(warp::path::end()) .and(not_while_syncing_filter.clone()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc>| { - blocking_json_task(move || { + |epoch: Epoch, + indices: api_types::ValidatorIndexData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { attester_duties::attester_duties(epoch, &indices.0, &chain) }) }, @@ -3076,10 +3260,14 @@ pub fn serve( .and(warp::path::end()) .and(not_while_syncing_filter.clone()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc>| { - blocking_json_task(move || { + |epoch: Epoch, + indices: api_types::ValidatorIndexData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { sync_committees::sync_committee_duties(epoch, &indices.0, &chain) }) }, @@ -3092,10 +3280,13 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |sync_committee_data: SyncContributionData, chain: Arc>| { - blocking_json_task(move || { + |sync_committee_data: SyncContributionData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { chain .get_aggregated_sync_committee_contribution(&sync_committee_data) .map_err(|e| { @@ -3120,15 +3311,17 @@ pub fn serve( .and(warp::path("aggregate_and_proofs")) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, aggregates: Vec>, network_tx: UnboundedSender>, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { let seen_timestamp = timestamp_now(); let mut verified_aggregates = Vec::with_capacity(aggregates.len()); let mut messages = Vec::with_capacity(aggregates.len()); @@ -3231,16 +3424,18 @@ pub fn serve( .and(warp::path("contribution_and_proofs")) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter) .and(log_filter.clone()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, contributions: Vec>, network_tx: UnboundedSender>, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { sync_committees::process_signed_contribution_and_proofs( contributions, network_tx, @@ -3259,14 +3454,16 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(validator_subscription_tx_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) .and_then( |subscriptions: Vec, validator_subscription_tx: Sender, + task_spawner: TaskSpawner, chain: Arc>, log: Logger| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { for subscription in &subscriptions { chain .validator_monitor @@ -3308,45 +3505,49 @@ pub fn serve( .and(warp::path("prepare_beacon_proposer")) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, log: Logger, - preparation_data: Vec| async move { - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(BeaconChainError::ExecutionLayerMissing) - .map_err(warp_utils::reject::beacon_chain_error)?; + preparation_data: Vec| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::beacon_chain_error)?; - let current_slot = chain - .slot() - .map_err(warp_utils::reject::beacon_chain_error)?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + let current_slot = chain + .slot() + .map_err(warp_utils::reject::beacon_chain_error)?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - debug!( - log, - "Received proposer preparation data"; - "count" => preparation_data.len(), - ); + debug!( + log, + "Received proposer preparation data"; + "count" => preparation_data.len(), + ); - execution_layer - .update_proposer_preparation(current_epoch, &preparation_data) - .await; + execution_layer + .update_proposer_preparation(current_epoch, &preparation_data) + .await; - chain - .prepare_beacon_proposer(current_slot) - .await - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "error updating proposer preparations: {:?}", - e - )) - })?; + chain + .prepare_beacon_proposer(current_slot) + .await + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; - Ok::<_, warp::reject::Rejection>(warp::reply::json(&()).into_response()) + Ok::<_, warp::reject::Rejection>(warp::reply::json(&()).into_response()) + }) }, ); @@ -3355,127 +3556,171 @@ pub fn serve( .and(warp::path("validator")) .and(warp::path("register_validator")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) .and_then( - |chain: Arc>, + |task_spawner: TaskSpawner, + chain: Arc>, log: Logger, - register_val_data: Vec| async move { - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(BeaconChainError::ExecutionLayerMissing) - .map_err(warp_utils::reject::beacon_chain_error)?; - let current_slot = chain - .slot_clock - .now_or_genesis() - .ok_or(BeaconChainError::UnableToReadSlot) - .map_err(warp_utils::reject::beacon_chain_error)?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + register_val_data: Vec| async { + let (tx, rx) = oneshot::channel(); - debug!( - log, - "Received register validator request"; - "count" => register_val_data.len(), - ); + task_spawner + .spawn_async_with_rejection(Priority::P0, async move { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::beacon_chain_error)?; + let current_slot = chain + .slot_clock + .now_or_genesis() + .ok_or(BeaconChainError::UnableToReadSlot) + .map_err(warp_utils::reject::beacon_chain_error)?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - let head_snapshot = chain.head_snapshot(); - let spec = &chain.spec; - - let (preparation_data, filtered_registration_data): ( - Vec, - Vec, - ) = register_val_data - .into_iter() - .filter_map(|register_data| { - chain - .validator_index(®ister_data.message.pubkey) - .ok() - .flatten() - .and_then(|validator_index| { - let validator = head_snapshot - .beacon_state - .get_validator(validator_index) - .ok()?; - let validator_status = ValidatorStatus::from_validator( - validator, - current_epoch, - spec.far_future_epoch, - ) - .superstatus(); - let is_active_or_pending = - matches!(validator_status, ValidatorStatus::Pending) - || matches!(validator_status, ValidatorStatus::Active); - - // Filter out validators who are not 'active' or 'pending'. - is_active_or_pending.then_some({ - ( - ProposerPreparationData { - validator_index: validator_index as u64, - fee_recipient: register_data.message.fee_recipient, - }, - register_data, - ) - }) - }) - }) - .unzip(); - - // Update the prepare beacon proposer cache based on this request. - execution_layer - .update_proposer_preparation(current_epoch, &preparation_data) - .await; - - // Call prepare beacon proposer blocking with the latest update in order to make - // sure we have a local payload to fall back to in the event of the blinded block - // flow failing. - chain - .prepare_beacon_proposer(current_slot) - .await - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "error updating proposer preparations: {:?}", - e - )) - })?; - - let builder = execution_layer - .builder() - .as_ref() - .ok_or(BeaconChainError::BuilderMissing) - .map_err(warp_utils::reject::beacon_chain_error)?; - - info!( - log, - "Forwarding register validator request to connected builder"; - "count" => filtered_registration_data.len(), - ); - - builder - .post_builder_validators(&filtered_registration_data) - .await - .map(|resp| warp::reply::json(&resp).into_response()) - .map_err(|e| { - warn!( + debug!( log, - "Relay error when registering validator(s)"; - "num_registrations" => filtered_registration_data.len(), - "error" => ?e + "Received register validator request"; + "count" => register_val_data.len(), ); - // Forward the HTTP status code if we are able to, otherwise fall back - // to a server error. - if let eth2::Error::ServerMessage(message) = e { - if message.code == StatusCode::BAD_REQUEST.as_u16() { - return warp_utils::reject::custom_bad_request(message.message); - } else { - // According to the spec this response should only be a 400 or 500, - // so we fall back to a 500 here. - return warp_utils::reject::custom_server_error(message.message); - } - } - warp_utils::reject::custom_server_error(format!("{e:?}")) + + let head_snapshot = chain.head_snapshot(); + let spec = &chain.spec; + + let (preparation_data, filtered_registration_data): ( + Vec, + Vec, + ) = register_val_data + .into_iter() + .filter_map(|register_data| { + chain + .validator_index(®ister_data.message.pubkey) + .ok() + .flatten() + .and_then(|validator_index| { + let validator = head_snapshot + .beacon_state + .get_validator(validator_index) + .ok()?; + let validator_status = ValidatorStatus::from_validator( + validator, + current_epoch, + spec.far_future_epoch, + ) + .superstatus(); + let is_active_or_pending = + matches!(validator_status, ValidatorStatus::Pending) + || matches!( + validator_status, + ValidatorStatus::Active + ); + + // Filter out validators who are not 'active' or 'pending'. + is_active_or_pending.then_some({ + ( + ProposerPreparationData { + validator_index: validator_index as u64, + fee_recipient: register_data + .message + .fee_recipient, + }, + register_data, + ) + }) + }) + }) + .unzip(); + + // Update the prepare beacon proposer cache based on this request. + execution_layer + .update_proposer_preparation(current_epoch, &preparation_data) + .await; + + // Call prepare beacon proposer blocking with the latest update in order to make + // sure we have a local payload to fall back to in the event of the blinded block + // flow failing. + chain + .prepare_beacon_proposer(current_slot) + .await + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; + + info!( + log, + "Forwarding register validator request to connected builder"; + "count" => filtered_registration_data.len(), + ); + + // It's a waste of a `BeaconProcessor` worker to just + // wait on a response from the builder (especially since + // they have frequent timeouts). Spawn a new task and + // send the response back to our original HTTP request + // task via a channel. + let builder_future = async move { + let builder = chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing) + .map_err(warp_utils::reject::beacon_chain_error)? + .builder() + .as_ref() + .ok_or(BeaconChainError::BuilderMissing) + .map_err(warp_utils::reject::beacon_chain_error)?; + + builder + .post_builder_validators(&filtered_registration_data) + .await + .map(|resp| warp::reply::json(&resp).into_response()) + .map_err(|e| { + warn!( + log, + "Relay error when registering validator(s)"; + "num_registrations" => filtered_registration_data.len(), + "error" => ?e + ); + // Forward the HTTP status code if we are able to, otherwise fall back + // to a server error. + if let eth2::Error::ServerMessage(message) = e { + if message.code == StatusCode::BAD_REQUEST.as_u16() { + return warp_utils::reject::custom_bad_request( + message.message, + ); + } else { + // According to the spec this response should only be a 400 or 500, + // so we fall back to a 500 here. + return warp_utils::reject::custom_server_error( + message.message, + ); + } + } + warp_utils::reject::custom_server_error(format!("{e:?}")) + }) + }; + tokio::task::spawn(async move { tx.send(builder_future.await) }); + + // Just send a generic 200 OK from this closure. We'll + // ignore the `Ok` variant and form a proper response + // from what is sent back down the channel. + Ok(warp::reply::reply().into_response()) }) + .await?; + + // Await a response from the builder without blocking a + // `BeaconProcessor` worker. + rx.await.unwrap_or_else(|_| { + Ok(warp::reply::with_status( + warp::reply::json(&"No response from channel"), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response()) + }) }, ); // POST validator/sync_committee_subscriptions @@ -3485,15 +3730,17 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(validator_subscription_tx_filter) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) .and_then( |subscriptions: Vec, validator_subscription_tx: Sender, + task_spawner: TaskSpawner, chain: Arc>, log: Logger | { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P0, move || { for subscription in subscriptions { chain .validator_monitor @@ -3528,10 +3775,14 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |epoch: Epoch, indices: Vec, chain: Arc>| { - blocking_json_task(move || { + |epoch: Epoch, + indices: Vec, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { // Ensure the request is for either the current, previous or next epoch. let current_epoch = chain .epoch() @@ -3565,10 +3816,13 @@ pub fn serve( .and(warp::path("liveness")) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |request_data: api_types::LivenessRequestData, chain: Arc>| { - blocking_json_task(move || { + |request_data: api_types::LivenessRequestData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { // Ensure the request is for either the current, previous or next epoch. let current_epoch = chain .epoch() @@ -3607,8 +3861,9 @@ pub fn serve( let get_lighthouse_health = warp::path("lighthouse") .and(warp::path("health")) .and(warp::path::end()) - .and_then(|| { - blocking_json_task(move || { + .and(task_spawner_filter.clone()) + .and_then(|task_spawner: TaskSpawner| { + task_spawner.blocking_json_task(Priority::P0, move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) .map_err(warp_utils::reject::custom_bad_request) @@ -3620,13 +3875,18 @@ pub fn serve( .and(warp::path("ui")) .and(warp::path("health")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(system_info_filter) .and(app_start_filter) .and(data_dir_filter) .and(network_globals.clone()) .and_then( - |sysinfo, app_start: std::time::Instant, data_dir, network_globals| { - blocking_json_task(move || { + |task_spawner: TaskSpawner, + sysinfo, + app_start: std::time::Instant, + data_dir, + network_globals| { + task_spawner.blocking_json_task(Priority::P0, move || { let app_uptime = app_start.elapsed().as_secs(); Ok(api_types::GenericResponse::from(observe_system_health_bn( sysinfo, @@ -3643,12 +3903,15 @@ pub fn serve( .and(warp::path("ui")) .and(warp::path("validator_count")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - ui::get_validator_count(chain).map(api_types::GenericResponse::from) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + ui::get_validator_count(chain).map(api_types::GenericResponse::from) + }) + }, + ); // POST lighthouse/ui/validator_metrics let post_lighthouse_ui_validator_metrics = warp::path("lighthouse") @@ -3656,10 +3919,13 @@ pub fn serve( .and(warp::path("validator_metrics")) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |request_data: ui::ValidatorMetricsRequestData, chain: Arc>| { - blocking_json_task(move || { + |request_data: ui::ValidatorMetricsRequestData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { ui::post_validator_monitor_metrics(request_data, chain) .map(api_types::GenericResponse::from) }) @@ -3672,10 +3938,13 @@ pub fn serve( .and(warp::path("validator_info")) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |request_data: ui::ValidatorInfoRequestData, chain: Arc>| { - blocking_json_task(move || { + |request_data: ui::ValidatorInfoRequestData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { ui::get_validator_info(request_data, chain) .map(api_types::GenericResponse::from) }) @@ -3686,21 +3955,26 @@ pub fn serve( let get_lighthouse_syncing = warp::path("lighthouse") .and(warp::path("syncing")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then(|network_globals: Arc>| { - blocking_json_task(move || { - Ok(api_types::GenericResponse::from( - network_globals.sync_state(), - )) - }) - }); + .and_then( + |task_spawner: TaskSpawner, + network_globals: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + Ok(api_types::GenericResponse::from( + network_globals.sync_state(), + )) + }) + }, + ); // GET lighthouse/nat let get_lighthouse_nat = warp::path("lighthouse") .and(warp::path("nat")) + .and(task_spawner_filter.clone()) .and(warp::path::end()) - .and_then(|| { - blocking_json_task(move || { + .and_then(|task_spawner: TaskSpawner| { + task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( lighthouse_network::metrics::NAT_OPEN .as_ref() @@ -3715,57 +3989,70 @@ pub fn serve( let get_lighthouse_peers = warp::path("lighthouse") .and(warp::path("peers")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then(|network_globals: Arc>| { - blocking_json_task(move || { - Ok(network_globals - .peers - .read() - .peers() - .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { - peer_id: peer_id.to_string(), - peer_info: peer_info.clone(), - }) - .collect::>()) - }) - }); + .and_then( + |task_spawner: TaskSpawner, + network_globals: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + Ok(network_globals + .peers + .read() + .peers() + .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { + peer_id: peer_id.to_string(), + peer_info: peer_info.clone(), + }) + .collect::>()) + }) + }, + ); // GET lighthouse/peers/connected let get_lighthouse_peers_connected = warp::path("lighthouse") .and(warp::path("peers")) .and(warp::path("connected")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(network_globals) - .and_then(|network_globals: Arc>| { - blocking_json_task(move || { - Ok(network_globals - .peers - .read() - .connected_peers() - .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { - peer_id: peer_id.to_string(), - peer_info: peer_info.clone(), - }) - .collect::>()) - }) - }); + .and_then( + |task_spawner: TaskSpawner, + network_globals: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + Ok(network_globals + .peers + .read() + .connected_peers() + .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { + peer_id: peer_id.to_string(), + peer_info: peer_info.clone(), + }) + .collect::>()) + }) + }, + ); // GET lighthouse/proto_array let get_lighthouse_proto_array = warp::path("lighthouse") .and(warp::path("proto_array")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_response_task(move || { - Ok::<_, warp::Rejection>(warp::reply::json(&api_types::GenericResponseRef::from( - chain - .canonical_head - .fork_choice_read_lock() - .proto_array() - .core_proto_array(), - ))) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_response_task(Priority::P1, move || { + Ok::<_, warp::Rejection>(warp::reply::json( + &api_types::GenericResponseRef::from( + chain + .canonical_head + .fork_choice_read_lock() + .proto_array() + .core_proto_array(), + ), + )) + }) + }, + ); // GET lighthouse/validator_inclusion/{epoch}/{validator_id} let get_lighthouse_validator_inclusion_global = warp::path("lighthouse") @@ -3773,10 +4060,14 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::param::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |epoch: Epoch, validator_id: ValidatorId, chain: Arc>| { - blocking_json_task(move || { + |epoch: Epoch, + validator_id: ValidatorId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { validator_inclusion::validator_inclusion_data(epoch, &validator_id, &chain) .map(api_types::GenericResponse::from) }) @@ -3789,82 +4080,94 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path("global")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|epoch: Epoch, chain: Arc>| { - blocking_json_task(move || { - validator_inclusion::global_validator_inclusion_data(epoch, &chain) - .map(api_types::GenericResponse::from) - }) - }); + .and_then( + |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + validator_inclusion::global_validator_inclusion_data(epoch, &chain) + .map(api_types::GenericResponse::from) + }) + }, + ); // GET lighthouse/eth1/syncing let get_lighthouse_eth1_syncing = warp::path("lighthouse") .and(warp::path("eth1")) .and(warp::path("syncing")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let current_slot_opt = chain.slot().ok(); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let current_slot_opt = chain.slot().ok(); - chain - .eth1_chain - .as_ref() - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "Eth1 sync is disabled. See the --eth1 CLI flag.".to_string(), - ) - }) - .and_then(|eth1| { - eth1.sync_status(chain.genesis_time, current_slot_opt, &chain.spec) - .ok_or_else(|| { - warp_utils::reject::custom_server_error( - "Unable to determine Eth1 sync status".to_string(), - ) - }) - }) - .map(api_types::GenericResponse::from) - }) - }); + chain + .eth1_chain + .as_ref() + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "Eth1 sync is disabled. See the --eth1 CLI flag.".to_string(), + ) + }) + .and_then(|eth1| { + eth1.sync_status(chain.genesis_time, current_slot_opt, &chain.spec) + .ok_or_else(|| { + warp_utils::reject::custom_server_error( + "Unable to determine Eth1 sync status".to_string(), + ) + }) + }) + .map(api_types::GenericResponse::from) + }) + }, + ); // GET lighthouse/eth1/block_cache let get_lighthouse_eth1_block_cache = warp::path("lighthouse") .and(warp::path("eth1")) .and(warp::path("block_cache")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then(|eth1_service: eth1::Service| { - blocking_json_task(move || { - Ok(api_types::GenericResponse::from( - eth1_service - .blocks() - .read() - .iter() - .cloned() - .collect::>(), - )) - }) - }); + .and_then( + |task_spawner: TaskSpawner, eth1_service: eth1::Service| { + task_spawner.blocking_json_task(Priority::P1, move || { + Ok(api_types::GenericResponse::from( + eth1_service + .blocks() + .read() + .iter() + .cloned() + .collect::>(), + )) + }) + }, + ); // GET lighthouse/eth1/deposit_cache let get_lighthouse_eth1_deposit_cache = warp::path("lighthouse") .and(warp::path("eth1")) .and(warp::path("deposit_cache")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(eth1_service_filter) - .and_then(|eth1_service: eth1::Service| { - blocking_json_task(move || { - Ok(api_types::GenericResponse::from( - eth1_service - .deposits() - .read() - .cache - .iter() - .cloned() - .collect::>(), - )) - }) - }); + .and_then( + |task_spawner: TaskSpawner, eth1_service: eth1::Service| { + task_spawner.blocking_json_task(Priority::P1, move || { + Ok(api_types::GenericResponse::from( + eth1_service + .deposits() + .read() + .cache + .iter() + .cloned() + .collect::>(), + )) + }) + }, + ); // GET lighthouse/beacon/states/{state_id}/ssz let get_lighthouse_beacon_states_ssz = warp::path("lighthouse") @@ -3873,42 +4176,50 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path("ssz")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_response_task(move || { - // This debug endpoint provides no indication of optimistic status. - let (state, _execution_optimistic, _finalized) = state_id.state(&chain)?; - Response::builder() - .status(200) - .header("Content-Type", "application/ssz") - .body(state.as_ssz_bytes()) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "failed to create response: {}", - e - )) - }) - }) - }); + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_response_task(Priority::P1, move || { + // This debug endpoint provides no indication of optimistic status. + let (state, _execution_optimistic, _finalized) = state_id.state(&chain)?; + Response::builder() + .status(200) + .header("Content-Type", "application/ssz") + .body(state.as_ssz_bytes()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }) + }) + }, + ); // GET lighthouse/staking let get_lighthouse_staking = warp::path("lighthouse") .and(warp::path("staking")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - if chain.eth1_chain.is_some() { - Ok(()) - } else { - Err(warp_utils::reject::custom_not_found( - "staking is not enabled, \ + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + if chain.eth1_chain.is_some() { + Ok(()) + } else { + Err(warp_utils::reject::custom_not_found( + "staking is not enabled, \ see the --staking CLI flag" - .to_string(), - )) - } - }) - }); + .to_string(), + )) + } + }) + }, + ); let database_path = warp::path("lighthouse").and(warp::path("database")); @@ -3916,21 +4227,29 @@ pub fn serve( let get_lighthouse_database_info = database_path .and(warp::path("info")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| blocking_json_task(move || database::info(chain))); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || database::info(chain)) + }, + ); // POST lighthouse/database/reconstruct let post_lighthouse_database_reconstruct = database_path .and(warp::path("reconstruct")) .and(warp::path::end()) .and(not_while_syncing_filter) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - chain.store_migrator.process_reconstruction(); - Ok("success") - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + chain.store_migrator.process_reconstruction(); + Ok("success") + }) + }, + ); // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") @@ -3938,10 +4257,13 @@ pub fn serve( .and(warp::path("block_rewards")) .and(warp::query::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then(|query, chain, log| { - blocking_json_task(move || block_rewards::get_block_rewards(query, chain, log)) + .and_then(|query, task_spawner: TaskSpawner, chain, log| { + task_spawner.blocking_json_task(Priority::P1, move || { + block_rewards::get_block_rewards(query, chain, log) + }) }); // POST lighthouse/analysis/block_rewards @@ -3950,11 +4272,16 @@ pub fn serve( .and(warp::path("block_rewards")) .and(warp::body::json()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then(|blocks, chain, log| { - blocking_json_task(move || block_rewards::compute_block_rewards(blocks, chain, log)) - }); + .and_then( + |blocks, task_spawner: TaskSpawner, chain, log| { + task_spawner.blocking_json_task(Priority::P1, move || { + block_rewards::compute_block_rewards(blocks, chain, log) + }) + }, + ); // GET lighthouse/analysis/attestation_performance/{index} let get_lighthouse_attestation_performance = warp::path("lighthouse") @@ -3963,12 +4290,15 @@ pub fn serve( .and(warp::path::param::()) .and(warp::query::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|target, query, chain: Arc>| { - blocking_json_task(move || { - attestation_performance::get_attestation_performance(target, query, chain) - }) - }); + .and_then( + |target, query, task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + attestation_performance::get_attestation_performance(target, query, chain) + }) + }, + ); // GET lighthouse/analysis/block_packing_efficiency let get_lighthouse_block_packing_efficiency = warp::path("lighthouse") @@ -3976,35 +4306,45 @@ pub fn serve( .and(warp::path("block_packing_efficiency")) .and(warp::query::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|query, chain: Arc>| { - blocking_json_task(move || { - block_packing_efficiency::get_block_packing_efficiency(query, chain) - }) - }); + .and_then( + |query, task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + block_packing_efficiency::get_block_packing_efficiency(query, chain) + }) + }, + ); // GET lighthouse/merge_readiness let get_lighthouse_merge_readiness = warp::path("lighthouse") .and(warp::path("merge_readiness")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| async move { - let merge_readiness = chain.check_merge_readiness().await; - Ok::<_, warp::reject::Rejection>( - warp::reply::json(&api_types::GenericResponse::from(merge_readiness)) - .into_response(), - ) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P1, async move { + let merge_readiness = chain.check_merge_readiness().await; + Ok::<_, warp::reject::Rejection>( + warp::reply::json(&api_types::GenericResponse::from(merge_readiness)) + .into_response(), + ) + }) + }, + ); let get_events = eth_v1 .and(warp::path("events")) .and(warp::path::end()) .and(multi_key_query::()) + .and(task_spawner_filter.clone()) .and(chain_filter) .and_then( |topics_res: Result, + task_spawner: TaskSpawner, chain: Arc>| { - blocking_response_task(move || { + task_spawner.blocking_response_task(Priority::P0, move || { let topics = topics_res?; // for each topic subscribed spawn a new subscription let mut receivers = Vec::with_capacity(topics.topics.len()); @@ -4075,38 +4415,46 @@ pub fn serve( let lighthouse_log_events = warp::path("lighthouse") .and(warp::path("logs")) .and(warp::path::end()) + .and(task_spawner_filter) .and(sse_component_filter) - .and_then(|sse_component: Option| { - blocking_response_task(move || { - if let Some(logging_components) = sse_component { - // Build a JSON stream - let s = - BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| { - match msg { - Ok(data) => { - // Serialize to json - match data.to_json_string() { - // Send the json as a Server Side Event - Ok(json) => Ok(Event::default().data(json)), - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("Unable to serialize to JSON {}", e), - )), + .and_then( + |task_spawner: TaskSpawner, sse_component: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + if let Some(logging_components) = sse_component { + // Build a JSON stream + let s = BroadcastStream::new(logging_components.sender.subscribe()).map( + |msg| { + match msg { + Ok(data) => { + // Serialize to json + match data.to_json_string() { + // Send the json as a Server Side Event + Ok(json) => Ok(Event::default().data(json)), + Err(e) => { + Err(warp_utils::reject::server_sent_event_error( + format!("Unable to serialize to JSON {}", e), + )) + } + } } + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to receive event {}", e), + )), } - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("Unable to receive event {}", e), - )), - } - }); + }, + ); - Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) - } else { - Err(warp_utils::reject::custom_server_error( - "SSE Logging is not enabled".to_string(), - )) - } - }) - }); + Ok::<_, warp::Rejection>(warp::sse::reply( + warp::sse::keep_alive().stream(s), + )) + } else { + Err(warp_utils::reject::custom_server_error( + "SSE Logging is not enabled".to_string(), + )) + } + }) + }, + ); // Define the ultimate set of routes that will be provided to the server. // Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`). diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs new file mode 100644 index 000000000..b4da67f77 --- /dev/null +++ b/beacon_node/http_api/src/task_spawner.rs @@ -0,0 +1,214 @@ +use beacon_processor::{BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent}; +use serde::Serialize; +use std::future::Future; +use tokio::sync::{mpsc::error::TrySendError, oneshot}; +use types::EthSpec; +use warp::reply::{Reply, Response}; + +/// Maps a request to a queue in the `BeaconProcessor`. +#[derive(Clone, Copy)] +pub enum Priority { + /// The highest priority. + P0, + /// The lowest priority. + P1, +} + +impl Priority { + /// Wrap `self` in a `WorkEvent` with an appropriate priority. + fn work_event(&self, process_fn: BlockingOrAsync) -> WorkEvent { + let work = match self { + Priority::P0 => Work::ApiRequestP0(process_fn), + Priority::P1 => Work::ApiRequestP1(process_fn), + }; + WorkEvent { + drop_during_sync: false, + work, + } + } +} + +/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor. +pub struct TaskSpawner { + /// Used to send tasks to the `BeaconProcessor`. The tokio executor will be + /// used if this is `None`. + beacon_processor_send: Option>, +} + +impl TaskSpawner { + pub fn new(beacon_processor_send: Option>) -> Self { + Self { + beacon_processor_send, + } + } + + /// Executes a "blocking" (non-async) task which returns a `Response`. + pub async fn blocking_response_task( + self, + priority: Priority, + func: F, + ) -> Result + where + F: FnOnce() -> Result + Send + Sync + 'static, + T: Reply + Send + 'static, + { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + // Create a closure that will execute `func` and send the result to + // a channel held by this thread. + let (tx, rx) = oneshot::channel(); + let process_fn = move || { + // Execute the function, collect the return value. + let func_result = func(); + // Send the result down the channel. Ignore any failures; the + // send can only fail if the receiver is dropped. + let _ = tx.send(func_result); + }; + + // Send the function to the beacon processor for execution at some arbitrary time. + match send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Blocking(Box::new(process_fn)), + rx, + ) + .await + { + Ok(result) => result.map(Reply::into_response), + Err(error_response) => Ok(error_response), + } + } else { + // There is no beacon processor so spawn a task directly on the + // tokio executor. + warp_utils::task::blocking_response_task(func).await + } + } + + /// Executes a "blocking" (non-async) task which returns a JSON-serializable + /// object. + pub async fn blocking_json_task( + self, + priority: Priority, + func: F, + ) -> Result + where + F: FnOnce() -> Result + Send + Sync + 'static, + T: Serialize + Send + 'static, + { + let func = || func().map(|t| warp::reply::json(&t).into_response()); + self.blocking_response_task(priority, func).await + } + + /// Executes an async task which may return a `warp::Rejection`. + pub async fn spawn_async_with_rejection( + self, + priority: Priority, + func: impl Future> + Send + Sync + 'static, + ) -> Result { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + // Create a wrapper future that will execute `func` and send the + // result to a channel held by this thread. + let (tx, rx) = oneshot::channel(); + let process_fn = async move { + // Await the future, collect the return value. + let func_result = func.await; + // Send the result down the channel. Ignore any failures; the + // send can only fail if the receiver is dropped. + let _ = tx.send(func_result); + }; + + // Send the function to the beacon processor for execution at some arbitrary time. + send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Async(Box::pin(process_fn)), + rx, + ) + .await + .unwrap_or_else(Result::Ok) + } else { + // There is no beacon processor so spawn a task directly on the + // tokio executor. + tokio::task::spawn(func).await.unwrap_or_else(|e| { + let response = warp::reply::with_status( + warp::reply::json(&format!("Tokio did not execute task: {e:?}")), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(); + Ok(response) + }) + } + } + + /// Executes an async task which always returns a `Response`. + pub async fn spawn_async( + self, + priority: Priority, + func: impl Future + Send + Sync + 'static, + ) -> Response { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + // Create a wrapper future that will execute `func` and send the + // result to a channel held by this thread. + let (tx, rx) = oneshot::channel(); + let process_fn = async move { + // Await the future, collect the return value. + let func_result = func.await; + // Send the result down the channel. Ignore any failures; the + // send can only fail if the receiver is dropped. + let _ = tx.send(func_result); + }; + + // Send the function to the beacon processor for execution at some arbitrary time. + send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Async(Box::pin(process_fn)), + rx, + ) + .await + .unwrap_or_else(|error_response| error_response) + } else { + // There is no beacon processor so spawn a task directly on the + // tokio executor. + tokio::task::spawn(func).await.unwrap_or_else(|e| { + warp::reply::with_status( + warp::reply::json(&format!("Tokio did not execute task: {e:?}")), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response() + }) + } + } +} + +/// Send a task to the beacon processor and await execution. +/// +/// If the task is not executed, return an `Err(response)` with an error message +/// for the API consumer. +async fn send_to_beacon_processor( + beacon_processor_send: &BeaconProcessorSend, + priority: Priority, + process_fn: BlockingOrAsync, + rx: oneshot::Receiver, +) -> Result { + let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) { + Ok(()) => { + match rx.await { + // The beacon processor executed the task and sent a result. + Ok(func_result) => return Ok(func_result), + // The beacon processor dropped the channel without sending a + // result. The beacon processor dropped this task because its + // queues are full or it's shutting down. + Err(_) => "The task did not execute. The server is overloaded or shutting down.", + } + } + Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.", + Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.", + }; + + let error_response = warp::reply::with_status( + warp::reply::json(&error_message), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(); + Err(error_response) +} diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 386de0081..18c2df31d 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -3,6 +3,7 @@ use beacon_chain::{ test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType}, BeaconChain, BeaconChainTypes, }; +use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig}; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; use lighthouse_network::{ @@ -24,7 +25,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; use store::MemoryStore; -use tokio::sync::oneshot; +use task_executor::test_utils::TestRuntime; use types::{ChainSpec, EthSpec}; pub const TCP_PORT: u16 = 42; @@ -37,7 +38,6 @@ pub struct InteractiveTester { pub harness: BeaconChainHarness>, pub client: BeaconNodeHttpClient, pub network_rx: NetworkReceivers, - _server_shutdown: oneshot::Sender<()>, } /// The result of calling `create_api_server`. @@ -46,7 +46,6 @@ pub struct InteractiveTester { pub struct ApiServer> { pub server: SFut, pub listening_socket: SocketAddr, - pub shutdown_tx: oneshot::Sender<()>, pub network_rx: NetworkReceivers, pub local_enr: Enr, pub external_peer_id: PeerId, @@ -93,10 +92,14 @@ impl InteractiveTester { let ApiServer { server, listening_socket, - shutdown_tx: _server_shutdown, network_rx, .. - } = create_api_server(harness.chain.clone(), harness.logger().clone()).await; + } = create_api_server( + harness.chain.clone(), + &harness.runtime, + harness.logger().clone(), + ) + .await; tokio::spawn(server); @@ -114,22 +117,23 @@ impl InteractiveTester { harness, client, network_rx, - _server_shutdown, } } } pub async fn create_api_server( chain: Arc>, + test_runtime: &TestRuntime, log: Logger, ) -> ApiServer> { // Get a random unused port. let port = unused_port::unused_tcp4_port().unwrap(); - create_api_server_on_port(chain, log, port).await + create_api_server_on_port(chain, test_runtime, log, port).await } pub async fn create_api_server_on_port( chain: Arc>, + test_runtime: &TestRuntime, log: Logger, port: u16, ) -> ApiServer> { @@ -177,6 +181,37 @@ pub async fn create_api_server_on_port( let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap(); + let beacon_processor_config = BeaconProcessorConfig::default(); + let BeaconProcessorChannels { + beacon_processor_tx, + beacon_processor_rx, + work_reprocessing_tx, + work_reprocessing_rx, + } = BeaconProcessorChannels::new(&beacon_processor_config); + + let beacon_processor_send = beacon_processor_tx; + BeaconProcessor { + network_globals: network_globals.clone(), + executor: test_runtime.task_executor.clone(), + // The number of workers must be greater than one. Tests which use the + // builder workflow sometimes require an internal HTTP request in order + // to fulfill an already in-flight HTTP request, therefore having only + // one worker will result in a deadlock. + max_workers: 2, + current_workers: 0, + config: beacon_processor_config, + log: log.clone(), + } + .spawn_manager( + beacon_processor_rx, + work_reprocessing_tx, + work_reprocessing_rx, + None, + chain.slot_clock.clone(), + chain.spec.maximum_gossip_clock_disparity(), + ) + .unwrap(); + let ctx = Arc::new(Context { config: Config { enabled: true, @@ -187,26 +222,22 @@ pub async fn create_api_server_on_port( allow_sync_stalled: false, data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), spec_fork_name: None, + enable_beacon_processor: true, }, chain: Some(chain), network_senders: Some(network_senders), network_globals: Some(network_globals), + beacon_processor_send: Some(beacon_processor_send), eth1_service: Some(eth1_service), sse_logging_components: None, log, }); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let server_shutdown = async { - // It's not really interesting why this triggered, just that it happened. - let _ = shutdown_rx.await; - }; - let (listening_socket, server) = crate::serve(ctx, server_shutdown).unwrap(); + let (listening_socket, server) = crate::serve(ctx, test_runtime.task_executor.exit()).unwrap(); ApiServer { server, listening_socket, - shutdown_tx, network_rx: network_receivers, local_enr: enr, external_peer_id: peer_id, diff --git a/beacon_node/http_api/src/validator.rs b/beacon_node/http_api/src/validator.rs new file mode 100644 index 000000000..18e9dbf63 --- /dev/null +++ b/beacon_node/http_api/src/validator.rs @@ -0,0 +1,21 @@ +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use types::*; + +/// Uses the `chain.validator_pubkey_cache` to resolve a pubkey to a validator +/// index and then ensures that the validator exists in the given `state`. +pub fn pubkey_to_validator_index( + chain: &BeaconChain, + state: &BeaconState, + pubkey: &PublicKeyBytes, +) -> Result, BeaconChainError> { + chain + .validator_index(pubkey)? + .filter(|&index| { + state + .validators() + .get(index) + .map_or(false, |v| v.pubkey == *pubkey) + }) + .map(Result::Ok) + .transpose() +} diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 992eb92b1..402f07072 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -30,7 +30,6 @@ use state_processing::per_block_processing::get_expected_withdrawals; use state_processing::per_slot_processing; use std::convert::TryInto; use std::sync::Arc; -use tokio::sync::oneshot; use tokio::time::Duration; use tree_hash::TreeHash; use types::application_domain::ApplicationDomain; @@ -70,7 +69,6 @@ struct ApiTester { attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, voluntary_exit: SignedVoluntaryExit, - _server_shutdown: oneshot::Sender<()>, network_rx: NetworkReceivers, local_enr: Enr, external_peer_id: PeerId, @@ -236,11 +234,10 @@ impl ApiTester { let ApiServer { server, listening_socket: _, - shutdown_tx, network_rx, local_enr, external_peer_id, - } = create_api_server_on_port(chain.clone(), log, port).await; + } = create_api_server_on_port(chain.clone(), &harness.runtime, log, port).await; harness.runtime.task_executor.spawn(server, "api_server"); @@ -268,7 +265,6 @@ impl ApiTester { attester_slashing, proposer_slashing, voluntary_exit, - _server_shutdown: shutdown_tx, network_rx, local_enr, external_peer_id, @@ -324,11 +320,10 @@ impl ApiTester { let ApiServer { server, listening_socket, - shutdown_tx, network_rx, local_enr, external_peer_id, - } = create_api_server(chain.clone(), log).await; + } = create_api_server(chain.clone(), &harness.runtime, log).await; harness.runtime.task_executor.spawn(server, "api_server"); @@ -353,7 +348,6 @@ impl ApiTester { attester_slashing, proposer_slashing, voluntary_exit, - _server_shutdown: shutdown_tx, network_rx, local_enr, external_peer_id, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index bfc7b6b37..88d119405 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -8,9 +8,9 @@ use beacon_chain::{ }; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ - work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, - GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, - MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, + work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend, + DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, + WorkEvent as BeaconWorkEvent, }; use environment::null_logger; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; @@ -639,11 +639,15 @@ impl NetworkBeaconProcessor> { pub fn null_for_testing( network_globals: Arc>, ) -> (Self, mpsc::Receiver>) { - let (beacon_processor_send, beacon_processor_receive) = - mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); + let BeaconProcessorChannels { + beacon_processor_tx, + beacon_processor_rx, + work_reprocessing_tx, + work_reprocessing_rx: _work_reprocessing_rx, + } = <_>::default(); + let (network_tx, _network_rx) = mpsc::unbounded_channel(); let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); - let (reprocess_tx, _reprocess_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); let log = null_logger().unwrap(); let harness: BeaconChainHarness> = BeaconChainHarness::builder(E::default()) @@ -656,18 +660,18 @@ impl NetworkBeaconProcessor> { let runtime = TestRuntime::default(); let network_beacon_processor = Self { - beacon_processor_send: BeaconProcessorSend(beacon_processor_send), + beacon_processor_send: beacon_processor_tx, duplicate_cache: DuplicateCache::default(), chain: harness.chain, network_tx, sync_tx, - reprocess_tx, + reprocess_tx: work_reprocessing_tx, network_globals, invalid_block_storage: InvalidBlockStorage::Disabled, executor: runtime.task_executor.clone(), log, }; - (network_beacon_processor, beacon_processor_receive) + (network_beacon_processor, beacon_processor_rx) } } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index ebe5bd123..577287750 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -11,7 +11,7 @@ use crate::{ use beacon_chain::test_utils::{ test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; -use beacon_chain::{BeaconChain, ChainConfig, WhenSlotSkipped}; +use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; use lighthouse_network::discovery::ConnectionId; use lighthouse_network::rpc::methods::BlobsByRangeRequest; @@ -74,18 +74,23 @@ struct TestRig { impl Drop for TestRig { fn drop(&mut self) { // Causes the beacon processor to shutdown. - self.beacon_processor_tx = BeaconProcessorSend(mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN).0); + let len = BeaconProcessorConfig::default().max_work_event_queue_len; + self.beacon_processor_tx = BeaconProcessorSend(mpsc::channel(len).0); } } impl TestRig { pub async fn new(chain_length: u64) -> Self { - Self::new_with_chain_config(chain_length, ChainConfig::default()).await + Self::new_parametric( + chain_length, + BeaconProcessorConfig::default().enable_backfill_rate_limiting, + ) + .await } - pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self { - let mut spec = test_spec::(); + pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self { // This allows for testing voluntary exits without building out a massive chain. + let mut spec = test_spec::(); spec.shard_committee_period = 2; let harness = BeaconChainHarness::builder(MainnetEthSpec) @@ -93,7 +98,7 @@ impl TestRig { .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() - .chain_config(chain_config) + .chain_config(<_>::default()) .build(); harness.advance_slot(); @@ -179,8 +184,15 @@ impl TestRig { let log = harness.logger().clone(); - let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); - let beacon_processor_tx = BeaconProcessorSend(beacon_processor_tx); + let mut beacon_processor_config = BeaconProcessorConfig::default(); + beacon_processor_config.enable_backfill_rate_limiting = enable_backfill_rate_limiting; + let BeaconProcessorChannels { + beacon_processor_tx, + beacon_processor_rx, + work_reprocessing_tx, + work_reprocessing_rx, + } = BeaconProcessorChannels::new(&beacon_processor_config); + let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); // Default metadata @@ -203,8 +215,6 @@ impl TestRig { let executor = harness.runtime.task_executor.clone(); - let (work_reprocessing_tx, work_reprocessing_rx) = - mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364); let duplicate_cache = DuplicateCache::default(); @@ -227,7 +237,7 @@ impl TestRig { executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, - enable_backfill_rate_limiting: harness.chain.config.enable_backfill_rate_limiting, + config: beacon_processor_config, log: log.clone(), } .spawn_manager( @@ -1053,11 +1063,8 @@ async fn test_backfill_sync_processing() { /// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. #[tokio::test] async fn test_backfill_sync_processing_rate_limiting_disabled() { - let chain_config = ChainConfig { - enable_backfill_rate_limiting: false, - ..Default::default() - }; - let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await; + let enable_backfill_rate_limiting = false; + let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await; for _ in 0..3 { rig.enqueue_backfill_batch(); diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 544c5dd9c..64ac3aaee 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -3,20 +3,16 @@ mod tests { use crate::persisted_dht::load_dht; use crate::{NetworkConfig, NetworkService}; - use beacon_chain::test_utils::EphemeralHarnessType; - use beacon_processor::{ - BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, - }; + use beacon_chain::test_utils::BeaconChainHarness; + use beacon_processor::BeaconProcessorChannels; use lighthouse_network::Enr; use slog::{o, Drain, Level, Logger}; use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; use std::sync::Arc; - use tokio::{runtime::Runtime, sync::mpsc}; + use tokio::runtime::Runtime; use types::MinimalEthSpec as E; - type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness>; - fn get_logger(actual_log: bool) -> Logger { if actual_log { let drain = { @@ -72,17 +68,20 @@ mod tests { // Create a new network service which implicitly gets dropped at the // end of the block. - let (beacon_processor_send, _beacon_processor_receive) = - mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); - let (beacon_processor_reprocess_tx, _beacon_processor_reprocess_rx) = - mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); + let BeaconProcessorChannels { + beacon_processor_tx, + beacon_processor_rx: _beacon_processor_rx, + work_reprocessing_tx, + work_reprocessing_rx: _work_reprocessing_rx, + } = <_>::default(); + let _network_service = NetworkService::start( beacon_chain.clone(), &config, executor, None, - BeaconProcessorSend(beacon_processor_send), - beacon_processor_reprocess_tx, + beacon_processor_tx, + work_reprocessing_tx, ) .await .unwrap(); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 44580e4a5..c6d7da4e6 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -389,6 +389,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \ MAINNET.") ) + .arg( + Arg::with_name("http-enable-beacon-processor") + .long("http-enable-beacon-processor") + .value_name("BOOLEAN") + .help("The beacon processor is a scheduler which provides quality-of-service and \ + DoS protection. When set to \"true\", HTTP API requests will be queued and scheduled \ + alongside other tasks. When set to \"false\", HTTP API responses will be executed \ + immediately.") + .takes_value(true) + .default_value("true") + ) /* Prometheus metrics HTTP server related arguments */ .arg( Arg::with_name("metrics") @@ -1201,4 +1212,55 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .possible_values(ProgressiveBalancesMode::VARIANTS) ) + .arg( + Arg::with_name("beacon-processor-max-workers") + .long("beacon-processor-max-workers") + .value_name("INTEGER") + .help("Specifies the maximum concurrent tasks for the task scheduler. Increasing \ + this value may increase resource consumption. Reducing the value \ + may result in decreased resource usage and diminished performance. The \ + default value is the number of logical CPU cores on the host.") + .takes_value(true) + ) + .arg( + Arg::with_name("beacon-processor-work-queue-len") + .long("beacon-processor-work-queue-len") + .value_name("INTEGER") + .help("Specifies the length of the inbound event queue. \ + Higher values may prevent messages from being dropped while lower values \ + may help protect the node from becoming overwhelmed.") + .default_value("16384") + .takes_value(true) + ) + .arg( + Arg::with_name("beacon-processor-reprocess-queue-len") + .long("beacon-processor-reprocess-queue-len") + .value_name("INTEGER") + .help("Specifies the length of the queue for messages requiring delayed processing. \ + Higher values may prevent messages from being dropped while lower values \ + may help protect the node from becoming overwhelmed.") + .default_value("12288") + .takes_value(true) + ) + .arg( + Arg::with_name("beacon-processor-attestation-batch-size") + .long("beacon-processor-attestation-batch-size") + .value_name("INTEGER") + .help("Specifies the number of gossip attestations in a signature verification batch. \ + Higher values may reduce CPU usage in a healthy network whilst lower values may \ + increase CPU usage in an unhealthy or hostile network.") + .default_value("64") + .takes_value(true) + ) + .arg( + Arg::with_name("beacon-processor-aggregate-batch-size") + .long("beacon-processor-aggregate-batch-size") + .value_name("INTEGER") + .help("Specifies the number of gossip aggregate attestations in a signature \ + verification batch. \ + Higher values may reduce CPU usage in a healthy network while lower values may \ + increase CPU usage in an unhealthy or hostile network.") + .default_value("64") + .takes_value(true) + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 3c5f62984..95fa44edc 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -5,6 +5,7 @@ use beacon_chain::chain_config::{ use beacon_chain::TrustedSetup; use clap::ArgMatches; use clap_utils::flags::DISABLE_MALLOC_TUNING_FLAG; +use clap_utils::parse_required; use client::{ClientConfig, ClientGenesis}; use directory::{DEFAULT_BEACON_NODE_DIR, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR}; use environment::RuntimeContext; @@ -149,6 +150,9 @@ pub fn get_config( client_config.http_api.allow_sync_stalled = true; } + client_config.http_api.enable_beacon_processor = + parse_required(cli_args, "http-enable-beacon-processor")?; + if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { client_config.chain.shuffling_cache_size = cache_size; } @@ -839,7 +843,7 @@ pub fn get_config( } // Backfill sync rate-limiting - client_config.chain.enable_backfill_rate_limiting = + client_config.beacon_processor.enable_backfill_rate_limiting = !cli_args.is_present("disable-backfill-rate-limiting"); if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")? @@ -853,6 +857,28 @@ pub fn get_config( client_config.chain.progressive_balances_mode = progressive_balances_mode; } + if let Some(max_workers) = clap_utils::parse_optional(cli_args, "beacon-processor-max-workers")? + { + client_config.beacon_processor.max_workers = max_workers; + } + + if client_config.beacon_processor.max_workers == 0 { + return Err("--beacon-processor-max-workers must be a non-zero value".to_string()); + } + + client_config.beacon_processor.max_work_event_queue_len = + clap_utils::parse_required(cli_args, "beacon-processor-work-queue-len")?; + client_config.beacon_processor.max_scheduled_work_queue_len = + clap_utils::parse_required(cli_args, "beacon-processor-reprocess-queue-len")?; + client_config + .beacon_processor + .max_gossip_attestation_batch_size = + clap_utils::parse_required(cli_args, "beacon-processor-attestation-batch-size")?; + client_config + .beacon_processor + .max_gossip_aggregate_batch_size = + clap_utils::parse_required(cli_args, "beacon-processor-aggregate-batch-size")?; + Ok(client_config) } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 6e1ab9aa4..085a2a782 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -84,6 +84,7 @@ impl ProductionBeaconNode { let builder = ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) + .beacon_processor(client_config.beacon_processor.clone()) .http_api_config(client_config.http_api.clone()) .disk_store( &db_path, diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 169aa67fd..8003236f2 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -66,6 +66,7 @@ lighthouse_network = { path = "../beacon_node/lighthouse_network" } sensitive_url = { path = "../common/sensitive_url" } eth1 = { path = "../beacon_node/eth1" } eth2 = { path = "../common/eth2" } +beacon_processor = { path = "../beacon_node/beacon_processor" } [[test]] name = "lighthouse_tests" diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 65e5cc7be..fe181a68a 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -5,6 +5,7 @@ use beacon_node::beacon_chain::chain_config::{ DisallowedReOrgOffsets, DEFAULT_RE_ORG_CUTOFF_DENOMINATOR, DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD, }; +use beacon_processor::BeaconProcessorConfig; use eth1::Eth1Endpoint; use lighthouse_network::PeerId; use std::fs::File; @@ -1150,13 +1151,13 @@ fn disable_backfill_rate_limiting_flag() { CommandLineTest::new() .flag("disable-backfill-rate-limiting", None) .run_with_zero_port() - .with_config(|config| assert!(!config.chain.enable_backfill_rate_limiting)); + .with_config(|config| assert!(!config.beacon_processor.enable_backfill_rate_limiting)); } #[test] fn default_backfill_rate_limiting_flag() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| assert!(config.chain.enable_backfill_rate_limiting)); + .with_config(|config| assert!(config.beacon_processor.enable_backfill_rate_limiting)); } #[test] fn default_boot_nodes() { @@ -1495,6 +1496,22 @@ fn http_allow_sync_stalled_flag() { .with_config(|config| assert_eq!(config.http_api.allow_sync_stalled, true)); } #[test] +fn http_enable_beacon_processor() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.enable_beacon_processor, true)); + + CommandLineTest::new() + .flag("http-enable-beacon-processor", Some("true")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.enable_beacon_processor, true)); + + CommandLineTest::new() + .flag("http-enable-beacon-processor", Some("false")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.enable_beacon_processor, false)); +} +#[test] fn http_tls_flags() { let dir = TempDir::new().expect("Unable to create temporary directory"); CommandLineTest::new() @@ -2366,3 +2383,40 @@ fn progressive_balances_fast() { ) }); } + +#[test] +fn beacon_processor() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.beacon_processor, <_>::default())); + + CommandLineTest::new() + .flag("beacon-processor-max-workers", Some("1")) + .flag("beacon-processor-work-queue-len", Some("2")) + .flag("beacon-processor-reprocess-queue-len", Some("3")) + .flag("beacon-processor-attestation-batch-size", Some("4")) + .flag("beacon-processor-aggregate-batch-size", Some("5")) + .flag("disable-backfill-rate-limiting", None) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.beacon_processor, + BeaconProcessorConfig { + max_workers: 1, + max_work_event_queue_len: 2, + max_scheduled_work_queue_len: 3, + max_gossip_attestation_batch_size: 4, + max_gossip_aggregate_batch_size: 5, + enable_backfill_rate_limiting: false + } + ) + }); +} + +#[test] +#[should_panic] +fn beacon_processor_zero_workers() { + CommandLineTest::new() + .flag("beacon-processor-max-workers", Some("0")) + .run_with_zero_port(); +} diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index 1e3f14a5a..cad23800e 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -24,7 +24,7 @@ pub use execution_layer::test_utils::{ pub use validator_client::Config as ValidatorConfig; /// The global timeout for HTTP requests to the beacon node. -const HTTP_TIMEOUT: Duration = Duration::from_secs(4); +const HTTP_TIMEOUT: Duration = Duration::from_secs(8); /// The timeout for a beacon node to start up. const STARTUP_TIMEOUT: Duration = Duration::from_secs(60); @@ -115,6 +115,11 @@ pub fn testing_client_config() -> ClientConfig { genesis_time: now, }; + // Specify a constant count of beacon processor workers. Having this number + // too low can cause annoying HTTP timeouts, especially on Github runners + // with 2 logical CPUs. + client_config.beacon_processor.max_workers = 4; + client_config } diff --git a/watch/Cargo.toml b/watch/Cargo.toml index 1a003167d..23e2c566d 100644 --- a/watch/Cargo.toml +++ b/watch/Cargo.toml @@ -43,3 +43,4 @@ beacon_chain = { path = "../beacon_node/beacon_chain" } network = { path = "../beacon_node/network" } testcontainers = "0.14.0" unused_port = { path = "../common/unused_port" } +task_executor = { path = "../common/task_executor" } diff --git a/watch/tests/tests.rs b/watch/tests/tests.rs index 28700ccdc..af1cde26b 100644 --- a/watch/tests/tests.rs +++ b/watch/tests/tests.rs @@ -85,7 +85,6 @@ struct TesterBuilder { pub harness: BeaconChainHarness>, pub config: Config, _bn_network_rx: NetworkReceivers, - _bn_api_shutdown_tx: oneshot::Sender<()>, } impl TesterBuilder { @@ -102,10 +101,14 @@ impl TesterBuilder { let ApiServer { server, listening_socket: bn_api_listening_socket, - shutdown_tx: _bn_api_shutdown_tx, network_rx: _bn_network_rx, .. - } = create_api_server(harness.chain.clone(), harness.logger().clone()).await; + } = create_api_server( + harness.chain.clone(), + &harness.runtime, + harness.logger().clone(), + ) + .await; tokio::spawn(server); /* @@ -139,7 +142,6 @@ impl TesterBuilder { harness, config, _bn_network_rx, - _bn_api_shutdown_tx, } } pub async fn build(self, pool: PgPool) -> Tester { @@ -186,7 +188,6 @@ impl TesterBuilder { config: self.config, updater, _bn_network_rx: self._bn_network_rx, - _bn_api_shutdown_tx: self._bn_api_shutdown_tx, _watch_shutdown_tx, } } @@ -204,7 +205,6 @@ struct Tester { pub config: Config, pub updater: UpdateHandler, _bn_network_rx: NetworkReceivers, - _bn_api_shutdown_tx: oneshot::Sender<()>, _watch_shutdown_tx: oneshot::Sender<()>, }