Merge branch 'unstable' of https://github.com/sigp/lighthouse into merge-unstable-deneb-aug-9

This commit is contained in:
realbigsean 2023-08-09 10:42:51 -04:00
commit 12b5e9ad3d
No known key found for this signature in database
GPG Key ID: BE1B3DB104F6C788
24 changed files with 1870 additions and 948 deletions

5
Cargo.lock generated
View File

@ -606,7 +606,9 @@ dependencies = [
"lighthouse_metrics", "lighthouse_metrics",
"lighthouse_network", "lighthouse_network",
"logging", "logging",
"num_cpus",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"serde",
"slog", "slog",
"slot_clock", "slot_clock",
"strum", "strum",
@ -3334,6 +3336,7 @@ name = "http_api"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"beacon_chain", "beacon_chain",
"beacon_processor",
"bs58 0.4.0", "bs58 0.4.0",
"bytes", "bytes",
"directory", "directory",
@ -4407,6 +4410,7 @@ dependencies = [
"account_manager", "account_manager",
"account_utils", "account_utils",
"beacon_node", "beacon_node",
"beacon_processor",
"bls", "bls",
"boot_node", "boot_node",
"clap", "clap",
@ -8984,6 +8988,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"task_executor",
"testcontainers", "testcontainers",
"tokio", "tokio",
"tokio-postgres", "tokio-postgres",

View File

@ -79,8 +79,6 @@ pub struct ChainConfig {
/// ///
/// This is useful for block builders and testing. /// This is useful for block builders and testing.
pub always_prepare_payload: bool, pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation. /// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode, pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer. /// Number of epochs between each migration of data from the hot database to the freezer.
@ -114,7 +112,6 @@ impl Default for ChainConfig {
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false, genesis_backfill: false,
always_prepare_payload: false, always_prepare_payload: false,
enable_backfill_rate_limiting: true,
progressive_balances_mode: ProgressiveBalancesMode::Checked, progressive_balances_mode: ProgressiveBalancesMode::Checked,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
} }

View File

@ -21,4 +21,6 @@ types = { path = "../../consensus/types" }
ethereum_ssz = "0.5.0" ethereum_ssz = "0.5.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.12.0" parking_lot = "0.12.0"
num_cpus = "1.13.0"
serde = { version = "1.0.116", features = ["derive"] }

View File

@ -46,6 +46,7 @@ use futures::task::Poll;
use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::TimeLatch; use logging::TimeLatch;
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::cmp; use std::cmp;
@ -73,7 +74,7 @@ pub mod work_reprocessing_queue;
/// The maximum size of the channel for work events to the `BeaconProcessor`. /// The maximum size of the channel for work events to the `BeaconProcessor`.
/// ///
/// Setting this too low will cause consensus messages to be dropped. /// Setting this too low will cause consensus messages to be dropped.
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for idle events to the `BeaconProcessor`. /// The maximum size of the channel for idle events to the `BeaconProcessor`.
/// ///
@ -82,7 +83,7 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const MAX_IDLE_QUEUE_LEN: usize = 16_384; const MAX_IDLE_QUEUE_LEN: usize = 16_384;
/// The maximum size of the channel for re-processing work events. /// The maximum size of the channel for re-processing work events.
pub const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4; const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;
/// The maximum number of queued `Attestation` objects that will be stored before we start dropping /// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them. /// them.
@ -179,6 +180,14 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// will be stored before we start dropping them. /// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;
/// The maximum number of priority-0 (highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;
/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;
/// The name of the manager tokio task. /// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
@ -196,8 +205,8 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker";
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single /// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to /// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
/// individually verifying each attestation signature. /// individually verifying each attestation signature.
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
/// Unique IDs used for metrics and testing. /// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed"; pub const WORKER_FREED: &str = "worker_freed";
@ -231,6 +240,61 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
pub max_workers: usize,
pub max_work_event_queue_len: usize,
pub max_scheduled_work_queue_len: usize,
pub max_gossip_attestation_batch_size: usize,
pub max_gossip_aggregate_batch_size: usize,
pub enable_backfill_rate_limiting: bool,
}
impl Default for BeaconProcessorConfig {
fn default() -> Self {
Self {
max_workers: cmp::max(1, num_cpus::get()),
max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE,
enable_backfill_rate_limiting: true,
}
}
}
// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}
impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}
impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
fn default() -> Self {
Self::new(&BeaconProcessorConfig::default())
}
}
/// A simple first-in-first-out queue with a maximum length. /// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> { struct FifoQueue<T> {
@ -379,7 +443,7 @@ impl<E: EthSpec> WorkEvent<E> {
} }
} }
impl<E: EthSpec> std::convert::From<ReadyWork> for WorkEvent<E> { impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
fn from(ready_work: ReadyWork) -> Self { fn from(ready_work: ReadyWork) -> Self {
match ready_work { match ready_work {
ReadyWork::Block(QueuedGossipBlock { ReadyWork::Block(QueuedGossipBlock {
@ -481,6 +545,10 @@ impl<E: EthSpec> BeaconProcessorSend<E> {
pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>; pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>; pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>; pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}
/// Indicates the type of work to be performed and therefore its priority and /// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics. /// queuing specifics.
@ -545,6 +613,8 @@ pub enum Work<E: EthSpec> {
BlobsByRootsRequest(BlockingFnWithManualSendOnIdle), BlobsByRootsRequest(BlockingFnWithManualSendOnIdle),
GossipBlsToExecutionChange(BlockingFn), GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn), LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
} }
impl<E: EthSpec> fmt::Debug for Work<E> { impl<E: EthSpec> fmt::Debug for Work<E> {
@ -586,6 +656,8 @@ impl<E: EthSpec> Work<E> {
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
} }
} }
} }
@ -664,7 +736,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub executor: TaskExecutor, pub executor: TaskExecutor,
pub max_workers: usize, pub max_workers: usize,
pub current_workers: usize, pub current_workers: usize,
pub enable_backfill_rate_limiting: bool, pub config: BeaconProcessorConfig,
pub log: Logger, pub log: Logger,
} }
@ -744,11 +816,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`). // receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) = let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(MAX_SCHEDULED_WORK_QUEUE_LEN); mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
spawn_reprocess_scheduler( spawn_reprocess_scheduler(
ready_work_tx, ready_work_tx,
work_reprocessing_rx, work_reprocessing_rx,
@ -769,7 +843,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
reprocess_work_rx: ready_work_rx, reprocess_work_rx: ready_work_rx,
}; };
let enable_backfill_rate_limiting = self.enable_backfill_rate_limiting; let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
loop { loop {
let work_event = match inbound_events.next().await { let work_event = match inbound_events.next().await {
@ -884,12 +958,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx); self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_blob_queue.pop() { } else if let Some(item) = gossip_blob_queue.pop() {
self.spawn_worker(item, idle_tx); self.spawn_worker(item, idle_tx);
// Check the priority 0 API requests after blocks and blobs, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the aggregates, *then* the unaggregates since we assume that // Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us // aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time. // more information with less signature verification time.
} else if aggregate_queue.len() > 0 { } else if aggregate_queue.len() > 0 {
let batch_size = let batch_size = cmp::min(
cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE); aggregate_queue.len(),
self.config.max_gossip_aggregate_batch_size,
);
if batch_size < 2 { if batch_size < 2 {
// One single aggregate is in the queue, process it individually. // One single aggregate is in the queue, process it individually.
@ -948,7 +1027,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if attestation_queue.len() > 0 { } else if attestation_queue.len() > 0 {
let batch_size = cmp::min( let batch_size = cmp::min(
attestation_queue.len(), attestation_queue.len(),
MAX_GOSSIP_ATTESTATION_BATCH_SIZE, self.config.max_gossip_attestation_batch_size,
); );
if batch_size < 2 { if batch_size < 2 {
@ -1043,6 +1122,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx); self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, idle_tx); self.spawn_worker(item, idle_tx);
// Check the priority 1 API requests after we've
// processed all the interesting things from the network
// and things required for us to stay in good repute
// with our P2P peers.
} else if let Some(item) = api_request_p1_queue.pop() {
self.spawn_worker(item, idle_tx);
// Handle backfill sync chain segments. // Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() { } else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, idle_tx); self.spawn_worker(item, idle_tx);
@ -1175,6 +1260,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => { Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log) unknown_light_client_update_queue.push(work, work_id, &self.log)
} }
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP1 { .. } => {
api_request_p1_queue.push(work, work_id, &self.log)
}
} }
} }
} }
@ -1235,6 +1326,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL, &metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
gossip_bls_to_execution_change_queue.len() as i64, gossip_bls_to_execution_change_queue.len() as i64,
); );
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL,
api_request_p0_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL,
api_request_p1_queue.len() as i64,
);
if aggregate_queue.is_full() && aggregate_debounce.elapsed() { if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!( error!(
@ -1357,6 +1456,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking_with_manual_send_idle(work) task_spawner.spawn_blocking_with_manual_send_idle(work)
} }
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
},
Work::GossipVoluntaryExit(process_fn) Work::GossipVoluntaryExit(process_fn)
| Work::GossipProposerSlashing(process_fn) | Work::GossipProposerSlashing(process_fn)
| Work::GossipAttesterSlashing(process_fn) | Work::GossipAttesterSlashing(process_fn)

View File

@ -110,6 +110,15 @@ lazy_static::lazy_static! {
"beacon_processor_sync_contribution_queue_total", "beacon_processor_sync_contribution_queue_total",
"Count of sync committee contributions waiting to be processed." "Count of sync committee contributions waiting to be processed."
); );
// HTTP API requests.
pub static ref BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p0_queue_total",
"Count of P0 HTTP requesets waiting to be processed."
);
pub static ref BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p1_queue_total",
"Count of P1 HTTP requesets waiting to be processed."
);
/* /*
* Attestation reprocessing queue metrics. * Attestation reprocessing queue metrics.

View File

@ -14,10 +14,8 @@ use beacon_chain::{
store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler, BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler,
}; };
use beacon_processor::{ use beacon_processor::BeaconProcessorConfig;
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessor, BeaconProcessorSend, use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
WorkEvent, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
};
use environment::RuntimeContext; use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth1::{Config as Eth1Config, Service as Eth1Service};
use eth2::{ use eth2::{
@ -39,7 +37,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use timer::spawn_timer; use timer::spawn_timer;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::oneshot;
use types::{ use types::{
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec,
ExecutionBlockHash, Hash256, SignedBeaconBlock, ExecutionBlockHash, Hash256, SignedBeaconBlock,
@ -79,11 +77,9 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
http_api_config: http_api::Config, http_api_config: http_api::Config,
http_metrics_config: http_metrics::Config, http_metrics_config: http_metrics::Config,
slasher: Option<Arc<Slasher<T::EthSpec>>>, slasher: Option<Arc<Slasher<T::EthSpec>>>,
beacon_processor_config: Option<BeaconProcessorConfig>,
beacon_processor_channels: Option<BeaconProcessorChannels<T::EthSpec>>,
eth_spec_instance: T::EthSpec, eth_spec_instance: T::EthSpec,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_receive: mpsc::Receiver<WorkEvent<T::EthSpec>>,
work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
} }
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore> impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
@ -99,10 +95,6 @@ where
/// ///
/// The `eth_spec_instance` parameter is used to concretize `TEthSpec`. /// The `eth_spec_instance` parameter is used to concretize `TEthSpec`.
pub fn new(eth_spec_instance: TEthSpec) -> Self { pub fn new(eth_spec_instance: TEthSpec) -> Self {
let (beacon_processor_send, beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
Self { Self {
slot_clock: None, slot_clock: None,
store: None, store: None,
@ -121,10 +113,8 @@ where
http_metrics_config: <_>::default(), http_metrics_config: <_>::default(),
slasher: None, slasher: None,
eth_spec_instance, eth_spec_instance,
beacon_processor_send: BeaconProcessorSend(beacon_processor_send), beacon_processor_config: None,
beacon_processor_receive, beacon_processor_channels: None,
work_reprocessing_tx,
work_reprocessing_rx,
} }
} }
@ -140,6 +130,12 @@ where
self self
} }
pub fn beacon_processor(mut self, config: BeaconProcessorConfig) -> Self {
self.beacon_processor_channels = Some(BeaconProcessorChannels::new(&config));
self.beacon_processor_config = Some(config);
self
}
pub fn slasher(mut self, slasher: Arc<Slasher<TEthSpec>>) -> Self { pub fn slasher(mut self, slasher: Arc<Slasher<TEthSpec>>) -> Self {
self.slasher = Some(slasher); self.slasher = Some(slasher);
self self
@ -479,6 +475,7 @@ where
chain: None, chain: None,
network_senders: None, network_senders: None,
network_globals: None, network_globals: None,
beacon_processor_send: None,
eth1_service: Some(genesis_service.eth1_service.clone()), eth1_service: Some(genesis_service.eth1_service.clone()),
log: context.log().clone(), log: context.log().clone(),
sse_logging_components: runtime_context.sse_logging_components.clone(), sse_logging_components: runtime_context.sse_logging_components.clone(),
@ -556,6 +553,10 @@ where
.as_ref() .as_ref()
.ok_or("network requires a runtime_context")? .ok_or("network requires a runtime_context")?
.clone(); .clone();
let beacon_processor_channels = self
.beacon_processor_channels
.as_ref()
.ok_or("network requires beacon_processor_channels")?;
// If gossipsub metrics are required we build a registry to record them // If gossipsub metrics are required we build a registry to record them
let mut gossipsub_registry = if config.metrics_enabled { let mut gossipsub_registry = if config.metrics_enabled {
@ -571,8 +572,8 @@ where
gossipsub_registry gossipsub_registry
.as_mut() .as_mut()
.map(|registry| registry.sub_registry_with_prefix("gossipsub")), .map(|registry| registry.sub_registry_with_prefix("gossipsub")),
self.beacon_processor_send.clone(), beacon_processor_channels.beacon_processor_tx.clone(),
self.work_reprocessing_tx.clone(), beacon_processor_channels.work_reprocessing_tx.clone(),
) )
.await .await
.map_err(|e| format!("Failed to start network: {:?}", e))?; .map_err(|e| format!("Failed to start network: {:?}", e))?;
@ -695,6 +696,14 @@ where
.runtime_context .runtime_context
.as_ref() .as_ref()
.ok_or("build requires a runtime context")?; .ok_or("build requires a runtime context")?;
let beacon_processor_channels = self
.beacon_processor_channels
.take()
.ok_or("build requires beacon_processor_channels")?;
let beacon_processor_config = self
.beacon_processor_config
.take()
.ok_or("build requires a beacon_processor_config")?;
let log = runtime_context.log().clone(); let log = runtime_context.log().clone();
let http_api_listen_addr = if self.http_api_config.enabled { let http_api_listen_addr = if self.http_api_config.enabled {
@ -704,6 +713,7 @@ where
network_senders: self.network_senders.clone(), network_senders: self.network_senders.clone(),
network_globals: self.network_globals.clone(), network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(), eth1_service: self.eth1_service.clone(),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(), sse_logging_components: runtime_context.sse_logging_components.clone(),
log: log.clone(), log: log.clone(),
}); });
@ -767,15 +777,13 @@ where
executor: beacon_processor_context.executor.clone(), executor: beacon_processor_context.executor.clone(),
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
enable_backfill_rate_limiting: beacon_chain config: beacon_processor_config,
.config
.enable_backfill_rate_limiting,
log: beacon_processor_context.log().clone(), log: beacon_processor_context.log().clone(),
} }
.spawn_manager( .spawn_manager(
self.beacon_processor_receive, beacon_processor_channels.beacon_processor_rx,
self.work_reprocessing_tx, beacon_processor_channels.work_reprocessing_tx,
self.work_reprocessing_rx, beacon_processor_channels.work_reprocessing_rx,
None, None,
beacon_chain.slot_clock.clone(), beacon_chain.slot_clock.clone(),
beacon_chain.spec.maximum_gossip_clock_disparity(), beacon_chain.spec.maximum_gossip_clock_disparity(),

View File

@ -1,5 +1,6 @@
use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD;
use beacon_chain::TrustedSetup; use beacon_chain::TrustedSetup;
use beacon_processor::BeaconProcessorConfig;
use directory::DEFAULT_ROOT_DIR; use directory::DEFAULT_ROOT_DIR;
use environment::LoggerConfig; use environment::LoggerConfig;
use network::NetworkConfig; use network::NetworkConfig;
@ -89,6 +90,7 @@ pub struct Config {
pub slasher: Option<slasher::Config>, pub slasher: Option<slasher::Config>,
pub logger_config: LoggerConfig, pub logger_config: LoggerConfig,
pub always_prefer_builder_payload: bool, pub always_prefer_builder_payload: bool,
pub beacon_processor: BeaconProcessorConfig,
} }
impl Default for Config { impl Default for Config {
@ -118,6 +120,7 @@ impl Default for Config {
validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD,
logger_config: LoggerConfig::default(), logger_config: LoggerConfig::default(),
always_prefer_builder_payload: false, always_prefer_builder_payload: false,
beacon_processor: <_>::default(),
} }
} }
} }

View File

@ -3,12 +3,12 @@ name = "http_api"
version = "0.1.0" version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"] authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2021" edition = "2021"
autotests = false # using a single test binary compiles faster autotests = false # using a single test binary compiles faster
[dependencies] [dependencies]
warp = { version = "0.3.2", features = ["tls"] } warp = { version = "0.3.2", features = ["tls"] }
serde = { version = "1.0.116", features = ["derive"] } serde = { version = "1.0.116", features = ["derive"] }
tokio = { version = "1.14.0", features = ["macros","sync"] } tokio = { version = "1.14.0", features = ["macros", "sync"] }
tokio-stream = { version = "0.1.3", features = ["sync"] } tokio-stream = { version = "0.1.3", features = ["sync"] }
types = { path = "../../consensus/types" } types = { path = "../../consensus/types" }
hex = "0.4.2" hex = "0.4.2"
@ -27,9 +27,9 @@ slot_clock = { path = "../../common/slot_clock" }
ethereum_ssz = "0.5.0" ethereum_ssz = "0.5.0"
bs58 = "0.4.0" bs58 = "0.4.0"
futures = "0.3.8" futures = "0.3.8"
execution_layer = {path = "../execution_layer"} execution_layer = { path = "../execution_layer" }
parking_lot = "0.12.0" parking_lot = "0.12.0"
safe_arith = {path = "../../consensus/safe_arith"} safe_arith = { path = "../../consensus/safe_arith" }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
lru = "0.7.7" lru = "0.7.7"
tree_hash = "0.5.2" tree_hash = "0.5.2"
@ -40,9 +40,10 @@ logging = { path = "../../common/logging" }
ethereum_serde_utils = "0.5.0" ethereum_serde_utils = "0.5.0"
operation_pool = { path = "../operation_pool" } operation_pool = { path = "../operation_pool" }
sensitive_url = { path = "../../common/sensitive_url" } sensitive_url = { path = "../../common/sensitive_url" }
unused_port = {path = "../../common/unused_port"} unused_port = { path = "../../common/unused_port" }
store = { path = "../store" } store = { path = "../store" }
bytes = "1.1.0" bytes = "1.1.0"
beacon_processor = { path = "../beacon_processor" }
[dev-dependencies] [dev-dependencies]
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
@ -52,4 +53,4 @@ genesis = { path = "../genesis" }
[[test]] [[test]]
name = "bn_http_api_tests" name = "bn_http_api_tests"
path = "tests/main.rs" path = "tests/main.rs"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,214 @@
use beacon_processor::{BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent};
use serde::Serialize;
use std::future::Future;
use tokio::sync::{mpsc::error::TrySendError, oneshot};
use types::EthSpec;
use warp::reply::{Reply, Response};
/// Maps a request to a queue in the `BeaconProcessor`.
#[derive(Clone, Copy)]
pub enum Priority {
/// The highest priority.
P0,
/// The lowest priority.
P1,
}
impl Priority {
/// Wrap `self` in a `WorkEvent` with an appropriate priority.
fn work_event<E: EthSpec>(&self, process_fn: BlockingOrAsync) -> WorkEvent<E> {
let work = match self {
Priority::P0 => Work::ApiRequestP0(process_fn),
Priority::P1 => Work::ApiRequestP1(process_fn),
};
WorkEvent {
drop_during_sync: false,
work,
}
}
}
/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor.
pub struct TaskSpawner<E: EthSpec> {
/// Used to send tasks to the `BeaconProcessor`. The tokio executor will be
/// used if this is `None`.
beacon_processor_send: Option<BeaconProcessorSend<E>>,
}
impl<E: EthSpec> TaskSpawner<E> {
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
Self {
beacon_processor_send,
}
}
/// Executes a "blocking" (non-async) task which returns a `Response`.
pub async fn blocking_response_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Reply + Send + 'static,
{
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a closure that will execute `func` and send the result to
// a channel held by this thread.
let (tx, rx) = oneshot::channel();
let process_fn = move || {
// Execute the function, collect the return value.
let func_result = func();
// Send the result down the channel. Ignore any failures; the
// send can only fail if the receiver is dropped.
let _ = tx.send(func_result);
};
// Send the function to the beacon processor for execution at some arbitrary time.
match send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Blocking(Box::new(process_fn)),
rx,
)
.await
{
Ok(result) => result.map(Reply::into_response),
Err(error_response) => Ok(error_response),
}
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
warp_utils::task::blocking_response_task(func).await
}
}
/// Executes a "blocking" (non-async) task which returns a JSON-serializable
/// object.
pub async fn blocking_json_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Serialize + Send + 'static,
{
let func = || func().map(|t| warp::reply::json(&t).into_response());
self.blocking_response_task(priority, func).await
}
/// Executes an async task which may return a `warp::Rejection`.
pub async fn spawn_async_with_rejection(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Result<Response, warp::Rejection> {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a wrapper future that will execute `func` and send the
// result to a channel held by this thread.
let (tx, rx) = oneshot::channel();
let process_fn = async move {
// Await the future, collect the return value.
let func_result = func.await;
// Send the result down the channel. Ignore any failures; the
// send can only fail if the receiver is dropped.
let _ = tx.send(func_result);
};
// Send the function to the beacon processor for execution at some arbitrary time.
send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.await
.unwrap_or_else(Result::Ok)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
tokio::task::spawn(func).await.unwrap_or_else(|e| {
let response = warp::reply::with_status(
warp::reply::json(&format!("Tokio did not execute task: {e:?}")),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Ok(response)
})
}
}
/// Executes an async task which always returns a `Response`.
pub async fn spawn_async(
self,
priority: Priority,
func: impl Future<Output = Response> + Send + Sync + 'static,
) -> Response {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a wrapper future that will execute `func` and send the
// result to a channel held by this thread.
let (tx, rx) = oneshot::channel();
let process_fn = async move {
// Await the future, collect the return value.
let func_result = func.await;
// Send the result down the channel. Ignore any failures; the
// send can only fail if the receiver is dropped.
let _ = tx.send(func_result);
};
// Send the function to the beacon processor for execution at some arbitrary time.
send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.await
.unwrap_or_else(|error_response| error_response)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
tokio::task::spawn(func).await.unwrap_or_else(|e| {
warp::reply::with_status(
warp::reply::json(&format!("Tokio did not execute task: {e:?}")),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response()
})
}
}
}
/// Send a task to the beacon processor and await execution.
///
/// If the task is not executed, return an `Err(response)` with an error message
/// for the API consumer.
async fn send_to_beacon_processor<E: EthSpec, T>(
beacon_processor_send: &BeaconProcessorSend<E>,
priority: Priority,
process_fn: BlockingOrAsync,
rx: oneshot::Receiver<T>,
) -> Result<T, Response> {
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
// The beacon processor executed the task and sent a result.
Ok(func_result) => return Ok(func_result),
// The beacon processor dropped the channel without sending a
// result. The beacon processor dropped this task because its
// queues are full or it's shutting down.
Err(_) => "The task did not execute. The server is overloaded or shutting down.",
}
}
Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.",
Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.",
};
let error_response = warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Err(error_response)
}

View File

@ -3,6 +3,7 @@ use beacon_chain::{
test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType}, test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType},
BeaconChain, BeaconChainTypes, BeaconChain, BeaconChainTypes,
}; };
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig};
use directory::DEFAULT_ROOT_DIR; use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts}; use eth2::{BeaconNodeHttpClient, Timeouts};
use lighthouse_network::{ use lighthouse_network::{
@ -24,7 +25,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::MemoryStore; use store::MemoryStore;
use tokio::sync::oneshot; use task_executor::test_utils::TestRuntime;
use types::{ChainSpec, EthSpec}; use types::{ChainSpec, EthSpec};
pub const TCP_PORT: u16 = 42; pub const TCP_PORT: u16 = 42;
@ -37,7 +38,6 @@ pub struct InteractiveTester<E: EthSpec> {
pub harness: BeaconChainHarness<EphemeralHarnessType<E>>, pub harness: BeaconChainHarness<EphemeralHarnessType<E>>,
pub client: BeaconNodeHttpClient, pub client: BeaconNodeHttpClient,
pub network_rx: NetworkReceivers<E>, pub network_rx: NetworkReceivers<E>,
_server_shutdown: oneshot::Sender<()>,
} }
/// The result of calling `create_api_server`. /// The result of calling `create_api_server`.
@ -46,7 +46,6 @@ pub struct InteractiveTester<E: EthSpec> {
pub struct ApiServer<E: EthSpec, SFut: Future<Output = ()>> { pub struct ApiServer<E: EthSpec, SFut: Future<Output = ()>> {
pub server: SFut, pub server: SFut,
pub listening_socket: SocketAddr, pub listening_socket: SocketAddr,
pub shutdown_tx: oneshot::Sender<()>,
pub network_rx: NetworkReceivers<E>, pub network_rx: NetworkReceivers<E>,
pub local_enr: Enr, pub local_enr: Enr,
pub external_peer_id: PeerId, pub external_peer_id: PeerId,
@ -93,10 +92,14 @@ impl<E: EthSpec> InteractiveTester<E> {
let ApiServer { let ApiServer {
server, server,
listening_socket, listening_socket,
shutdown_tx: _server_shutdown,
network_rx, network_rx,
.. ..
} = create_api_server(harness.chain.clone(), harness.logger().clone()).await; } = create_api_server(
harness.chain.clone(),
&harness.runtime,
harness.logger().clone(),
)
.await;
tokio::spawn(server); tokio::spawn(server);
@ -114,22 +117,23 @@ impl<E: EthSpec> InteractiveTester<E> {
harness, harness,
client, client,
network_rx, network_rx,
_server_shutdown,
} }
} }
} }
pub async fn create_api_server<T: BeaconChainTypes>( pub async fn create_api_server<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
test_runtime: &TestRuntime,
log: Logger, log: Logger,
) -> ApiServer<T::EthSpec, impl Future<Output = ()>> { ) -> ApiServer<T::EthSpec, impl Future<Output = ()>> {
// Get a random unused port. // Get a random unused port.
let port = unused_port::unused_tcp4_port().unwrap(); let port = unused_port::unused_tcp4_port().unwrap();
create_api_server_on_port(chain, log, port).await create_api_server_on_port(chain, test_runtime, log, port).await
} }
pub async fn create_api_server_on_port<T: BeaconChainTypes>( pub async fn create_api_server_on_port<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
test_runtime: &TestRuntime,
log: Logger, log: Logger,
port: u16, port: u16,
) -> ApiServer<T::EthSpec, impl Future<Output = ()>> { ) -> ApiServer<T::EthSpec, impl Future<Output = ()>> {
@ -177,6 +181,37 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
let eth1_service = let eth1_service =
eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap(); eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap();
let beacon_processor_config = BeaconProcessorConfig::default();
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx,
} = BeaconProcessorChannels::new(&beacon_processor_config);
let beacon_processor_send = beacon_processor_tx;
BeaconProcessor {
network_globals: network_globals.clone(),
executor: test_runtime.task_executor.clone(),
// The number of workers must be greater than one. Tests which use the
// builder workflow sometimes require an internal HTTP request in order
// to fulfill an already in-flight HTTP request, therefore having only
// one worker will result in a deadlock.
max_workers: 2,
current_workers: 0,
config: beacon_processor_config,
log: log.clone(),
}
.spawn_manager(
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx,
None,
chain.slot_clock.clone(),
chain.spec.maximum_gossip_clock_disparity(),
)
.unwrap();
let ctx = Arc::new(Context { let ctx = Arc::new(Context {
config: Config { config: Config {
enabled: true, enabled: true,
@ -187,26 +222,22 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
allow_sync_stalled: false, allow_sync_stalled: false,
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
spec_fork_name: None, spec_fork_name: None,
enable_beacon_processor: true,
}, },
chain: Some(chain), chain: Some(chain),
network_senders: Some(network_senders), network_senders: Some(network_senders),
network_globals: Some(network_globals), network_globals: Some(network_globals),
beacon_processor_send: Some(beacon_processor_send),
eth1_service: Some(eth1_service), eth1_service: Some(eth1_service),
sse_logging_components: None, sse_logging_components: None,
log, log,
}); });
let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (listening_socket, server) = crate::serve(ctx, test_runtime.task_executor.exit()).unwrap();
let server_shutdown = async {
// It's not really interesting why this triggered, just that it happened.
let _ = shutdown_rx.await;
};
let (listening_socket, server) = crate::serve(ctx, server_shutdown).unwrap();
ApiServer { ApiServer {
server, server,
listening_socket, listening_socket,
shutdown_tx,
network_rx: network_receivers, network_rx: network_receivers,
local_enr: enr, local_enr: enr,
external_peer_id: peer_id, external_peer_id: peer_id,

View File

@ -0,0 +1,21 @@
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use types::*;
/// Uses the `chain.validator_pubkey_cache` to resolve a pubkey to a validator
/// index and then ensures that the validator exists in the given `state`.
pub fn pubkey_to_validator_index<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
pubkey: &PublicKeyBytes,
) -> Result<Option<usize>, BeaconChainError> {
chain
.validator_index(pubkey)?
.filter(|&index| {
state
.validators()
.get(index)
.map_or(false, |v| v.pubkey == *pubkey)
})
.map(Result::Ok)
.transpose()
}

View File

@ -30,7 +30,6 @@ use state_processing::per_block_processing::get_expected_withdrawals;
use state_processing::per_slot_processing; use state_processing::per_slot_processing;
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::time::Duration; use tokio::time::Duration;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::application_domain::ApplicationDomain; use types::application_domain::ApplicationDomain;
@ -70,7 +69,6 @@ struct ApiTester {
attester_slashing: AttesterSlashing<E>, attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing, proposer_slashing: ProposerSlashing,
voluntary_exit: SignedVoluntaryExit, voluntary_exit: SignedVoluntaryExit,
_server_shutdown: oneshot::Sender<()>,
network_rx: NetworkReceivers<E>, network_rx: NetworkReceivers<E>,
local_enr: Enr, local_enr: Enr,
external_peer_id: PeerId, external_peer_id: PeerId,
@ -236,11 +234,10 @@ impl ApiTester {
let ApiServer { let ApiServer {
server, server,
listening_socket: _, listening_socket: _,
shutdown_tx,
network_rx, network_rx,
local_enr, local_enr,
external_peer_id, external_peer_id,
} = create_api_server_on_port(chain.clone(), log, port).await; } = create_api_server_on_port(chain.clone(), &harness.runtime, log, port).await;
harness.runtime.task_executor.spawn(server, "api_server"); harness.runtime.task_executor.spawn(server, "api_server");
@ -268,7 +265,6 @@ impl ApiTester {
attester_slashing, attester_slashing,
proposer_slashing, proposer_slashing,
voluntary_exit, voluntary_exit,
_server_shutdown: shutdown_tx,
network_rx, network_rx,
local_enr, local_enr,
external_peer_id, external_peer_id,
@ -324,11 +320,10 @@ impl ApiTester {
let ApiServer { let ApiServer {
server, server,
listening_socket, listening_socket,
shutdown_tx,
network_rx, network_rx,
local_enr, local_enr,
external_peer_id, external_peer_id,
} = create_api_server(chain.clone(), log).await; } = create_api_server(chain.clone(), &harness.runtime, log).await;
harness.runtime.task_executor.spawn(server, "api_server"); harness.runtime.task_executor.spawn(server, "api_server");
@ -353,7 +348,6 @@ impl ApiTester {
attester_slashing, attester_slashing,
proposer_slashing, proposer_slashing,
voluntary_exit, voluntary_exit,
_server_shutdown: shutdown_tx,
network_rx, network_rx,
local_enr, local_enr,
external_peer_id, external_peer_id,

View File

@ -8,9 +8,9 @@ use beacon_chain::{
}; };
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};
use beacon_processor::{ use beacon_processor::{
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend,
GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, WorkEvent as BeaconWorkEvent,
}; };
use environment::null_logger; use environment::null_logger;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
@ -639,11 +639,15 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
pub fn null_for_testing( pub fn null_for_testing(
network_globals: Arc<NetworkGlobals<E>>, network_globals: Arc<NetworkGlobals<E>>,
) -> (Self, mpsc::Receiver<BeaconWorkEvent<E>>) { ) -> (Self, mpsc::Receiver<BeaconWorkEvent<E>>) {
let (beacon_processor_send, beacon_processor_receive) = let BeaconProcessorChannels {
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); beacon_processor_tx,
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx: _work_reprocessing_rx,
} = <_>::default();
let (network_tx, _network_rx) = mpsc::unbounded_channel(); let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
let (reprocess_tx, _reprocess_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let log = null_logger().unwrap(); let log = null_logger().unwrap();
let harness: BeaconChainHarness<TestBeaconChainType<E>> = let harness: BeaconChainHarness<TestBeaconChainType<E>> =
BeaconChainHarness::builder(E::default()) BeaconChainHarness::builder(E::default())
@ -656,18 +660,18 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
let runtime = TestRuntime::default(); let runtime = TestRuntime::default();
let network_beacon_processor = Self { let network_beacon_processor = Self {
beacon_processor_send: BeaconProcessorSend(beacon_processor_send), beacon_processor_send: beacon_processor_tx,
duplicate_cache: DuplicateCache::default(), duplicate_cache: DuplicateCache::default(),
chain: harness.chain, chain: harness.chain,
network_tx, network_tx,
sync_tx, sync_tx,
reprocess_tx, reprocess_tx: work_reprocessing_tx,
network_globals, network_globals,
invalid_block_storage: InvalidBlockStorage::Disabled, invalid_block_storage: InvalidBlockStorage::Disabled,
executor: runtime.task_executor.clone(), executor: runtime.task_executor.clone(),
log, log,
}; };
(network_beacon_processor, beacon_processor_receive) (network_beacon_processor, beacon_processor_rx)
} }
} }

View File

@ -11,7 +11,7 @@ use crate::{
use beacon_chain::test_utils::{ use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
}; };
use beacon_chain::{BeaconChain, ChainConfig, WhenSlotSkipped}; use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *}; use beacon_processor::{work_reprocessing_queue::*, *};
use lighthouse_network::discovery::ConnectionId; use lighthouse_network::discovery::ConnectionId;
use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::methods::BlobsByRangeRequest;
@ -74,17 +74,21 @@ struct TestRig {
impl Drop for TestRig { impl Drop for TestRig {
fn drop(&mut self) { fn drop(&mut self) {
// Causes the beacon processor to shutdown. // Causes the beacon processor to shutdown.
self.beacon_processor_tx = BeaconProcessorSend(mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN).0); let len = BeaconProcessorConfig::default().max_work_event_queue_len;
self.beacon_processor_tx = BeaconProcessorSend(mpsc::channel(len).0);
} }
} }
impl TestRig { impl TestRig {
pub async fn new(chain_length: u64) -> Self { pub async fn new(chain_length: u64) -> Self {
Self::new_with_chain_config(chain_length, ChainConfig::default()).await Self::new_parametric(
chain_length,
BeaconProcessorConfig::default().enable_backfill_rate_limiting,
)
.await
} }
pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self { pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self {
let mut spec = test_spec::<MainnetEthSpec>();
// This allows for testing voluntary exits without building out a massive chain. // This allows for testing voluntary exits without building out a massive chain.
spec.shard_committee_period = 2; spec.shard_committee_period = 2;
@ -93,7 +97,7 @@ impl TestRig {
.deterministic_keypairs(VALIDATOR_COUNT) .deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store() .fresh_ephemeral_store()
.mock_execution_layer() .mock_execution_layer()
.chain_config(chain_config) .chain_config(<_>::default())
.build(); .build();
harness.advance_slot(); harness.advance_slot();
@ -179,8 +183,15 @@ impl TestRig {
let log = harness.logger().clone(); let log = harness.logger().clone();
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); let mut beacon_processor_config = BeaconProcessorConfig::default();
let beacon_processor_tx = BeaconProcessorSend(beacon_processor_tx); beacon_processor_config.enable_backfill_rate_limiting = enable_backfill_rate_limiting;
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx,
} = BeaconProcessorChannels::new(&beacon_processor_config);
let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
// Default metadata // Default metadata
@ -203,8 +214,6 @@ impl TestRig {
let executor = harness.runtime.task_executor.clone(); let executor = harness.runtime.task_executor.clone();
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364); let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364);
let duplicate_cache = DuplicateCache::default(); let duplicate_cache = DuplicateCache::default();
@ -227,7 +236,7 @@ impl TestRig {
executor, executor,
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
enable_backfill_rate_limiting: harness.chain.config.enable_backfill_rate_limiting, config: beacon_processor_config,
log: log.clone(), log: log.clone(),
} }
.spawn_manager( .spawn_manager(
@ -1053,11 +1062,8 @@ async fn test_backfill_sync_processing() {
/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. /// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
#[tokio::test] #[tokio::test]
async fn test_backfill_sync_processing_rate_limiting_disabled() { async fn test_backfill_sync_processing_rate_limiting_disabled() {
let chain_config = ChainConfig { let enable_backfill_rate_limiting = false;
enable_backfill_rate_limiting: false, let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await;
..Default::default()
};
let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await;
for _ in 0..3 { for _ in 0..3 {
rig.enqueue_backfill_batch(); rig.enqueue_backfill_batch();

View File

@ -3,9 +3,9 @@
mod tests { mod tests {
use crate::persisted_dht::load_dht; use crate::persisted_dht::load_dht;
use crate::{NetworkConfig, NetworkService}; use crate::{NetworkConfig, NetworkService};
use beacon_chain::test_utils::EphemeralHarnessType; use beacon_chain::test_utils::BeaconChainHarness;
use beacon_processor::{ use beacon_processor::{
BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, BeaconProcessorChannels, BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
}; };
use lighthouse_network::Enr; use lighthouse_network::Enr;
use slog::{o, Drain, Level, Logger}; use slog::{o, Drain, Level, Logger};
@ -15,8 +15,6 @@ mod tests {
use tokio::{runtime::Runtime, sync::mpsc}; use tokio::{runtime::Runtime, sync::mpsc};
use types::MinimalEthSpec as E; use types::MinimalEthSpec as E;
type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness<EphemeralHarnessType<E>>;
fn get_logger(actual_log: bool) -> Logger { fn get_logger(actual_log: bool) -> Logger {
if actual_log { if actual_log {
let drain = { let drain = {
@ -72,17 +70,20 @@ mod tests {
// Create a new network service which implicitly gets dropped at the // Create a new network service which implicitly gets dropped at the
// end of the block. // end of the block.
let (beacon_processor_send, _beacon_processor_receive) = let BeaconProcessorChannels {
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); beacon_processor_tx,
let (beacon_processor_reprocess_tx, _beacon_processor_reprocess_rx) = beacon_processor_rx: _beacon_processor_rx,
mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); work_reprocessing_tx,
work_reprocessing_rx: _work_reprocessing_rx,
} = <_>::default();
let _network_service = NetworkService::start( let _network_service = NetworkService::start(
beacon_chain.clone(), beacon_chain.clone(),
&config, &config,
executor, executor,
None, None,
BeaconProcessorSend(beacon_processor_send), beacon_processor_tx,
beacon_processor_reprocess_tx, work_reprocessing_tx,
) )
.await .await
.unwrap(); .unwrap();

View File

@ -389,6 +389,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \ stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \
MAINNET.") MAINNET.")
) )
.arg(
Arg::with_name("http-enable-beacon-processor")
.long("http-enable-beacon-processor")
.value_name("BOOLEAN")
.help("The beacon processor is a scheduler which provides quality-of-service and \
DoS protection. When set to \"true\", HTTP API requests will be queued and scheduled \
alongside other tasks. When set to \"false\", HTTP API responses will be executed \
immediately.")
.takes_value(true)
.default_value("true")
)
/* Prometheus metrics HTTP server related arguments */ /* Prometheus metrics HTTP server related arguments */
.arg( .arg(
Arg::with_name("metrics") Arg::with_name("metrics")
@ -1183,4 +1194,55 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true) .takes_value(true)
.possible_values(ProgressiveBalancesMode::VARIANTS) .possible_values(ProgressiveBalancesMode::VARIANTS)
) )
.arg(
Arg::with_name("beacon-processor-max-workers")
.long("beacon-processor-max-workers")
.value_name("INTEGER")
.help("Specifies the maximum concurrent tasks for the task scheduler. Increasing \
this value may increase resource consumption. Reducing the value \
may result in decreased resource usage and diminished performance. The \
default value is the number of logical CPU cores on the host.")
.takes_value(true)
)
.arg(
Arg::with_name("beacon-processor-work-queue-len")
.long("beacon-processor-work-queue-len")
.value_name("INTEGER")
.help("Specifies the length of the inbound event queue. \
Higher values may prevent messages from being dropped while lower values \
may help protect the node from becoming overwhelmed.")
.default_value("16384")
.takes_value(true)
)
.arg(
Arg::with_name("beacon-processor-reprocess-queue-len")
.long("beacon-processor-reprocess-queue-len")
.value_name("INTEGER")
.help("Specifies the length of the queue for messages requiring delayed processing. \
Higher values may prevent messages from being dropped while lower values \
may help protect the node from becoming overwhelmed.")
.default_value("12288")
.takes_value(true)
)
.arg(
Arg::with_name("beacon-processor-attestation-batch-size")
.long("beacon-processor-attestation-batch-size")
.value_name("INTEGER")
.help("Specifies the number of gossip attestations in a signature verification batch. \
Higher values may reduce CPU usage in a healthy network whilst lower values may \
increase CPU usage in an unhealthy or hostile network.")
.default_value("64")
.takes_value(true)
)
.arg(
Arg::with_name("beacon-processor-aggregate-batch-size")
.long("beacon-processor-aggregate-batch-size")
.value_name("INTEGER")
.help("Specifies the number of gossip aggregate attestations in a signature \
verification batch. \
Higher values may reduce CPU usage in a healthy network while lower values may \
increase CPU usage in an unhealthy or hostile network.")
.default_value("64")
.takes_value(true)
)
} }

View File

@ -5,6 +5,7 @@ use beacon_chain::chain_config::{
use beacon_chain::TrustedSetup; use beacon_chain::TrustedSetup;
use clap::ArgMatches; use clap::ArgMatches;
use clap_utils::flags::DISABLE_MALLOC_TUNING_FLAG; use clap_utils::flags::DISABLE_MALLOC_TUNING_FLAG;
use clap_utils::parse_required;
use client::{ClientConfig, ClientGenesis}; use client::{ClientConfig, ClientGenesis};
use directory::{DEFAULT_BEACON_NODE_DIR, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR}; use directory::{DEFAULT_BEACON_NODE_DIR, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR};
use environment::RuntimeContext; use environment::RuntimeContext;
@ -149,6 +150,9 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true; client_config.http_api.allow_sync_stalled = true;
} }
client_config.http_api.enable_beacon_processor =
parse_required(cli_args, "http-enable-beacon-processor")?;
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size; client_config.chain.shuffling_cache_size = cache_size;
} }
@ -837,7 +841,7 @@ pub fn get_config<E: EthSpec>(
} }
// Backfill sync rate-limiting // Backfill sync rate-limiting
client_config.chain.enable_backfill_rate_limiting = client_config.beacon_processor.enable_backfill_rate_limiting =
!cli_args.is_present("disable-backfill-rate-limiting"); !cli_args.is_present("disable-backfill-rate-limiting");
if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")? if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")?
@ -851,6 +855,28 @@ pub fn get_config<E: EthSpec>(
client_config.chain.progressive_balances_mode = progressive_balances_mode; client_config.chain.progressive_balances_mode = progressive_balances_mode;
} }
if let Some(max_workers) = clap_utils::parse_optional(cli_args, "beacon-processor-max-workers")?
{
client_config.beacon_processor.max_workers = max_workers;
}
if client_config.beacon_processor.max_workers == 0 {
return Err("--beacon-processor-max-workers must be a non-zero value".to_string());
}
client_config.beacon_processor.max_work_event_queue_len =
clap_utils::parse_required(cli_args, "beacon-processor-work-queue-len")?;
client_config.beacon_processor.max_scheduled_work_queue_len =
clap_utils::parse_required(cli_args, "beacon-processor-reprocess-queue-len")?;
client_config
.beacon_processor
.max_gossip_attestation_batch_size =
clap_utils::parse_required(cli_args, "beacon-processor-attestation-batch-size")?;
client_config
.beacon_processor
.max_gossip_aggregate_batch_size =
clap_utils::parse_required(cli_args, "beacon-processor-aggregate-batch-size")?;
Ok(client_config) Ok(client_config)
} }

View File

@ -84,6 +84,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
let builder = ClientBuilder::new(context.eth_spec_instance.clone()) let builder = ClientBuilder::new(context.eth_spec_instance.clone())
.runtime_context(context) .runtime_context(context)
.chain_spec(spec) .chain_spec(spec)
.beacon_processor(client_config.beacon_processor.clone())
.http_api_config(client_config.http_api.clone()) .http_api_config(client_config.http_api.clone())
.disk_store( .disk_store(
&db_path, &db_path,

View File

@ -66,6 +66,7 @@ lighthouse_network = { path = "../beacon_node/lighthouse_network" }
sensitive_url = { path = "../common/sensitive_url" } sensitive_url = { path = "../common/sensitive_url" }
eth1 = { path = "../beacon_node/eth1" } eth1 = { path = "../beacon_node/eth1" }
eth2 = { path = "../common/eth2" } eth2 = { path = "../common/eth2" }
beacon_processor = { path = "../beacon_node/beacon_processor" }
[[test]] [[test]]
name = "lighthouse_tests" name = "lighthouse_tests"

View File

@ -5,6 +5,7 @@ use beacon_node::beacon_chain::chain_config::{
DisallowedReOrgOffsets, DEFAULT_RE_ORG_CUTOFF_DENOMINATOR, DisallowedReOrgOffsets, DEFAULT_RE_ORG_CUTOFF_DENOMINATOR,
DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD, DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD,
}; };
use beacon_processor::BeaconProcessorConfig;
use eth1::Eth1Endpoint; use eth1::Eth1Endpoint;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use std::fs::File; use std::fs::File;
@ -1118,13 +1119,13 @@ fn disable_backfill_rate_limiting_flag() {
CommandLineTest::new() CommandLineTest::new()
.flag("disable-backfill-rate-limiting", None) .flag("disable-backfill-rate-limiting", None)
.run_with_zero_port() .run_with_zero_port()
.with_config(|config| assert!(!config.chain.enable_backfill_rate_limiting)); .with_config(|config| assert!(!config.beacon_processor.enable_backfill_rate_limiting));
} }
#[test] #[test]
fn default_backfill_rate_limiting_flag() { fn default_backfill_rate_limiting_flag() {
CommandLineTest::new() CommandLineTest::new()
.run_with_zero_port() .run_with_zero_port()
.with_config(|config| assert!(config.chain.enable_backfill_rate_limiting)); .with_config(|config| assert!(config.beacon_processor.enable_backfill_rate_limiting));
} }
#[test] #[test]
fn default_boot_nodes() { fn default_boot_nodes() {
@ -1463,6 +1464,22 @@ fn http_allow_sync_stalled_flag() {
.with_config(|config| assert_eq!(config.http_api.allow_sync_stalled, true)); .with_config(|config| assert_eq!(config.http_api.allow_sync_stalled, true));
} }
#[test] #[test]
fn http_enable_beacon_processor() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.enable_beacon_processor, true));
CommandLineTest::new()
.flag("http-enable-beacon-processor", Some("true"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.enable_beacon_processor, true));
CommandLineTest::new()
.flag("http-enable-beacon-processor", Some("false"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.enable_beacon_processor, false));
}
#[test]
fn http_tls_flags() { fn http_tls_flags() {
let dir = TempDir::new().expect("Unable to create temporary directory"); let dir = TempDir::new().expect("Unable to create temporary directory");
CommandLineTest::new() CommandLineTest::new()
@ -2334,3 +2351,40 @@ fn progressive_balances_fast() {
) )
}); });
} }
#[test]
fn beacon_processor() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.beacon_processor, <_>::default()));
CommandLineTest::new()
.flag("beacon-processor-max-workers", Some("1"))
.flag("beacon-processor-work-queue-len", Some("2"))
.flag("beacon-processor-reprocess-queue-len", Some("3"))
.flag("beacon-processor-attestation-batch-size", Some("4"))
.flag("beacon-processor-aggregate-batch-size", Some("5"))
.flag("disable-backfill-rate-limiting", None)
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.beacon_processor,
BeaconProcessorConfig {
max_workers: 1,
max_work_event_queue_len: 2,
max_scheduled_work_queue_len: 3,
max_gossip_attestation_batch_size: 4,
max_gossip_aggregate_batch_size: 5,
enable_backfill_rate_limiting: false
}
)
});
}
#[test]
#[should_panic]
fn beacon_processor_zero_workers() {
CommandLineTest::new()
.flag("beacon-processor-max-workers", Some("0"))
.run_with_zero_port();
}

View File

@ -24,7 +24,7 @@ pub use execution_layer::test_utils::{
pub use validator_client::Config as ValidatorConfig; pub use validator_client::Config as ValidatorConfig;
/// The global timeout for HTTP requests to the beacon node. /// The global timeout for HTTP requests to the beacon node.
const HTTP_TIMEOUT: Duration = Duration::from_secs(4); const HTTP_TIMEOUT: Duration = Duration::from_secs(8);
/// The timeout for a beacon node to start up. /// The timeout for a beacon node to start up.
const STARTUP_TIMEOUT: Duration = Duration::from_secs(60); const STARTUP_TIMEOUT: Duration = Duration::from_secs(60);
@ -115,6 +115,11 @@ pub fn testing_client_config() -> ClientConfig {
genesis_time: now, genesis_time: now,
}; };
// Specify a constant count of beacon processor workers. Having this number
// too low can cause annoying HTTP timeouts, especially on Github runners
// with 2 logical CPUs.
client_config.beacon_processor.max_workers = 4;
client_config client_config
} }

View File

@ -43,3 +43,4 @@ beacon_chain = { path = "../beacon_node/beacon_chain" }
network = { path = "../beacon_node/network" } network = { path = "../beacon_node/network" }
testcontainers = "0.14.0" testcontainers = "0.14.0"
unused_port = { path = "../common/unused_port" } unused_port = { path = "../common/unused_port" }
task_executor = { path = "../common/task_executor" }

View File

@ -85,7 +85,6 @@ struct TesterBuilder {
pub harness: BeaconChainHarness<EphemeralHarnessType<E>>, pub harness: BeaconChainHarness<EphemeralHarnessType<E>>,
pub config: Config, pub config: Config,
_bn_network_rx: NetworkReceivers<E>, _bn_network_rx: NetworkReceivers<E>,
_bn_api_shutdown_tx: oneshot::Sender<()>,
} }
impl TesterBuilder { impl TesterBuilder {
@ -102,10 +101,14 @@ impl TesterBuilder {
let ApiServer { let ApiServer {
server, server,
listening_socket: bn_api_listening_socket, listening_socket: bn_api_listening_socket,
shutdown_tx: _bn_api_shutdown_tx,
network_rx: _bn_network_rx, network_rx: _bn_network_rx,
.. ..
} = create_api_server(harness.chain.clone(), harness.logger().clone()).await; } = create_api_server(
harness.chain.clone(),
&harness.runtime,
harness.logger().clone(),
)
.await;
tokio::spawn(server); tokio::spawn(server);
/* /*
@ -139,7 +142,6 @@ impl TesterBuilder {
harness, harness,
config, config,
_bn_network_rx, _bn_network_rx,
_bn_api_shutdown_tx,
} }
} }
pub async fn build(self, pool: PgPool) -> Tester { pub async fn build(self, pool: PgPool) -> Tester {
@ -186,7 +188,6 @@ impl TesterBuilder {
config: self.config, config: self.config,
updater, updater,
_bn_network_rx: self._bn_network_rx, _bn_network_rx: self._bn_network_rx,
_bn_api_shutdown_tx: self._bn_api_shutdown_tx,
_watch_shutdown_tx, _watch_shutdown_tx,
} }
} }
@ -204,7 +205,6 @@ struct Tester {
pub config: Config, pub config: Config,
pub updater: UpdateHandler<E>, pub updater: UpdateHandler<E>,
_bn_network_rx: NetworkReceivers<E>, _bn_network_rx: NetworkReceivers<E>,
_bn_api_shutdown_tx: oneshot::Sender<()>,
_watch_shutdown_tx: oneshot::Sender<()>, _watch_shutdown_tx: oneshot::Sender<()>,
} }