diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4d28326d1..4d5b98a82 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -516,7 +516,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let genesis_data = api_types::GenesisData { @@ -549,7 +549,7 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -570,7 +570,7 @@ pub fn serve( .clone() .and(warp::path("fork")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -591,7 +591,7 @@ pub fn serve( .clone() .and(warp::path("finality_checkpoints")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -627,7 +627,7 @@ pub fn serve( .and(warp::path("validator_balances")) .and(warp::path::end()) .and(multi_key_query::()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -685,7 +685,7 @@ pub fn serve( .and(warp::path("validators")) .and(warp::path::end()) .and(multi_key_query::()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -769,7 +769,7 @@ pub fn serve( )) })) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -837,7 +837,7 @@ pub fn serve( .and(warp::path("committees")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1020,7 +1020,7 @@ pub fn serve( .and(warp::path("sync_committees")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1086,7 +1086,7 @@ pub fn serve( .and(warp::path("randao")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1128,7 +1128,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::HeadersQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -1228,7 +1228,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1276,7 +1276,7 @@ pub fn serve( .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block: Arc>, task_spawner: TaskSpawner, chain: Arc>, @@ -1302,33 +1302,35 @@ pub fn serve( .and(warp::path("blocks")) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block = match SignedBeaconBlock::::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(e) => { - return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e))) - } - }; - publish_blocks::publish_block( - None, - ProvenancedBlock::local(Arc::new(block)), - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = + SignedBeaconBlock::::from_ssz_bytes(&block_bytes, &chain.spec) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + })?; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(Arc::new(block)), + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1349,8 +1351,8 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| { - task_spawner.spawn_async(Priority::P1, async move { - match publish_blocks::publish_block( + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + publish_blocks::publish_block( None, ProvenancedBlock::local(block), chain, @@ -1359,17 +1361,7 @@ pub fn serve( validation_level.broadcast_validation, ) .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + .map(|()| warp::reply().into_response()) }) }, ); @@ -1380,48 +1372,36 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( |validation_level: api_types::BroadcastValidationQuery, block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block = match SignedBeaconBlock::::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(_) => { - return warp::reply::with_status( - StatusCode::BAD_REQUEST, - eth2::StatusCode::BAD_REQUEST, - ) - .into_response(); - } - }; - match publish_blocks::publish_block( - None, - ProvenancedBlock::local(Arc::new(block)), - chain, - &network_tx, - log, - validation_level.broadcast_validation, - ) - .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = + SignedBeaconBlock::::from_ssz_bytes(&block_bytes, &chain.spec) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + })?; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(Arc::new(block)), + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1439,7 +1419,7 @@ pub fn serve( .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block: SignedBeaconBlock>, task_spawner: TaskSpawner, chain: Arc>, @@ -1460,33 +1440,29 @@ pub fn serve( ); // POST beacon/blocks - let post_beacon_blinded_blocks_ssz = - eth_v1 - .and(warp::path("beacon")) - .and(warp::path("blinded_blocks")) - .and(warp::path::end()) - .and(warp::body::bytes()) - .and(chain_filter.clone()) - .and(network_tx_filter.clone()) - .and(log_filter.clone()) - .and_then( - |block_bytes: Bytes, - chain: Arc>, - network_tx: UnboundedSender>, - log: Logger| async move { - let block = - match SignedBeaconBlock::>::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(e) => { - return Err(warp_utils::reject::custom_bad_request(format!( - "{:?}", - e - ))) - } - }; + let post_beacon_blinded_blocks_ssz = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blinded_blocks")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |block_bytes: Bytes, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = SignedBeaconBlock::>::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; publish_blocks::publish_blinded_block( block, chain, @@ -1496,8 +1472,9 @@ pub fn serve( ) .await .map(|()| warp::reply().into_response()) - }, - ); + }) + }, + ); let post_beacon_blinded_blocks_v2 = eth_v2 .and(warp::path("beacon")) @@ -1617,7 +1594,7 @@ pub fn serve( .clone() .and(warp::path::end()) .and(warp::header::optional::("accept")) - .and_then( + .then( |endpoint_version: EndpointVersion, block_id: BlockId, task_spawner: TaskSpawner, @@ -1660,7 +1637,7 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1680,7 +1657,7 @@ pub fn serve( .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1704,7 +1681,7 @@ pub fn serve( .and(chain_filter.clone()) .and(warp::path::end()) .and(warp::header::optional::("accept")) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>, @@ -1762,7 +1739,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, @@ -1904,7 +1881,7 @@ pub fn serve( .and(warp::path("attestations")) .and(warp::path::end()) .and(warp::query::()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { @@ -1937,7 +1914,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: AttesterSlashing, @@ -1979,7 +1956,7 @@ pub fn serve( .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_attester_slashings(); @@ -1995,7 +1972,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: ProposerSlashing, @@ -2037,7 +2014,7 @@ pub fn serve( .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_proposer_slashings(); @@ -2053,7 +2030,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, exit: SignedVoluntaryExit, @@ -2093,7 +2070,7 @@ pub fn serve( .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_voluntary_exits(); @@ -2110,7 +2087,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, signatures: Vec, @@ -2130,7 +2107,7 @@ pub fn serve( .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let address_changes = chain.op_pool.get_all_bls_to_execution_changes(); @@ -2147,7 +2124,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, address_changes: Vec, @@ -2239,7 +2216,7 @@ pub fn serve( .and(warp::header::optional::("accept")) .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then( + .then( |accept_header: Option, task_spawner: TaskSpawner, eth1_service: eth1::Service| { @@ -2293,7 +2270,7 @@ pub fn serve( .and(warp::path("blocks")) .and(block_id_or_err) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, block_id: BlockId| { @@ -2326,7 +2303,7 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, epoch: Epoch, @@ -2378,7 +2355,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, block_id: BlockId, @@ -2411,7 +2388,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let forks = ForkName::list_all() @@ -2430,7 +2407,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( move |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { let config_and_preset = @@ -2446,7 +2423,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -2477,7 +2454,7 @@ pub fn serve( .and(warp::header::optional::("accept")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, state_id: StateId, accept_header: Option, @@ -2537,7 +2514,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, task_spawner: TaskSpawner, chain: Arc>| { @@ -2576,7 +2553,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let beacon_fork_choice = chain.canonical_head.fork_choice_read_lock(); @@ -2631,7 +2608,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -2687,7 +2664,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>, chain: Arc>| { @@ -2738,7 +2715,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>, chain: Arc>| { @@ -2786,7 +2763,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |requested_peer_id: String, task_spawner: TaskSpawner, network_globals: Arc>| { @@ -2846,7 +2823,7 @@ pub fn serve( .and(multi_key_query::()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |query_res: Result, task_spawner: TaskSpawner, network_globals: Arc>| { @@ -2916,7 +2893,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -2969,7 +2946,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>, @@ -2995,7 +2972,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, slot: Slot, query: api_types::ValidatorBlocksQuery, @@ -3064,7 +3041,7 @@ pub fn serve( .and(warp::query::()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |slot: Slot, query: api_types::ValidatorBlocksQuery, task_spawner: TaskSpawner, @@ -3121,7 +3098,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::ValidatorAttestationDataQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -3156,7 +3133,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::ValidatorAggregateAttestationQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -3197,7 +3174,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, @@ -3223,7 +3200,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, @@ -3243,7 +3220,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |sync_committee_data: SyncContributionData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3277,7 +3254,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, aggregates: Vec>, @@ -3390,7 +3367,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, contributions: Vec>, @@ -3418,7 +3395,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |subscriptions: Vec, validator_subscription_tx: Sender, task_spawner: TaskSpawner, @@ -3470,7 +3447,7 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, log: Logger, @@ -3521,15 +3498,15 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, log: Logger, register_val_data: Vec| async { let (tx, rx) = oneshot::channel(); - task_spawner - .spawn_async_with_rejection(Priority::P0, async move { + let initial_result = task_spawner + .spawn_async_with_rejection_no_conversion(Priority::P0, async move { let execution_layer = chain .execution_layer .as_ref() @@ -3671,17 +3648,22 @@ pub fn serve( // from what is sent back down the channel. Ok(warp::reply::reply().into_response()) }) - .await?; + .await; + + if initial_result.is_err() { + return task_spawner::convert_rejection(initial_result).await; + } // Await a response from the builder without blocking a // `BeaconProcessor` worker. - rx.await.unwrap_or_else(|_| { + task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| { Ok(warp::reply::with_status( warp::reply::json(&"No response from channel"), eth2::StatusCode::INTERNAL_SERVER_ERROR, ) .into_response()) - }) + })) + .await }, ); // POST validator/sync_committee_subscriptions @@ -3694,7 +3676,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |subscriptions: Vec, validator_subscription_tx: Sender, task_spawner: TaskSpawner, @@ -3738,7 +3720,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: Vec, task_spawner: TaskSpawner, @@ -3779,7 +3761,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: api_types::LivenessRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3823,7 +3805,7 @@ pub fn serve( .and(warp::path("health")) .and(warp::path::end()) .and(task_spawner_filter.clone()) - .and_then(|task_spawner: TaskSpawner| { + .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P0, move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) @@ -3841,7 +3823,7 @@ pub fn serve( .and(app_start_filter) .and(data_dir_filter) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, sysinfo, app_start: std::time::Instant, @@ -3866,7 +3848,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { ui::get_validator_count(chain).map(api_types::GenericResponse::from) @@ -3882,7 +3864,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: ui::ValidatorMetricsRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3901,7 +3883,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: ui::ValidatorInfoRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3918,7 +3900,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { @@ -3934,7 +3916,7 @@ pub fn serve( .and(warp::path("nat")) .and(task_spawner_filter.clone()) .and(warp::path::end()) - .and_then(|task_spawner: TaskSpawner| { + .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( lighthouse_network::metrics::NAT_OPEN @@ -3952,7 +3934,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -3976,7 +3958,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -3999,7 +3981,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_response_task(Priority::P1, move || { Ok::<_, warp::Rejection>(warp::reply::json( @@ -4023,7 +4005,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, validator_id: ValidatorId, task_spawner: TaskSpawner, @@ -4043,7 +4025,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { validator_inclusion::global_validator_inclusion_data(epoch, &chain) @@ -4059,7 +4041,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let current_slot_opt = chain.slot().ok(); @@ -4092,7 +4074,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, eth1_service: eth1::Service| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -4114,7 +4096,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(eth1_service_filter) - .and_then( + .then( |task_spawner: TaskSpawner, eth1_service: eth1::Service| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -4139,7 +4121,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -4166,7 +4148,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { if chain.eth1_chain.is_some() { @@ -4190,7 +4172,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || database::info(chain)) }, @@ -4203,7 +4185,7 @@ pub fn serve( .and(not_while_syncing_filter) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { chain.store_migrator.process_reconstruction(); @@ -4220,7 +4202,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |blocks: Vec>>, task_spawner: TaskSpawner, chain: Arc>, @@ -4246,7 +4228,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then(|query, task_spawner: TaskSpawner, chain, log| { + .then(|query, task_spawner: TaskSpawner, chain, log| { task_spawner.blocking_json_task(Priority::P1, move || { block_rewards::get_block_rewards(query, chain, log) }) @@ -4261,7 +4243,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |blocks, task_spawner: TaskSpawner, chain, log| { task_spawner.blocking_json_task(Priority::P1, move || { block_rewards::compute_block_rewards(blocks, chain, log) @@ -4278,7 +4260,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |target, query, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { attestation_performance::get_attestation_performance(target, query, chain) @@ -4294,7 +4276,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { block_packing_efficiency::get_block_packing_efficiency(query, chain) @@ -4308,7 +4290,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.spawn_async_with_rejection(Priority::P1, async move { let merge_readiness = chain.check_merge_readiness().await; @@ -4326,7 +4308,7 @@ pub fn serve( .and(multi_key_query::()) .and(task_spawner_filter.clone()) .and(chain_filter) - .and_then( + .then( |topics_res: Result, task_spawner: TaskSpawner, chain: Arc>| { @@ -4403,7 +4385,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter) .and(sse_component_filter) - .and_then( + .then( |task_spawner: TaskSpawner, sse_component: Option| { task_spawner.blocking_response_task(Priority::P1, move || { if let Some(logging_components) = sse_component { diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index b4da67f77..503faff71 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -35,6 +35,24 @@ pub struct TaskSpawner { beacon_processor_send: Option>, } +/// Convert a warp `Rejection` into a `Response`. +/// +/// This function should *always* be used to convert rejections into responses. This prevents warp +/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404 +pub async fn convert_rejection(res: Result) -> Response { + match res { + Ok(response) => response.into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + warp::reply::json(&"unhandled error"), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } +} + impl TaskSpawner { pub fn new(beacon_processor_send: Option>) -> Self { Self { @@ -43,11 +61,7 @@ impl TaskSpawner { } /// Executes a "blocking" (non-async) task which returns a `Response`. - pub async fn blocking_response_task( - self, - priority: Priority, - func: F, - ) -> Result + pub async fn blocking_response_task(self, priority: Priority, func: F) -> Response where F: FnOnce() -> Result + Send + Sync + 'static, T: Reply + Send + 'static, @@ -65,31 +79,25 @@ impl TaskSpawner { }; // Send the function to the beacon processor for execution at some arbitrary time. - match send_to_beacon_processor( + let result = send_to_beacon_processor( beacon_processor_send, priority, BlockingOrAsync::Blocking(Box::new(process_fn)), rx, ) .await - { - Ok(result) => result.map(Reply::into_response), - Err(error_response) => Ok(error_response), - } + .and_then(|x| x); + convert_rejection(result).await } else { // There is no beacon processor so spawn a task directly on the // tokio executor. - warp_utils::task::blocking_response_task(func).await + convert_rejection(warp_utils::task::blocking_response_task(func).await).await } } /// Executes a "blocking" (non-async) task which returns a JSON-serializable /// object. - pub async fn blocking_json_task( - self, - priority: Priority, - func: F, - ) -> Result + pub async fn blocking_json_task(self, priority: Priority, func: F) -> Response where F: FnOnce() -> Result + Send + Sync + 'static, T: Serialize + Send + 'static, @@ -98,11 +106,26 @@ impl TaskSpawner { self.blocking_response_task(priority, func).await } - /// Executes an async task which may return a `warp::Rejection`. + /// Executes an async task which may return a `Rejection`, which will be converted to a response. pub async fn spawn_async_with_rejection( self, priority: Priority, func: impl Future> + Send + Sync + 'static, + ) -> Response { + let result = self + .spawn_async_with_rejection_no_conversion(priority, func) + .await; + convert_rejection(result).await + } + + /// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection. + /// + /// If you call this function you MUST convert the rejection to a response and not let it + /// propagate into Warp's filters. See `convert_rejection`. + pub async fn spawn_async_with_rejection_no_conversion( + self, + priority: Priority, + func: impl Future> + Send + Sync + 'static, ) -> Result { if let Some(beacon_processor_send) = &self.beacon_processor_send { // Create a wrapper future that will execute `func` and send the @@ -124,18 +147,16 @@ impl TaskSpawner { rx, ) .await - .unwrap_or_else(Result::Ok) + .and_then(|x| x) } else { // There is no beacon processor so spawn a task directly on the // tokio executor. - tokio::task::spawn(func).await.unwrap_or_else(|e| { - let response = warp::reply::with_status( - warp::reply::json(&format!("Tokio did not execute task: {e:?}")), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - Ok(response) - }) + tokio::task::spawn(func) + .await + .map_err(|_| { + warp_utils::reject::custom_server_error("Tokio failed to spawn task".into()) + }) + .and_then(|x| x) } } @@ -158,14 +179,14 @@ impl TaskSpawner { }; // Send the function to the beacon processor for execution at some arbitrary time. - send_to_beacon_processor( + let result = send_to_beacon_processor( beacon_processor_send, priority, BlockingOrAsync::Async(Box::pin(process_fn)), rx, ) - .await - .unwrap_or_else(|error_response| error_response) + .await; + convert_rejection(result).await } else { // There is no beacon processor so spawn a task directly on the // tokio executor. @@ -182,14 +203,14 @@ impl TaskSpawner { /// Send a task to the beacon processor and await execution. /// -/// If the task is not executed, return an `Err(response)` with an error message +/// If the task is not executed, return an `Err` with an error message /// for the API consumer. async fn send_to_beacon_processor( beacon_processor_send: &BeaconProcessorSend, priority: Priority, process_fn: BlockingOrAsync, rx: oneshot::Receiver, -) -> Result { +) -> Result { let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) { Ok(()) => { match rx.await { @@ -205,10 +226,7 @@ async fn send_to_beacon_processor( Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.", }; - let error_response = warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - Err(error_response) + Err(warp_utils::reject::custom_server_error( + error_message.to_string(), + )) }