diff --git a/.editorconfig b/.editorconfig index a14dd7a51..149415d12 100644 --- a/.editorconfig +++ b/.editorconfig @@ -6,4 +6,4 @@ end_of_line=lf charset=utf-8 trim_trailing_whitespace=true max_line_length=100 -insert_final_newline=false +insert_final_newline=true \ No newline at end of file diff --git a/.github/workflows/book.yml b/.github/workflows/book.yml index 598754368..db458a3db 100644 --- a/.github/workflows/book.yml +++ b/.github/workflows/book.yml @@ -5,6 +5,10 @@ on: branches: - unstable +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build-and-upload-to-s3: runs-on: ubuntu-20.04 diff --git a/.github/workflows/cancel-previous-runs.yml b/.github/workflows/cancel-previous-runs.yml deleted file mode 100644 index 2eaefa40c..000000000 --- a/.github/workflows/cancel-previous-runs.yml +++ /dev/null @@ -1,14 +0,0 @@ -name: cancel previous runs -on: [push] -jobs: - cancel: - name: 'Cancel Previous Runs' - runs-on: ubuntu-latest - timeout-minutes: 3 - steps: - # https://github.com/styfle/cancel-workflow-action/releases - - uses: styfle/cancel-workflow-action@514c783324374c6940d1b92bfb962d0763d22de3 # 0.7.0 - with: - # https://api.github.com/repos/sigp/lighthouse/actions/workflows - workflow_id: 697364,2434944,4462424,308241,2883401,316 - access_token: ${{ github.token }} diff --git a/.github/workflows/docker-antithesis.yml b/.github/workflows/docker-antithesis.yml index 84f5541a3..a96431faf 100644 --- a/.github/workflows/docker-antithesis.yml +++ b/.github/workflows/docker-antithesis.yml @@ -5,6 +5,10 @@ on: branches: - unstable +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: ANTITHESIS_PASSWORD: ${{ secrets.ANTITHESIS_PASSWORD }} ANTITHESIS_USERNAME: ${{ secrets.ANTITHESIS_USERNAME }} diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index d0a7ba4fd..fd5799a50 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -9,6 +9,10 @@ on: tags: - v* +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} diff --git a/.github/workflows/linkcheck.yml b/.github/workflows/linkcheck.yml index 8428c0a3b..19236691f 100644 --- a/.github/workflows/linkcheck.yml +++ b/.github/workflows/linkcheck.yml @@ -9,6 +9,10 @@ on: - 'book/**' merge_group: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: linkcheck: name: Check broken links diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index ea4c1e248..1269aee62 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -8,6 +8,10 @@ on: pull_request: merge_group: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: run-local-testnet: strategy: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 30e4211b8..e38b03daf 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -5,6 +5,10 @@ on: tags: - v* +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 2741f59e7..a7e7792fe 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -9,6 +9,11 @@ on: - 'pr/*' pull_request: merge_group: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: # Deny warnings in CI # Disable debug info (see https://github.com/sigp/lighthouse/issues/4005) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 88f1c5ddd..6f0e66112 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -179,7 +179,7 @@ pub enum WhenSlotSkipped { /// /// This is how the HTTP API behaves. None, - /// If the slot it a skip slot, return the previous non-skipped block. + /// If the slot is a skip slot, return the previous non-skipped block. /// /// This is generally how the specification behaves. Prev, diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index e76a5a805..eae71bd63 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -135,7 +135,7 @@ impl BeaconProposerCache { /// Compute the proposer duties using the head state without cache. pub fn compute_proposer_duties_from_head( - current_epoch: Epoch, + request_epoch: Epoch, chain: &BeaconChain, ) -> Result<(Vec, Hash256, ExecutionStatus, Fork), BeaconChainError> { // Atomically collect information about the head whilst holding the canonical head `Arc` as @@ -159,7 +159,7 @@ pub fn compute_proposer_duties_from_head( .ok_or(BeaconChainError::HeadMissingFromForkChoice(head_block_root))?; // Advance the state into the requested epoch. - ensure_state_is_in_epoch(&mut state, head_state_root, current_epoch, &chain.spec)?; + ensure_state_is_in_epoch(&mut state, head_state_root, request_epoch, &chain.spec)?; let indices = state .get_beacon_proposer_indices(&chain.spec) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 6675b7826..9850bd2d0 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -81,7 +81,7 @@ const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] = /// A payload alongside some information about where it came from. pub enum ProvenancedPayload

{ - /// A good ol' fashioned farm-to-table payload from your local EE. + /// A good old fashioned farm-to-table payload from your local EE. Local(P), /// A payload from a builder (e.g. mev-boost). Builder(P), diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 300ebfa49..4a58a0dea 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -516,7 +516,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let genesis_data = api_types::GenesisData { @@ -549,7 +549,7 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -570,7 +570,7 @@ pub fn serve( .clone() .and(warp::path("fork")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -591,7 +591,7 @@ pub fn serve( .clone() .and(warp::path("finality_checkpoints")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -627,7 +627,7 @@ pub fn serve( .and(warp::path("validator_balances")) .and(warp::path::end()) .and(multi_key_query::()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -685,7 +685,7 @@ pub fn serve( .and(warp::path("validators")) .and(warp::path::end()) .and(multi_key_query::()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -769,7 +769,7 @@ pub fn serve( )) })) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -837,7 +837,7 @@ pub fn serve( .and(warp::path("committees")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1020,7 +1020,7 @@ pub fn serve( .and(warp::path("sync_committees")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1086,7 +1086,7 @@ pub fn serve( .and(warp::path("randao")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1128,7 +1128,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::HeadersQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -1228,7 +1228,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1276,7 +1276,7 @@ pub fn serve( .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block_contents: SignedBlockContents, task_spawner: TaskSpawner, chain: Arc>, @@ -1302,33 +1302,35 @@ pub fn serve( .and(warp::path("blocks")) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block_contents = match SignedBlockContents::::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(e) => { - return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e))) - } - }; - publish_blocks::publish_block( - None, - ProvenancedBlock::local(block_contents), - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block_contents = SignedBlockContents::::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(block_contents), + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1349,8 +1351,8 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| { - task_spawner.spawn_async(Priority::P1, async move { - match publish_blocks::publish_block( + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + publish_blocks::publish_block( None, ProvenancedBlock::local(block_contents), chain, @@ -1359,17 +1361,7 @@ pub fn serve( validation_level.broadcast_validation, ) .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + .map(|()| warp::reply().into_response()) }) }, ); @@ -1380,53 +1372,41 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( |validation_level: api_types::BroadcastValidationQuery, block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block_contents = match SignedBlockContents::::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(_) => { - return warp::reply::with_status( - StatusCode::BAD_REQUEST, - eth2::StatusCode::BAD_REQUEST, - ) - .into_response(); - } - }; - match publish_blocks::publish_block( - None, - ProvenancedBlock::local(block_contents), - chain, - &network_tx, - log, - validation_level.broadcast_validation, - ) - .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block_contents = SignedBlockContents::::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(block_contents), + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); /* - * beacon/blocks + * beacon/blinded_blocks */ // POST beacon/blinded_blocks @@ -1439,7 +1419,7 @@ pub fn serve( .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block_contents: SignedBlockContents>, task_spawner: TaskSpawner, chain: Arc>, @@ -1465,33 +1445,35 @@ pub fn serve( .and(warp::path("blinded_blocks")) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block = - match SignedBlockContents::>::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(e) => { - return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e))) - } - }; - publish_blocks::publish_blinded_block( - block, - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = + SignedBlockContents::>::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; + publish_blocks::publish_blinded_block( + block, + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1612,7 +1594,7 @@ pub fn serve( .clone() .and(warp::path::end()) .and(warp::header::optional::("accept")) - .and_then( + .then( |endpoint_version: EndpointVersion, block_id: BlockId, task_spawner: TaskSpawner, @@ -1655,7 +1637,7 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1675,7 +1657,7 @@ pub fn serve( .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1699,7 +1681,7 @@ pub fn serve( .and(chain_filter.clone()) .and(warp::path::end()) .and(warp::header::optional::("accept")) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>, @@ -1798,7 +1780,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, @@ -1940,7 +1922,7 @@ pub fn serve( .and(warp::path("attestations")) .and(warp::path::end()) .and(warp::query::()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { @@ -1973,7 +1955,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: AttesterSlashing, @@ -2015,7 +1997,7 @@ pub fn serve( .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_attester_slashings(); @@ -2031,7 +2013,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: ProposerSlashing, @@ -2073,7 +2055,7 @@ pub fn serve( .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_proposer_slashings(); @@ -2089,7 +2071,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, exit: SignedVoluntaryExit, @@ -2129,7 +2111,7 @@ pub fn serve( .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_voluntary_exits(); @@ -2146,7 +2128,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, signatures: Vec, @@ -2166,7 +2148,7 @@ pub fn serve( .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let address_changes = chain.op_pool.get_all_bls_to_execution_changes(); @@ -2183,7 +2165,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, address_changes: Vec, @@ -2275,7 +2257,7 @@ pub fn serve( .and(warp::header::optional::("accept")) .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then( + .then( |accept_header: Option, task_spawner: TaskSpawner, eth1_service: eth1::Service| { @@ -2329,7 +2311,7 @@ pub fn serve( .and(warp::path("blocks")) .and(block_id_or_err) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, block_id: BlockId| { @@ -2362,7 +2344,7 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, epoch: Epoch, @@ -2414,7 +2396,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, block_id: BlockId, @@ -2447,7 +2429,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let forks = ForkName::list_all() @@ -2466,7 +2448,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( move |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { let config_and_preset = @@ -2482,7 +2464,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -2513,7 +2495,7 @@ pub fn serve( .and(warp::header::optional::("accept")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, state_id: StateId, accept_header: Option, @@ -2573,7 +2555,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, task_spawner: TaskSpawner, chain: Arc>| { @@ -2612,7 +2594,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let beacon_fork_choice = chain.canonical_head.fork_choice_read_lock(); @@ -2667,7 +2649,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -2723,7 +2705,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>, chain: Arc>| { @@ -2774,7 +2756,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>, chain: Arc>| { @@ -2822,7 +2804,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |requested_peer_id: String, task_spawner: TaskSpawner, network_globals: Arc>| { @@ -2882,7 +2864,7 @@ pub fn serve( .and(multi_key_query::()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |query_res: Result, task_spawner: TaskSpawner, network_globals: Arc>| { @@ -2952,7 +2934,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -3005,7 +2987,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>, @@ -3031,7 +3013,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, slot: Slot, query: api_types::ValidatorBlocksQuery, @@ -3103,7 +3085,7 @@ pub fn serve( .and(warp::query::()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |slot: Slot, query: api_types::ValidatorBlocksQuery, task_spawner: TaskSpawner, @@ -3166,7 +3148,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::ValidatorAttestationDataQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -3201,7 +3183,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::ValidatorAggregateAttestationQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -3242,7 +3224,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, @@ -3253,7 +3235,7 @@ pub fn serve( }, ); - // POST validator/duties/sync + // POST validator/duties/sync/{epoch} let post_validator_duties_sync = eth_v1 .and(warp::path("validator")) .and(warp::path("duties")) @@ -3268,7 +3250,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, @@ -3288,7 +3270,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |sync_committee_data: SyncContributionData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3322,7 +3304,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, aggregates: Vec>, @@ -3435,7 +3417,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, contributions: Vec>, @@ -3463,7 +3445,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |subscriptions: Vec, validator_subscription_tx: Sender, task_spawner: TaskSpawner, @@ -3515,7 +3497,7 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, log: Logger, @@ -3566,15 +3548,15 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, log: Logger, register_val_data: Vec| async { let (tx, rx) = oneshot::channel(); - task_spawner - .spawn_async_with_rejection(Priority::P0, async move { + let initial_result = task_spawner + .spawn_async_with_rejection_no_conversion(Priority::P0, async move { let execution_layer = chain .execution_layer .as_ref() @@ -3716,17 +3698,22 @@ pub fn serve( // from what is sent back down the channel. Ok(warp::reply::reply().into_response()) }) - .await?; + .await; + + if initial_result.is_err() { + return task_spawner::convert_rejection(initial_result).await; + } // Await a response from the builder without blocking a // `BeaconProcessor` worker. - rx.await.unwrap_or_else(|_| { + task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| { Ok(warp::reply::with_status( warp::reply::json(&"No response from channel"), eth2::StatusCode::INTERNAL_SERVER_ERROR, ) .into_response()) - }) + })) + .await }, ); // POST validator/sync_committee_subscriptions @@ -3739,7 +3726,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |subscriptions: Vec, validator_subscription_tx: Sender, task_spawner: TaskSpawner, @@ -3783,7 +3770,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: Vec, task_spawner: TaskSpawner, @@ -3824,7 +3811,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: api_types::LivenessRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3868,7 +3855,7 @@ pub fn serve( .and(warp::path("health")) .and(warp::path::end()) .and(task_spawner_filter.clone()) - .and_then(|task_spawner: TaskSpawner| { + .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P0, move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) @@ -3886,7 +3873,7 @@ pub fn serve( .and(app_start_filter) .and(data_dir_filter) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, sysinfo, app_start: std::time::Instant, @@ -3911,7 +3898,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { ui::get_validator_count(chain).map(api_types::GenericResponse::from) @@ -3927,7 +3914,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: ui::ValidatorMetricsRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3946,7 +3933,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: ui::ValidatorInfoRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3963,7 +3950,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { @@ -3979,7 +3966,7 @@ pub fn serve( .and(warp::path("nat")) .and(task_spawner_filter.clone()) .and(warp::path::end()) - .and_then(|task_spawner: TaskSpawner| { + .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( lighthouse_network::metrics::NAT_OPEN @@ -3997,7 +3984,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -4021,7 +4008,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -4044,7 +4031,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_response_task(Priority::P1, move || { Ok::<_, warp::Rejection>(warp::reply::json( @@ -4068,7 +4055,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, validator_id: ValidatorId, task_spawner: TaskSpawner, @@ -4088,7 +4075,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { validator_inclusion::global_validator_inclusion_data(epoch, &chain) @@ -4104,7 +4091,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let current_slot_opt = chain.slot().ok(); @@ -4137,7 +4124,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, eth1_service: eth1::Service| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -4159,7 +4146,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(eth1_service_filter) - .and_then( + .then( |task_spawner: TaskSpawner, eth1_service: eth1::Service| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -4184,7 +4171,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -4211,7 +4198,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { if chain.eth1_chain.is_some() { @@ -4235,7 +4222,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || database::info(chain)) }, @@ -4248,7 +4235,7 @@ pub fn serve( .and(not_while_syncing_filter) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { chain.store_migrator.process_reconstruction(); @@ -4266,7 +4253,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then(|query, task_spawner: TaskSpawner, chain, log| { + .then(|query, task_spawner: TaskSpawner, chain, log| { task_spawner.blocking_json_task(Priority::P1, move || { block_rewards::get_block_rewards(query, chain, log) }) @@ -4281,7 +4268,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |blocks, task_spawner: TaskSpawner, chain, log| { task_spawner.blocking_json_task(Priority::P1, move || { block_rewards::compute_block_rewards(blocks, chain, log) @@ -4298,7 +4285,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |target, query, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { attestation_performance::get_attestation_performance(target, query, chain) @@ -4314,7 +4301,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { block_packing_efficiency::get_block_packing_efficiency(query, chain) @@ -4328,7 +4315,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.spawn_async_with_rejection(Priority::P1, async move { let merge_readiness = chain.check_merge_readiness().await; @@ -4346,7 +4333,7 @@ pub fn serve( .and(multi_key_query::()) .and(task_spawner_filter.clone()) .and(chain_filter) - .and_then( + .then( |topics_res: Result, task_spawner: TaskSpawner, chain: Arc>| { @@ -4423,7 +4410,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter) .and(sse_component_filter) - .and_then( + .then( |task_spawner: TaskSpawner, sse_component: Option| { task_spawner.blocking_response_task(Priority::P1, move || { if let Some(logging_components) = sse_component { diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index b4da67f77..503faff71 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -35,6 +35,24 @@ pub struct TaskSpawner { beacon_processor_send: Option>, } +/// Convert a warp `Rejection` into a `Response`. +/// +/// This function should *always* be used to convert rejections into responses. This prevents warp +/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404 +pub async fn convert_rejection(res: Result) -> Response { + match res { + Ok(response) => response.into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + warp::reply::json(&"unhandled error"), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } +} + impl TaskSpawner { pub fn new(beacon_processor_send: Option>) -> Self { Self { @@ -43,11 +61,7 @@ impl TaskSpawner { } /// Executes a "blocking" (non-async) task which returns a `Response`. - pub async fn blocking_response_task( - self, - priority: Priority, - func: F, - ) -> Result + pub async fn blocking_response_task(self, priority: Priority, func: F) -> Response where F: FnOnce() -> Result + Send + Sync + 'static, T: Reply + Send + 'static, @@ -65,31 +79,25 @@ impl TaskSpawner { }; // Send the function to the beacon processor for execution at some arbitrary time. - match send_to_beacon_processor( + let result = send_to_beacon_processor( beacon_processor_send, priority, BlockingOrAsync::Blocking(Box::new(process_fn)), rx, ) .await - { - Ok(result) => result.map(Reply::into_response), - Err(error_response) => Ok(error_response), - } + .and_then(|x| x); + convert_rejection(result).await } else { // There is no beacon processor so spawn a task directly on the // tokio executor. - warp_utils::task::blocking_response_task(func).await + convert_rejection(warp_utils::task::blocking_response_task(func).await).await } } /// Executes a "blocking" (non-async) task which returns a JSON-serializable /// object. - pub async fn blocking_json_task( - self, - priority: Priority, - func: F, - ) -> Result + pub async fn blocking_json_task(self, priority: Priority, func: F) -> Response where F: FnOnce() -> Result + Send + Sync + 'static, T: Serialize + Send + 'static, @@ -98,11 +106,26 @@ impl TaskSpawner { self.blocking_response_task(priority, func).await } - /// Executes an async task which may return a `warp::Rejection`. + /// Executes an async task which may return a `Rejection`, which will be converted to a response. pub async fn spawn_async_with_rejection( self, priority: Priority, func: impl Future> + Send + Sync + 'static, + ) -> Response { + let result = self + .spawn_async_with_rejection_no_conversion(priority, func) + .await; + convert_rejection(result).await + } + + /// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection. + /// + /// If you call this function you MUST convert the rejection to a response and not let it + /// propagate into Warp's filters. See `convert_rejection`. + pub async fn spawn_async_with_rejection_no_conversion( + self, + priority: Priority, + func: impl Future> + Send + Sync + 'static, ) -> Result { if let Some(beacon_processor_send) = &self.beacon_processor_send { // Create a wrapper future that will execute `func` and send the @@ -124,18 +147,16 @@ impl TaskSpawner { rx, ) .await - .unwrap_or_else(Result::Ok) + .and_then(|x| x) } else { // There is no beacon processor so spawn a task directly on the // tokio executor. - tokio::task::spawn(func).await.unwrap_or_else(|e| { - let response = warp::reply::with_status( - warp::reply::json(&format!("Tokio did not execute task: {e:?}")), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - Ok(response) - }) + tokio::task::spawn(func) + .await + .map_err(|_| { + warp_utils::reject::custom_server_error("Tokio failed to spawn task".into()) + }) + .and_then(|x| x) } } @@ -158,14 +179,14 @@ impl TaskSpawner { }; // Send the function to the beacon processor for execution at some arbitrary time. - send_to_beacon_processor( + let result = send_to_beacon_processor( beacon_processor_send, priority, BlockingOrAsync::Async(Box::pin(process_fn)), rx, ) - .await - .unwrap_or_else(|error_response| error_response) + .await; + convert_rejection(result).await } else { // There is no beacon processor so spawn a task directly on the // tokio executor. @@ -182,14 +203,14 @@ impl TaskSpawner { /// Send a task to the beacon processor and await execution. /// -/// If the task is not executed, return an `Err(response)` with an error message +/// If the task is not executed, return an `Err` with an error message /// for the API consumer. async fn send_to_beacon_processor( beacon_processor_send: &BeaconProcessorSend, priority: Priority, process_fn: BlockingOrAsync, rx: oneshot::Receiver, -) -> Result { +) -> Result { let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) { Ok(()) => { match rx.await { @@ -205,10 +226,7 @@ async fn send_to_beacon_processor( Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.", }; - let error_response = warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - Err(error_response) + Err(warp_utils::reject::custom_server_error( + error_message.to_string(), + )) } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 0f8ddc53c..82a371d8a 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -7,9 +7,9 @@ pub(crate) mod enr; pub mod enr_ext; // Allow external use of the lighthouse ENR builder -use crate::metrics; use crate::service::TARGET_SUBNET_PEERS; use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery}; +use crate::{metrics, ClearDialError}; use discv5::{enr::NodeId, Discv5, Discv5Event}; pub use enr::{ build_enr, create_enr_builder_from_config, load_enr_from_disk, use_or_load_enr, CombinedKey, @@ -1111,7 +1111,7 @@ impl Discovery { | DialError::Transport(_) | DialError::WrongPeerId { .. } => { // set peer as disconnected in discovery DHT - debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id); + debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id, "error" => %ClearDialError(error)); self.disconnect_peer(&peer_id); } DialError::DialPeerConditionFalse(_) | DialError::Aborted => {} diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index 3d539af3b..7467fb7f0 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -17,6 +17,7 @@ pub mod rpc; pub mod types; pub use config::gossip_max_size; +use libp2p::swarm::DialError; pub use listen_addr::*; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; @@ -63,6 +64,46 @@ impl<'de> Deserialize<'de> for PeerIdSerialized { } } +// A wrapper struct that prints a dial error nicely. +struct ClearDialError<'a>(&'a DialError); + +impl<'a> ClearDialError<'a> { + fn most_inner_error(err: &(dyn std::error::Error)) -> &(dyn std::error::Error) { + let mut current = err; + while let Some(source) = current.source() { + current = source; + } + current + } +} + +impl<'a> std::fmt::Display for ClearDialError<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + match &self.0 { + DialError::Transport(errors) => { + for (_, transport_error) in errors { + match transport_error { + libp2p::TransportError::MultiaddrNotSupported(multiaddr_error) => { + write!(f, "Multiaddr not supported: {multiaddr_error}")?; + } + libp2p::TransportError::Other(other_error) => { + let inner_error = ClearDialError::most_inner_error(other_error); + write!(f, "Transport error: {inner_error}")?; + } + } + } + Ok(()) + } + DialError::LocalPeerId { .. } => write!(f, "The peer being dialed is the local peer."), + DialError::NoAddresses => write!(f, "No addresses for the peer to dial."), + DialError::DialPeerConditionFalse(_) => write!(f, "PeerCondition evaluation failed."), + DialError::Aborted => write!(f, "Connection aborted."), + DialError::WrongPeerId { .. } => write!(f, "Wrong peer id."), + DialError::Denied { cause } => write!(f, "Connection denied: {:?}", cause), + } + } +} + pub use crate::types::{ error, Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, SubnetDiscovery, diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index ce374bb9a..70f421681 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -12,9 +12,9 @@ use libp2p::swarm::{ConnectionId, NetworkBehaviour, PollParameters, ToSwarm}; use slog::{debug, error}; use types::EthSpec; -use crate::metrics; use crate::rpc::GoodbyeReason; use crate::types::SyncState; +use crate::{metrics, ClearDialError}; use super::peerdb::BanResult; use super::{ConnectingType, PeerManager, PeerManagerEvent, ReportSource}; @@ -132,7 +132,7 @@ impl NetworkBehaviour for PeerManager { error, connection_id: _, }) => { - debug!(self.log, "Failed to dial peer"; "peer_id"=> ?peer_id, "error" => %error); + debug!(self.log, "Failed to dial peer"; "peer_id"=> ?peer_id, "error" => %ClearDialError(error)); self.on_dial_failure(peer_id); } FromSwarm::ExternalAddrConfirmed(_) => { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d8286c29f..dda30a1bf 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -536,7 +536,7 @@ impl SyncManager { // If we would otherwise be synced, first check if we need to perform or // complete a backfill sync. - #[cfg(not(feature = "disable_backfill"))] + #[cfg(not(feature = "disable-backfill"))] if matches!(sync_state, SyncState::Synced) { // Determine if we need to start/resume/restart a backfill sync. match self.backfill_sync.start(&mut self.network) { @@ -561,7 +561,7 @@ impl SyncManager { } Some((RangeSyncType::Finalized, start_slot, target_slot)) => { // If there is a backfill sync in progress pause it. - #[cfg(not(feature = "disable_backfill"))] + #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); SyncState::SyncingFinalized { @@ -571,7 +571,7 @@ impl SyncManager { } Some((RangeSyncType::Head, start_slot, target_slot)) => { // If there is a backfill sync in progress pause it. - #[cfg(not(feature = "disable_backfill"))] + #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); SyncState::SyncingHead { diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs index 5b9d4258e..9e4c424bd 100644 --- a/beacon_node/operation_pool/src/reward_cache.rs +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -12,7 +12,7 @@ struct Initialization { #[derive(Debug, Clone, Default)] pub struct RewardCache { initialization: Option, - /// `BitVec` of validator indices which don't have default participation flags for the prev. epoch. + /// `BitVec` of validator indices which don't have default participation flags for the prev epoch. /// /// We choose to only track whether validators have *any* participation flag set because /// it's impossible to include a new attestation which is better than the existing participation diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 536100a02..d20c7eadb 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -290,9 +290,10 @@ pub enum AttestationFromBlock { False, } -/// Parameters which are cached between calls to `Self::get_head`. +/// Parameters which are cached between calls to `ForkChoice::get_head`. #[derive(Clone, Copy)] pub struct ForkchoiceUpdateParameters { + /// The most recent result of running `ForkChoice::get_head`. pub head_root: Hash256, pub head_hash: Option, pub justified_hash: Option, @@ -325,8 +326,6 @@ pub struct ForkChoice { queued_attestations: Vec, /// Stores a cache of the values required to be sent to the execution layer. forkchoice_update_parameters: ForkchoiceUpdateParameters, - /// The most recent result of running `Self::get_head`. - head_block_root: Hash256, _phantom: PhantomData, } @@ -412,14 +411,13 @@ where head_hash: None, justified_hash: None, finalized_hash: None, + // This will be updated during the next call to `Self::get_head`. head_root: Hash256::zero(), }, - // This will be updated during the next call to `Self::get_head`. - head_block_root: Hash256::zero(), _phantom: PhantomData, }; - // Ensure that `fork_choice.head_block_root` is updated. + // Ensure that `fork_choice.forkchoice_update_parameters.head_root` is updated. fork_choice.get_head(current_slot, spec)?; Ok(fork_choice) @@ -468,13 +466,10 @@ where // for lower slots to account for skip slots. .find(|(_, slot)| *slot <= ancestor_slot) .map(|(root, _)| root)), - Ordering::Less => Ok(Some(block_root)), - Ordering::Equal => // Root is older than queried slot, thus a skip slot. Return most recent root prior // to slot. - { - Ok(Some(block_root)) - } + Ordering::Less => Ok(Some(block_root)), + Ordering::Equal => Ok(Some(block_root)), } } @@ -507,8 +502,6 @@ where spec, )?; - self.head_block_root = head_root; - // Cache some values for the next forkchoiceUpdate call to the execution layer. let head_hash = self .get_block(&head_root) @@ -612,7 +605,7 @@ where /// have *differing* finalized and justified information. pub fn cached_fork_choice_view(&self) -> ForkChoiceView { ForkChoiceView { - head_block_root: self.head_block_root, + head_block_root: self.forkchoice_update_parameters.head_root, justified_checkpoint: self.justified_checkpoint(), finalized_checkpoint: self.finalized_checkpoint(), } @@ -1524,10 +1517,9 @@ where head_hash: None, justified_hash: None, finalized_hash: None, + // Will be updated in the following call to `Self::get_head`. head_root: Hash256::zero(), }, - // Will be updated in the following call to `Self::get_head`. - head_block_root: Hash256::zero(), _phantom: PhantomData, }; diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 88111b461..7b6afb94f 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -884,7 +884,7 @@ impl ProtoArray { } } else { // Choose the winner by weight. - if child.weight >= best_child.weight { + if child.weight > best_child.weight { change_to_child } else { no_change @@ -910,7 +910,7 @@ impl ProtoArray { Ok(()) } - /// Indicates if the node itself is viable for the head, or if it's best descendant is viable + /// Indicates if the node itself is viable for the head, or if its best descendant is viable /// for the head. fn node_leads_to_viable_head( &self, diff --git a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs index a5caddd04..d67e7874c 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs @@ -111,8 +111,8 @@ impl SingleEpochParticipationCache { current_epoch: Epoch, relative_epoch: RelativeEpoch, ) -> Result<(), BeaconStateError> { - let val_balance = state.get_effective_balance(val_index)?; let validator = state.get_validator(val_index)?; + let val_balance = validator.effective_balance; // Sanity check to ensure the validator is active. let epoch = relative_epoch.into_epoch(current_epoch); diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 53915b52d..fc7ab8d52 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -344,18 +344,6 @@ impl EnvironmentBuilder { Ok(self) } - /// Optionally adds a network configuration to the environment. - pub fn optional_eth2_network_config( - self, - optional_config: Option, - ) -> Result { - if let Some(config) = optional_config { - self.eth2_network_config(config) - } else { - Ok(self) - } - } - /// Consumes the builder, returning an `Environment`. pub fn build(self) -> Result, String> { let (signal, exit) = exit_future::signal(); diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 73e042342..d8b522307 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -513,7 +513,7 @@ fn run( let mut environment = builder .multi_threaded_tokio_runtime()? - .optional_eth2_network_config(Some(eth2_network_config))? + .eth2_network_config(eth2_network_config)? .build()?; let log = environment.core_context().log().clone(); @@ -559,7 +559,7 @@ fn run( (Some(_), Some(_)) => panic!("CLI prevents both --network and --testnet-dir"), }; - if let Some(sub_matches) = matches.subcommand_matches("account_manager") { + if let Some(sub_matches) = matches.subcommand_matches(account_manager::CMD) { eprintln!("Running account manager for {} network", network_name); // Pass the entire `environment` to the account manager so it can run blocking operations. account_manager::run(sub_matches, environment)?; diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 535f6aeb0..a3b3cabcc 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1021,7 +1021,7 @@ async fn fill_in_selection_proofs( /// 2. We won't miss a block if the duties for the current slot happen to change with this poll. /// /// This sounds great, but is it safe? Firstly, the additional notification will only contain block -/// producers that were not included in the first notification. This should be safety enough. +/// producers that were not included in the first notification. This should be safe enough. /// However, we also have the slashing protection as a second line of defence. These two factors /// provide an acceptable level of safety. /// diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f7a80f0a8..6f071055a 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -524,7 +524,7 @@ impl ProductionValidatorClient { pub fn start_service(&mut self) -> Result<(), String> { // We use `SLOTS_PER_EPOCH` as the capacity of the block notification channel, because - // we don't except notifications to be delayed by more than a single slot, let alone a + // we don't expect notifications to be delayed by more than a single slot, let alone a // whole epoch! let channel_capacity = T::slots_per_epoch() as usize; let (block_service_tx, block_service_rx) = mpsc::channel(channel_capacity); @@ -627,8 +627,8 @@ async fn init_from_beacon_node( let num_available = beacon_nodes.num_available().await; let num_total = beacon_nodes.num_total(); - let proposer_available = beacon_nodes.num_available().await; - let proposer_total = beacon_nodes.num_total(); + let proposer_available = proposer_nodes.num_available().await; + let proposer_total = proposer_nodes.num_total(); if proposer_total > 0 && proposer_available == 0 { warn!(