Add BlockTimesCache to allow additional block delay metrics (#2546)

## Issue Addressed

Closes #2528

## Proposed Changes

- Add `BlockTimesCache` to provide block timing information to `BeaconChain`. This allows additional metrics to be calculated for blocks that are set as head too late.
- Thread the `seen_timestamp` of blocks received from RPC responses (except blocks from syncing) through to the sync manager, similar to what is done for blocks from gossip.

## Additional Info

This provides the following additional metrics:
- `BEACON_BLOCK_OBSERVED_SLOT_START_DELAY_TIME`
  - The delay between the start of the slot and when the block was first observed.
- `BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME`
   - The delay between when the block was first observed and when the block was imported.
- `BEACON_BLOCK_HEAD_IMPORTED_DELAY_TIME`
  - The delay between when the block was imported and when the block was set as head.

The metric `BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME` was removed.

A log is produced when a block is set as head too late, e.g.:
```
Aug 27 03:46:39.006 DEBG Delayed head block                      set_as_head_delay: Some(21.731066ms), imported_delay: Some(119.929934ms), observed_delay: Some(3.864596988s), block_delay: 4.006257988s, slot: 1931331, proposer_index: 24294, block_root: 0x937602c89d3143afa89088a44bdf4b4d0d760dad082abacb229495c048648a9e, service: beacon
```
This commit is contained in:
Mac L 2021-09-30 04:31:41 +00:00
parent 70441aa554
commit 4c510f8f6b
14 changed files with 389 additions and 43 deletions

View File

@ -5,6 +5,7 @@ use crate::attestation_verification::{
};
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock,
@ -38,14 +39,16 @@ use crate::sync_committee_verification::{
};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{
get_block_delay_ms, get_slot_delay_ms, timestamp_now, ValidatorMonitor,
get_slot_delay_ms, timestamp_now, ValidatorMonitor,
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SyncDuty};
use eth2::types::{
EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead, SyncDuty,
};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
@ -298,6 +301,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>,
/// A cache used when producing attestations.
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A list of any hard-coded forks that have been disabled.
pub disabled_forks: Vec<String>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
@ -2538,14 +2543,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);
// Log metrics to track the delay between when the block was made and when we imported it.
//
// We're declaring the block "imported" at this point, since fork choice and the DB know
// about it.
metrics::observe_duration(
&metrics::BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME,
get_block_delay_ms(timestamp_now(), block.to_ref(), &self.slot_clock),
);
let block_time_imported = timestamp_now();
let parent_root = block.parent_root();
let slot = block.slot();
@ -2590,6 +2590,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
let block_delay_total = get_slot_delay_ms(block_time_imported, slot, &self.slot_clock);
// Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to
// the cache during sync.
if block_delay_total < self.slot_clock.slot_duration() * 64 {
// Store the timestamp of the block being imported into the cache.
self.block_times_cache.write().set_time_imported(
block_root,
current_slot,
block_time_imported,
);
}
// Do not store metrics if the block was > 4 slots old, this helps prevent noise during
// sync.
if block_delay_total < self.slot_clock.slot_duration() * 4 {
// Observe the delay between when we observed the block and when we imported it.
let block_delays = self.block_times_cache.read().get_block_delays(
block_root,
self.slot_clock
.start_of(current_slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);
metrics::observe_duration(
&metrics::BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME,
block_delays
.imported
.unwrap_or_else(|| Duration::from_secs(0)),
);
}
Ok(block_root)
}
@ -2998,7 +3030,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
// These fields are used for server-sent events
// These fields are used for server-sent events.
let state_root = new_head.beacon_state_root();
let head_slot = new_head.beacon_state.slot();
let target_epoch_start_slot = new_head
@ -3010,6 +3042,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.previous_epoch()
.start_slot(T::EthSpec::slots_per_epoch());
let head_proposer_index = new_head.beacon_block.message().proposer_index();
let proposer_graffiti = new_head
.beacon_block
.message()
.body()
.graffiti()
.as_utf8_lossy();
drop(lag_timer);
@ -3020,24 +3058,68 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.try_write_for(HEAD_LOCK_TIMEOUT)
.ok_or(Error::CanonicalHeadLockTimeout)? = new_head;
// The block has now been set as head so we can record times and delays.
metrics::stop_timer(update_head_timer);
let block_delay = get_slot_delay_ms(timestamp_now(), head_slot, &self.slot_clock);
let block_time_set_as_head = timestamp_now();
// Observe the delay between the start of the slot and when we set the block as head.
// Calculate the total delay between the start of the slot and when it was set as head.
let block_delay_total =
get_slot_delay_ms(block_time_set_as_head, head_slot, &self.slot_clock);
// Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to
// the cache during sync.
if block_delay_total < self.slot_clock.slot_duration() * 64 {
self.block_times_cache.write().set_time_set_as_head(
beacon_block_root,
current_head.slot,
block_time_set_as_head,
);
}
// If a block comes in from over 4 slots ago, it is most likely a block from sync.
let block_from_sync = block_delay_total > self.slot_clock.slot_duration() * 4;
// Determine whether the block has been set as head too late for proper attestation
// production.
let late_head = block_delay_total >= self.slot_clock.unagg_attestation_production_delay();
// Do not store metrics if the block was > 4 slots old, this helps prevent noise during
// sync.
if !block_from_sync {
// Observe the total block delay. This is the delay between the time the slot started
// and when the block was set as head.
metrics::observe_duration(
&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME,
block_delay,
block_delay_total,
);
// If the block was enshrined as head too late for attestations to be created for it, log a
// debug warning and increment a metric.
//
// Don't create this log if the block was > 4 slots old, this helps prevent noise during
// sync.
if block_delay >= self.slot_clock.unagg_attestation_production_delay()
&& block_delay < self.slot_clock.slot_duration() * 4
{
// Observe the delay between when we imported the block and when we set the block as
// head.
let block_delays = self.block_times_cache.read().get_block_delays(
beacon_block_root,
self.slot_clock
.start_of(head_slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);
metrics::observe_duration(
&metrics::BEACON_BLOCK_OBSERVED_SLOT_START_DELAY_TIME,
block_delays
.observed
.unwrap_or_else(|| Duration::from_secs(0)),
);
metrics::observe_duration(
&metrics::BEACON_BLOCK_HEAD_IMPORTED_DELAY_TIME,
block_delays
.set_as_head
.unwrap_or_else(|| Duration::from_secs(0)),
);
// If the block was enshrined as head too late for attestations to be created for it,
// log a debug warning and increment a metric.
if late_head {
metrics::inc_counter(&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_EXCEEDED_TOTAL);
debug!(
self.log,
@ -3045,9 +3127,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"block_root" => ?beacon_block_root,
"proposer_index" => head_proposer_index,
"slot" => head_slot,
"block_delay" => ?block_delay,
"block_delay" => ?block_delay_total,
"observed_delay" => ?block_delays.observed,
"imported_delay" => ?block_delays.imported,
"set_as_head_delay" => ?block_delays.set_as_head,
);
}
}
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
@ -3146,6 +3232,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()),
}));
}
if !block_from_sync && late_head && event_handler.has_late_head_subscribers() {
let peer_info = self
.block_times_cache
.read()
.get_peer_info(beacon_block_root);
let block_delays = self.block_times_cache.read().get_block_delays(
beacon_block_root,
self.slot_clock
.start_of(head_slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);
event_handler.register(EventKind::LateHead(SseLateHead {
slot: head_slot,
block: beacon_block_root,
peer_id: peer_info.id,
peer_client: peer_info.client,
proposer_index: head_proposer_index,
proposer_graffiti,
block_delay: block_delay_total,
observed_delay: block_delays.observed,
imported_delay: block_delays.imported,
set_as_head_delay: block_delays.set_as_head,
}));
}
}
Ok(())
@ -3212,6 +3323,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
trace!(self.log, "Running beacon chain per slot tasks");
if let Some(slot) = self.slot_clock.now() {
self.naive_aggregation_pool.write().prune(slot);
self.block_times_cache.write().prune(slot);
}
}

View File

@ -0,0 +1,143 @@
//! This module provides the `BlockTimesCache' which contains information regarding block timings.
//!
//! This provides `BeaconChain` and associated functions with access to the timestamps of when a
//! certain block was observed, imported and set as head.
//! This allows for better traceability and allows us to determine the root cause for why a block
//! was set as head late.
//! This allows us to distingush between the following scenarios:
//! - The block was observed late.
//! - We were too slow to import it.
//! - We were too slow to set it as head.
use eth2::types::{Hash256, Slot};
use std::collections::HashMap;
use std::time::Duration;
type BlockRoot = Hash256;
#[derive(Clone, Default)]
pub struct Timestamps {
pub observed: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
}
// Helps arrange delay data so it is more relevant to metrics.
#[derive(Default)]
pub struct BlockDelays {
pub observed: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
}
impl BlockDelays {
fn new(times: Timestamps, slot_start_time: Duration) -> BlockDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(times.observed?));
let set_as_head = times
.set_as_head
.and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?));
BlockDelays {
observed,
imported,
set_as_head,
}
}
}
// If the block was received via gossip, we can record the client type of the peer which sent us
// the block.
#[derive(Clone, Default)]
pub struct BlockPeerInfo {
pub id: Option<String>,
pub client: Option<String>,
}
pub struct BlockTimesCacheValue {
pub slot: Slot,
pub timestamps: Timestamps,
pub peer_info: BlockPeerInfo,
}
impl BlockTimesCacheValue {
fn new(slot: Slot) -> Self {
BlockTimesCacheValue {
slot,
timestamps: Default::default(),
peer_info: Default::default(),
}
}
}
#[derive(Default)]
pub struct BlockTimesCache {
pub cache: HashMap<BlockRoot, BlockTimesCacheValue>,
}
/// Helper methods to read from and write to the cache.
impl BlockTimesCache {
pub fn set_time_observed(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
peer_id: Option<String>,
peer_client: Option<String>,
) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.observed = Some(timestamp);
block_times.peer_info = BlockPeerInfo {
id: peer_id,
client: peer_client,
};
}
pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.imported = Some(timestamp);
}
pub fn set_time_set_as_head(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.set_as_head = Some(timestamp);
}
pub fn get_block_delays(
&self,
block_root: BlockRoot,
slot_start_time: Duration,
) -> BlockDelays {
if let Some(block_times) = self.cache.get(&block_root) {
BlockDelays::new(block_times.timestamps.clone(), slot_start_time)
} else {
BlockDelays::default()
}
}
pub fn get_peer_info(&self, block_root: BlockRoot) -> BlockPeerInfo {
if let Some(block_info) = self.cache.get(&block_root) {
block_info.peer_info.clone()
} else {
BlockPeerInfo::default()
}
}
// Prune the cache to only store the most recent 2 epochs.
pub fn prune(&mut self, current_slot: Slot) {
self.cache
.retain(|_, cache| cache.slot > current_slot.saturating_sub(64_u64));
}
}

View File

@ -724,6 +724,7 @@ where
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
disabled_forks: self.disabled_forks,

View File

@ -14,6 +14,7 @@ pub struct ServerSentEventHandler<T: EthSpec> {
exit_tx: Sender<EventKind<T>>,
chain_reorg_tx: Sender<EventKind<T>>,
contribution_tx: Sender<EventKind<T>>,
late_head: Sender<EventKind<T>>,
log: Logger,
}
@ -30,6 +31,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg_tx, _) = broadcast::channel(capacity);
let (contribution_tx, _) = broadcast::channel(capacity);
let (late_head, _) = broadcast::channel(capacity);
Self {
attestation_tx,
@ -39,6 +41,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
exit_tx,
chain_reorg_tx,
contribution_tx,
late_head,
log,
}
}
@ -62,6 +65,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof))
.map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)),
EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head))
.map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@ -96,6 +101,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.contribution_tx.subscribe()
}
pub fn subscribe_late_head(&self) -> Receiver<EventKind<T>> {
self.late_head.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
@ -123,4 +132,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn has_contribution_subscribers(&self) -> bool {
self.contribution_tx.receiver_count() > 0
}
pub fn has_late_head_subscribers(&self) -> bool {
self.late_head.receiver_count() > 0
}
}

View File

@ -5,6 +5,7 @@ mod beacon_chain;
mod beacon_fork_choice_store;
mod beacon_proposer_cache;
mod beacon_snapshot;
mod block_times_cache;
mod block_verification;
pub mod builder;
pub mod chain_config;

View File

@ -738,17 +738,25 @@ lazy_static! {
/*
* Block Delay Metrics
*/
pub static ref BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_imported_slot_start_delay_time",
"Duration between the start of the blocks slot and the current time when it was imported.",
pub static ref BEACON_BLOCK_OBSERVED_SLOT_START_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_observed_slot_start_delay_time",
"Duration between the start of the block's slot and the time the block was observed.",
);
pub static ref BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_imported_observed_delay_time",
"Duration between the time the block was observed and the time when it was imported.",
);
pub static ref BEACON_BLOCK_HEAD_IMPORTED_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_head_imported_delay_time",
"Duration between the time the block was imported and the time when it was set as head.",
);
pub static ref BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_head_slot_start_delay_time",
"Duration between the start of the blocks slot and the current time when it was as head.",
"Duration between the start of the block's slot and the time when it was set as head.",
);
pub static ref BEACON_BLOCK_HEAD_SLOT_START_DELAY_EXCEEDED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_block_head_slot_start_delay_exceeded_total",
"Triggered when the duration between the start of the blocks slot and the current time \
"Triggered when the duration between the start of the block's slot and the current time \
will result in failed attestations.",
);

View File

@ -2475,6 +2475,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::ContributionAndProof => {
event_handler.subscribe_contributions()
}
api_types::EventTopic::LateHead => {
event_handler.subscribe_late_head()
}
};
receivers.push(BroadcastStream::new(receiver).map(|msg| {

View File

@ -42,7 +42,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock};
use eth2_libp2p::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
MessageId, NetworkGlobals, PeerId, PeerRequestId,
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@ -341,6 +341,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
pub fn gossip_beacon_block(
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
block: Box<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
) -> Self {
@ -349,6 +350,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
work: Work::GossipBlock {
message_id,
peer_id,
peer_client,
block,
seen_timestamp,
},
@ -602,6 +604,7 @@ pub enum Work<T: BeaconChainTypes> {
GossipBlock {
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
block: Box<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
},
@ -1362,11 +1365,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipBlock {
message_id,
peer_id,
peer_client,
block,
seen_timestamp,
} => worker.process_gossip_block(
message_id,
peer_id,
peer_client,
*block,
work_reprocessing_tx,
seen_timestamp,

View File

@ -231,6 +231,7 @@ impl TestRig {
.try_send(WorkEvent::gossip_beacon_block(
junk_message_id(),
junk_peer_id(),
Client::default(),
Box::new(self.next_block.clone()),
Duration::from_secs(0),
))

View File

@ -7,7 +7,7 @@ use beacon_chain::{
validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock,
};
use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use eth2_libp2p::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use ssz::Encode;
@ -622,6 +622,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
block: SignedBeaconBlock<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
seen_duration: Duration,
@ -634,6 +635,15 @@ impl<T: BeaconChainTypes> Worker<T> {
block_delay,
);
// Write the time the block was observed into delay cache.
self.chain.block_times_cache.write().set_time_observed(
block.canonical_root(),
block.slot(),
seen_duration,
Some(peer_id.to_string()),
Some(peer_client.to_string()),
);
let verified_block = match self.chain.verify_block_for_gossip(block) {
Ok(verified_block) => {
if block_delay >= self.chain.slot_clock.unagg_attestation_production_delay() {

View File

@ -223,7 +223,12 @@ impl<T: BeaconChainTypes> Router<T> {
);
}
PubsubMessage::BeaconBlock(block) => {
self.processor.on_block_gossip(id, peer_id, block);
self.processor.on_block_gossip(
id,
peer_id,
self.network_globals.client(&peer_id),
block,
);
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);

View File

@ -5,7 +5,7 @@ use crate::service::NetworkMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2_libp2p::rpc::*;
use eth2_libp2p::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response};
use eth2_libp2p::{Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response};
use slog::{debug, error, o, trace, warn};
use std::cmp;
use std::sync::Arc;
@ -211,6 +211,7 @@ impl<T: BeaconChainTypes> Processor<T> {
peer_id,
request_id: id,
beacon_block,
seen_timestamp: timestamp_now(),
});
} else {
debug!(
@ -229,11 +230,13 @@ impl<T: BeaconChainTypes> Processor<T> {
&mut self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
block: Box<SignedBeaconBlock<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block(
message_id,
peer_id,
peer_client,
block,
timestamp_now(),
))

View File

@ -54,6 +54,7 @@ use ssz_types::VariableList;
use std::boxed::Box;
use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
@ -90,6 +91,7 @@ pub enum SyncMessage<T: EthSpec> {
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<Box<SignedBeaconBlock<T>>>,
seen_timestamp: Duration,
},
/// A block with an unknown parent has been received.
@ -313,6 +315,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
request_id: RequestId,
block: Option<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
) {
match block {
Some(block) => {
@ -326,7 +329,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
single_block_hash = Some(block_request.hash);
}
if let Some(block_hash) = single_block_hash {
self.single_block_lookup_response(peer_id, block, block_hash)
self.single_block_lookup_response(peer_id, block, block_hash, seen_timestamp)
.await;
return;
}
@ -449,6 +452,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block: SignedBeaconBlock<T::EthSpec>,
expected_block_hash: Hash256,
seen_timestamp: Duration,
) {
// verify the hash is correct and try and process the block
if expected_block_hash != block.canonical_root() {
@ -467,6 +471,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// we have the correct block, try and process it
match block_result {
Ok(block_root) => {
// Block has been processed, so write the block time to the cache.
self.chain.block_times_cache.write().set_time_observed(
block_root,
block.slot(),
seen_timestamp,
None,
None,
);
info!(self.log, "Processed block"; "block" => %block_root);
match self.chain.fork_choice() {
@ -1007,8 +1019,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id,
request_id,
beacon_block,
seen_timestamp,
} => {
self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b))
self.blocks_by_root_response(
peer_id,
request_id,
beacon_block.map(|b| *b),
seen_timestamp,
)
.await;
}
SyncMessage::UnknownBlock(peer_id, block) => {

View File

@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::fmt;
use std::str::{from_utf8, FromStr};
use std::time::Duration;
pub use types::*;
/// An API error serializable to JSON.
@ -761,6 +762,20 @@ pub struct SseChainReorg {
pub epoch: Epoch,
}
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseLateHead {
pub slot: Slot,
pub block: Hash256,
pub proposer_index: u64,
pub peer_id: Option<String>,
pub peer_client: Option<String>,
pub proposer_graffiti: String,
pub block_delay: Duration,
pub observed_delay: Option<Duration>,
pub imported_delay: Option<Duration>,
pub set_as_head_delay: Option<Duration>,
}
#[derive(PartialEq, Debug, Serialize, Clone)]
#[serde(bound = "T: EthSpec", untagged)]
pub enum EventKind<T: EthSpec> {
@ -771,6 +786,7 @@ pub enum EventKind<T: EthSpec> {
VoluntaryExit(SignedVoluntaryExit),
ChainReorg(SseChainReorg),
ContributionAndProof(Box<SignedContributionAndProof<T>>),
LateHead(SseLateHead),
}
impl<T: EthSpec> EventKind<T> {
@ -783,6 +799,7 @@ impl<T: EthSpec> EventKind<T> {
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
EventKind::ChainReorg(_) => "chain_reorg",
EventKind::ContributionAndProof(_) => "contribution_and_proof",
EventKind::LateHead(_) => "late_head",
}
}
@ -822,6 +839,9 @@ impl<T: EthSpec> EventKind<T> {
"head" => Ok(EventKind::Head(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Head: {:?}", e)),
)?)),
"late_head" => Ok(EventKind::LateHead(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Late Head: {:?}", e)),
)?)),
"voluntary_exit" => Ok(EventKind::VoluntaryExit(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Voluntary Exit: {:?}", e))
@ -854,6 +874,7 @@ pub enum EventTopic {
FinalizedCheckpoint,
ChainReorg,
ContributionAndProof,
LateHead,
}
impl FromStr for EventTopic {
@ -868,6 +889,7 @@ impl FromStr for EventTopic {
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
"chain_reorg" => Ok(EventTopic::ChainReorg),
"contribution_and_proof" => Ok(EventTopic::ContributionAndProof),
"late_head" => Ok(EventTopic::LateHead),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
@ -883,6 +905,7 @@ impl fmt::Display for EventTopic {
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
EventTopic::ChainReorg => write!(f, "chain_reorg"),
EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"),
EventTopic::LateHead => write!(f, "late_head"),
}
}
}