diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f64168527..eb9e179aa 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5,6 +5,7 @@ use crate::attestation_verification::{ }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::BeaconProposerCache; +use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock, @@ -38,14 +39,16 @@ use crate::sync_committee_verification::{ }; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_monitor::{ - get_block_delay_ms, get_slot_delay_ms, timestamp_now, ValidatorMonitor, + get_slot_delay_ms, timestamp_now, ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; use crate::{metrics, BeaconChainError}; -use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SyncDuty}; +use eth2::types::{ + EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead, SyncDuty, +}; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; @@ -298,6 +301,8 @@ pub struct BeaconChain { pub(crate) validator_pubkey_cache: TimeoutRwLock>, /// A cache used when producing attestations. pub(crate) attester_cache: Arc, + /// A cache used to keep track of various block timings. + pub block_times_cache: Arc>, /// A list of any hard-coded forks that have been disabled. pub disabled_forks: Vec, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -2538,14 +2543,9 @@ impl BeaconChain { // This prevents inconsistency between the two at the expense of concurrency. drop(fork_choice); - // Log metrics to track the delay between when the block was made and when we imported it. - // // We're declaring the block "imported" at this point, since fork choice and the DB know // about it. - metrics::observe_duration( - &metrics::BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME, - get_block_delay_ms(timestamp_now(), block.to_ref(), &self.slot_clock), - ); + let block_time_imported = timestamp_now(); let parent_root = block.parent_root(); let slot = block.slot(); @@ -2590,6 +2590,38 @@ impl BeaconChain { metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + let block_delay_total = get_slot_delay_ms(block_time_imported, slot, &self.slot_clock); + + // Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to + // the cache during sync. + if block_delay_total < self.slot_clock.slot_duration() * 64 { + // Store the timestamp of the block being imported into the cache. + self.block_times_cache.write().set_time_imported( + block_root, + current_slot, + block_time_imported, + ); + } + + // Do not store metrics if the block was > 4 slots old, this helps prevent noise during + // sync. + if block_delay_total < self.slot_clock.slot_duration() * 4 { + // Observe the delay between when we observed the block and when we imported it. + let block_delays = self.block_times_cache.read().get_block_delays( + block_root, + self.slot_clock + .start_of(current_slot) + .unwrap_or_else(|| Duration::from_secs(0)), + ); + + metrics::observe_duration( + &metrics::BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME, + block_delays + .imported + .unwrap_or_else(|| Duration::from_secs(0)), + ); + } + Ok(block_root) } @@ -2998,7 +3030,7 @@ impl BeaconChain { let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); - // These fields are used for server-sent events + // These fields are used for server-sent events. let state_root = new_head.beacon_state_root(); let head_slot = new_head.beacon_state.slot(); let target_epoch_start_slot = new_head @@ -3010,6 +3042,12 @@ impl BeaconChain { .previous_epoch() .start_slot(T::EthSpec::slots_per_epoch()); let head_proposer_index = new_head.beacon_block.message().proposer_index(); + let proposer_graffiti = new_head + .beacon_block + .message() + .body() + .graffiti() + .as_utf8_lossy(); drop(lag_timer); @@ -3020,35 +3058,83 @@ impl BeaconChain { .try_write_for(HEAD_LOCK_TIMEOUT) .ok_or(Error::CanonicalHeadLockTimeout)? = new_head; + // The block has now been set as head so we can record times and delays. metrics::stop_timer(update_head_timer); - let block_delay = get_slot_delay_ms(timestamp_now(), head_slot, &self.slot_clock); + let block_time_set_as_head = timestamp_now(); - // Observe the delay between the start of the slot and when we set the block as head. - metrics::observe_duration( - &metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME, - block_delay, - ); + // Calculate the total delay between the start of the slot and when it was set as head. + let block_delay_total = + get_slot_delay_ms(block_time_set_as_head, head_slot, &self.slot_clock); - // If the block was enshrined as head too late for attestations to be created for it, log a - // debug warning and increment a metric. - // - // Don't create this log if the block was > 4 slots old, this helps prevent noise during - // sync. - if block_delay >= self.slot_clock.unagg_attestation_production_delay() - && block_delay < self.slot_clock.slot_duration() * 4 - { - metrics::inc_counter(&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_EXCEEDED_TOTAL); - debug!( - self.log, - "Delayed head block"; - "block_root" => ?beacon_block_root, - "proposer_index" => head_proposer_index, - "slot" => head_slot, - "block_delay" => ?block_delay, + // Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to + // the cache during sync. + if block_delay_total < self.slot_clock.slot_duration() * 64 { + self.block_times_cache.write().set_time_set_as_head( + beacon_block_root, + current_head.slot, + block_time_set_as_head, ); } + // If a block comes in from over 4 slots ago, it is most likely a block from sync. + let block_from_sync = block_delay_total > self.slot_clock.slot_duration() * 4; + + // Determine whether the block has been set as head too late for proper attestation + // production. + let late_head = block_delay_total >= self.slot_clock.unagg_attestation_production_delay(); + + // Do not store metrics if the block was > 4 slots old, this helps prevent noise during + // sync. + if !block_from_sync { + // Observe the total block delay. This is the delay between the time the slot started + // and when the block was set as head. + metrics::observe_duration( + &metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME, + block_delay_total, + ); + + // Observe the delay between when we imported the block and when we set the block as + // head. + let block_delays = self.block_times_cache.read().get_block_delays( + beacon_block_root, + self.slot_clock + .start_of(head_slot) + .unwrap_or_else(|| Duration::from_secs(0)), + ); + + metrics::observe_duration( + &metrics::BEACON_BLOCK_OBSERVED_SLOT_START_DELAY_TIME, + block_delays + .observed + .unwrap_or_else(|| Duration::from_secs(0)), + ); + + metrics::observe_duration( + &metrics::BEACON_BLOCK_HEAD_IMPORTED_DELAY_TIME, + block_delays + .set_as_head + .unwrap_or_else(|| Duration::from_secs(0)), + ); + + // If the block was enshrined as head too late for attestations to be created for it, + // log a debug warning and increment a metric. + if late_head { + metrics::inc_counter(&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_EXCEEDED_TOTAL); + debug!( + self.log, + "Delayed head block"; + "block_root" => ?beacon_block_root, + "proposer_index" => head_proposer_index, + "slot" => head_slot, + "block_delay" => ?block_delay_total, + "observed_delay" => ?block_delays.observed, + "imported_delay" => ?block_delays.imported, + "set_as_head_delay" => ?block_delays.set_as_head, + ); + } + } + self.snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .map(|mut snapshot_cache| { @@ -3146,6 +3232,31 @@ impl BeaconChain { epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()), })); } + + if !block_from_sync && late_head && event_handler.has_late_head_subscribers() { + let peer_info = self + .block_times_cache + .read() + .get_peer_info(beacon_block_root); + let block_delays = self.block_times_cache.read().get_block_delays( + beacon_block_root, + self.slot_clock + .start_of(head_slot) + .unwrap_or_else(|| Duration::from_secs(0)), + ); + event_handler.register(EventKind::LateHead(SseLateHead { + slot: head_slot, + block: beacon_block_root, + peer_id: peer_info.id, + peer_client: peer_info.client, + proposer_index: head_proposer_index, + proposer_graffiti, + block_delay: block_delay_total, + observed_delay: block_delays.observed, + imported_delay: block_delays.imported, + set_as_head_delay: block_delays.set_as_head, + })); + } } Ok(()) @@ -3212,6 +3323,7 @@ impl BeaconChain { trace!(self.log, "Running beacon chain per slot tasks"); if let Some(slot) = self.slot_clock.now() { self.naive_aggregation_pool.write().prune(slot); + self.block_times_cache.write().prune(slot); } } diff --git a/beacon_node/beacon_chain/src/block_times_cache.rs b/beacon_node/beacon_chain/src/block_times_cache.rs new file mode 100644 index 000000000..484de841d --- /dev/null +++ b/beacon_node/beacon_chain/src/block_times_cache.rs @@ -0,0 +1,143 @@ +//! This module provides the `BlockTimesCache' which contains information regarding block timings. +//! +//! This provides `BeaconChain` and associated functions with access to the timestamps of when a +//! certain block was observed, imported and set as head. +//! This allows for better traceability and allows us to determine the root cause for why a block +//! was set as head late. +//! This allows us to distingush between the following scenarios: +//! - The block was observed late. +//! - We were too slow to import it. +//! - We were too slow to set it as head. + +use eth2::types::{Hash256, Slot}; +use std::collections::HashMap; +use std::time::Duration; + +type BlockRoot = Hash256; + +#[derive(Clone, Default)] +pub struct Timestamps { + pub observed: Option, + pub imported: Option, + pub set_as_head: Option, +} + +// Helps arrange delay data so it is more relevant to metrics. +#[derive(Default)] +pub struct BlockDelays { + pub observed: Option, + pub imported: Option, + pub set_as_head: Option, +} + +impl BlockDelays { + fn new(times: Timestamps, slot_start_time: Duration) -> BlockDelays { + let observed = times + .observed + .and_then(|observed_time| observed_time.checked_sub(slot_start_time)); + let imported = times + .imported + .and_then(|imported_time| imported_time.checked_sub(times.observed?)); + let set_as_head = times + .set_as_head + .and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?)); + BlockDelays { + observed, + imported, + set_as_head, + } + } +} + +// If the block was received via gossip, we can record the client type of the peer which sent us +// the block. +#[derive(Clone, Default)] +pub struct BlockPeerInfo { + pub id: Option, + pub client: Option, +} + +pub struct BlockTimesCacheValue { + pub slot: Slot, + pub timestamps: Timestamps, + pub peer_info: BlockPeerInfo, +} + +impl BlockTimesCacheValue { + fn new(slot: Slot) -> Self { + BlockTimesCacheValue { + slot, + timestamps: Default::default(), + peer_info: Default::default(), + } + } +} + +#[derive(Default)] +pub struct BlockTimesCache { + pub cache: HashMap, +} + +/// Helper methods to read from and write to the cache. +impl BlockTimesCache { + pub fn set_time_observed( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + peer_id: Option, + peer_client: Option, + ) { + let block_times = self + .cache + .entry(block_root) + .or_insert_with(|| BlockTimesCacheValue::new(slot)); + block_times.timestamps.observed = Some(timestamp); + block_times.peer_info = BlockPeerInfo { + id: peer_id, + client: peer_client, + }; + } + + pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + let block_times = self + .cache + .entry(block_root) + .or_insert_with(|| BlockTimesCacheValue::new(slot)); + block_times.timestamps.imported = Some(timestamp); + } + + pub fn set_time_set_as_head(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + let block_times = self + .cache + .entry(block_root) + .or_insert_with(|| BlockTimesCacheValue::new(slot)); + block_times.timestamps.set_as_head = Some(timestamp); + } + + pub fn get_block_delays( + &self, + block_root: BlockRoot, + slot_start_time: Duration, + ) -> BlockDelays { + if let Some(block_times) = self.cache.get(&block_root) { + BlockDelays::new(block_times.timestamps.clone(), slot_start_time) + } else { + BlockDelays::default() + } + } + + pub fn get_peer_info(&self, block_root: BlockRoot) -> BlockPeerInfo { + if let Some(block_info) = self.cache.get(&block_root) { + block_info.peer_info.clone() + } else { + BlockPeerInfo::default() + } + } + + // Prune the cache to only store the most recent 2 epochs. + pub fn prune(&mut self, current_slot: Slot) { + self.cache + .retain(|_, cache| cache.slot > current_slot.saturating_sub(64_u64)); + } +} diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 6d4b9225d..3d718df23 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -724,6 +724,7 @@ where )), shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), beacon_proposer_cache: <_>::default(), + block_times_cache: <_>::default(), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), disabled_forks: self.disabled_forks, diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 1db745727..459ccb457 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -14,6 +14,7 @@ pub struct ServerSentEventHandler { exit_tx: Sender>, chain_reorg_tx: Sender>, contribution_tx: Sender>, + late_head: Sender>, log: Logger, } @@ -30,6 +31,7 @@ impl ServerSentEventHandler { let (exit_tx, _) = broadcast::channel(capacity); let (chain_reorg_tx, _) = broadcast::channel(capacity); let (contribution_tx, _) = broadcast::channel(capacity); + let (late_head, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -39,6 +41,7 @@ impl ServerSentEventHandler { exit_tx, chain_reorg_tx, contribution_tx, + late_head, log, } } @@ -62,6 +65,8 @@ impl ServerSentEventHandler { .map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)), EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof)) .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), + EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head)) + .map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -96,6 +101,10 @@ impl ServerSentEventHandler { self.contribution_tx.subscribe() } + pub fn subscribe_late_head(&self) -> Receiver> { + self.late_head.subscribe() + } + pub fn has_attestation_subscribers(&self) -> bool { self.attestation_tx.receiver_count() > 0 } @@ -123,4 +132,8 @@ impl ServerSentEventHandler { pub fn has_contribution_subscribers(&self) -> bool { self.contribution_tx.receiver_count() > 0 } + + pub fn has_late_head_subscribers(&self) -> bool { + self.late_head.receiver_count() > 0 + } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9796c65d1..fd4cb1495 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -5,6 +5,7 @@ mod beacon_chain; mod beacon_fork_choice_store; mod beacon_proposer_cache; mod beacon_snapshot; +mod block_times_cache; mod block_verification; pub mod builder; pub mod chain_config; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 203df0134..136002cb8 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -738,17 +738,25 @@ lazy_static! { /* * Block Delay Metrics */ - pub static ref BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME: Result = try_create_histogram( - "beacon_block_imported_slot_start_delay_time", - "Duration between the start of the blocks slot and the current time when it was imported.", + pub static ref BEACON_BLOCK_OBSERVED_SLOT_START_DELAY_TIME: Result = try_create_histogram( + "beacon_block_observed_slot_start_delay_time", + "Duration between the start of the block's slot and the time the block was observed.", + ); + pub static ref BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME: Result = try_create_histogram( + "beacon_block_imported_observed_delay_time", + "Duration between the time the block was observed and the time when it was imported.", + ); + pub static ref BEACON_BLOCK_HEAD_IMPORTED_DELAY_TIME: Result = try_create_histogram( + "beacon_block_head_imported_delay_time", + "Duration between the time the block was imported and the time when it was set as head.", ); pub static ref BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME: Result = try_create_histogram( "beacon_block_head_slot_start_delay_time", - "Duration between the start of the blocks slot and the current time when it was as head.", + "Duration between the start of the block's slot and the time when it was set as head.", ); pub static ref BEACON_BLOCK_HEAD_SLOT_START_DELAY_EXCEEDED_TOTAL: Result = try_create_int_counter( "beacon_block_head_slot_start_delay_exceeded_total", - "Triggered when the duration between the start of the blocks slot and the current time \ + "Triggered when the duration between the start of the block's slot and the current time \ will result in failed attestations.", ); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 61a9cf137..3e3a51c6a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2475,6 +2475,9 @@ pub fn serve( api_types::EventTopic::ContributionAndProof => { event_handler.subscribe_contributions() } + api_types::EventTopic::LateHead => { + event_handler.subscribe_late_head() + } }; receivers.push(BroadcastStream::new(receiver).map(|msg| { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 63868b2df..cb07f572b 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -42,7 +42,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock}; use eth2_libp2p::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, - MessageId, NetworkGlobals, PeerId, PeerRequestId, + Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -341,6 +341,7 @@ impl WorkEvent { pub fn gossip_beacon_block( message_id: MessageId, peer_id: PeerId, + peer_client: Client, block: Box>, seen_timestamp: Duration, ) -> Self { @@ -349,6 +350,7 @@ impl WorkEvent { work: Work::GossipBlock { message_id, peer_id, + peer_client, block, seen_timestamp, }, @@ -602,6 +604,7 @@ pub enum Work { GossipBlock { message_id: MessageId, peer_id: PeerId, + peer_client: Client, block: Box>, seen_timestamp: Duration, }, @@ -1362,11 +1365,13 @@ impl BeaconProcessor { Work::GossipBlock { message_id, peer_id, + peer_client, block, seen_timestamp, } => worker.process_gossip_block( message_id, peer_id, + peer_client, *block, work_reprocessing_tx, seen_timestamp, diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 0f491527b..5b67d3555 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -231,6 +231,7 @@ impl TestRig { .try_send(WorkEvent::gossip_beacon_block( junk_message_id(), junk_peer_id(), + Client::default(), Box::new(self.next_block.clone()), Duration::from_secs(0), )) diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 81028d476..675544beb 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -7,7 +7,7 @@ use beacon_chain::{ validator_monitor::get_block_delay_ms, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, }; -use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; +use eth2_libp2p::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; @@ -622,6 +622,7 @@ impl Worker { self, message_id: MessageId, peer_id: PeerId, + peer_client: Client, block: SignedBeaconBlock, reprocess_tx: mpsc::Sender>, seen_duration: Duration, @@ -634,6 +635,15 @@ impl Worker { block_delay, ); + // Write the time the block was observed into delay cache. + self.chain.block_times_cache.write().set_time_observed( + block.canonical_root(), + block.slot(), + seen_duration, + Some(peer_id.to_string()), + Some(peer_client.to_string()), + ); + let verified_block = match self.chain.verify_block_for_gossip(block) { Ok(verified_block) => { if block_delay >= self.chain.slot_clock.unagg_attestation_production_delay() { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 5096a4bdc..178a16319 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -223,7 +223,12 @@ impl Router { ); } PubsubMessage::BeaconBlock(block) => { - self.processor.on_block_gossip(id, peer_id, block); + self.processor.on_block_gossip( + id, + peer_id, + self.network_globals.client(&peer_id), + block, + ); } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 01ea98948..1df83173b 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -5,7 +5,7 @@ use crate::service::NetworkMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::*; -use eth2_libp2p::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response}; +use eth2_libp2p::{Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response}; use slog::{debug, error, o, trace, warn}; use std::cmp; use std::sync::Arc; @@ -211,6 +211,7 @@ impl Processor { peer_id, request_id: id, beacon_block, + seen_timestamp: timestamp_now(), }); } else { debug!( @@ -229,11 +230,13 @@ impl Processor { &mut self, message_id: MessageId, peer_id: PeerId, + peer_client: Client, block: Box>, ) { self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block( message_id, peer_id, + peer_client, block, timestamp_now(), )) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 90ff61b41..f6d5a954d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -54,6 +54,7 @@ use ssz_types::VariableList; use std::boxed::Box; use std::ops::Sub; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; @@ -90,6 +91,7 @@ pub enum SyncMessage { peer_id: PeerId, request_id: RequestId, beacon_block: Option>>, + seen_timestamp: Duration, }, /// A block with an unknown parent has been received. @@ -313,6 +315,7 @@ impl SyncManager { peer_id: PeerId, request_id: RequestId, block: Option>, + seen_timestamp: Duration, ) { match block { Some(block) => { @@ -326,7 +329,7 @@ impl SyncManager { single_block_hash = Some(block_request.hash); } if let Some(block_hash) = single_block_hash { - self.single_block_lookup_response(peer_id, block, block_hash) + self.single_block_lookup_response(peer_id, block, block_hash, seen_timestamp) .await; return; } @@ -449,6 +452,7 @@ impl SyncManager { peer_id: PeerId, block: SignedBeaconBlock, expected_block_hash: Hash256, + seen_timestamp: Duration, ) { // verify the hash is correct and try and process the block if expected_block_hash != block.canonical_root() { @@ -467,6 +471,14 @@ impl SyncManager { // we have the correct block, try and process it match block_result { Ok(block_root) => { + // Block has been processed, so write the block time to the cache. + self.chain.block_times_cache.write().set_time_observed( + block_root, + block.slot(), + seen_timestamp, + None, + None, + ); info!(self.log, "Processed block"; "block" => %block_root); match self.chain.fork_choice() { @@ -1007,9 +1019,15 @@ impl SyncManager { peer_id, request_id, beacon_block, + seen_timestamp, } => { - self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b)) - .await; + self.blocks_by_root_response( + peer_id, + request_id, + beacon_block.map(|b| *b), + seen_timestamp, + ) + .await; } SyncMessage::UnknownBlock(peer_id, block) => { self.add_unknown_block(peer_id, *block); diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index eeb379d36..1e4bfca2d 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::fmt; use std::str::{from_utf8, FromStr}; +use std::time::Duration; pub use types::*; /// An API error serializable to JSON. @@ -761,6 +762,20 @@ pub struct SseChainReorg { pub epoch: Epoch, } +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseLateHead { + pub slot: Slot, + pub block: Hash256, + pub proposer_index: u64, + pub peer_id: Option, + pub peer_client: Option, + pub proposer_graffiti: String, + pub block_delay: Duration, + pub observed_delay: Option, + pub imported_delay: Option, + pub set_as_head_delay: Option, +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "T: EthSpec", untagged)] pub enum EventKind { @@ -771,6 +786,7 @@ pub enum EventKind { VoluntaryExit(SignedVoluntaryExit), ChainReorg(SseChainReorg), ContributionAndProof(Box>), + LateHead(SseLateHead), } impl EventKind { @@ -783,6 +799,7 @@ impl EventKind { EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", EventKind::ContributionAndProof(_) => "contribution_and_proof", + EventKind::LateHead(_) => "late_head", } } @@ -822,6 +839,9 @@ impl EventKind { "head" => Ok(EventKind::Head(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Head: {:?}", e)), )?)), + "late_head" => Ok(EventKind::LateHead(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Late Head: {:?}", e)), + )?)), "voluntary_exit" => Ok(EventKind::VoluntaryExit( serde_json::from_str(data).map_err(|e| { ServerError::InvalidServerSentEvent(format!("Voluntary Exit: {:?}", e)) @@ -854,6 +874,7 @@ pub enum EventTopic { FinalizedCheckpoint, ChainReorg, ContributionAndProof, + LateHead, } impl FromStr for EventTopic { @@ -868,6 +889,7 @@ impl FromStr for EventTopic { "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), "chain_reorg" => Ok(EventTopic::ChainReorg), "contribution_and_proof" => Ok(EventTopic::ContributionAndProof), + "late_head" => Ok(EventTopic::LateHead), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -883,6 +905,7 @@ impl fmt::Display for EventTopic { EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"), + EventTopic::LateHead => write!(f, "late_head"), } } }