Merge branch 'unstable' into eip4844
This commit is contained in:
commit
607242c127
17
Cargo.lock
generated
17
Cargo.lock
generated
@ -1628,6 +1628,16 @@ dependencies = [
|
||||
"tokio-util 0.6.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "delay_map"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e4355c25cbf99edcb6b4a0e906f6bdc6956eda149e84455bea49696429b2f8e8"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"tokio-util 0.7.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deposit_contract"
|
||||
version = "0.2.0"
|
||||
@ -1826,7 +1836,7 @@ dependencies = [
|
||||
"aes 0.7.5",
|
||||
"aes-gcm 0.9.4",
|
||||
"arrayvec",
|
||||
"delay_map",
|
||||
"delay_map 0.1.2",
|
||||
"enr",
|
||||
"fnv",
|
||||
"futures",
|
||||
@ -4439,7 +4449,7 @@ dependencies = [
|
||||
name = "lighthouse_network"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"delay_map",
|
||||
"delay_map 0.3.0",
|
||||
"directory",
|
||||
"dirs",
|
||||
"discv5",
|
||||
@ -5056,7 +5066,7 @@ name = "network"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"beacon_chain",
|
||||
"delay_map",
|
||||
"delay_map 0.3.0",
|
||||
"derivative",
|
||||
"environment",
|
||||
"error-chain",
|
||||
@ -8020,6 +8030,7 @@ dependencies = [
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"pin-project-lite 0.2.9",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
@ -1,4 +1,4 @@
|
||||
FROM rust:1.65.0-bullseye AS builder
|
||||
FROM rust:1.66.0-bullseye AS builder
|
||||
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake clang libclang-dev protobuf-compiler
|
||||
COPY . lighthouse
|
||||
ARG FEATURES
|
||||
|
@ -5291,7 +5291,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
latest_valid_hash,
|
||||
ref validation_error,
|
||||
} => {
|
||||
debug!(
|
||||
warn!(
|
||||
self.log,
|
||||
"Invalid execution payload";
|
||||
"validation_error" => ?validation_error,
|
||||
@ -5300,11 +5300,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
"head_block_root" => ?head_block_root,
|
||||
"method" => "fcU",
|
||||
);
|
||||
warn!(
|
||||
self.log,
|
||||
"Fork choice update invalidated payload";
|
||||
"status" => ?status
|
||||
);
|
||||
|
||||
match latest_valid_hash {
|
||||
// The `latest_valid_hash` is set to `None` when the EE
|
||||
@ -5350,7 +5345,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
PayloadStatus::InvalidBlockHash {
|
||||
ref validation_error,
|
||||
} => {
|
||||
debug!(
|
||||
warn!(
|
||||
self.log,
|
||||
"Invalid execution payload block hash";
|
||||
"validation_error" => ?validation_error,
|
||||
@ -5358,11 +5353,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
"head_block_root" => ?head_block_root,
|
||||
"method" => "fcU",
|
||||
);
|
||||
warn!(
|
||||
self.log,
|
||||
"Fork choice update invalidated payload";
|
||||
"status" => ?status
|
||||
);
|
||||
// The execution engine has stated that the head block is invalid, however it
|
||||
// hasn't returned a latest valid ancestor.
|
||||
//
|
||||
|
@ -306,10 +306,10 @@ pub enum BlockError<T: EthSpec> {
|
||||
///
|
||||
/// ## Peer scoring
|
||||
///
|
||||
/// TODO(merge): reconsider how we score peers for this.
|
||||
///
|
||||
/// The peer sent us an invalid block, but I'm not really sure how to score this in an
|
||||
/// "optimistic" sync world.
|
||||
/// The peer sent us an invalid block, we must penalise harshly.
|
||||
/// If it's actually our fault (e.g. our execution node database is corrupt) we have bigger
|
||||
/// problems to worry about than losing peers, and we're doing the network a favour by
|
||||
/// disconnecting.
|
||||
ParentExecutionPayloadInvalid {
|
||||
parent_root: Hash256,
|
||||
},
|
||||
|
@ -159,7 +159,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>(
|
||||
latest_valid_hash,
|
||||
ref validation_error,
|
||||
} => {
|
||||
debug!(
|
||||
warn!(
|
||||
chain.log,
|
||||
"Invalid execution payload";
|
||||
"validation_error" => ?validation_error,
|
||||
@ -206,7 +206,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>(
|
||||
PayloadStatus::InvalidBlockHash {
|
||||
ref validation_error,
|
||||
} => {
|
||||
debug!(
|
||||
warn!(
|
||||
chain.log,
|
||||
"Invalid execution payload block hash";
|
||||
"validation_error" => ?validation_error,
|
||||
|
@ -15,6 +15,7 @@ use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::str::Utf8Error;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use store::AbstractExecPayload;
|
||||
use types::{
|
||||
AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256,
|
||||
IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof,
|
||||
@ -1736,9 +1737,9 @@ fn u64_to_i64(n: impl Into<u64>) -> i64 {
|
||||
}
|
||||
|
||||
/// Returns the delay between the start of `block.slot` and `seen_timestamp`.
|
||||
pub fn get_block_delay_ms<T: EthSpec, S: SlotClock>(
|
||||
pub fn get_block_delay_ms<T: EthSpec, S: SlotClock, P: AbstractExecPayload<T>>(
|
||||
seen_timestamp: Duration,
|
||||
block: BeaconBlockRef<'_, T>,
|
||||
block: BeaconBlockRef<'_, T, P>,
|
||||
slot_clock: &S,
|
||||
) -> Duration {
|
||||
get_slot_delay_ms::<S>(seen_timestamp, block.slot(), slot_clock)
|
||||
|
@ -460,7 +460,11 @@ async fn capella_readiness_logging<T: BeaconChainTypes>(
|
||||
|
||||
match beacon_chain.check_capella_readiness().await {
|
||||
CapellaReadiness::Ready => {
|
||||
info!(log, "Ready for Capella")
|
||||
info!(
|
||||
log,
|
||||
"Ready for Capella";
|
||||
"info" => "ensure the execution endpoint is updated to the latest Capella/Shanghai release"
|
||||
)
|
||||
}
|
||||
readiness @ CapellaReadiness::ExchangeCapabilitiesFailed { error: _ } => {
|
||||
error!(
|
||||
|
@ -87,7 +87,7 @@ const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
|
||||
const CONFIG_POLL_INTERVAL: Duration = Duration::from_secs(60);
|
||||
|
||||
/// A payload alongside some information about where it came from.
|
||||
enum ProvenancedPayload<P> {
|
||||
pub enum ProvenancedPayload<P> {
|
||||
/// A good ol' fashioned farm-to-table payload from your local EE.
|
||||
Local(P),
|
||||
/// A payload from a builder (e.g. mev-boost).
|
||||
|
@ -37,6 +37,7 @@ use lighthouse_version::version_with_platform;
|
||||
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
|
||||
use operation_pool::ReceivedPreCapella;
|
||||
use parking_lot::RwLock;
|
||||
use publish_blocks::ProvenancedBlock;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{crit, debug, error, info, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
@ -1123,9 +1124,15 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
log: Logger| async move {
|
||||
publish_blocks::publish_block(None, block, chain, &network_tx, log)
|
||||
.await
|
||||
.map(|()| warp::reply().into_response())
|
||||
publish_blocks::publish_block(
|
||||
None,
|
||||
ProvenancedBlock::Local(block),
|
||||
chain,
|
||||
&network_tx,
|
||||
log,
|
||||
)
|
||||
.await
|
||||
.map(|()| warp::reply().into_response())
|
||||
},
|
||||
);
|
||||
|
||||
|
@ -29,9 +29,10 @@ lazy_static::lazy_static! {
|
||||
"http_api_beacon_proposer_cache_misses_total",
|
||||
"Count of times the proposer cache has been missed",
|
||||
);
|
||||
pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result<Histogram> = try_create_histogram(
|
||||
pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result<HistogramVec> = try_create_histogram_vec(
|
||||
"http_api_block_broadcast_delay_times",
|
||||
"Time between start of the slot and when the block was broadcast"
|
||||
"Time between start of the slot and when the block was broadcast",
|
||||
&["provenance"]
|
||||
);
|
||||
pub static ref HTTP_API_BLOCK_PUBLISHED_LATE_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"http_api_block_published_late_total",
|
||||
|
@ -1,30 +1,46 @@
|
||||
use crate::metrics;
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
|
||||
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
|
||||
use beacon_chain::NotifyExecutionLayer;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
|
||||
use beacon_chain::{
|
||||
BeaconChain, BeaconChainTypes, BlockError, CountUnrealized, NotifyExecutionLayer,
|
||||
};
|
||||
use execution_layer::ProvenancedPayload;
|
||||
use lighthouse_network::PubsubMessage;
|
||||
use network::NetworkMessage;
|
||||
use slog::{debug, error, info, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tree_hash::TreeHash;
|
||||
use types::{
|
||||
AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload,
|
||||
Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
|
||||
AbstractExecPayload, BeaconBlockRef, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash,
|
||||
FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
|
||||
};
|
||||
use warp::Rejection;
|
||||
|
||||
pub enum ProvenancedBlock<T: EthSpec> {
|
||||
/// The payload was built using a local EE.
|
||||
Local(Arc<SignedBeaconBlock<T, FullPayload<T>>>),
|
||||
/// The payload was build using a remote builder (e.g., via a mev-boost
|
||||
/// compatible relay).
|
||||
Builder(Arc<SignedBeaconBlock<T, FullPayload<T>>>),
|
||||
}
|
||||
|
||||
/// Handles a request from the HTTP API for full blocks.
|
||||
pub async fn publish_block<T: BeaconChainTypes>(
|
||||
block_root: Option<Hash256>,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
provenanced_block: ProvenancedBlock<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
log: Logger,
|
||||
) -> Result<(), Rejection> {
|
||||
let seen_timestamp = timestamp_now();
|
||||
let (block, is_locally_built_block) = match provenanced_block {
|
||||
ProvenancedBlock::Local(block) => (block, true),
|
||||
ProvenancedBlock::Builder(block) => (block, false),
|
||||
};
|
||||
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
|
||||
|
||||
//FIXME(sean) have to move this to prior to publishing because it's included in the blobs sidecar message.
|
||||
//this may skew metrics
|
||||
@ -57,14 +73,9 @@ pub async fn publish_block<T: BeaconChainTypes>(
|
||||
}
|
||||
} else {
|
||||
crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
|
||||
block.into()
|
||||
BlockWrapper::Block(block)
|
||||
};
|
||||
|
||||
// Determine the delay after the start of the slot, register it with metrics.
|
||||
let block = wrapped_block.as_block();
|
||||
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
|
||||
metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay);
|
||||
|
||||
let available_block = match wrapped_block.into_available_block(block_root, &chain) {
|
||||
Ok(available_block) => available_block,
|
||||
Err(e) => {
|
||||
@ -109,30 +120,17 @@ pub async fn publish_block<T: BeaconChainTypes>(
|
||||
// head.
|
||||
chain.recompute_head_at_current_slot().await;
|
||||
|
||||
// Perform some logging to inform users if their blocks are being produced
|
||||
// late.
|
||||
//
|
||||
// Check to see the thresholds are non-zero to avoid logging errors with small
|
||||
// slot times (e.g., during testing)
|
||||
let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay();
|
||||
let delayed_threshold = too_late_threshold / 2;
|
||||
if delay >= too_late_threshold {
|
||||
error!(
|
||||
log,
|
||||
"Block was broadcast too late";
|
||||
"msg" => "system may be overloaded, block likely to be orphaned",
|
||||
"delay_ms" => delay.as_millis(),
|
||||
"slot" => available_block.slot(),
|
||||
"root" => ?root,
|
||||
)
|
||||
} else if delay >= delayed_threshold {
|
||||
error!(
|
||||
log,
|
||||
"Block broadcast was delayed";
|
||||
"msg" => "system may be overloaded, block may be orphaned",
|
||||
"delay_ms" => delay.as_millis(),
|
||||
"slot" => available_block.slot(),
|
||||
"root" => ?root,
|
||||
// Only perform late-block logging here if the block is local. For
|
||||
// blocks built with builders we consider the broadcast time to be
|
||||
// when the blinded block is published to the builder.
|
||||
if is_locally_built_block {
|
||||
late_block_logging(
|
||||
&chain,
|
||||
seen_timestamp,
|
||||
available_block.message(),
|
||||
root,
|
||||
"local",
|
||||
&log,
|
||||
)
|
||||
}
|
||||
|
||||
@ -181,14 +179,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
|
||||
) -> Result<(), Rejection> {
|
||||
let block_root = block.canonical_root();
|
||||
let full_block = reconstruct_block(chain.clone(), block_root, block, log.clone()).await?;
|
||||
publish_block::<T>(
|
||||
Some(block_root),
|
||||
Arc::new(full_block),
|
||||
chain,
|
||||
network_tx,
|
||||
log,
|
||||
)
|
||||
.await
|
||||
publish_block::<T>(Some(block_root), full_block, chain, network_tx, log).await
|
||||
}
|
||||
|
||||
/// Deconstruct the given blinded block, and construct a full block. This attempts to use the
|
||||
@ -199,15 +190,15 @@ async fn reconstruct_block<T: BeaconChainTypes>(
|
||||
block_root: Hash256,
|
||||
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
|
||||
log: Logger,
|
||||
) -> Result<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>, Rejection> {
|
||||
let full_payload = if let Ok(payload_header) = block.message().body().execution_payload() {
|
||||
) -> Result<ProvenancedBlock<T::EthSpec>, Rejection> {
|
||||
let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() {
|
||||
let el = chain.execution_layer.as_ref().ok_or_else(|| {
|
||||
warp_utils::reject::custom_server_error("Missing execution layer".to_string())
|
||||
})?;
|
||||
|
||||
// If the execution block hash is zero, use an empty payload.
|
||||
let full_payload = if payload_header.block_hash() == ExecutionBlockHash::zero() {
|
||||
FullPayload::default_at_fork(
|
||||
let payload = FullPayload::default_at_fork(
|
||||
chain
|
||||
.spec
|
||||
.fork_name_at_epoch(block.slot().epoch(T::EthSpec::slots_per_epoch())),
|
||||
@ -217,15 +208,30 @@ async fn reconstruct_block<T: BeaconChainTypes>(
|
||||
"Default payload construction error: {e:?}"
|
||||
))
|
||||
})?
|
||||
.into()
|
||||
// If we already have an execution payload with this transactions root cached, use it.
|
||||
.into();
|
||||
ProvenancedPayload::Local(payload)
|
||||
// If we already have an execution payload with this transactions root cached, use it.
|
||||
} else if let Some(cached_payload) =
|
||||
el.get_payload_by_root(&payload_header.tree_hash_root())
|
||||
{
|
||||
info!(log, "Reconstructing a full block using a local payload"; "block_hash" => ?cached_payload.block_hash());
|
||||
cached_payload
|
||||
// Otherwise, this means we are attempting a blind block proposal.
|
||||
ProvenancedPayload::Local(cached_payload)
|
||||
// Otherwise, this means we are attempting a blind block proposal.
|
||||
} else {
|
||||
// Perform the logging for late blocks when we publish to the
|
||||
// builder, rather than when we publish to the network. This helps
|
||||
// prevent false positive logs when the builder publishes to the P2P
|
||||
// network significantly earlier than when they return the block to
|
||||
// us.
|
||||
late_block_logging(
|
||||
&chain,
|
||||
timestamp_now(),
|
||||
block.message(),
|
||||
block_root,
|
||||
"builder",
|
||||
&log,
|
||||
);
|
||||
|
||||
let full_payload = el
|
||||
.propose_blinded_beacon_block(block_root, &block)
|
||||
.await
|
||||
@ -236,7 +242,7 @@ async fn reconstruct_block<T: BeaconChainTypes>(
|
||||
))
|
||||
})?;
|
||||
info!(log, "Successfully published a block to the builder network"; "block_hash" => ?full_payload.block_hash());
|
||||
full_payload
|
||||
ProvenancedPayload::Builder(full_payload)
|
||||
};
|
||||
|
||||
Some(full_payload)
|
||||
@ -244,7 +250,71 @@ async fn reconstruct_block<T: BeaconChainTypes>(
|
||||
None
|
||||
};
|
||||
|
||||
block.try_into_full_block(full_payload).ok_or_else(|| {
|
||||
match full_payload_opt {
|
||||
// A block without a payload is pre-merge and we consider it locally
|
||||
// built.
|
||||
None => block
|
||||
.try_into_full_block(None)
|
||||
.map(Arc::new)
|
||||
.map(ProvenancedBlock::Local),
|
||||
Some(ProvenancedPayload::Local(full_payload)) => block
|
||||
.try_into_full_block(Some(full_payload))
|
||||
.map(Arc::new)
|
||||
.map(ProvenancedBlock::Local),
|
||||
Some(ProvenancedPayload::Builder(full_payload)) => block
|
||||
.try_into_full_block(Some(full_payload))
|
||||
.map(Arc::new)
|
||||
.map(ProvenancedBlock::Builder),
|
||||
}
|
||||
.ok_or_else(|| {
|
||||
warp_utils::reject::custom_server_error("Unable to add payload to block".to_string())
|
||||
})
|
||||
}
|
||||
|
||||
/// If the `seen_timestamp` is some time after the start of the slot for
|
||||
/// `block`, create some logs to indicate that the block was published late.
|
||||
fn late_block_logging<T: BeaconChainTypes, P: AbstractExecPayload<T::EthSpec>>(
|
||||
chain: &BeaconChain<T>,
|
||||
seen_timestamp: Duration,
|
||||
block: BeaconBlockRef<T::EthSpec, P>,
|
||||
root: Hash256,
|
||||
provenance: &str,
|
||||
log: &Logger,
|
||||
) {
|
||||
let delay = get_block_delay_ms(seen_timestamp, block, &chain.slot_clock);
|
||||
|
||||
metrics::observe_timer_vec(
|
||||
&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES,
|
||||
&[provenance],
|
||||
delay,
|
||||
);
|
||||
|
||||
// Perform some logging to inform users if their blocks are being produced
|
||||
// late.
|
||||
//
|
||||
// Check to see the thresholds are non-zero to avoid logging errors with small
|
||||
// slot times (e.g., during testing)
|
||||
let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay();
|
||||
let delayed_threshold = too_late_threshold / 2;
|
||||
if delay >= too_late_threshold {
|
||||
error!(
|
||||
log,
|
||||
"Block was broadcast too late";
|
||||
"msg" => "system may be overloaded, block likely to be orphaned",
|
||||
"provenance" => provenance,
|
||||
"delay_ms" => delay.as_millis(),
|
||||
"slot" => block.slot(),
|
||||
"root" => ?root,
|
||||
)
|
||||
} else if delay >= delayed_threshold {
|
||||
error!(
|
||||
log,
|
||||
"Block broadcast was delayed";
|
||||
"msg" => "system may be overloaded, block may be orphaned",
|
||||
"provenance" => provenance,
|
||||
"delay_ms" => delay.as_millis(),
|
||||
"slot" => block.slot(),
|
||||
"root" => ?root,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ strum = { version = "0.24.0", features = ["derive"] }
|
||||
superstruct = "0.5.0"
|
||||
prometheus-client = "0.18.0"
|
||||
unused_port = { path = "../../common/unused_port" }
|
||||
delay_map = "0.1.1"
|
||||
delay_map = "0.3.0"
|
||||
void = "1"
|
||||
|
||||
[dependencies.libp2p]
|
||||
|
@ -13,7 +13,7 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
|
||||
use rand::seq::SliceRandom;
|
||||
use slog::{debug, error, trace, warn};
|
||||
use smallvec::SmallVec;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::BTreeMap;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
@ -77,7 +77,7 @@ pub struct PeerManager<TSpec: EthSpec> {
|
||||
/// The target number of peers we would like to connect to.
|
||||
target_peers: usize,
|
||||
/// Peers queued to be dialed.
|
||||
peers_to_dial: VecDeque<(PeerId, Option<Enr>)>,
|
||||
peers_to_dial: BTreeMap<PeerId, Option<Enr>>,
|
||||
/// The number of temporarily banned peers. This is used to prevent instantaneous
|
||||
/// reconnection.
|
||||
// NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A
|
||||
@ -308,7 +308,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
/// proves resource constraining, we should switch to multiaddr dialling here.
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
pub fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) -> Vec<PeerId> {
|
||||
let mut to_dial_peers = Vec::new();
|
||||
let mut to_dial_peers = Vec::with_capacity(4);
|
||||
|
||||
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
|
||||
for (peer_id, min_ttl) in results {
|
||||
@ -398,7 +398,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
|
||||
// A peer is being dialed.
|
||||
pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option<Enr>) {
|
||||
self.peers_to_dial.push_back((*peer_id, enr));
|
||||
self.peers_to_dial.insert(*peer_id, enr);
|
||||
}
|
||||
|
||||
/// Reports if a peer is banned or not.
|
||||
@ -1197,6 +1197,18 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
|
||||
// Unban any peers that have served their temporary ban timeout
|
||||
self.unban_temporary_banned_peers();
|
||||
|
||||
// Maintains memory by shrinking mappings
|
||||
self.shrink_mappings();
|
||||
}
|
||||
|
||||
// Reduce memory footprint by routinely shrinking associating mappings.
|
||||
fn shrink_mappings(&mut self) {
|
||||
self.inbound_ping_peers.shrink_to(5);
|
||||
self.outbound_ping_peers.shrink_to(5);
|
||||
self.status_peers.shrink_to(5);
|
||||
self.temporary_banned_peers.shrink_to_fit();
|
||||
self.sync_committee_subnets.shrink_to_fit();
|
||||
}
|
||||
|
||||
// Update metrics related to peer scoring.
|
||||
|
@ -89,7 +89,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
self.events.shrink_to_fit();
|
||||
}
|
||||
|
||||
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_front() {
|
||||
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() {
|
||||
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr);
|
||||
let handler = self.new_handler();
|
||||
return Poll::Ready(NetworkBehaviourAction::Dial {
|
||||
|
@ -407,10 +407,14 @@ impl ProtocolId {
|
||||
/// beginning of the stream, else returns `false`.
|
||||
pub fn has_context_bytes(&self) -> bool {
|
||||
match self.message_name {
|
||||
Protocol::BlocksByRange | Protocol::BlocksByRoot => {
|
||||
!matches!(self.version, Version::V1)
|
||||
}
|
||||
Protocol::BlobsByRange | Protocol::BlobsByRoot | Protocol::LightClientBootstrap => true,
|
||||
Protocol::BlocksByRange | Protocol::BlocksByRoot => match self.version {
|
||||
Version::V2 => true,
|
||||
Version::V1 => false,
|
||||
},
|
||||
Protocol::LightClientBootstrap => match self.version {
|
||||
Version::V2 | Version::V1 => true,
|
||||
},
|
||||
Protocol::BlobsByRoot | Protocol::BlobsByRange => true,
|
||||
Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false,
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ if-addrs = "0.6.4"
|
||||
strum = "0.24.0"
|
||||
tokio-util = { version = "0.6.3", features = ["time"] }
|
||||
derivative = "2.2.0"
|
||||
delay_map = "0.1.1"
|
||||
delay_map = "0.3.0"
|
||||
ethereum-types = { version = "0.14.1", optional = true }
|
||||
operation_pool = { path = "../operation_pool" }
|
||||
execution_layer = { path = "../execution_layer" }
|
||||
|
@ -834,7 +834,6 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
| Err(e @ BlockError::WeakSubjectivityConflict)
|
||||
| Err(e @ BlockError::InconsistentFork(_))
|
||||
| Err(e @ BlockError::ExecutionPayloadError(_))
|
||||
// TODO(merge): reconsider peer scoring for this event.
|
||||
| Err(e @ BlockError::ParentExecutionPayloadInvalid { .. })
|
||||
| Err(e @ BlockError::GenesisBlock) => {
|
||||
warn!(self.log, "Could not verify block for gossip. Rejecting the block";
|
||||
@ -847,7 +846,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(e@ BlockError::BlobValidation(_)) => {
|
||||
Err(e @ BlockError::BlobValidation(_)) => {
|
||||
warn!(self.log, "Could not verify blob for gossip. Rejecting the block and blob";
|
||||
"error" => %e);
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
|
||||
|
@ -528,6 +528,21 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
})
|
||||
}
|
||||
}
|
||||
ref err @ BlockError::ParentExecutionPayloadInvalid { ref parent_root } => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Failed to sync chain built on invalid parent";
|
||||
"parent_root" => ?parent_root,
|
||||
"advice" => "check execution node for corruption then restart it and Lighthouse",
|
||||
);
|
||||
Err(ChainSegmentFailed {
|
||||
message: format!("Peer sent invalid block. Reason: {err:?}"),
|
||||
// We need to penalise harshly in case this represents an actual attack. In case
|
||||
// of a faulty EL it will usually require manual intervention to fix anyway, so
|
||||
// it's not too bad if we drop most of our peers.
|
||||
peer_action: Some(PeerAction::LowToleranceError),
|
||||
})
|
||||
}
|
||||
other => {
|
||||
debug!(
|
||||
self.log, "Invalid block received";
|
||||
|
624
beacon_node/network/src/router.rs
Normal file
624
beacon_node/network/src/router.rs
Normal file
@ -0,0 +1,624 @@
|
||||
//! This module handles incoming network messages.
|
||||
//!
|
||||
//! It routes the messages to appropriate services.
|
||||
//! It handles requests at the application layer in its associated processor and directs
|
||||
//! syncing-related responses to the Sync manager.
|
||||
#![allow(clippy::unit_arg)]
|
||||
|
||||
use crate::beacon_processor::{
|
||||
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
|
||||
};
|
||||
use crate::error;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use crate::status::status_message;
|
||||
use crate::sync::manager::RequestId as SyncId;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use futures::{future, StreamExt};
|
||||
|
||||
use lighthouse_network::{rpc::*, PubsubMessage};
|
||||
use lighthouse_network::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response};
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
|
||||
|
||||
/// Handles messages from the network and routes them to the appropriate service to be handled.
|
||||
pub struct Router<T: BeaconChainTypes> {
|
||||
/// Access to the peer db and network information.
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
/// A reference to the underlying beacon chain.
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
/// A channel to the syncing thread.
|
||||
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
|
||||
/// A network context to return and handle RPC requests.
|
||||
network: HandlerNetworkContext<T::EthSpec>,
|
||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
|
||||
/// The `Router` logger.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
/// Types of messages the router can receive.
|
||||
#[derive(Debug)]
|
||||
pub enum RouterMessage<T: EthSpec> {
|
||||
/// Peer has disconnected.
|
||||
PeerDisconnected(PeerId),
|
||||
/// An RPC request has been received.
|
||||
RPCRequestReceived {
|
||||
peer_id: PeerId,
|
||||
id: PeerRequestId,
|
||||
request: Request,
|
||||
},
|
||||
/// An RPC response has been received.
|
||||
RPCResponseReceived {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
response: Response<T>,
|
||||
},
|
||||
/// An RPC request failed
|
||||
RPCFailed {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
error: RPCError,
|
||||
},
|
||||
/// A gossip message has been received. The fields are: message id, the peer that sent us this
|
||||
/// message, the message itself and a bool which indicates if the message should be processed
|
||||
/// by the beacon chain after successful verification.
|
||||
PubsubMessage(MessageId, PeerId, PubsubMessage<T>, bool),
|
||||
/// The peer manager has requested we re-status a peer.
|
||||
StatusPeer(PeerId),
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Router<T> {
|
||||
/// Initializes and runs the Router.
|
||||
pub fn spawn(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
executor: task_executor::TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
|
||||
let message_handler_log = log.new(o!("service"=> "router"));
|
||||
trace!(message_handler_log, "Service starting");
|
||||
|
||||
let (handler_send, handler_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let (beacon_processor_send, beacon_processor_receive) =
|
||||
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
|
||||
|
||||
let sync_logger = log.new(o!("service"=> "sync"));
|
||||
|
||||
// spawn the sync thread
|
||||
let sync_send = crate::sync::manager::spawn(
|
||||
executor.clone(),
|
||||
beacon_chain.clone(),
|
||||
network_globals.clone(),
|
||||
network_send.clone(),
|
||||
beacon_processor_send.clone(),
|
||||
sync_logger,
|
||||
);
|
||||
|
||||
BeaconProcessor {
|
||||
beacon_chain: Arc::downgrade(&beacon_chain),
|
||||
network_tx: network_send.clone(),
|
||||
sync_tx: sync_send.clone(),
|
||||
network_globals: network_globals.clone(),
|
||||
executor: executor.clone(),
|
||||
max_workers: cmp::max(1, num_cpus::get()),
|
||||
current_workers: 0,
|
||||
importing_blocks: Default::default(),
|
||||
log: log.clone(),
|
||||
}
|
||||
.spawn_manager(beacon_processor_receive, None);
|
||||
|
||||
// generate the Message handler
|
||||
let mut handler = Router {
|
||||
network_globals,
|
||||
chain: beacon_chain,
|
||||
sync_send,
|
||||
network: HandlerNetworkContext::new(network_send, log.clone()),
|
||||
beacon_processor_send,
|
||||
log: message_handler_log,
|
||||
};
|
||||
|
||||
// spawn handler task and move the message handler instance into the spawned thread
|
||||
executor.spawn(
|
||||
async move {
|
||||
debug!(log, "Network message router started");
|
||||
UnboundedReceiverStream::new(handler_recv)
|
||||
.for_each(move |msg| future::ready(handler.handle_message(msg)))
|
||||
.await;
|
||||
},
|
||||
"router",
|
||||
);
|
||||
|
||||
Ok(handler_send)
|
||||
}
|
||||
|
||||
/// Handle all messages incoming from the network service.
|
||||
fn handle_message(&mut self, message: RouterMessage<T::EthSpec>) {
|
||||
match message {
|
||||
// we have initiated a connection to a peer or the peer manager has requested a
|
||||
// re-status
|
||||
RouterMessage::StatusPeer(peer_id) => {
|
||||
self.send_status(peer_id);
|
||||
}
|
||||
// A peer has disconnected
|
||||
RouterMessage::PeerDisconnected(peer_id) => {
|
||||
self.send_to_sync(SyncMessage::Disconnect(peer_id));
|
||||
}
|
||||
RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
} => {
|
||||
self.handle_rpc_request(peer_id, id, request);
|
||||
}
|
||||
RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
self.handle_rpc_response(peer_id, request_id, response);
|
||||
}
|
||||
RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id,
|
||||
error,
|
||||
} => {
|
||||
self.on_rpc_error(peer_id, request_id, error);
|
||||
}
|
||||
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
|
||||
self.handle_gossip(id, peer_id, gossip, should_process);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* RPC - Related functionality */
|
||||
|
||||
/// A new RPC request has been received from the network.
|
||||
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: PeerRequestId, request: Request) {
|
||||
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
||||
debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request);
|
||||
return;
|
||||
}
|
||||
match request {
|
||||
Request::Status(status_message) => {
|
||||
self.on_status_request(peer_id, request_id, status_message)
|
||||
}
|
||||
Request::BlocksByRange(request) => self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::blocks_by_range_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::BlocksByRoot(request) => self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::blocks_by_roots_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::BlobsByRange(request) => self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::blobs_by_range_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::BlobsByRoot(request) => self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::blobs_by_root_request(peer_id, request_id, request),
|
||||
),
|
||||
Request::LightClientBootstrap(request) => self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::lightclient_bootstrap_request(peer_id, request_id, request),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// An RPC response has been received from the network.
|
||||
fn handle_rpc_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
response: Response<T::EthSpec>,
|
||||
) {
|
||||
match response {
|
||||
Response::Status(status_message) => {
|
||||
debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status_message);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::status_message(
|
||||
peer_id,
|
||||
status_message,
|
||||
))
|
||||
}
|
||||
Response::BlocksByRange(beacon_block) => {
|
||||
self.on_blocks_by_range_response(peer_id, request_id, beacon_block);
|
||||
}
|
||||
Response::BlocksByRoot(beacon_block) => {
|
||||
self.on_blocks_by_root_response(peer_id, request_id, beacon_block);
|
||||
}
|
||||
Response::BlobsByRange(blob) => {
|
||||
self.on_blobs_by_range_response(peer_id, request_id, blob);
|
||||
}
|
||||
Response::BlobsByRoot(blob) => {
|
||||
self.on_blobs_by_root_response(peer_id, request_id, blob);
|
||||
}
|
||||
Response::LightClientBootstrap(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle RPC messages.
|
||||
/// Note: `should_process` is currently only useful for the `Attestation` variant.
|
||||
/// if `should_process` is `false`, we only propagate the message on successful verification,
|
||||
/// else, we propagate **and** import into the beacon chain.
|
||||
fn handle_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
gossip_message: PubsubMessage<T::EthSpec>,
|
||||
should_process: bool,
|
||||
) {
|
||||
match gossip_message {
|
||||
PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => self
|
||||
.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation(
|
||||
message_id,
|
||||
peer_id,
|
||||
*aggregate_and_proof,
|
||||
timestamp_now(),
|
||||
)),
|
||||
PubsubMessage::Attestation(subnet_attestation) => {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation(
|
||||
message_id,
|
||||
peer_id,
|
||||
subnet_attestation.1,
|
||||
subnet_attestation.0,
|
||||
should_process,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
PubsubMessage::BeaconBlock(block) => {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block(
|
||||
message_id,
|
||||
peer_id,
|
||||
self.network_globals.client(&peer_id),
|
||||
block,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
PubsubMessage::BeaconBlockAndBlobsSidecars(data) => {
|
||||
let peer_client = self.network_globals.client(&peer_id);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_block_and_blobs_sidecar(
|
||||
message_id,
|
||||
peer_id,
|
||||
peer_client,
|
||||
data,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
PubsubMessage::VoluntaryExit(exit) => {
|
||||
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit(
|
||||
message_id, peer_id, exit,
|
||||
))
|
||||
}
|
||||
PubsubMessage::ProposerSlashing(proposer_slashing) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received a proposer slashing";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
proposer_slashing,
|
||||
))
|
||||
}
|
||||
PubsubMessage::AttesterSlashing(attester_slashing) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received a attester slashing";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
attester_slashing,
|
||||
))
|
||||
}
|
||||
PubsubMessage::SignedContributionAndProof(contribution_and_proof) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received sync committee aggregate";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution(
|
||||
message_id,
|
||||
peer_id,
|
||||
*contribution_and_proof,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received sync committee signature";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature(
|
||||
message_id,
|
||||
peer_id,
|
||||
sync_committtee_msg.1,
|
||||
sync_committtee_msg.0,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received light client finality update";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::gossip_light_client_finality_update(
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_finality_update,
|
||||
timestamp_now(),
|
||||
),
|
||||
)
|
||||
}
|
||||
PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received light client optimistic update";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.send_beacon_processor_work(
|
||||
BeaconWorkEvent::gossip_light_client_optimistic_update(
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
timestamp_now(),
|
||||
),
|
||||
)
|
||||
}
|
||||
PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => self
|
||||
.send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change(
|
||||
message_id,
|
||||
peer_id,
|
||||
bls_to_execution_change,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_status(&mut self, peer_id: PeerId) {
|
||||
let status_message = status_message(&self.chain);
|
||||
debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message);
|
||||
self.network
|
||||
.send_processor_request(peer_id, Request::Status(status_message));
|
||||
}
|
||||
|
||||
fn send_to_sync(&mut self, message: SyncMessage<T::EthSpec>) {
|
||||
self.sync_send.send(message).unwrap_or_else(|e| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not send message to the sync service";
|
||||
"error" => %e,
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
|
||||
/// this function notifies the sync manager of the error.
|
||||
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
|
||||
// Check if the failed RPC belongs to sync
|
||||
if let RequestId::Sync(request_id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a `Status` request.
|
||||
///
|
||||
/// Processes the `Status` from the remote peer and sends back our `Status`.
|
||||
pub fn on_status_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
status: StatusMessage,
|
||||
) {
|
||||
debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status);
|
||||
|
||||
// Say status back.
|
||||
self.network.send_response(
|
||||
peer_id,
|
||||
Response::Status(status_message(&self.chain)),
|
||||
request_id,
|
||||
);
|
||||
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status))
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRange` response from the peer.
|
||||
/// A `beacon_block` behaves as a stream which is terminated on a `None` response.
|
||||
pub fn on_blocks_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
|
||||
unreachable!("Block lookups do not request BBRange requests")
|
||||
}
|
||||
id @ (SyncId::BackFillBlocks { .. }
|
||||
| SyncId::RangeBlocks { .. }
|
||||
| SyncId::BackFillBlobs { .. }
|
||||
| SyncId::RangeBlobs { .. }) => id,
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlocksByRange Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn on_blobs_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
blob_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlobsByRange Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
if let RequestId::Sync(id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcBlobs {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"All blobs by range responses should belong to sync"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRoot` response from the peer.
|
||||
pub fn on_blocks_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
|
||||
SyncId::BackFillBlocks { .. }
|
||||
| SyncId::RangeBlocks { .. }
|
||||
| SyncId::RangeBlobs { .. }
|
||||
| SyncId::BackFillBlobs { .. } => {
|
||||
unreachable!("Batch syncing do not request BBRoot requests")
|
||||
}
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlocksByRoot Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRoot` response from the peer.
|
||||
pub fn on_blobs_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
block_and_blobs: Option<types::SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
|
||||
SyncId::BackFillBlocks { .. }
|
||||
| SyncId::RangeBlocks { .. }
|
||||
| SyncId::RangeBlobs { .. }
|
||||
| SyncId::BackFillBlobs { .. } => {
|
||||
unreachable!("Batch syncing does not request BBRoot requests")
|
||||
}
|
||||
},
|
||||
RequestId::Router => unreachable!("All BlobsByRoot requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlobsByRoot Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlockAndBlobs {
|
||||
request_id,
|
||||
peer_id,
|
||||
block_and_blobs,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
|
||||
self.beacon_processor_send
|
||||
.try_send(work)
|
||||
.unwrap_or_else(|e| {
|
||||
let work_type = match &e {
|
||||
mpsc::error::TrySendError::Closed(work)
|
||||
| mpsc::error::TrySendError::Full(work) => work.work_type(),
|
||||
};
|
||||
error!(&self.log, "Unable to send message to the beacon processor";
|
||||
"error" => %e, "type" => work_type)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps a Network Channel to employ various RPC related network functionality for the
|
||||
/// processor.
|
||||
#[derive(Clone)]
|
||||
pub struct HandlerNetworkContext<T: EthSpec> {
|
||||
/// The network channel to relay messages to the Network service.
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,
|
||||
/// Logger for the `NetworkContext`.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> HandlerNetworkContext<T> {
|
||||
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<T>>, log: slog::Logger) -> Self {
|
||||
Self { network_send, log }
|
||||
}
|
||||
|
||||
/// Sends a message to the network task.
|
||||
fn inform_network(&mut self, msg: NetworkMessage<T>) {
|
||||
self.network_send.send(msg).unwrap_or_else(
|
||||
|e| warn!(self.log, "Could not send message to the network service"; "error" => %e),
|
||||
)
|
||||
}
|
||||
|
||||
/// Sends a request to the network task.
|
||||
pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) {
|
||||
self.inform_network(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: RequestId::Router,
|
||||
request,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sends a response to the network task.
|
||||
pub fn send_response(&mut self, peer_id: PeerId, response: Response<T>, id: PeerRequestId) {
|
||||
self.inform_network(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn timestamp_now() -> Duration {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| Duration::from_secs(0))
|
||||
}
|
@ -1,346 +0,0 @@
|
||||
//! This module handles incoming network messages.
|
||||
//!
|
||||
//! It routes the messages to appropriate services.
|
||||
//! It handles requests at the application layer in its associated processor and directs
|
||||
//! syncing-related responses to the Sync manager.
|
||||
#![allow(clippy::unit_arg)]
|
||||
|
||||
mod processor;
|
||||
|
||||
use crate::error;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::rpc::RPCError;
|
||||
use lighthouse_network::{
|
||||
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
|
||||
};
|
||||
use processor::Processor;
|
||||
use slog::{debug, o, trace};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use types::EthSpec;
|
||||
|
||||
/// Handles messages received from the network and client and organises syncing. This
|
||||
/// functionality of this struct is to validate an decode messages from the network before
|
||||
/// passing them to the internal message processor. The message processor spawns a syncing thread
|
||||
/// which manages which blocks need to be requested and processed.
|
||||
pub struct Router<T: BeaconChainTypes> {
|
||||
/// Access to the peer db.
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
/// Processes validated and decoded messages from the network. Has direct access to the
|
||||
/// sync manager.
|
||||
processor: Processor<T>,
|
||||
/// The `Router` logger.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
/// Types of messages the handler can receive.
|
||||
#[derive(Debug)]
|
||||
pub enum RouterMessage<T: EthSpec> {
|
||||
/// We have initiated a connection to a new peer.
|
||||
PeerDialed(PeerId),
|
||||
/// Peer has disconnected,
|
||||
PeerDisconnected(PeerId),
|
||||
/// An RPC request has been received.
|
||||
RPCRequestReceived {
|
||||
peer_id: PeerId,
|
||||
id: PeerRequestId,
|
||||
request: Request,
|
||||
},
|
||||
/// An RPC response has been received.
|
||||
RPCResponseReceived {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
response: Response<T>,
|
||||
},
|
||||
/// An RPC request failed
|
||||
RPCFailed {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
error: RPCError,
|
||||
},
|
||||
/// A gossip message has been received. The fields are: message id, the peer that sent us this
|
||||
/// message, the message itself and a bool which indicates if the message should be processed
|
||||
/// by the beacon chain after successful verification.
|
||||
PubsubMessage(MessageId, PeerId, PubsubMessage<T>, bool),
|
||||
/// The peer manager has requested we re-status a peer.
|
||||
StatusPeer(PeerId),
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Router<T> {
|
||||
/// Initializes and runs the Router.
|
||||
pub fn spawn(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
executor: task_executor::TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
|
||||
let message_handler_log = log.new(o!("service"=> "router"));
|
||||
trace!(message_handler_log, "Service starting");
|
||||
|
||||
let (handler_send, handler_recv) = mpsc::unbounded_channel();
|
||||
|
||||
// Initialise a message instance, which itself spawns the syncing thread.
|
||||
let processor = Processor::new(
|
||||
executor.clone(),
|
||||
beacon_chain,
|
||||
network_globals.clone(),
|
||||
network_send,
|
||||
&log,
|
||||
);
|
||||
|
||||
// generate the Message handler
|
||||
let mut handler = Router {
|
||||
network_globals,
|
||||
processor,
|
||||
log: message_handler_log,
|
||||
};
|
||||
|
||||
// spawn handler task and move the message handler instance into the spawned thread
|
||||
executor.spawn(
|
||||
async move {
|
||||
debug!(log, "Network message router started");
|
||||
UnboundedReceiverStream::new(handler_recv)
|
||||
.for_each(move |msg| future::ready(handler.handle_message(msg)))
|
||||
.await;
|
||||
},
|
||||
"router",
|
||||
);
|
||||
|
||||
Ok(handler_send)
|
||||
}
|
||||
|
||||
/// Handle all messages incoming from the network service.
|
||||
fn handle_message(&mut self, message: RouterMessage<T::EthSpec>) {
|
||||
match message {
|
||||
// we have initiated a connection to a peer or the peer manager has requested a
|
||||
// re-status
|
||||
RouterMessage::PeerDialed(peer_id) | RouterMessage::StatusPeer(peer_id) => {
|
||||
self.processor.send_status(peer_id);
|
||||
}
|
||||
// A peer has disconnected
|
||||
RouterMessage::PeerDisconnected(peer_id) => {
|
||||
self.processor.on_disconnect(peer_id);
|
||||
}
|
||||
RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
request,
|
||||
} => {
|
||||
self.handle_rpc_request(peer_id, id, request);
|
||||
}
|
||||
RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
self.handle_rpc_response(peer_id, request_id, response);
|
||||
}
|
||||
RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id,
|
||||
error,
|
||||
} => {
|
||||
self.processor.on_rpc_error(peer_id, request_id, error);
|
||||
}
|
||||
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
|
||||
self.handle_gossip(id, peer_id, gossip, should_process);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* RPC - Related functionality */
|
||||
|
||||
/// A new RPC request has been received from the network.
|
||||
fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) {
|
||||
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
||||
debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request);
|
||||
return;
|
||||
}
|
||||
match request {
|
||||
Request::Status(status_message) => {
|
||||
self.processor
|
||||
.on_status_request(peer_id, id, status_message)
|
||||
}
|
||||
Request::BlocksByRange(request) => self
|
||||
.processor
|
||||
.on_blocks_by_range_request(peer_id, id, request),
|
||||
Request::BlocksByRoot(request) => self
|
||||
.processor
|
||||
.on_blocks_by_root_request(peer_id, id, request),
|
||||
Request::BlobsByRange(request) => self
|
||||
.processor
|
||||
.on_blobs_by_range_request(peer_id, id, request),
|
||||
Request::BlobsByRoot(request) => self
|
||||
.processor
|
||||
.on_blobs_by_root_request(peer_id, id, request),
|
||||
Request::LightClientBootstrap(request) => self
|
||||
.processor
|
||||
.on_lightclient_bootstrap(peer_id, id, request),
|
||||
}
|
||||
}
|
||||
|
||||
/// An RPC response has been received from the network.
|
||||
// we match on id and ignore responses past the timeout.
|
||||
fn handle_rpc_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
response: Response<T::EthSpec>,
|
||||
) {
|
||||
// an error could have occurred.
|
||||
match response {
|
||||
Response::Status(status_message) => {
|
||||
self.processor.on_status_response(peer_id, status_message);
|
||||
}
|
||||
Response::BlocksByRange(beacon_block) => {
|
||||
self.processor
|
||||
.on_blocks_by_range_response(peer_id, request_id, beacon_block);
|
||||
}
|
||||
Response::BlocksByRoot(beacon_block) => {
|
||||
self.processor
|
||||
.on_blocks_by_root_response(peer_id, request_id, beacon_block);
|
||||
}
|
||||
Response::BlobsByRange(beacon_blob) => {
|
||||
self.processor
|
||||
.on_blobs_by_range_response(peer_id, request_id, beacon_blob);
|
||||
}
|
||||
Response::BlobsByRoot(beacon_blob) => {
|
||||
self.processor
|
||||
.on_blobs_by_root_response(peer_id, request_id, beacon_blob);
|
||||
}
|
||||
Response::LightClientBootstrap(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle RPC messages.
|
||||
/// Note: `should_process` is currently only useful for the `Attestation` variant.
|
||||
/// if `should_process` is `false`, we only propagate the message on successful verification,
|
||||
/// else, we propagate **and** import into the beacon chain.
|
||||
fn handle_gossip(
|
||||
&mut self,
|
||||
id: MessageId,
|
||||
peer_id: PeerId,
|
||||
gossip_message: PubsubMessage<T::EthSpec>,
|
||||
should_process: bool,
|
||||
) {
|
||||
match gossip_message {
|
||||
// Attestations should never reach the router.
|
||||
PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => {
|
||||
self.processor
|
||||
.on_aggregated_attestation_gossip(id, peer_id, *aggregate_and_proof);
|
||||
}
|
||||
PubsubMessage::Attestation(subnet_attestation) => {
|
||||
self.processor.on_unaggregated_attestation_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
subnet_attestation.1.clone(),
|
||||
subnet_attestation.0,
|
||||
should_process,
|
||||
);
|
||||
}
|
||||
PubsubMessage::BeaconBlock(block) => {
|
||||
self.processor.on_block_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
self.network_globals.client(&peer_id),
|
||||
block,
|
||||
);
|
||||
}
|
||||
PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs) => {
|
||||
self.processor.on_block_and_blobs_sidecar_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
self.network_globals.client(&peer_id),
|
||||
block_and_blobs,
|
||||
);
|
||||
}
|
||||
PubsubMessage::VoluntaryExit(exit) => {
|
||||
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);
|
||||
self.processor.on_voluntary_exit_gossip(id, peer_id, exit);
|
||||
}
|
||||
PubsubMessage::ProposerSlashing(proposer_slashing) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received a proposer slashing";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor
|
||||
.on_proposer_slashing_gossip(id, peer_id, proposer_slashing);
|
||||
}
|
||||
PubsubMessage::AttesterSlashing(attester_slashing) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received a attester slashing";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor
|
||||
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
|
||||
}
|
||||
PubsubMessage::SignedContributionAndProof(contribution_and_proof) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received sync committee aggregate";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor.on_sync_committee_contribution_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
*contribution_and_proof,
|
||||
);
|
||||
}
|
||||
PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received sync committee signature";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor.on_sync_committee_signature_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
sync_committtee_msg.1,
|
||||
sync_committtee_msg.0,
|
||||
);
|
||||
}
|
||||
PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BLS to execution change";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor.on_bls_to_execution_change_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
bls_to_execution_change,
|
||||
);
|
||||
}
|
||||
PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received light client finality update";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor.on_light_client_finality_update_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
light_client_finality_update,
|
||||
);
|
||||
}
|
||||
PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received light client optimistic update";
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
self.processor.on_light_client_optimistic_update_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,579 +0,0 @@
|
||||
use crate::beacon_processor::{
|
||||
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
|
||||
};
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use crate::status::status_message;
|
||||
use crate::sync::manager::RequestId as SyncId;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
||||
use lighthouse_network::rpc::*;
|
||||
use lighthouse_network::{
|
||||
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response,
|
||||
};
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use store::SyncCommitteeMessage;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, BlobsSidecar, EthSpec, LightClientFinalityUpdate,
|
||||
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
|
||||
SignedBeaconBlockAndBlobsSidecar, SignedBlsToExecutionChange, SignedContributionAndProof,
|
||||
SignedVoluntaryExit, SubnetId, SyncSubnetId,
|
||||
};
|
||||
|
||||
/// Processes validated messages from the network. It relays necessary data to the syncing thread
|
||||
/// and processes blocks from the pubsub network.
|
||||
pub struct Processor<T: BeaconChainTypes> {
|
||||
/// A reference to the underlying beacon chain.
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
/// A channel to the syncing thread.
|
||||
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
|
||||
/// A network context to return and handle RPC requests.
|
||||
network: HandlerNetworkContext<T::EthSpec>,
|
||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
|
||||
/// The `RPCHandler` logger.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Processor<T> {
|
||||
/// Instantiate a `Processor` instance
|
||||
pub fn new(
|
||||
executor: task_executor::TaskExecutor,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
let sync_logger = log.new(o!("service"=> "sync"));
|
||||
let (beacon_processor_send, beacon_processor_receive) =
|
||||
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
|
||||
|
||||
// spawn the sync thread
|
||||
let sync_send = crate::sync::manager::spawn(
|
||||
executor.clone(),
|
||||
beacon_chain.clone(),
|
||||
network_globals.clone(),
|
||||
network_send.clone(),
|
||||
beacon_processor_send.clone(),
|
||||
sync_logger,
|
||||
);
|
||||
|
||||
BeaconProcessor {
|
||||
beacon_chain: Arc::downgrade(&beacon_chain),
|
||||
network_tx: network_send.clone(),
|
||||
sync_tx: sync_send.clone(),
|
||||
network_globals,
|
||||
executor,
|
||||
max_workers: cmp::max(1, num_cpus::get()),
|
||||
current_workers: 0,
|
||||
importing_blocks: Default::default(),
|
||||
log: log.clone(),
|
||||
}
|
||||
.spawn_manager(beacon_processor_receive, None);
|
||||
|
||||
Processor {
|
||||
chain: beacon_chain,
|
||||
sync_send,
|
||||
network: HandlerNetworkContext::new(network_send, log.clone()),
|
||||
beacon_processor_send,
|
||||
log: log.new(o!("service" => "router")),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_to_sync(&mut self, message: SyncMessage<T::EthSpec>) {
|
||||
self.sync_send.send(message).unwrap_or_else(|e| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Could not send message to the sync service";
|
||||
"error" => %e,
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a peer disconnect.
|
||||
///
|
||||
/// Removes the peer from the manager.
|
||||
pub fn on_disconnect(&mut self, peer_id: PeerId) {
|
||||
self.send_to_sync(SyncMessage::Disconnect(peer_id));
|
||||
}
|
||||
|
||||
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
|
||||
/// this function notifies the sync manager of the error.
|
||||
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
|
||||
// Check if the failed RPC belongs to sync
|
||||
if let RequestId::Sync(request_id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a `Status` message to the peer.
|
||||
///
|
||||
/// Called when we first connect to a peer, or when the PeerManager determines we need to
|
||||
/// re-status.
|
||||
pub fn send_status(&mut self, peer_id: PeerId) {
|
||||
let status_message = status_message(&self.chain);
|
||||
debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message);
|
||||
self.network
|
||||
.send_processor_request(peer_id, Request::Status(status_message));
|
||||
}
|
||||
|
||||
/// Handle a `Status` request.
|
||||
///
|
||||
/// Processes the `Status` from the remote peer and sends back our `Status`.
|
||||
pub fn on_status_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
status: StatusMessage,
|
||||
) {
|
||||
debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status);
|
||||
|
||||
// Say status back.
|
||||
self.network.send_response(
|
||||
peer_id,
|
||||
Response::Status(status_message(&self.chain)),
|
||||
request_id,
|
||||
);
|
||||
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status))
|
||||
}
|
||||
|
||||
/// Process a `Status` response from a peer.
|
||||
pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) {
|
||||
debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status);
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status))
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRoot` request from the peer.
|
||||
pub fn on_blocks_by_root_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: BlocksByRootRequest,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request(
|
||||
peer_id, request_id, request,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_blobs_by_range_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: BlobsByRangeRequest,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_range_request(
|
||||
peer_id, request_id, request,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_blobs_by_root_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: BlobsByRootRequest,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_root_request(
|
||||
peer_id, request_id, request,
|
||||
))
|
||||
}
|
||||
|
||||
/// Handle a `LightClientBootstrap` request from the peer.
|
||||
pub fn on_lightclient_bootstrap(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: LightClientBootstrapRequest,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::lightclient_bootstrap_request(
|
||||
peer_id, request_id, request,
|
||||
))
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRange` request from the peer.
|
||||
pub fn on_blocks_by_range_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
req: BlocksByRangeRequest,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_range_request(
|
||||
peer_id, request_id, req,
|
||||
))
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRange` response from the peer.
|
||||
/// A `beacon_block` behaves as a stream which is terminated on a `None` response.
|
||||
pub fn on_blocks_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
|
||||
unreachable!("Block lookups do not request BBRange requests")
|
||||
}
|
||||
id @ (SyncId::BackFillBlocks { .. }
|
||||
| SyncId::RangeBlocks { .. }
|
||||
| SyncId::BackFillBlobs { .. }
|
||||
| SyncId::RangeBlobs { .. }) => id,
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlocksByRange Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn on_blobs_by_range_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
blob_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlobsByRange Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
|
||||
if let RequestId::Sync(id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcBlobs {
|
||||
peer_id,
|
||||
request_id: id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"All blobs by range responses should belong to sync"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRoot` response from the peer.
|
||||
pub fn on_blocks_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
|
||||
SyncId::BackFillBlocks { .. }
|
||||
| SyncId::RangeBlocks { .. }
|
||||
| SyncId::RangeBlobs { .. }
|
||||
| SyncId::BackFillBlobs { .. } => {
|
||||
unreachable!("Batch syncing do not request BBRoot requests")
|
||||
}
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlocksByRoot Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlock {
|
||||
peer_id,
|
||||
request_id,
|
||||
beacon_block,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRoot` response from the peer.
|
||||
pub fn on_blobs_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
block_and_blobs: Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
|
||||
) {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => match sync_id {
|
||||
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
|
||||
SyncId::BackFillBlocks { .. }
|
||||
| SyncId::RangeBlocks { .. }
|
||||
| SyncId::RangeBlobs { .. }
|
||||
| SyncId::BackFillBlobs { .. } => {
|
||||
unreachable!("Batch syncing does not request BBRoot requests")
|
||||
}
|
||||
},
|
||||
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
"Received BlockAndBlobssByRoot Response";
|
||||
"peer" => %peer_id,
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlockAndBlobs {
|
||||
peer_id,
|
||||
request_id,
|
||||
block_and_blobs,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Process a gossip message declaring a new block.
|
||||
///
|
||||
/// Attempts to apply to block to the beacon chain. May queue the block for later processing.
|
||||
///
|
||||
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
|
||||
pub fn on_block_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
peer_client: Client,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block(
|
||||
message_id,
|
||||
peer_id,
|
||||
peer_client,
|
||||
block,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_block_and_blobs_sidecar_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
peer_client: Client,
|
||||
block_and_blobs: SignedBeaconBlockAndBlobsSidecar<T::EthSpec>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_block_and_blobs_sidecar(
|
||||
message_id,
|
||||
peer_id,
|
||||
peer_client,
|
||||
block_and_blobs,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_unaggregated_attestation_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
unaggregated_attestation: Attestation<T::EthSpec>,
|
||||
subnet_id: SubnetId,
|
||||
should_process: bool,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation(
|
||||
message_id,
|
||||
peer_id,
|
||||
unaggregated_attestation,
|
||||
subnet_id,
|
||||
should_process,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_aggregated_attestation_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
aggregate: SignedAggregateAndProof<T::EthSpec>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation(
|
||||
message_id,
|
||||
peer_id,
|
||||
aggregate,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_voluntary_exit_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
voluntary_exit: Box<SignedVoluntaryExit>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit(
|
||||
message_id,
|
||||
peer_id,
|
||||
voluntary_exit,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_proposer_slashing_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
proposer_slashing: Box<ProposerSlashing>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
proposer_slashing,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_attester_slashing_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing(
|
||||
message_id,
|
||||
peer_id,
|
||||
attester_slashing,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_sync_committee_signature_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
sync_signature: SyncCommitteeMessage,
|
||||
subnet_id: SyncSubnetId,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature(
|
||||
message_id,
|
||||
peer_id,
|
||||
sync_signature,
|
||||
subnet_id,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_sync_committee_contribution_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
sync_contribution: SignedContributionAndProof<T::EthSpec>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution(
|
||||
message_id,
|
||||
peer_id,
|
||||
sync_contribution,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_bls_to_execution_change_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change(
|
||||
message_id,
|
||||
peer_id,
|
||||
bls_to_execution_change,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_light_client_finality_update_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_finality_update(
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_finality_update,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_light_client_optimistic_update_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_optimistic_update(
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
|
||||
self.beacon_processor_send
|
||||
.try_send(work)
|
||||
.unwrap_or_else(|e| {
|
||||
let work_type = match &e {
|
||||
mpsc::error::TrySendError::Closed(work)
|
||||
| mpsc::error::TrySendError::Full(work) => work.work_type(),
|
||||
};
|
||||
error!(&self.log, "Unable to send message to the beacon processor";
|
||||
"error" => %e, "type" => work_type)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps a Network Channel to employ various RPC related network functionality for the
|
||||
/// processor.
|
||||
#[derive(Clone)]
|
||||
pub struct HandlerNetworkContext<T: EthSpec> {
|
||||
/// The network channel to relay messages to the Network service.
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,
|
||||
/// Logger for the `NetworkContext`.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> HandlerNetworkContext<T> {
|
||||
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<T>>, log: slog::Logger) -> Self {
|
||||
Self { network_send, log }
|
||||
}
|
||||
|
||||
/// Sends a message to the network task.
|
||||
fn inform_network(&mut self, msg: NetworkMessage<T>) {
|
||||
self.network_send.send(msg).unwrap_or_else(
|
||||
|e| warn!(self.log, "Could not send message to the network service"; "error" => %e),
|
||||
)
|
||||
}
|
||||
|
||||
/// Sends a request to the network task.
|
||||
pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) {
|
||||
self.inform_network(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: RequestId::Router,
|
||||
request,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sends a response to the network task.
|
||||
pub fn send_response(&mut self, peer_id: PeerId, response: Response<T>, id: PeerRequestId) {
|
||||
self.inform_network(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn timestamp_now() -> Duration {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| Duration::from_secs(0))
|
||||
}
|
@ -472,7 +472,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
) {
|
||||
match ev {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
self.send_to_router(RouterMessage::PeerDialed(peer_id));
|
||||
self.send_to_router(RouterMessage::StatusPeer(peer_id));
|
||||
}
|
||||
NetworkEvent::PeerConnectedIncoming(_)
|
||||
| NetworkEvent::PeerBanned(_)
|
||||
|
@ -1,13 +1,16 @@
|
||||
# Frequently Asked Questions
|
||||
|
||||
## 1. Where can I find my API token?
|
||||
## 1. Are there any requirements to run Siren?
|
||||
Yes, Siren requires Lighthouse v3.5.1 or higher to function properly. These releases can be found on the [releases](https://github.com/sigp/lighthouse/releases) page of the Lighthouse repository.
|
||||
|
||||
## 2. Where can I find my API token?
|
||||
The required Api token may be found in the default data directory of the validator client. For more information please refer to the lighthouse ui configuration [`api token section`](./ui-configuration.md#api-token).
|
||||
|
||||
## 2. How do I fix the Node Network Errors?
|
||||
## 3. How do I fix the Node Network Errors?
|
||||
If you recieve a red notification with a BEACON or VALIDATOR NODE NETWORK ERROR you can refer to the lighthouse ui configuration and [`connecting to clients section`](./ui-configuration.md#connecting-to-the-clients).
|
||||
|
||||
## 3. How do I change my Beacon or Validator address after logging in?
|
||||
## 4. How do I change my Beacon or Validator address after logging in?
|
||||
Once you have successfully arrived to the main dashboard, use the sidebar to access the settings view. In the top right hand corner there is a `Configurtion` action button that will redirect you back to the configuration screen where you can make appropriate changes.
|
||||
|
||||
## 4. Why doesn't my validator balance graph show any data?
|
||||
## 5. Why doesn't my validator balance graph show any data?
|
||||
If your graph is not showing data, it usually means your validator node is still caching data. The application must wait at least 3 epochs before it can render any graphical visualizations. This could take up to 20min.
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
Siren runs on Linux, MacOS and Windows.
|
||||
|
||||
## Version Requirement
|
||||
The Siren app requires Lighthouse v3.5.1 or higher to function properly. These versions can be found on the [releases](https://github.com/sigp/lighthouse/releases) page of the Lighthouse repository.
|
||||
|
||||
## Pre-Built Electron Packages
|
||||
|
||||
|
@ -160,6 +160,12 @@ where
|
||||
self.map.contains(key)
|
||||
}
|
||||
|
||||
/// Shrink the mappings to fit the current size.
|
||||
pub fn shrink_to_fit(&mut self) {
|
||||
self.map.shrink_to_fit();
|
||||
self.list.shrink_to_fit();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[track_caller]
|
||||
fn check_invariant(&self) {
|
||||
|
@ -12,7 +12,7 @@ use test_random_derive::TestRandom;
|
||||
use tree_hash::TreeHash;
|
||||
use tree_hash_derive::TreeHash;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum BlockType {
|
||||
Blinded,
|
||||
Full,
|
||||
|
@ -4,7 +4,7 @@ version = "3.5.1"
|
||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = "2021"
|
||||
autotests = false
|
||||
rust-version = "1.65"
|
||||
rust-version = "1.66"
|
||||
|
||||
[features]
|
||||
default = ["slasher-mdbx"]
|
||||
|
Loading…
Reference in New Issue
Block a user