diff --git a/beacon_node/beacon_chain/src/attestation_simulator.rs b/beacon_node/beacon_chain/src/attestation_simulator.rs new file mode 100644 index 000000000..b0b25aef1 --- /dev/null +++ b/beacon_node/beacon_chain/src/attestation_simulator.rs @@ -0,0 +1,93 @@ +use crate::{BeaconChain, BeaconChainTypes}; +use slog::{debug, error}; +use slot_clock::SlotClock; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use types::Slot; + +/// Spawns a routine which produces an unaggregated attestation at every slot. +/// +/// This routine will run once per slot +pub fn start_attestation_simulator_service( + executor: TaskExecutor, + chain: Arc>, +) { + executor.clone().spawn( + async move { attestation_simulator_service(executor, chain).await }, + "attestation_simulator_service", + ); +} + +/// Loop indefinitely, calling `BeaconChain::produce_unaggregated_attestation` every 4s into each slot. +async fn attestation_simulator_service( + executor: TaskExecutor, + chain: Arc>, +) { + let slot_duration = chain.slot_clock.slot_duration(); + let additional_delay = slot_duration / 3; + + loop { + match chain.slot_clock.duration_to_next_slot() { + Some(duration) => { + sleep(duration + additional_delay).await; + + debug!( + chain.log, + "Simulating unagg. attestation production"; + ); + + // Run the task in the executor + let inner_chain = chain.clone(); + executor.spawn( + async move { + if let Ok(current_slot) = inner_chain.slot() { + produce_unaggregated_attestation(inner_chain, current_slot); + } + }, + "attestation_simulator_service", + ); + } + None => { + error!(chain.log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + } + }; + } +} + +pub fn produce_unaggregated_attestation( + inner_chain: Arc>, + current_slot: Slot, +) { + // Since attestations for different committees are practically identical (apart from the committee index field) + // Committee 0 is guaranteed to exist. That means there's no need to load the committee. + let beacon_committee_index = 0; + + // Store the unaggregated attestation in the validator monitor for later processing + match inner_chain.produce_unaggregated_attestation(current_slot, beacon_committee_index) { + Ok(unaggregated_attestation) => { + let data = &unaggregated_attestation.data; + + debug!( + inner_chain.log, + "Produce unagg. attestation"; + "attestation_source" => data.source.root.to_string(), + "attestation_target" => data.target.root.to_string(), + ); + + inner_chain + .validator_monitor + .write() + .set_unaggregated_attestation(unaggregated_attestation); + } + Err(e) => { + debug!( + inner_chain.log, + "Failed to simulate attestation"; + "error" => ?e + ); + } + } +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 71270c197..4a97e4cbf 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3621,9 +3621,11 @@ impl BeaconChain { } // Allow the validator monitor to learn about a new valid state. - self.validator_monitor - .write() - .process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), state); + self.validator_monitor.write().process_valid_state( + current_slot.epoch(T::EthSpec::slots_per_epoch()), + state, + &self.spec, + ); let validator_monitor = self.validator_monitor.read(); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index fbd255126..00f5b04d2 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -786,6 +786,7 @@ where validator_monitor.process_valid_state( slot.epoch(TEthSpec::slots_per_epoch()), &head_snapshot.beacon_state, + &self.spec, ); } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 8edb7b4fc..4ad98a50e 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,4 +1,5 @@ pub mod attestation_rewards; +pub mod attestation_simulator; pub mod attestation_verification; mod attester_cache; pub mod beacon_block_reward; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index b9a748b6d..ad095b37b 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -10,6 +10,20 @@ use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; /// The maximum time to wait for the snapshot cache lock during a metrics scrape. const SNAPSHOT_CACHE_TIMEOUT: Duration = Duration::from_millis(100); +// Attestation simulator metrics +pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT_TOTAL: &str = + "validator_monitor_attestation_simulator_head_attester_hit_total"; +pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS_TOTAL: &str = + "validator_monitor_attestation_simulator_head_attester_miss_total"; +pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT_TOTAL: &str = + "validator_monitor_attestation_simulator_target_attester_hit_total"; +pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS_TOTAL: &str = + "validator_monitor_attestation_simulator_target_attester_miss_total"; +pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &str = + "validator_monitor_attestation_simulator_source_attester_hit_total"; +pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str = + "validator_monitor_attestation_simulator_source_attester_miss_total"; + lazy_static! { /* * Block Processing @@ -1045,6 +1059,48 @@ lazy_static! { "beacon_aggregated_attestation_subsets_total", "Count of new aggregated attestations that are subsets of already known aggregates" ); + /* + * Attestation simulator metrics + */ + pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT: Result = + try_create_int_counter( + VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT_TOTAL, + "Incremented if a validator is flagged as a previous slot head attester \ + during per slot processing", + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS: Result = + try_create_int_counter( + VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS_TOTAL, + "Incremented if a validator is not flagged as a previous slot head attester \ + during per slot processing", + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT: Result = + try_create_int_counter( + VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT_TOTAL, + "Incremented if a validator is flagged as a previous slot target attester \ + during per slot processing", + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS: Result = + try_create_int_counter( + VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS_TOTAL, + "Incremented if a validator is not flagged as a previous slot target attester \ + during per slot processing", + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT: Result = + try_create_int_counter( + VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL, + "Incremented if a validator is flagged as a previous slot source attester \ + during per slot processing", + ); + pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS: Result = + try_create_int_counter( + VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL, + "Incremented if a validator is not flagged as a previous slot source attester \ + during per slot processing", + ); + /* + * Missed block metrics + */ pub static ref VALIDATOR_MONITOR_MISSED_BLOCKS_TOTAL: Result = try_create_int_counter_vec( "validator_monitor_missed_blocks_total", "Number of non-finalized blocks missed", diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index 8cea9c076..c511a0ff8 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; use smallvec::SmallVec; +use state_processing::common::get_attestation_participation_flag_indices; use state_processing::per_epoch_processing::{ errors::EpochProcessingError, EpochProcessingSummary, }; @@ -21,8 +22,11 @@ use std::str::Utf8Error; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::AbstractExecPayload; +use types::consts::altair::{ + TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX, +}; use types::{ - AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, + Attestation, AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, }; @@ -69,6 +73,15 @@ impl Default for ValidatorMonitorConfig { } } +/// The goal is to check the behaviour of the BN if it pretends to attest at each slot +/// Check the head/target/source once the state.slot is some slots beyond attestation.data.slot +/// to defend against re-orgs. 16 slots is the minimum to defend against re-orgs of up to 16 slots. +pub const UNAGGREGATED_ATTESTATION_LAG_SLOTS: usize = 16; + +/// Bound the storage size of simulated attestations. The head state can only verify attestations +/// from the current and previous epoch. +pub const MAX_UNAGGREGATED_ATTESTATION_HASHMAP_LENGTH: usize = 64; + #[derive(Debug)] pub enum Error { InvalidPubkey(String), @@ -370,7 +383,7 @@ struct MissedBlock { /// /// The intention of this struct is to provide users with more logging and Prometheus metrics around /// validators that they are interested in. -pub struct ValidatorMonitor { +pub struct ValidatorMonitor { /// The validators that require additional monitoring. validators: HashMap, /// A map of validator index (state.validators) to a validator public key. @@ -386,6 +399,8 @@ pub struct ValidatorMonitor { missed_blocks: HashSet, // A beacon proposer cache beacon_proposer_cache: Arc>, + // Unaggregated attestations generated by the committee index at each slot. + unaggregated_attestations: HashMap>, log: Logger, _phantom: PhantomData, } @@ -409,6 +424,7 @@ impl ValidatorMonitor { individual_tracking_threshold, missed_blocks: <_>::default(), beacon_proposer_cache, + unaggregated_attestations: <_>::default(), log, _phantom: PhantomData, }; @@ -444,9 +460,32 @@ impl ValidatorMonitor { }); } + /// Add an unaggregated attestation + pub fn set_unaggregated_attestation(&mut self, attestation: Attestation) { + let unaggregated_attestations = &mut self.unaggregated_attestations; + + // Pruning, this removes the oldest key/pair of the hashmap if it's greater than MAX_UNAGGREGATED_ATTESTATION_HASHMAP_LENGTH + if unaggregated_attestations.len() >= MAX_UNAGGREGATED_ATTESTATION_HASHMAP_LENGTH { + if let Some(oldest_slot) = unaggregated_attestations.keys().min().copied() { + unaggregated_attestations.remove(&oldest_slot); + } + } + let slot = attestation.data.slot; + self.unaggregated_attestations.insert(slot, attestation); + } + + pub fn get_unaggregated_attestation(&self, slot: Slot) -> Option<&Attestation> { + self.unaggregated_attestations.get(&slot) + } + /// Reads information from the given `state`. The `state` *must* be valid (i.e, able to be /// imported). - pub fn process_valid_state(&mut self, current_epoch: Epoch, state: &BeaconState) { + pub fn process_valid_state( + &mut self, + current_epoch: Epoch, + state: &BeaconState, + spec: &ChainSpec, + ) { // Add any new validator indices. state .validators() @@ -463,6 +502,7 @@ impl ValidatorMonitor { // Add missed non-finalized blocks for the monitored validators self.add_validators_missed_blocks(state); + self.process_unaggregated_attestations(state, spec); // Update metrics for individual validators. for monitored_validator in self.validators.values() { @@ -654,6 +694,107 @@ impl ValidatorMonitor { .cloned() } + /// Process the unaggregated attestations generated by the service `attestation_simulator_service` + /// and check if the attestation qualifies for a reward matching the flags source/target/head + fn process_unaggregated_attestations(&mut self, state: &BeaconState, spec: &ChainSpec) { + let current_slot = state.slot(); + + // Ensures that we process attestation when there have been skipped slots between blocks + let attested_slots: Vec<_> = self + .unaggregated_attestations + .keys() + .filter(|&&attestation_slot| { + attestation_slot + < current_slot - Slot::new(UNAGGREGATED_ATTESTATION_LAG_SLOTS as u64) + }) + .cloned() + .collect(); + + let unaggregated_attestations = &mut self.unaggregated_attestations; + for slot in attested_slots { + if let Some(unaggregated_attestation) = unaggregated_attestations.remove(&slot) { + // Don't process this attestation, it's too old to be processed by this state. + if slot.epoch(T::slots_per_epoch()) < state.previous_epoch() { + continue; + } + + // We are simulating that unaggregated attestation in a service that produces unaggregated attestations + // every slot, the inclusion_delay shouldn't matter here as long as the minimum value + // that qualifies the committee index for reward is included + let inclusion_delay = spec.min_attestation_inclusion_delay; + + // Get the reward indices for the unaggregated attestation or log an error + match get_attestation_participation_flag_indices( + state, + &unaggregated_attestation.data, + inclusion_delay, + spec, + ) { + Ok(flag_indices) => { + let head_hit = flag_indices.contains(&TIMELY_HEAD_FLAG_INDEX); + let target_hit = flag_indices.contains(&TIMELY_TARGET_FLAG_INDEX); + let source_hit = flag_indices.contains(&TIMELY_SOURCE_FLAG_INDEX); + + if head_hit { + metrics::inc_counter( + &metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT, + ); + } else { + metrics::inc_counter( + &metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS, + ); + } + if target_hit { + metrics::inc_counter( + &metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT, + ); + } else { + metrics::inc_counter( + &metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS, + ); + } + if source_hit { + metrics::inc_counter( + &metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT, + ); + } else { + metrics::inc_counter( + &metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS, + ); + } + + let data = &unaggregated_attestation.data; + debug!( + self.log, + "Simulated attestation evaluated"; + "attestation_source" => ?data.source.root, + "attestation_target" => ?data.target.root, + "attestation_head" => ?data.beacon_block_root, + "attestation_slot" => ?data.slot, + "source_hit" => source_hit, + "target_hit" => target_hit, + "head_hit" => head_hit, + ); + } + Err(err) => { + error!( + self.log, + "Failed to get attestation participation flag indices"; + "error" => ?err, + "unaggregated_attestation" => ?unaggregated_attestation, + ); + } + } + } else { + error!( + self.log, + "Failed to remove unaggregated attestation from the hashmap"; + "slot" => ?slot, + ); + } + } + } + /// Run `func` with the `TOTAL_LABEL` and optionally the /// `individual_id`. /// diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index fdc37b552..ff83b2532 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -1,8 +1,10 @@ #![cfg(not(debug_assertions))] +use beacon_chain::attestation_simulator::produce_unaggregated_attestation; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; -use beacon_chain::{StateSkipConfig, WhenSlotSkipped}; +use beacon_chain::validator_monitor::UNAGGREGATED_ATTESTATION_LAG_SLOTS; +use beacon_chain::{metrics, StateSkipConfig, WhenSlotSkipped}; use lazy_static::lazy_static; use std::sync::Arc; use tree_hash::TreeHash; @@ -15,6 +17,91 @@ lazy_static! { static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); } +/// This test builds a chain that is testing the performance of the unaggregated attestations +/// produced by the attestation simulator service. +#[tokio::test] +async fn produces_attestations_from_attestation_simulator_service() { + // Produce 2 epochs, or 64 blocks + let num_blocks_produced = MainnetEthSpec::slots_per_epoch() * 2; + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .default_spec() + .keypairs(KEYPAIRS[..].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let chain = &harness.chain; + + // Test all valid committee indices and their rewards for all slots in the chain + // using validator monitor + for slot in 0..=num_blocks_produced { + // We do not produce at slot=0, and there's no committe cache available anyway + if slot > 0 && slot <= num_blocks_produced { + harness.advance_slot(); + + harness + .extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + } + // Set the state to the current slot + let slot = Slot::from(slot); + let mut state = chain + .state_at_slot(slot, StateSkipConfig::WithStateRoots) + .expect("should get state"); + + // Prebuild the committee cache for the current epoch + state + .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) + .unwrap(); + + // Produce an unaggragetated attestation + produce_unaggregated_attestation(chain.clone(), chain.slot().unwrap()); + + // Verify that the ua is stored in validator monitor + let validator_monitor = chain.validator_monitor.read(); + validator_monitor + .get_unaggregated_attestation(slot) + .expect("should get unaggregated attestation"); + } + + // Compare the prometheus metrics that evaluates the performance of the unaggregated attestations + let hit_prometheus_metrics = vec![ + metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT_TOTAL, + metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT_TOTAL, + metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL, + ]; + let miss_prometheus_metrics = vec![ + metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS_TOTAL, + metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS_TOTAL, + metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL, + ]; + + // Expected metrics count should only apply to hit metrics as miss metrics are never set, nor can be found + // when gathering prometheus metrics. If they are found, which should not, it will diff from 0 and fail the test + let expected_miss_metrics_count = 0; + let expected_hit_metrics_count = + num_blocks_produced - UNAGGREGATED_ATTESTATION_LAG_SLOTS as u64; + lighthouse_metrics::gather().iter().for_each(|mf| { + if hit_prometheus_metrics.contains(&mf.get_name()) { + assert_eq!( + mf.get_metric()[0].get_counter().get_value() as u64, + expected_hit_metrics_count + ); + } + if miss_prometheus_metrics.contains(&mf.get_name()) { + assert_eq!( + mf.get_metric()[0].get_counter().get_value() as u64, + expected_miss_metrics_count + ); + } + }); +} + /// This test builds a chain that is just long enough to finalize an epoch then it produces an /// attestation at each slot from genesis through to three epochs past the head. /// diff --git a/beacon_node/beacon_chain/tests/validator_monitor.rs b/beacon_node/beacon_chain/tests/validator_monitor.rs index 5bc6b758c..6fe074068 100644 --- a/beacon_node/beacon_chain/tests/validator_monitor.rs +++ b/beacon_node/beacon_chain/tests/validator_monitor.rs @@ -110,7 +110,7 @@ async fn produces_missed_blocks() { // Let's validate the state which will call the function responsible for // adding the missed blocks to the validator monitor let mut validator_monitor = harness1.chain.validator_monitor.write(); - validator_monitor.process_valid_state(nb_epoch_to_simulate, _state); + validator_monitor.process_valid_state(nb_epoch_to_simulate, _state, &harness1.chain.spec); // We should have one entry in the missed blocks map assert_eq!( @@ -193,7 +193,7 @@ async fn produces_missed_blocks() { // Let's validate the state which will call the function responsible for // adding the missed blocks to the validator monitor let mut validator_monitor2 = harness2.chain.validator_monitor.write(); - validator_monitor2.process_valid_state(epoch, _state2); + validator_monitor2.process_valid_state(epoch, _state2, &harness2.chain.spec); // We should have one entry in the missed blocks map assert_eq!( validator_monitor2.get_monitored_validator_missed_block_count(validator_index as u64), @@ -219,7 +219,7 @@ async fn produces_missed_blocks() { // Let's validate the state which will call the function responsible for // adding the missed blocks to the validator monitor - validator_monitor2.process_valid_state(epoch, _state2); + validator_monitor2.process_valid_state(epoch, _state2, &harness2.chain.spec); // We shouldn't have any entry in the missed blocks map assert_ne!(validator_index, not_monitored_validator_index); @@ -288,7 +288,7 @@ async fn produces_missed_blocks() { // Let's validate the state which will call the function responsible for // adding the missed blocks to the validator monitor let mut validator_monitor3 = harness3.chain.validator_monitor.write(); - validator_monitor3.process_valid_state(epoch, _state3); + validator_monitor3.process_valid_state(epoch, _state3, &harness3.chain.spec); // We shouldn't have one entry in the missed blocks map assert_eq!( diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index bfd55c3be..c6c238fd5 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -2,6 +2,7 @@ use crate::address_change_broadcast::broadcast_address_changes_at_capella; use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; +use beacon_chain::attestation_simulator::start_attestation_simulator_service; use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service; use beacon_chain::otb_verification_service::start_otb_verification_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; @@ -839,6 +840,10 @@ where runtime_context.executor.clone(), beacon_chain.clone(), ); + start_attestation_simulator_service( + beacon_chain.task_executor.clone(), + beacon_chain.clone(), + ); } Ok(Client {