Optimize finalized chain sync by skipping newPayload messages (#3738)

## Issue Addressed

#3704 

## Proposed Changes
Adds is_syncing_finalized: bool parameter for block verification functions. Sets the payload_verification_status to Optimistic if is_syncing_finalized is true. Uses SyncState in NetworkGlobals in BeaconProcessor to retrieve the syncing status.

## Additional Info
I could implement FinalizedSignatureVerifiedBlock if you think it would be nicer.
This commit is contained in:
GeemoCandama 2022-11-29 08:19:27 +00:00
parent a2969ba7de
commit 3534c85e30
15 changed files with 200 additions and 62 deletions

View File

@ -18,7 +18,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, PreparePayloadHandle};
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
@ -2341,6 +2341,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
chain_segment: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> ChainSegmentResult<T::EthSpec> {
let mut imported_blocks = 0;
@ -2409,6 +2410,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
signature_verified_block.block_root(),
signature_verified_block,
count_unrealized,
notify_execution_layer,
)
.await
{
@ -2497,6 +2499,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
unverified_block: B,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<Hash256, BlockError<T::EthSpec>> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
@ -2510,8 +2513,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
let execution_pending =
unverified_block.into_execution_pending_block(block_root, &chain)?;
let execution_pending = unverified_block.into_execution_pending_block(
block_root,
&chain,
notify_execution_layer,
)?;
chain
.import_execution_pending_block(execution_pending, count_unrealized)
.await

View File

@ -45,7 +45,7 @@
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
AllowOptimisticImport, PayloadNotifier,
AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier,
};
use crate::snapshot_cache::PreProcessingSnapshot;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
@ -636,8 +636,9 @@ pub trait IntoExecutionPendingBlock<T: BeaconChainTypes>: Sized {
self,
block_root: Hash256,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<ExecutionPendingBlock<T>, BlockError<T::EthSpec>> {
self.into_execution_pending_block_slashable(block_root, chain)
self.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer)
.map(|execution_pending| {
// Supply valid block to slasher.
if let Some(slasher) = chain.slasher.as_ref() {
@ -653,6 +654,7 @@ pub trait IntoExecutionPendingBlock<T: BeaconChainTypes>: Sized {
self,
block_root: Hash256,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>>;
fn block(&self) -> &SignedBeaconBlock<T::EthSpec>;
@ -899,10 +901,15 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for GossipVerifiedBlock<T
self,
block_root: Hash256,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>> {
let execution_pending =
SignatureVerifiedBlock::from_gossip_verified_block_check_slashable(self, chain)?;
execution_pending.into_execution_pending_block_slashable(block_root, chain)
execution_pending.into_execution_pending_block_slashable(
block_root,
chain,
notify_execution_layer,
)
}
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
@ -1032,6 +1039,7 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBloc
self,
block_root: Hash256,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>> {
let header = self.block.signed_block_header();
let (parent, block) = if let Some(parent) = self.parent {
@ -1047,6 +1055,7 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBloc
parent,
self.consensus_context,
chain,
notify_execution_layer,
)
.map_err(|e| BlockSlashInfo::SignatureValid(header, e))
}
@ -1063,13 +1072,14 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock
self,
block_root: Hash256,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>> {
// Perform an early check to prevent wasting time on irrelevant blocks.
let block_root = check_block_relevancy(&self, block_root, chain)
.map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?;
SignatureVerifiedBlock::check_slashable(self, block_root, chain)?
.into_execution_pending_block_slashable(block_root, chain)
.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer)
}
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
@ -1091,6 +1101,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent: PreProcessingSnapshot<T::EthSpec>,
mut consensus_context: ConsensusContext<T::EthSpec>,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<Self, BlockError<T::EthSpec>> {
if let Some(parent) = chain
.canonical_head
@ -1237,7 +1248,8 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
// Define a future that will verify the execution payload with an execution engine (but
// don't execute it yet).
let payload_notifier = PayloadNotifier::new(chain.clone(), block.clone(), &state)?;
let payload_notifier =
PayloadNotifier::new(chain.clone(), block.clone(), &state, notify_execution_layer)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&state, block.message().body());
let payload_verification_future = async move {

View File

@ -35,6 +35,16 @@ pub enum AllowOptimisticImport {
No,
}
/// Signal whether the execution payloads of new blocks should be
/// immediately verified with the EL or imported optimistically without
/// any EL communication.
#[derive(Default, Clone, Copy)]
pub enum NotifyExecutionLayer {
#[default]
Yes,
No,
}
/// Used to await the result of executing payload with a remote EE.
pub struct PayloadNotifier<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
@ -47,8 +57,12 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
chain: Arc<BeaconChain<T>>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
state: &BeaconState<T::EthSpec>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<Self, BlockError<T::EthSpec>> {
let payload_verification_status = if is_execution_enabled(state, block.message().body()) {
let payload_verification_status = match notify_execution_layer {
NotifyExecutionLayer::No => Some(PayloadVerificationStatus::Optimistic),
NotifyExecutionLayer::Yes => {
if is_execution_enabled(state, block.message().body()) {
// Perform the initial stages of payload verification.
//
// We will duplicate these checks again during `per_block_processing`, however these checks
@ -62,6 +76,8 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
None
} else {
Some(PayloadVerificationStatus::Irrelevant)
}
}
};
Ok(Self {

View File

@ -63,6 +63,7 @@ pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock};
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};
pub use events::ServerSentEventHandler;
pub use execution_layer::EngineState;
pub use execution_payload::NotifyExecutionLayer;
pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters};
pub use metrics::scrape_for_metrics;
pub use parking_lot;

View File

@ -2,7 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain;
pub use crate::{
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
migrate::MigratorConfig,
BeaconChainError, ProduceBlockVerification,
BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification,
};
use crate::{
builder::{BeaconChainBuilder, Witness},
@ -1460,7 +1460,12 @@ where
self.set_current_slot(slot);
let block_hash: SignedBeaconBlockHash = self
.chain
.process_block(block_root, Arc::new(block), CountUnrealized::True)
.process_block(
block_root,
Arc::new(block),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await?
.into();
self.chain.recompute_head_at_current_slot().await;
@ -1477,6 +1482,7 @@ where
block.canonical_root(),
Arc::new(block),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await?
.into();

View File

@ -3,7 +3,7 @@
use beacon_chain::test_utils::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
};
use beacon_chain::{BeaconSnapshot, BlockError, ChainSegmentResult};
use beacon_chain::{BeaconSnapshot, BlockError, ChainSegmentResult, NotifyExecutionLayer};
use fork_choice::CountUnrealized;
use lazy_static::lazy_static;
use logging::test_logger;
@ -147,14 +147,18 @@ async fn chain_segment_full_segment() {
// Sneak in a little check to ensure we can process empty chain segments.
harness
.chain
.process_chain_segment(vec![], CountUnrealized::True)
.process_chain_segment(vec![], CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error()
.expect("should import empty chain segment");
harness
.chain
.process_chain_segment(blocks.clone(), CountUnrealized::True)
.process_chain_segment(
blocks.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.into_block_error()
.expect("should import chain segment");
@ -183,7 +187,11 @@ async fn chain_segment_varying_chunk_size() {
for chunk in blocks.chunks(*chunk_size) {
harness
.chain
.process_chain_segment(chunk.to_vec(), CountUnrealized::True)
.process_chain_segment(
chunk.to_vec(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.into_block_error()
.unwrap_or_else(|_| panic!("should import chain segment of len {}", chunk_size));
@ -219,7 +227,7 @@ async fn chain_segment_non_linear_parent_roots() {
matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::NonLinearParentRoots)
@ -239,7 +247,7 @@ async fn chain_segment_non_linear_parent_roots() {
matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::NonLinearParentRoots)
@ -270,7 +278,7 @@ async fn chain_segment_non_linear_slots() {
matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::NonLinearSlots)
@ -291,7 +299,7 @@ async fn chain_segment_non_linear_slots() {
matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::NonLinearSlots)
@ -317,7 +325,7 @@ async fn assert_invalid_signature(
matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::InvalidSignature)
@ -339,7 +347,11 @@ async fn assert_invalid_signature(
// imported prior to this test.
let _ = harness
.chain
.process_chain_segment(ancestor_blocks, CountUnrealized::True)
.process_chain_segment(
ancestor_blocks,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await;
harness.chain.recompute_head_at_current_slot().await;
@ -349,6 +361,7 @@ async fn assert_invalid_signature(
snapshots[block_index].beacon_block.canonical_root(),
snapshots[block_index].beacon_block.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await;
assert!(
@ -400,7 +413,11 @@ async fn invalid_signature_gossip_block() {
.collect();
harness
.chain
.process_chain_segment(ancestor_blocks, CountUnrealized::True)
.process_chain_segment(
ancestor_blocks,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.into_block_error()
.expect("should import all blocks prior to the one being tested");
@ -412,7 +429,8 @@ async fn invalid_signature_gossip_block() {
.process_block(
signed_block.canonical_root(),
Arc::new(signed_block),
CountUnrealized::True
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await,
Err(BlockError::InvalidSignature)
@ -446,7 +464,7 @@ async fn invalid_signature_block_proposal() {
matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::InvalidSignature)
@ -644,7 +662,7 @@ async fn invalid_signature_deposit() {
!matches!(
harness
.chain
.process_chain_segment(blocks, CountUnrealized::True)
.process_chain_segment(blocks, CountUnrealized::True, NotifyExecutionLayer::Yes)
.await
.into_block_error(),
Err(BlockError::InvalidSignature)
@ -725,6 +743,7 @@ async fn block_gossip_verification() {
gossip_verified.block_root,
gossip_verified,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.expect("should import valid gossip verified block");
@ -996,6 +1015,7 @@ async fn verify_block_for_gossip_slashing_detection() {
verified_block.block_root,
verified_block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();
@ -1035,6 +1055,7 @@ async fn verify_block_for_gossip_doppelganger_detection() {
verified_block.block_root,
verified_block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();
@ -1180,7 +1201,8 @@ async fn add_base_block_to_altair_chain() {
.process_block(
base_block.canonical_root(),
Arc::new(base_block.clone()),
CountUnrealized::True
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.err()
@ -1195,7 +1217,11 @@ async fn add_base_block_to_altair_chain() {
assert!(matches!(
harness
.chain
.process_chain_segment(vec![Arc::new(base_block)], CountUnrealized::True)
.process_chain_segment(
vec![Arc::new(base_block)],
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await,
ChainSegmentResult::Failed {
imported_blocks: 0,
@ -1313,7 +1339,8 @@ async fn add_altair_block_to_base_chain() {
.process_block(
altair_block.canonical_root(),
Arc::new(altair_block.clone()),
CountUnrealized::True
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.err()
@ -1328,7 +1355,11 @@ async fn add_altair_block_to_base_chain() {
assert!(matches!(
harness
.chain
.process_chain_segment(vec![Arc::new(altair_block)], CountUnrealized::True)
.process_chain_segment(
vec![Arc::new(altair_block)],
CountUnrealized::True,
NotifyExecutionLayer::Yes
)
.await,
ChainSegmentResult::Failed {
imported_blocks: 0,

View File

@ -7,8 +7,8 @@ use beacon_chain::otb_verification_service::{
use beacon_chain::{
canonical_head::{CachedHead, CanonicalHead},
test_utils::{BeaconChainHarness, EphemeralHarnessType},
BeaconChainError, BlockError, ExecutionPayloadError, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
BeaconChainError, BlockError, ExecutionPayloadError, NotifyExecutionLayer, StateSkipConfig,
WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
};
use execution_layer::{
@ -693,6 +693,7 @@ async fn invalidates_all_descendants() {
fork_block.canonical_root(),
Arc::new(fork_block),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();
@ -789,6 +790,7 @@ async fn switches_heads() {
fork_block.canonical_root(),
Arc::new(fork_block),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();
@ -1035,7 +1037,7 @@ async fn invalid_parent() {
// Ensure the block built atop an invalid payload is invalid for import.
assert!(matches!(
rig.harness.chain.process_block(block.canonical_root(), block.clone(), CountUnrealized::True).await,
rig.harness.chain.process_block(block.canonical_root(), block.clone(), CountUnrealized::True, NotifyExecutionLayer::Yes).await,
Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root })
if invalid_root == parent_root
));
@ -1317,7 +1319,12 @@ async fn build_optimistic_chain(
for block in blocks {
rig.harness
.chain
.process_block(block.canonical_root(), block, CountUnrealized::True)
.process_block(
block.canonical_root(),
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();
}
@ -1879,6 +1886,7 @@ async fn recover_from_invalid_head_by_importing_blocks() {
fork_block.canonical_root(),
fork_block.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();

View File

@ -7,8 +7,8 @@ use beacon_chain::test_utils::{
};
use beacon_chain::{
historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain,
BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, ServerSentEventHandler,
WhenSlotSkipped,
BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, NotifyExecutionLayer,
ServerSentEventHandler, WhenSlotSkipped,
};
use fork_choice::CountUnrealized;
use lazy_static::lazy_static;
@ -2148,6 +2148,7 @@ async fn weak_subjectivity_sync() {
full_block.canonical_root(),
Arc::new(full_block),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap();

View File

@ -6,7 +6,7 @@ use beacon_chain::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
OP_POOL_DB_KEY,
},
BeaconChain, StateSkipConfig, WhenSlotSkipped,
BeaconChain, NotifyExecutionLayer, StateSkipConfig, WhenSlotSkipped,
};
use fork_choice::CountUnrealized;
use lazy_static::lazy_static;
@ -687,7 +687,8 @@ async fn run_skip_slot_test(skip_slots: u64) {
.process_block(
harness_a.chain.head_snapshot().beacon_block_root,
harness_a.chain.head_snapshot().beacon_block.clone(),
CountUnrealized::True
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
.unwrap(),

View File

@ -1,6 +1,8 @@
use crate::metrics;
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
use beacon_chain::{
BeaconChain, BeaconChainTypes, BlockError, CountUnrealized, NotifyExecutionLayer,
};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slog::{crit, error, info, warn, Logger};
@ -35,7 +37,12 @@ pub async fn publish_block<T: BeaconChainTypes>(
let block_root = block_root.unwrap_or_else(|| block.canonical_root());
match chain
.process_block(block_root, block.clone(), CountUnrealized::True)
.process_block(
block_root,
block.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
{
Ok(root) => {

View File

@ -74,6 +74,17 @@ impl SyncState {
}
}
pub fn is_syncing_finalized(&self) -> bool {
match self {
SyncState::SyncingFinalized { .. } => true,
SyncState::SyncingHead { .. } => false,
SyncState::SyncTransition => false,
SyncState::BackFillSyncing { .. } => false,
SyncState::Synced => false,
SyncState::Stalled => false,
}
}
/// Returns true if the node is synced.
///
/// NOTE: We consider the node synced if it is fetching old historical blocks.

View File

@ -41,7 +41,7 @@
use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
use derivative::Derivative;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@ -1587,8 +1587,24 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/*
* Verification for a chain segment (multiple blocks).
*/
Work::ChainSegment { process_id, blocks } => task_spawner
.spawn_async(async move { worker.process_chain_segment(process_id, blocks).await }),
Work::ChainSegment { process_id, blocks } => {
let notify_execution_layer = if self
.network_globals
.sync_state
.read()
.is_syncing_finalized()
{
NotifyExecutionLayer::No
} else {
NotifyExecutionLayer::Yes
};
task_spawner.spawn_async(async move {
worker
.process_chain_segment(process_id, blocks, notify_execution_layer)
.await
})
}
/*
* Processing of Status Messages.
*/

View File

@ -7,7 +7,7 @@ use beacon_chain::{
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError,
GossipVerifiedBlock,
GossipVerifiedBlock, NotifyExecutionLayer,
};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use slog::{crit, debug, error, info, trace, warn};
@ -934,7 +934,12 @@ impl<T: BeaconChainTypes> Worker<T> {
match self
.chain
.process_block(block_root, verified_block, CountUnrealized::True)
.process_block(
block_root,
verified_block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
{
Ok(block_root) => {

View File

@ -10,6 +10,7 @@ use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::CountUnrealized;
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
NotifyExecutionLayer,
};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
@ -85,7 +86,12 @@ impl<T: BeaconChainTypes> Worker<T> {
let slot = block.slot();
let result = self
.chain
.process_block(block_root, block, CountUnrealized::True)
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
@ -127,6 +133,7 @@ impl<T: BeaconChainTypes> Worker<T> {
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
notify_execution_layer: NotifyExecutionLayer,
) {
let result = match sync_type {
// this a request from the range sync
@ -136,7 +143,11 @@ impl<T: BeaconChainTypes> Worker<T> {
let sent_blocks = downloaded_blocks.len();
match self
.process_blocks(downloaded_blocks.iter(), count_unrealized)
.process_blocks(
downloaded_blocks.iter(),
count_unrealized,
notify_execution_layer,
)
.await
{
(_, Ok(_)) => {
@ -215,7 +226,11 @@ impl<T: BeaconChainTypes> Worker<T> {
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match self
.process_blocks(downloaded_blocks.iter().rev(), CountUnrealized::True)
.process_blocks(
downloaded_blocks.iter().rev(),
CountUnrealized::True,
notify_execution_layer,
)
.await
{
(imported_blocks, Err(e)) => {
@ -246,11 +261,12 @@ impl<T: BeaconChainTypes> Worker<T> {
&self,
downloaded_blocks: impl Iterator<Item = &'a Arc<SignedBeaconBlock<T::EthSpec>>>,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<Arc<_>> = downloaded_blocks.cloned().collect();
match self
.chain
.process_chain_segment(blocks, count_unrealized)
.process_chain_segment(blocks, count_unrealized, notify_execution_layer)
.await
{
ChainSegmentResult::Successful { imported_blocks } => {

View File

@ -7,7 +7,7 @@ use beacon_chain::{
obtain_indexed_attestation_and_committees_per_slot, VerifiedAttestation,
},
test_utils::{BeaconChainHarness, EphemeralHarnessType},
BeaconChainTypes, CachedHead, CountUnrealized,
BeaconChainTypes, CachedHead, CountUnrealized, NotifyExecutionLayer,
};
use execution_layer::{json_structures::JsonPayloadStatusV1Status, PayloadStatusV1};
use serde::Deserialize;
@ -388,6 +388,7 @@ impl<E: EthSpec> Tester<E> {
block_root,
block.clone(),
CountUnrealized::False,
NotifyExecutionLayer::Yes,
))?;
if result.is_ok() != valid {
return Err(Error::DidntFail(format!(