diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6b514d569..074321885 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2389,6 +2389,25 @@ impl BeaconChain { } } + // Register sync aggregate with validator monitor + if let Some(sync_aggregate) = block.body().sync_aggregate() { + // `SyncCommittee` for the sync_aggregate should correspond to the duty slot + let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let sync_committee = self.sync_committee_at_epoch(duty_epoch)?; + let participant_pubkeys = sync_committee + .pubkeys + .iter() + .zip(sync_aggregate.sync_committee_bits.iter()) + .filter_map(|(pubkey, bit)| bit.then(|| pubkey)) + .collect::>(); + + validator_monitor.register_sync_aggregate_in_block( + block.slot(), + block.parent_root(), + participant_pubkeys, + ); + } + for exit in block.body().voluntary_exits() { validator_monitor.register_block_voluntary_exit(&exit.message) } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ebedef992..0486220b8 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -553,6 +553,48 @@ lazy_static! { "The number of attester slashings seen in the previous epoch.", &["validator"] ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_sync_committee_messages_total", + "The number of sync committee messages seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_MIN_DELAY_SECONDS: Result = + try_create_histogram_vec( + "validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds", + "The min delay between when the validator should send the sync committee message and when it was received.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_INCLUSIONS: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_sync_contribution_inclusions", + "The count of times a sync signature was seen inside a sync contribution.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_SYNC_SIGNATURE_BLOCK_INCLUSIONS: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_sync_signature_block_inclusions", + "The count of times a sync signature was seen inside a block.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTIONS_TOTAL: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_sync_contributions_total", + "The number of sync contributions seen in the previous epoch.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_MIN_DELAY_SECONDS: Result = + try_create_histogram_vec( + "validator_monitor_prev_epoch_sync_contribution_min_delay_seconds", + "The min delay between when the validator should send the sync contribution and when it was received.", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE: Result = + try_create_int_gauge_vec( + "validator_monitor_validator_in_current_sync_committee", + "Is the validator in the current sync committee (1 for true and 0 for false)", + &["validator"] + ); /* * Validator Monitor Metrics (real-time) @@ -571,6 +613,26 @@ lazy_static! { "The delay between when the validator should send the attestation and when it was received.", &["src", "validator"] ); + pub static ref VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_sync_committee_messages_total", + "Number of sync committee messages seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_DELAY_SECONDS: Result = try_create_histogram_vec( + "validator_monitor_sync_committee_messages_delay_seconds", + "The delay between when the validator should send the sync committee message and when it was received.", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_sync_contributions_total", + "Number of sync contributions seen", + &["src", "validator"] + ); + pub static ref VALIDATOR_MONITOR_SYNC_COONTRIBUTIONS_DELAY_SECONDS: Result = try_create_histogram_vec( + "validator_monitor_sync_contribtions_delay_seconds", + "The delay between when the aggregator should send the sync contribution and when it was received.", + &["src", "validator"] + ); pub static ref VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL: Result = try_create_int_counter_vec( "validator_monitor_aggregated_attestation_total", "Number of aggregated attestations seen", @@ -586,6 +648,11 @@ lazy_static! { "Number of times an attestation has been seen in an aggregate", &["src", "validator"] ); + pub static ref VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_CONTRIBUTION_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_sync_committee_message_in_contribution_total", + "Number of times a sync committee message has been seen in a sync contribution", + &["src", "validator"] + ); pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS: Result = try_create_histogram_vec( "validator_monitor_attestation_in_aggregate_delay_seconds", "The delay between when the validator should send the aggregate and when it was received.", @@ -596,6 +663,11 @@ lazy_static! { "Number of times an attestation has been seen in a block", &["src", "validator"] ); + pub static ref VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_BLOCK_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_sync_committee_message_in_block_total", + "Number of times a validator's sync committee message has been seen in a sync aggregate", + &["src", "validator"] + ); pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS: Result = try_create_int_gauge_vec( "validator_monitor_attestation_in_block_delay_slots", "The excess slots (beyond the minimum delay) between the attestation slot and the block slot.", diff --git a/beacon_node/beacon_chain/src/sync_committee_verification.rs b/beacon_node/beacon_chain/src/sync_committee_verification.rs index 403ef683a..4bc5b439e 100644 --- a/beacon_node/beacon_chain/src/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/src/sync_committee_verification.rs @@ -251,6 +251,7 @@ impl From for Error { #[derivative(Clone(bound = "T: BeaconChainTypes"))] pub struct VerifiedSyncContribution { signed_aggregate: SignedContributionAndProof, + participant_pubkeys: Vec, } /// Wraps a `SyncCommitteeMessage` that has been verified for propagation on the gossip network. @@ -385,7 +386,10 @@ impl VerifiedSyncContribution { slot: contribution.slot, }); } - Ok(VerifiedSyncContribution { signed_aggregate }) + Ok(VerifiedSyncContribution { + signed_aggregate, + participant_pubkeys, + }) } /// A helper function to add this aggregate to `beacon_chain.op_pool`. @@ -402,6 +406,11 @@ impl VerifiedSyncContribution { pub fn aggregate(&self) -> &SignedContributionAndProof { &self.signed_aggregate } + + /// Returns the pubkeys of all validators that are included in the aggregate. + pub fn participant_pubkeys(&self) -> &[PublicKeyBytes] { + &self.participant_pubkeys + } } impl VerifiedSyncCommitteeMessage { diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index a1879060a..84bb5d8b9 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -4,7 +4,7 @@ use crate::metrics; use parking_lot::RwLock; -use slog::{crit, error, info, warn, Logger}; +use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; use state_processing::per_epoch_processing::{ errors::EpochProcessingError, EpochProcessingSummary, @@ -16,9 +16,9 @@ use std::marker::PhantomData; use std::str::Utf8Error; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use types::{ - AttestationData, AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, - Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, Slot, - VoluntaryExit, + AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, + IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, + SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, }; /// The validator monitor collects per-epoch data about each monitored validator. Historical data @@ -43,7 +43,7 @@ struct EpochSummary { /// The delay between when the attestation should have been produced and when it was observed. pub attestation_min_delay: Option, /// The number of times a validators attestation was seen in an aggregate. - pub attestation_aggregate_incusions: usize, + pub attestation_aggregate_inclusions: usize, /// The number of times a validators attestation was seen in a block. pub attestation_block_inclusions: usize, /// The minimum observed inclusion distance for an attestation for this epoch.. @@ -62,6 +62,27 @@ struct EpochSummary { pub aggregates: usize, /// The delay between when the aggregate should have been produced and when it was observed. pub aggregate_min_delay: Option, + + /* + * SyncCommitteeMessages in the current epoch + */ + /// The number of sync committee messages seen. + sync_committee_messages: usize, + /// The delay between when the sync committee message should have been produced and when it was observed. + sync_committee_message_min_delay: Option, + /// The number of times a validator's sync signature was included in the sync aggregate. + sync_signature_block_inclusions: usize, + /// The number of times a validator's sync signature was aggregated into a sync contribution. + sync_signature_contribution_inclusions: usize, + + /* + * SyncContributions in the current epoch + */ + /// The number of SyncContributions observed in the current epoch. + sync_contributions: usize, + /// The delay between when the sync committee contribution should have been produced and when it was observed. + sync_contribution_min_delay: Option, + /* * Others pertaining to this epoch. */ @@ -93,13 +114,27 @@ impl EpochSummary { Self::update_if_lt(&mut self.attestation_min_delay, delay); } + pub fn register_sync_committee_message(&mut self, delay: Duration) { + self.sync_committee_messages += 1; + Self::update_if_lt(&mut self.sync_committee_message_min_delay, delay); + } + pub fn register_aggregated_attestation(&mut self, delay: Duration) { self.aggregates += 1; Self::update_if_lt(&mut self.aggregate_min_delay, delay); } + pub fn register_sync_committee_contribution(&mut self, delay: Duration) { + self.sync_contributions += 1; + Self::update_if_lt(&mut self.sync_contribution_min_delay, delay); + } + pub fn register_aggregate_attestation_inclusion(&mut self) { - self.attestation_aggregate_incusions += 1; + self.attestation_aggregate_inclusions += 1; + } + + pub fn register_sync_signature_contribution_inclusion(&mut self) { + self.sync_signature_contribution_inclusions += 1; } pub fn register_attestation_block_inclusion(&mut self, delay: Slot) { @@ -107,6 +142,10 @@ impl EpochSummary { Self::update_if_lt(&mut self.attestation_min_block_inclusion_distance, delay); } + pub fn register_sync_signature_block_inclusions(&mut self) { + self.sync_signature_block_inclusions += 1; + } + pub fn register_exit(&mut self) { self.exits += 1; } @@ -328,10 +367,10 @@ impl ValidatorMonitor { pub fn process_validator_statuses( &self, epoch: Epoch, - summary: &EpochProcessingSummary, + summary: &EpochProcessingSummary, spec: &ChainSpec, ) -> Result<(), EpochProcessingError> { - for monitored_validator in self.validators.values() { + for (pubkey, monitored_validator) in self.validators.iter() { // We subtract two from the state of the epoch that generated these summaries. // // - One to account for it being the previous epoch. @@ -455,6 +494,45 @@ impl ValidatorMonitor { inclusion_info.delay as i64, ); } + + // Indicates the number of sync committee signatures that made it into + // a sync aggregate in the current_epoch (state.epoch - 1). + // Note: Unlike attestations, sync committee signatures must be included in the + // immediate next slot. Hence, num included sync aggregates for `state.epoch - 1` + // is available right after state transition to state.epoch. + let current_epoch = epoch - 1; + if let Some(sync_committee) = summary.sync_committee() { + if sync_committee.contains(pubkey) { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE, + &[id], + 1, + ); + let epoch_summary = monitored_validator.summaries.read(); + if let Some(summary) = epoch_summary.get(¤t_epoch) { + info!( + self.log, + "Current epoch sync signatures"; + "included" => summary.sync_signature_block_inclusions, + "expected" => T::slots_per_epoch(), + "epoch" => current_epoch, + "validator" => id, + ); + } + } else { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE, + &[id], + 0, + ); + debug!( + self.log, + "Validator isn't part of the current sync committee"; + "epoch" => current_epoch, + "validator" => id, + ); + } + } } } @@ -555,22 +633,6 @@ impl ValidatorMonitor { } } - /// Returns the duration between when the attestation `data` could be produced (1/3rd through - /// the slot) and `seen_timestamp`. - fn get_unaggregated_attestation_delay_ms( - seen_timestamp: Duration, - data: &AttestationData, - slot_clock: &S, - ) -> Duration { - slot_clock - .start_of(data.slot) - .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) - .and_then(|gross_delay| { - gross_delay.checked_sub(slot_clock.unagg_attestation_production_delay()) - }) - .unwrap_or_else(|| Duration::from_secs(0)) - } - /// Register an attestation seen on the gossip network. pub fn register_gossip_unaggregated_attestation( &self, @@ -610,7 +672,12 @@ impl ValidatorMonitor { ) { let data = &indexed_attestation.data; let epoch = data.slot.epoch(T::slots_per_epoch()); - let delay = Self::get_unaggregated_attestation_delay_ms(seen_timestamp, data, slot_clock); + let delay = get_message_delay_ms( + seen_timestamp, + data.slot, + slot_clock.unagg_attestation_production_delay(), + slot_clock, + ); indexed_attestation.attesting_indices.iter().for_each(|i| { if let Some(validator) = self.get_validator(*i) { @@ -645,22 +712,6 @@ impl ValidatorMonitor { }) } - /// Returns the duration between when a `AggregateAndproof` with `data` could be produced (2/3rd - /// through the slot) and `seen_timestamp`. - fn get_aggregated_attestation_delay_ms( - seen_timestamp: Duration, - data: &AttestationData, - slot_clock: &S, - ) -> Duration { - slot_clock - .start_of(data.slot) - .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) - .and_then(|gross_delay| { - gross_delay.checked_sub(slot_clock.agg_attestation_production_delay()) - }) - .unwrap_or_else(|| Duration::from_secs(0)) - } - /// Register a `signed_aggregate_and_proof` seen on the gossip network. pub fn register_gossip_aggregated_attestation( &self, @@ -705,7 +756,12 @@ impl ValidatorMonitor { ) { let data = &indexed_attestation.data; let epoch = data.slot.epoch(T::slots_per_epoch()); - let delay = Self::get_aggregated_attestation_delay_ms(seen_timestamp, data, slot_clock); + let delay = get_message_delay_ms( + seen_timestamp, + data.slot, + slot_clock.agg_attestation_production_delay(), + slot_clock, + ); let aggregator_index = signed_aggregate_and_proof.message.aggregator_index; if let Some(validator) = self.get_validator(aggregator_index) { @@ -814,6 +870,226 @@ impl ValidatorMonitor { }) } + /// Register a sync committee message received over gossip. + pub fn register_gossip_sync_committee_message( + &self, + seen_timestamp: Duration, + sync_committee_message: &SyncCommitteeMessage, + slot_clock: &S, + ) { + self.register_sync_committee_message( + "gossip", + seen_timestamp, + sync_committee_message, + slot_clock, + ) + } + + /// Register a sync committee message received over the http api. + pub fn register_api_sync_committee_message( + &self, + seen_timestamp: Duration, + sync_committee_message: &SyncCommitteeMessage, + slot_clock: &S, + ) { + self.register_sync_committee_message( + "api", + seen_timestamp, + sync_committee_message, + slot_clock, + ) + } + + /// Register a sync committee message. + fn register_sync_committee_message( + &self, + src: &str, + seen_timestamp: Duration, + sync_committee_message: &SyncCommitteeMessage, + slot_clock: &S, + ) { + if let Some(validator) = self.get_validator(sync_committee_message.validator_index) { + let id = &validator.id; + + let epoch = sync_committee_message.slot.epoch(T::slots_per_epoch()); + let delay = get_message_delay_ms( + seen_timestamp, + sync_committee_message.slot, + slot_clock.sync_committee_message_production_delay(), + slot_clock, + ); + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_TOTAL, + &[src, id], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_DELAY_SECONDS, + &[src, id], + delay, + ); + + info!( + self.log, + "Sync committee message"; + "head" => %sync_committee_message.beacon_block_root, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %sync_committee_message.slot, + "src" => src, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_sync_committee_message(delay) + }); + } + } + + /// Register a sync committee contribution received over gossip. + pub fn register_gossip_sync_committee_contribution( + &self, + seen_timestamp: Duration, + sync_contribution: &SignedContributionAndProof, + participant_pubkeys: &[PublicKeyBytes], + slot_clock: &S, + ) { + self.register_sync_committee_contribution( + "gossip", + seen_timestamp, + sync_contribution, + participant_pubkeys, + slot_clock, + ) + } + + /// Register a sync committee contribution received over the http api. + pub fn register_api_sync_committee_contribution( + &self, + seen_timestamp: Duration, + sync_contribution: &SignedContributionAndProof, + participant_pubkeys: &[PublicKeyBytes], + slot_clock: &S, + ) { + self.register_sync_committee_contribution( + "api", + seen_timestamp, + sync_contribution, + participant_pubkeys, + slot_clock, + ) + } + + /// Register a sync committee contribution. + fn register_sync_committee_contribution( + &self, + src: &str, + seen_timestamp: Duration, + sync_contribution: &SignedContributionAndProof, + participant_pubkeys: &[PublicKeyBytes], + slot_clock: &S, + ) { + let slot = sync_contribution.message.contribution.slot; + let epoch = slot.epoch(T::slots_per_epoch()); + let beacon_block_root = sync_contribution.message.contribution.beacon_block_root; + let delay = get_message_delay_ms( + seen_timestamp, + slot, + slot_clock.sync_committee_contribution_production_delay(), + slot_clock, + ); + + let aggregator_index = sync_contribution.message.aggregator_index; + if let Some(validator) = self.get_validator(aggregator_index) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_TOTAL, + &[src, id], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COONTRIBUTIONS_DELAY_SECONDS, + &[src, id], + delay, + ); + + info!( + self.log, + "Sync contribution"; + "head" => %beacon_block_root, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %slot, + "src" => src, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_sync_committee_contribution(delay) + }); + } + + for validator_pubkey in participant_pubkeys.iter() { + if let Some(validator) = self.validators.get(validator_pubkey) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_CONTRIBUTION_TOTAL, + &[src, id], + ); + + info!( + self.log, + "Sync signature included in contribution"; + "head" => %beacon_block_root, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %slot, + "src" => src, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_sync_signature_contribution_inclusion() + }); + } + } + } + + /// Register that the `sync_aggregate` was included in a *valid* `BeaconBlock`. + pub fn register_sync_aggregate_in_block( + &self, + slot: Slot, + beacon_block_root: Hash256, + participant_pubkeys: Vec<&PublicKeyBytes>, + ) { + let epoch = slot.epoch(T::slots_per_epoch()); + + for validator_pubkey in participant_pubkeys { + if let Some(validator) = self.validators.get(validator_pubkey) { + let id = &validator.id; + + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_BLOCK_TOTAL, + &["block", id], + ); + + info!( + self.log, + "Sync signature included in block"; + "head" => %beacon_block_root, + "epoch" => %epoch, + "slot" => %slot, + "validator" => %id, + ); + + validator.with_epoch_summary(epoch, |summary| { + summary.register_sync_signature_block_inclusions(); + }); + } + } + } + /// Register an exit from the gossip network. pub fn register_gossip_voluntary_exit(&self, exit: &VoluntaryExit) { self.register_voluntary_exit("gossip", exit) @@ -995,7 +1271,7 @@ impl ValidatorMonitor { metrics::set_gauge_vec( &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS, &[id], - summary.attestation_aggregate_incusions as i64, + summary.attestation_aggregate_inclusions as i64, ); metrics::set_gauge_vec( &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS, @@ -1009,6 +1285,48 @@ impl ValidatorMonitor { distance.as_u64() as i64, ); } + /* + * Sync committee messages + */ + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_TOTAL, + &[id], + summary.sync_committee_messages as i64, + ); + if let Some(delay) = summary.sync_committee_message_min_delay { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_MIN_DELAY_SECONDS, + &[id], + delay, + ); + } + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_INCLUSIONS, + &[id], + summary.sync_signature_contribution_inclusions as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_SIGNATURE_BLOCK_INCLUSIONS, + &[id], + summary.sync_signature_block_inclusions as i64, + ); + + /* + * Sync contributions + */ + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTIONS_TOTAL, + &[id], + summary.sync_contributions as i64, + ); + if let Some(delay) = summary.sync_contribution_min_delay { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_MIN_DELAY_SECONDS, + &[id], + delay, + ); + } + /* * Blocks */ @@ -1094,3 +1412,23 @@ pub fn get_slot_delay_ms( .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) .unwrap_or_else(|| Duration::from_secs(0)) } + +/// Returns the duration between when any message could be produced and the `seen_timestamp`. +/// +/// `message_production_delay` is the duration from the beginning of the slot when the message +/// should be produced. +/// e.g. for unagg attestations, `message_production_delay = slot_duration / 3`. +/// +/// `slot` is the slot for which the message was produced. +fn get_message_delay_ms( + seen_timestamp: Duration, + slot: Slot, + message_production_delay: Duration, + slot_clock: &S, +) -> Duration { + slot_clock + .start_of(slot) + .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) + .and_then(|gross_delay| gross_delay.checked_sub(message_production_delay)) + .unwrap_or_else(|| Duration::from_secs(0)) +} diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index 68f54a8d4..75e3633fb 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -5,8 +5,8 @@ use beacon_chain::sync_committee_verification::{ Error as SyncVerificationError, VerifiedSyncCommitteeMessage, }; use beacon_chain::{ - BeaconChain, BeaconChainError, BeaconChainTypes, StateSkipConfig, - MAXIMUM_GOSSIP_CLOCK_DISPARITY, + validator_monitor::timestamp_now, BeaconChain, BeaconChainError, BeaconChainTypes, + StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; use eth2::types::{self as api_types}; use eth2_libp2p::PubsubMessage; @@ -130,6 +130,8 @@ pub fn process_sync_committee_signatures( ) -> Result<(), warp::reject::Rejection> { let mut failures = vec![]; + let seen_timestamp = timestamp_now(); + for (i, sync_committee_signature) in sync_committee_signatures.iter().enumerate() { let subnet_positions = match get_subnet_positions_for_sync_committee_message( sync_committee_signature, @@ -168,6 +170,16 @@ pub fn process_sync_committee_signatures( ))), )?; + // Register with validator monitor + chain + .validator_monitor + .read() + .register_api_sync_committee_message( + seen_timestamp, + verified.sync_message(), + &chain.slot_clock, + ); + verified_for_pool = Some(verified); } Err(e) => { @@ -231,6 +243,8 @@ pub fn process_signed_contribution_and_proofs( let mut verified_contributions = Vec::with_capacity(signed_contribution_and_proofs.len()); let mut failures = vec![]; + let seen_timestamp = timestamp_now(); + // Verify contributions & broadcast to the network. for (index, contribution) in signed_contribution_and_proofs.into_iter().enumerate() { let aggregator_index = contribution.message.aggregator_index; @@ -246,7 +260,17 @@ pub fn process_signed_contribution_and_proofs( )), )?; - // FIXME(altair): notify validator monitor + // Register with validator monitor + chain + .validator_monitor + .read() + .register_api_sync_committee_contribution( + seen_timestamp, + verified_contribution.aggregate(), + verified_contribution.participant_pubkeys(), + &chain.slot_clock, + ); + verified_contributions.push((index, verified_contribution)); } // If we already know the contribution, don't broadcast it or attempt to diff --git a/beacon_node/http_api/src/validator_inclusion.rs b/beacon_node/http_api/src/validator_inclusion.rs index 9131d698f..48dfc17ff 100644 --- a/beacon_node/http_api/src/validator_inclusion.rs +++ b/beacon_node/http_api/src/validator_inclusion.rs @@ -27,7 +27,7 @@ fn end_of_epoch_state( fn get_epoch_processing_summary( state: &mut BeaconState, spec: &ChainSpec, -) -> Result { +) -> Result, warp::reject::Rejection> { process_epoch(state, spec) .map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e))) } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 099516082..e581b681e 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -716,7 +716,7 @@ impl Worker { peer_id: PeerId, sync_signature: SyncCommitteeMessage, subnet_id: SyncSubnetId, - _seen_timestamp: Duration, + seen_timestamp: Duration, ) { let sync_signature = match self .chain @@ -734,21 +734,19 @@ impl Worker { } }; - /*TODO: + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + // Register the sync signature with any monitored validators. self.chain .validator_monitor .read() - .register_gossip_unaggregated_attestation( + .register_gossip_sync_committee_message( seen_timestamp, - attestation.indexed_attestation(), + sync_signature.sync_message(), &self.chain.slot_clock, ); - */ - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_VERIFIED_TOTAL); @@ -778,7 +776,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, sync_contribution: SignedContributionAndProof, - _seen_timestamp: Duration, + seen_timestamp: Duration, ) { let sync_contribution = match self .chain @@ -801,19 +799,16 @@ impl Worker { // propagated on the gossip network. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - /* TODO - // Register the attestation with any monitored validators. self.chain .validator_monitor .read() - .register_gossip_aggregated_attestation( + .register_gossip_sync_committee_contribution( seen_timestamp, - aggregate.aggregate(), - aggregate.indexed_attestation(), + sync_contribution.aggregate(), + sync_contribution.participant_pubkeys(), &self.chain.slot_clock, ); - metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); - */ + metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_VERIFIED_TOTAL); if let Err(e) = self .chain diff --git a/common/slot_clock/src/lib.rs b/common/slot_clock/src/lib.rs index 94538141c..2d14abb55 100644 --- a/common/slot_clock/src/lib.rs +++ b/common/slot_clock/src/lib.rs @@ -82,9 +82,21 @@ pub trait SlotClock: Send + Sync + Sized + Clone { self.slot_duration() / 3 } + /// Returns the delay between the start of the slot and when sync committee messages should be + /// produced. + fn sync_committee_message_production_delay(&self) -> Duration { + self.slot_duration() / 3 + } + /// Returns the delay between the start of the slot and when aggregated attestations should be /// produced. fn agg_attestation_production_delay(&self) -> Duration { self.slot_duration() * 2 / 3 } + + /// Returns the delay between the start of the slot and when partially aggregated `SyncCommitteeContribution` should be + /// produced. + fn sync_committee_contribution_production_delay(&self) -> Duration { + self.slot_duration() * 2 / 3 + } } diff --git a/consensus/state_processing/src/per_epoch_processing.rs b/consensus/state_processing/src/per_epoch_processing.rs index 894da7cee..245876b86 100644 --- a/consensus/state_processing/src/per_epoch_processing.rs +++ b/consensus/state_processing/src/per_epoch_processing.rs @@ -27,7 +27,7 @@ pub mod weigh_justification_and_finalization; pub fn process_epoch( state: &mut BeaconState, spec: &ChainSpec, -) -> Result { +) -> Result, Error> { // Verify that the `BeaconState` instantiation matches the fork at `state.slot()`. state .fork_name(spec) diff --git a/consensus/state_processing/src/per_epoch_processing/altair.rs b/consensus/state_processing/src/per_epoch_processing/altair.rs index dd93ccab2..a915db63c 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair.rs @@ -22,7 +22,7 @@ pub mod sync_committee_updates; pub fn process_epoch( state: &mut BeaconState, spec: &ChainSpec, -) -> Result { +) -> Result, Error> { // Ensure the committee caches are built. state.build_committee_cache(RelativeEpoch::Previous, spec)?; state.build_committee_cache(RelativeEpoch::Current, spec)?; @@ -30,6 +30,7 @@ pub fn process_epoch( // Pre-compute participating indices and total balances. let participation_cache = ParticipationCache::new(state, spec)?; + let sync_committee = state.current_sync_committee()?.clone(); // Justification and finalization. process_justification_and_finalization(state, &participation_cache)?; @@ -75,5 +76,6 @@ pub fn process_epoch( Ok(EpochProcessingSummary::Altair { participation_cache, + sync_committee, }) } diff --git a/consensus/state_processing/src/per_epoch_processing/base.rs b/consensus/state_processing/src/per_epoch_processing/base.rs index fd530a2ea..43d96bb93 100644 --- a/consensus/state_processing/src/per_epoch_processing/base.rs +++ b/consensus/state_processing/src/per_epoch_processing/base.rs @@ -18,7 +18,7 @@ pub mod validator_statuses; pub fn process_epoch( state: &mut BeaconState, spec: &ChainSpec, -) -> Result { +) -> Result, Error> { // Ensure the committee caches are built. state.build_committee_cache(RelativeEpoch::Previous, spec)?; state.build_committee_cache(RelativeEpoch::Current, spec)?; diff --git a/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs b/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs index 6c3fb1518..814874742 100644 --- a/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs +++ b/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs @@ -3,20 +3,23 @@ use super::{ base::{validator_statuses::InclusionInfo, TotalBalances, ValidatorStatus}, }; use crate::metrics; +use std::sync::Arc; +use types::{EthSpec, SyncCommittee}; /// Provides a summary of validator participation during the epoch. #[derive(PartialEq, Debug)] -pub enum EpochProcessingSummary { +pub enum EpochProcessingSummary { Base { total_balances: TotalBalances, statuses: Vec, }, Altair { participation_cache: ParticipationCache, + sync_committee: Arc>, }, } -impl EpochProcessingSummary { +impl EpochProcessingSummary { /// Updates some Prometheus metrics with some values in `self`. #[cfg(feature = "metrics")] pub fn observe_metrics(&self) -> Result<(), ParticipationCacheError> { @@ -40,12 +43,21 @@ impl EpochProcessingSummary { Ok(()) } + /// Returns the sync committee indices for the current epoch for altair. + pub fn sync_committee(&self) -> Option<&SyncCommittee> { + match self { + EpochProcessingSummary::Altair { sync_committee, .. } => Some(sync_committee), + EpochProcessingSummary::Base { .. } => None, + } + } + /// Returns the sum of the effective balance of all validators in the current epoch. pub fn current_epoch_total_active_balance(&self) -> u64 { match self { EpochProcessingSummary::Base { total_balances, .. } => total_balances.current_epoch(), EpochProcessingSummary::Altair { participation_cache, + .. } => participation_cache.current_epoch_total_active_balance(), } } @@ -59,6 +71,7 @@ impl EpochProcessingSummary { } EpochProcessingSummary::Altair { participation_cache, + .. } => participation_cache.current_epoch_target_attesting_balance(), } } @@ -69,6 +82,7 @@ impl EpochProcessingSummary { EpochProcessingSummary::Base { total_balances, .. } => total_balances.previous_epoch(), EpochProcessingSummary::Altair { participation_cache, + .. } => participation_cache.previous_epoch_total_active_balance(), } } @@ -126,6 +140,7 @@ impl EpochProcessingSummary { } EpochProcessingSummary::Altair { participation_cache, + .. } => participation_cache.previous_epoch_target_attesting_balance(), } } @@ -144,6 +159,7 @@ impl EpochProcessingSummary { } EpochProcessingSummary::Altair { participation_cache, + .. } => participation_cache.previous_epoch_head_attesting_balance(), } } @@ -162,6 +178,7 @@ impl EpochProcessingSummary { } EpochProcessingSummary::Altair { participation_cache, + .. } => participation_cache.previous_epoch_source_attesting_balance(), } } diff --git a/consensus/state_processing/src/per_slot_processing.rs b/consensus/state_processing/src/per_slot_processing.rs index 6bb38fa39..43eaa89c1 100644 --- a/consensus/state_processing/src/per_slot_processing.rs +++ b/consensus/state_processing/src/per_slot_processing.rs @@ -26,7 +26,7 @@ pub fn per_slot_processing( state: &mut BeaconState, state_root: Option, spec: &ChainSpec, -) -> Result, Error> { +) -> Result>, Error> { // Verify that the `BeaconState` instantiation matches the fork at `state.slot()`. state .fork_name(spec) diff --git a/consensus/types/src/sync_committee.rs b/consensus/types/src/sync_committee.rs index 784fb0ce1..598d5fc16 100644 --- a/consensus/types/src/sync_committee.rs +++ b/consensus/types/src/sync_committee.rs @@ -84,4 +84,9 @@ impl SyncCommittee { } Ok(subnet_positions) } + + /// Returns `true` if the pubkey exists in the `SyncCommittee`. + pub fn contains(&self, pubkey: &PublicKeyBytes) -> bool { + self.pubkeys.contains(pubkey) + } }