Fix beacon-processor-max-workers (#4636)

## Issue Addressed

Fixes a bug in the handling of `--beacon-process-max-workers` which caused it to have no effect.

## Proposed Changes

For this PR I channeled @ethDreamer and saw deep into the faulty CLI config -- this bug is almost identical to the one Mark found and fixed in #4622.
This commit is contained in:
Michael Sproul 2023-08-21 05:02:34 +00:00
parent b3e1c297c8
commit 524d9af288
7 changed files with 10 additions and 17 deletions

2
Cargo.lock generated
View File

@ -1053,7 +1053,6 @@ dependencies = [
"logging", "logging",
"monitoring_api", "monitoring_api",
"network", "network",
"num_cpus",
"operation_pool", "operation_pool",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"sensitive_url", "sensitive_url",
@ -5037,7 +5036,6 @@ dependencies = [
"logging", "logging",
"lru_cache", "lru_cache",
"matches", "matches",
"num_cpus",
"operation_pool", "operation_pool",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"rand 0.8.5", "rand 0.8.5",

View File

@ -708,7 +708,6 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
pub struct BeaconProcessor<E: EthSpec> { pub struct BeaconProcessor<E: EthSpec> {
pub network_globals: Arc<NetworkGlobals<E>>, pub network_globals: Arc<NetworkGlobals<E>>,
pub executor: TaskExecutor, pub executor: TaskExecutor,
pub max_workers: usize,
pub current_workers: usize, pub current_workers: usize,
pub config: BeaconProcessorConfig, pub config: BeaconProcessorConfig,
pub log: Logger, pub log: Logger,
@ -721,7 +720,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
/// - Performed immediately, if a worker is available. /// - Performed immediately, if a worker is available.
/// - Queued for later processing, if no worker is currently available. /// - Queued for later processing, if no worker is currently available.
/// ///
/// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task /// Only `self.config.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
/// started with `spawn_blocking`. /// started with `spawn_blocking`.
/// ///
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work /// The optional `work_journal_tx` allows for an outside process to receive a log of all work
@ -896,7 +895,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let _ = work_journal_tx.try_send(id); let _ = work_journal_tx.try_send(id);
} }
let can_spawn = self.current_workers < self.max_workers; let can_spawn = self.current_workers < self.config.max_workers;
let drop_during_sync = work_event let drop_during_sync = work_event
.as_ref() .as_ref()
.map_or(false, |event| event.drop_during_sync); .map_or(false, |event| event.drop_during_sync);

View File

@ -44,4 +44,3 @@ slasher_service = { path = "../../slasher/service" }
monitoring_api = {path = "../../common/monitoring_api"} monitoring_api = {path = "../../common/monitoring_api"}
execution_layer = { path = "../execution_layer" } execution_layer = { path = "../execution_layer" }
beacon_processor = { path = "../beacon_processor" } beacon_processor = { path = "../beacon_processor" }
num_cpus = "1.13.0"

View File

@ -29,7 +29,6 @@ use network::{NetworkConfig, NetworkSenders, NetworkService};
use slasher::Slasher; use slasher::Slasher;
use slasher_service::SlasherService; use slasher_service::SlasherService;
use slog::{debug, info, warn, Logger}; use slog::{debug, info, warn, Logger};
use std::cmp;
use std::net::TcpListener; use std::net::TcpListener;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
@ -757,7 +756,6 @@ where
BeaconProcessor { BeaconProcessor {
network_globals: network_globals.clone(), network_globals: network_globals.clone(),
executor: beacon_processor_context.executor.clone(), executor: beacon_processor_context.executor.clone(),
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
log: beacon_processor_context.log().clone(), log: beacon_processor_context.log().clone(),

View File

@ -184,7 +184,14 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
let eth1_service = let eth1_service =
eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap(); eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap();
let beacon_processor_config = BeaconProcessorConfig::default(); let beacon_processor_config = BeaconProcessorConfig {
// The number of workers must be greater than one. Tests which use the
// builder workflow sometimes require an internal HTTP request in order
// to fulfill an already in-flight HTTP request, therefore having only
// one worker will result in a deadlock.
max_workers: 2,
..BeaconProcessorConfig::default()
};
let BeaconProcessorChannels { let BeaconProcessorChannels {
beacon_processor_tx, beacon_processor_tx,
beacon_processor_rx, beacon_processor_rx,
@ -196,11 +203,6 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
BeaconProcessor { BeaconProcessor {
network_globals: network_globals.clone(), network_globals: network_globals.clone(),
executor: test_runtime.task_executor.clone(), executor: test_runtime.task_executor.clone(),
// The number of workers must be greater than one. Tests which use the
// builder workflow sometimes require an internal HTTP request in order
// to fulfill an already in-flight HTTP request, therefore having only
// one worker will result in a deadlock.
max_workers: 2,
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
log: log.clone(), log: log.clone(),

View File

@ -36,7 +36,6 @@ logging = { path = "../../common/logging" }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
igd = "0.12.1" igd = "0.12.1"
itertools = "0.10.0" itertools = "0.10.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" } lru_cache = { path = "../../common/lru_cache" }
if-addrs = "0.6.4" if-addrs = "0.6.4"
strum = "0.24.0" strum = "0.24.0"

View File

@ -20,7 +20,6 @@ use lighthouse_network::{
Client, MessageId, NetworkGlobals, PeerId, Client, MessageId, NetworkGlobals, PeerId,
}; };
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::cmp;
use std::iter::Iterator; use std::iter::Iterator;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -228,7 +227,6 @@ impl TestRig {
let beacon_processor = BeaconProcessor { let beacon_processor = BeaconProcessor {
network_globals, network_globals,
executor, executor,
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
log: log.clone(), log: log.clone(),