Merge branch 'eip4844' into deneb-free-blobs

This commit is contained in:
Diva M 2023-03-17 16:39:17 -05:00
commit 78414333a2
No known key found for this signature in database
GPG Key ID: 1BAE5E01126680FE
27 changed files with 861 additions and 1034 deletions

17
Cargo.lock generated
View File

@ -1629,6 +1629,16 @@ dependencies = [
"tokio-util 0.6.10",
]
[[package]]
name = "delay_map"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4355c25cbf99edcb6b4a0e906f6bdc6956eda149e84455bea49696429b2f8e8"
dependencies = [
"futures",
"tokio-util 0.7.7",
]
[[package]]
name = "deposit_contract"
version = "0.2.0"
@ -1827,7 +1837,7 @@ dependencies = [
"aes 0.7.5",
"aes-gcm 0.9.4",
"arrayvec",
"delay_map",
"delay_map 0.1.2",
"enr",
"fnv",
"futures",
@ -4440,7 +4450,7 @@ dependencies = [
name = "lighthouse_network"
version = "0.2.0"
dependencies = [
"delay_map",
"delay_map 0.3.0",
"directory",
"dirs",
"discv5",
@ -5057,7 +5067,7 @@ name = "network"
version = "0.2.0"
dependencies = [
"beacon_chain",
"delay_map",
"delay_map 0.3.0",
"derivative",
"environment",
"error-chain",
@ -8021,6 +8031,7 @@ dependencies = [
"futures-io",
"futures-sink",
"pin-project-lite 0.2.9",
"slab",
"tokio",
"tracing",
]

View File

@ -1,4 +1,4 @@
FROM rust:1.65.0-bullseye AS builder
FROM rust:1.66.0-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake clang libclang-dev protobuf-compiler
COPY . lighthouse
ARG FEATURES

View File

@ -5348,7 +5348,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
latest_valid_hash,
ref validation_error,
} => {
debug!(
warn!(
self.log,
"Invalid execution payload";
"validation_error" => ?validation_error,
@ -5357,11 +5357,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"head_block_root" => ?head_block_root,
"method" => "fcU",
);
warn!(
self.log,
"Fork choice update invalidated payload";
"status" => ?status
);
match latest_valid_hash {
// The `latest_valid_hash` is set to `None` when the EE
@ -5407,7 +5402,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
PayloadStatus::InvalidBlockHash {
ref validation_error,
} => {
debug!(
warn!(
self.log,
"Invalid execution payload block hash";
"validation_error" => ?validation_error,
@ -5415,11 +5410,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"head_block_root" => ?head_block_root,
"method" => "fcU",
);
warn!(
self.log,
"Fork choice update invalidated payload";
"status" => ?status
);
// The execution engine has stated that the head block is invalid, however it
// hasn't returned a latest valid ancestor.
//

View File

@ -306,10 +306,10 @@ pub enum BlockError<T: EthSpec> {
///
/// ## Peer scoring
///
/// TODO(merge): reconsider how we score peers for this.
///
/// The peer sent us an invalid block, but I'm not really sure how to score this in an
/// "optimistic" sync world.
/// The peer sent us an invalid block, we must penalise harshly.
/// If it's actually our fault (e.g. our execution node database is corrupt) we have bigger
/// problems to worry about than losing peers, and we're doing the network a favour by
/// disconnecting.
ParentExecutionPayloadInvalid {
parent_root: Hash256,
},

View File

@ -159,7 +159,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>(
latest_valid_hash,
ref validation_error,
} => {
debug!(
warn!(
chain.log,
"Invalid execution payload";
"validation_error" => ?validation_error,
@ -206,7 +206,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>(
PayloadStatus::InvalidBlockHash {
ref validation_error,
} => {
debug!(
warn!(
chain.log,
"Invalid execution payload block hash";
"validation_error" => ?validation_error,

View File

@ -15,6 +15,7 @@ use std::io;
use std::marker::PhantomData;
use std::str::Utf8Error;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::AbstractExecPayload;
use types::{
AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256,
IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof,
@ -1736,9 +1737,9 @@ fn u64_to_i64(n: impl Into<u64>) -> i64 {
}
/// Returns the delay between the start of `block.slot` and `seen_timestamp`.
pub fn get_block_delay_ms<T: EthSpec, S: SlotClock>(
pub fn get_block_delay_ms<T: EthSpec, S: SlotClock, P: AbstractExecPayload<T>>(
seen_timestamp: Duration,
block: BeaconBlockRef<'_, T>,
block: BeaconBlockRef<'_, T, P>,
slot_clock: &S,
) -> Duration {
get_slot_delay_ms::<S>(seen_timestamp, block.slot(), slot_clock)

View File

@ -460,7 +460,11 @@ async fn capella_readiness_logging<T: BeaconChainTypes>(
match beacon_chain.check_capella_readiness().await {
CapellaReadiness::Ready => {
info!(log, "Ready for Capella")
info!(
log,
"Ready for Capella";
"info" => "ensure the execution endpoint is updated to the latest Capella/Shanghai release"
)
}
readiness @ CapellaReadiness::ExchangeCapabilitiesFailed { error: _ } => {
error!(

View File

@ -87,7 +87,7 @@ const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
const CONFIG_POLL_INTERVAL: Duration = Duration::from_secs(60);
/// A payload alongside some information about where it came from.
enum ProvenancedPayload<P> {
pub enum ProvenancedPayload<P> {
/// A good ol' fashioned farm-to-table payload from your local EE.
Local(P),
/// A payload from a builder (e.g. mev-boost).

View File

@ -39,6 +39,7 @@ use lighthouse_version::version_with_platform;
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
use operation_pool::ReceivedPreCapella;
use parking_lot::RwLock;
use publish_blocks::ProvenancedBlock;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
@ -1125,9 +1126,15 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
publish_blocks::publish_block(None, block_contents, chain, &network_tx, log)
.await
.map(|()| warp::reply().into_response())
publish_blocks::publish_block(
None,
ProvenancedBlock::Local(block_contents),
chain,
&network_tx,
log,
)
.await
.map(|()| warp::reply().into_response())
},
);

View File

@ -29,9 +29,10 @@ lazy_static::lazy_static! {
"http_api_beacon_proposer_cache_misses_total",
"Count of times the proposer cache has been missed",
);
pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result<Histogram> = try_create_histogram(
pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"http_api_block_broadcast_delay_times",
"Time between start of the slot and when the block was broadcast"
"Time between start of the slot and when the block was broadcast",
&["provenance"]
);
pub static ref HTTP_API_BLOCK_PUBLISHED_LATE_TOTAL: Result<IntCounter> = try_create_int_counter(
"http_api_block_published_late_total",

View File

@ -1,33 +1,53 @@
use crate::metrics;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::NotifyExecutionLayer;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
use beacon_chain::{
BeaconChain, BeaconChainTypes, BlockError, CountUnrealized, NotifyExecutionLayer,
};
use eth2::types::SignedBlockContents;
use execution_layer::ProvenancedPayload;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload,
Hash256, SignedBeaconBlock,
AbstractExecPayload, BeaconBlockRef, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash,
FullPayload, Hash256, SignedBeaconBlock,
};
use warp::Rejection;
pub enum ProvenancedBlock<T: EthSpec> {
/// The payload was built using a local EE.
Local(SignedBlockContents<T, FullPayload<T>>),
/// The payload was build using a remote builder (e.g., via a mev-boost
/// compatible relay).
Builder(SignedBlockContents<T, FullPayload<T>>),
}
/// Handles a request from the HTTP API for full blocks.
pub async fn publish_block<T: BeaconChainTypes>(
block_root: Option<Hash256>,
block_contents: SignedBlockContents<T::EthSpec>,
provenanced_block: ProvenancedBlock<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger,
) -> Result<(), Rejection> {
let seen_timestamp = timestamp_now();
let (block, maybe_blobs) = block_contents.deconstruct();
let block = Arc::new(block);
let (block, maybe_blobs, is_locally_built_block) = match provenanced_block {
ProvenancedBlock::Local(block_contents) => {
let (block, maybe_blobs) = block_contents.deconstruct();
(Arc::new(block), maybe_blobs, true)
}
ProvenancedBlock::Builder(block_contents) => {
let (block, maybe_blobs) = block_contents.deconstruct();
(Arc::new(block), maybe_blobs, false)
}
};
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
//FIXME(sean) have to move this to prior to publishing because it's included in the blobs sidecar message.
//this may skew metrics
@ -62,11 +82,6 @@ pub async fn publish_block<T: BeaconChainTypes>(
}
};
// Determine the delay after the start of the slot, register it with metrics.
let block = wrapped_block.as_block();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay);
let available_block = match wrapped_block.into_available_block(block_root, &chain) {
Ok(available_block) => available_block,
Err(e) => {
@ -111,30 +126,17 @@ pub async fn publish_block<T: BeaconChainTypes>(
// head.
chain.recompute_head_at_current_slot().await;
// Perform some logging to inform users if their blocks are being produced
// late.
//
// Check to see the thresholds are non-zero to avoid logging errors with small
// slot times (e.g., during testing)
let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay();
let delayed_threshold = too_late_threshold / 2;
if delay >= too_late_threshold {
error!(
log,
"Block was broadcast too late";
"msg" => "system may be overloaded, block likely to be orphaned",
"delay_ms" => delay.as_millis(),
"slot" => available_block.slot(),
"root" => ?root,
)
} else if delay >= delayed_threshold {
error!(
log,
"Block broadcast was delayed";
"msg" => "system may be overloaded, block may be orphaned",
"delay_ms" => delay.as_millis(),
"slot" => available_block.slot(),
"root" => ?root,
// Only perform late-block logging here if the block is local. For
// blocks built with builders we consider the broadcast time to be
// when the blinded block is published to the builder.
if is_locally_built_block {
late_block_logging(
&chain,
seen_timestamp,
available_block.message(),
root,
"local",
&log,
)
}
@ -183,14 +185,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
) -> Result<(), Rejection> {
let block_root = block.canonical_root();
let full_block = reconstruct_block(chain.clone(), block_root, block, log.clone()).await?;
publish_block::<T>(
Some(block_root),
SignedBlockContents::Block(full_block),
chain,
network_tx,
log,
)
.await
publish_block::<T>(Some(block_root), full_block, chain, network_tx, log).await
}
/// Deconstruct the given blinded block, and construct a full block. This attempts to use the
@ -201,15 +196,15 @@ async fn reconstruct_block<T: BeaconChainTypes>(
block_root: Hash256,
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
log: Logger,
) -> Result<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>, Rejection> {
let full_payload = if let Ok(payload_header) = block.message().body().execution_payload() {
) -> Result<ProvenancedBlock<T::EthSpec>, Rejection> {
let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() {
let el = chain.execution_layer.as_ref().ok_or_else(|| {
warp_utils::reject::custom_server_error("Missing execution layer".to_string())
})?;
// If the execution block hash is zero, use an empty payload.
let full_payload = if payload_header.block_hash() == ExecutionBlockHash::zero() {
FullPayload::default_at_fork(
let payload = FullPayload::default_at_fork(
chain
.spec
.fork_name_at_epoch(block.slot().epoch(T::EthSpec::slots_per_epoch())),
@ -219,15 +214,30 @@ async fn reconstruct_block<T: BeaconChainTypes>(
"Default payload construction error: {e:?}"
))
})?
.into()
// If we already have an execution payload with this transactions root cached, use it.
.into();
ProvenancedPayload::Local(payload)
// If we already have an execution payload with this transactions root cached, use it.
} else if let Some(cached_payload) =
el.get_payload_by_root(&payload_header.tree_hash_root())
{
info!(log, "Reconstructing a full block using a local payload"; "block_hash" => ?cached_payload.block_hash());
cached_payload
// Otherwise, this means we are attempting a blind block proposal.
ProvenancedPayload::Local(cached_payload)
// Otherwise, this means we are attempting a blind block proposal.
} else {
// Perform the logging for late blocks when we publish to the
// builder, rather than when we publish to the network. This helps
// prevent false positive logs when the builder publishes to the P2P
// network significantly earlier than when they return the block to
// us.
late_block_logging(
&chain,
timestamp_now(),
block.message(),
block_root,
"builder",
&log,
);
let full_payload = el
.propose_blinded_beacon_block(block_root, &block)
.await
@ -238,7 +248,7 @@ async fn reconstruct_block<T: BeaconChainTypes>(
))
})?;
info!(log, "Successfully published a block to the builder network"; "block_hash" => ?full_payload.block_hash());
full_payload
ProvenancedPayload::Builder(full_payload)
};
Some(full_payload)
@ -246,7 +256,71 @@ async fn reconstruct_block<T: BeaconChainTypes>(
None
};
block.try_into_full_block(full_payload).ok_or_else(|| {
match full_payload_opt {
// A block without a payload is pre-merge and we consider it locally
// built.
None => block
.try_into_full_block(None)
.map(SignedBlockContents::Block)
.map(ProvenancedBlock::Local),
Some(ProvenancedPayload::Local(full_payload)) => block
.try_into_full_block(Some(full_payload))
.map(SignedBlockContents::Block)
.map(ProvenancedBlock::Local),
Some(ProvenancedPayload::Builder(full_payload)) => block
.try_into_full_block(Some(full_payload))
.map(SignedBlockContents::Block)
.map(ProvenancedBlock::Builder),
}
.ok_or_else(|| {
warp_utils::reject::custom_server_error("Unable to add payload to block".to_string())
})
}
/// If the `seen_timestamp` is some time after the start of the slot for
/// `block`, create some logs to indicate that the block was published late.
fn late_block_logging<T: BeaconChainTypes, P: AbstractExecPayload<T::EthSpec>>(
chain: &BeaconChain<T>,
seen_timestamp: Duration,
block: BeaconBlockRef<T::EthSpec, P>,
root: Hash256,
provenance: &str,
log: &Logger,
) {
let delay = get_block_delay_ms(seen_timestamp, block, &chain.slot_clock);
metrics::observe_timer_vec(
&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES,
&[provenance],
delay,
);
// Perform some logging to inform users if their blocks are being produced
// late.
//
// Check to see the thresholds are non-zero to avoid logging errors with small
// slot times (e.g., during testing)
let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay();
let delayed_threshold = too_late_threshold / 2;
if delay >= too_late_threshold {
error!(
log,
"Block was broadcast too late";
"msg" => "system may be overloaded, block likely to be orphaned",
"provenance" => provenance,
"delay_ms" => delay.as_millis(),
"slot" => block.slot(),
"root" => ?root,
)
} else if delay >= delayed_threshold {
error!(
log,
"Block broadcast was delayed";
"msg" => "system may be overloaded, block may be orphaned",
"provenance" => provenance,
"delay_ms" => delay.as_millis(),
"slot" => block.slot(),
"root" => ?root,
)
}
}

View File

@ -42,7 +42,7 @@ strum = { version = "0.24.0", features = ["derive"] }
superstruct = "0.5.0"
prometheus-client = "0.18.0"
unused_port = { path = "../../common/unused_port" }
delay_map = "0.1.1"
delay_map = "0.3.0"
void = "1"
[dependencies.libp2p]

View File

@ -13,7 +13,7 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom;
use slog::{debug, error, trace, warn};
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::collections::BTreeMap;
use std::{
sync::Arc,
time::{Duration, Instant},
@ -77,7 +77,7 @@ pub struct PeerManager<TSpec: EthSpec> {
/// The target number of peers we would like to connect to.
target_peers: usize,
/// Peers queued to be dialed.
peers_to_dial: VecDeque<(PeerId, Option<Enr>)>,
peers_to_dial: BTreeMap<PeerId, Option<Enr>>,
/// The number of temporarily banned peers. This is used to prevent instantaneous
/// reconnection.
// NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A
@ -308,7 +308,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// proves resource constraining, we should switch to multiaddr dialling here.
#[allow(clippy::mutable_key_type)]
pub fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) -> Vec<PeerId> {
let mut to_dial_peers = Vec::new();
let mut to_dial_peers = Vec::with_capacity(4);
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for (peer_id, min_ttl) in results {
@ -398,7 +398,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// A peer is being dialed.
pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option<Enr>) {
self.peers_to_dial.push_back((*peer_id, enr));
self.peers_to_dial.insert(*peer_id, enr);
}
/// Reports if a peer is banned or not.
@ -1197,6 +1197,18 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// Unban any peers that have served their temporary ban timeout
self.unban_temporary_banned_peers();
// Maintains memory by shrinking mappings
self.shrink_mappings();
}
// Reduce memory footprint by routinely shrinking associating mappings.
fn shrink_mappings(&mut self) {
self.inbound_ping_peers.shrink_to(5);
self.outbound_ping_peers.shrink_to(5);
self.status_peers.shrink_to(5);
self.temporary_banned_peers.shrink_to_fit();
self.sync_committee_subnets.shrink_to_fit();
}
// Update metrics related to peer scoring.

View File

@ -89,7 +89,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.events.shrink_to_fit();
}
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_front() {
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() {
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr);
let handler = self.new_handler();
return Poll::Ready(NetworkBehaviourAction::Dial {

View File

@ -409,10 +409,14 @@ impl ProtocolId {
/// beginning of the stream, else returns `false`.
pub fn has_context_bytes(&self) -> bool {
match self.message_name {
Protocol::BlocksByRange | Protocol::BlocksByRoot => {
!matches!(self.version, Version::V1)
}
Protocol::BlobsByRange | Protocol::BlobsByRoot | Protocol::LightClientBootstrap => true,
Protocol::BlocksByRange | Protocol::BlocksByRoot => match self.version {
Version::V2 => true,
Version::V1 => false,
},
Protocol::LightClientBootstrap => match self.version {
Version::V2 | Version::V1 => true,
},
Protocol::BlobsByRoot | Protocol::BlobsByRange => true,
Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false,
}
}

View File

@ -43,7 +43,7 @@ if-addrs = "0.6.4"
strum = "0.24.0"
tokio-util = { version = "0.6.3", features = ["time"] }
derivative = "2.2.0"
delay_map = "0.1.1"
delay_map = "0.3.0"
ethereum-types = { version = "0.14.1", optional = true }
operation_pool = { path = "../operation_pool" }
execution_layer = { path = "../execution_layer" }

View File

@ -856,7 +856,6 @@ impl<T: BeaconChainTypes> Worker<T> {
| Err(e @ BlockError::WeakSubjectivityConflict)
| Err(e @ BlockError::InconsistentFork(_))
| Err(e @ BlockError::ExecutionPayloadError(_))
// TODO(merge): reconsider peer scoring for this event.
| Err(e @ BlockError::ParentExecutionPayloadInvalid { .. })
| Err(e @ BlockError::GenesisBlock) => {
warn!(self.log, "Could not verify block for gossip. Rejecting the block";
@ -869,7 +868,7 @@ impl<T: BeaconChainTypes> Worker<T> {
);
return None;
}
Err(e@ BlockError::BlobValidation(_)) => {
Err(e @ BlockError::BlobValidation(_)) => {
warn!(self.log, "Could not verify blob for gossip. Rejecting the block and blob";
"error" => %e);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);

View File

@ -528,6 +528,21 @@ impl<T: BeaconChainTypes> Worker<T> {
})
}
}
ref err @ BlockError::ParentExecutionPayloadInvalid { ref parent_root } => {
warn!(
self.log,
"Failed to sync chain built on invalid parent";
"parent_root" => ?parent_root,
"advice" => "check execution node for corruption then restart it and Lighthouse",
);
Err(ChainSegmentFailed {
message: format!("Peer sent invalid block. Reason: {err:?}"),
// We need to penalise harshly in case this represents an actual attack. In case
// of a faulty EL it will usually require manual intervention to fix anyway, so
// it's not too bad if we drop most of our peers.
peer_action: Some(PeerAction::LowToleranceError),
})
}
other => {
debug!(
self.log, "Invalid block received";

View File

@ -0,0 +1,627 @@
//! This module handles incoming network messages.
//!
//! It routes the messages to appropriate services.
//! It handles requests at the application layer in its associated processor and directs
//! syncing-related responses to the Sync manager.
#![allow(clippy::unit_arg)]
use crate::beacon_processor::{
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::error;
use crate::service::{NetworkMessage, RequestId};
use crate::status::status_message;
use crate::sync::manager::RequestId as SyncId;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::{future, StreamExt};
use lighthouse_network::{rpc::*, PubsubMessage};
use lighthouse_network::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response};
use slog::{debug, error, o, trace, warn};
use std::cmp;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
/// Access to the peer db and network information.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// A channel to the syncing thread.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
/// The `Router` logger.
log: slog::Logger,
}
/// Types of messages the router can receive.
#[derive(Debug)]
pub enum RouterMessage<T: EthSpec> {
/// Peer has disconnected.
PeerDisconnected(PeerId),
/// An RPC request has been received.
RPCRequestReceived {
peer_id: PeerId,
id: PeerRequestId,
request: Request,
},
/// An RPC response has been received.
RPCResponseReceived {
peer_id: PeerId,
request_id: RequestId,
response: Response<T>,
},
/// An RPC request failed
RPCFailed {
peer_id: PeerId,
request_id: RequestId,
error: RPCError,
},
/// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message, the message itself and a bool which indicates if the message should be processed
/// by the beacon chain after successful verification.
PubsubMessage(MessageId, PeerId, PubsubMessage<T>, bool),
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
}
impl<T: BeaconChainTypes> Router<T> {
/// Initializes and runs the Router.
pub fn spawn(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
executor: task_executor::TaskExecutor,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router"));
trace!(message_handler_log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel();
let (beacon_processor_send, beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let sync_logger = log.new(o!("service"=> "sync"));
// spawn the sync thread
let sync_send = crate::sync::manager::spawn(
executor.clone(),
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
beacon_processor_send.clone(),
sync_logger,
);
BeaconProcessor {
beacon_chain: Arc::downgrade(&beacon_chain),
network_tx: network_send.clone(),
sync_tx: sync_send.clone(),
network_globals: network_globals.clone(),
executor: executor.clone(),
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: Default::default(),
log: log.clone(),
}
.spawn_manager(beacon_processor_receive, None);
// generate the Message handler
let mut handler = Router {
network_globals,
chain: beacon_chain,
sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()),
beacon_processor_send,
log: message_handler_log,
};
// spawn handler task and move the message handler instance into the spawned thread
executor.spawn(
async move {
debug!(log, "Network message router started");
UnboundedReceiverStream::new(handler_recv)
.for_each(move |msg| future::ready(handler.handle_message(msg)))
.await;
},
"router",
);
Ok(handler_send)
}
/// Handle all messages incoming from the network service.
fn handle_message(&mut self, message: RouterMessage<T::EthSpec>) {
match message {
// we have initiated a connection to a peer or the peer manager has requested a
// re-status
RouterMessage::StatusPeer(peer_id) => {
self.send_status(peer_id);
}
// A peer has disconnected
RouterMessage::PeerDisconnected(peer_id) => {
self.send_to_sync(SyncMessage::Disconnect(peer_id));
}
RouterMessage::RPCRequestReceived {
peer_id,
id,
request,
} => {
self.handle_rpc_request(peer_id, id, request);
}
RouterMessage::RPCResponseReceived {
peer_id,
request_id,
response,
} => {
self.handle_rpc_response(peer_id, request_id, response);
}
RouterMessage::RPCFailed {
peer_id,
request_id,
error,
} => {
self.on_rpc_error(peer_id, request_id, error);
}
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);
}
}
}
/* RPC - Related functionality */
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: PeerRequestId, request: Request) {
if !self.network_globals.peers.read().is_connected(&peer_id) {
debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request);
return;
}
match request {
Request::Status(status_message) => {
self.on_status_request(peer_id, request_id, status_message)
}
Request::BlocksByRange(request) => self.send_beacon_processor_work(
BeaconWorkEvent::blocks_by_range_request(peer_id, request_id, request),
),
Request::BlocksByRoot(request) => self.send_beacon_processor_work(
BeaconWorkEvent::blocks_by_roots_request(peer_id, request_id, request),
),
Request::BlobsByRange(request) => self.send_beacon_processor_work(
BeaconWorkEvent::blobs_by_range_request(peer_id, request_id, request),
),
Request::BlobsByRoot(request) => self.send_beacon_processor_work(
BeaconWorkEvent::blobs_by_root_request(peer_id, request_id, request),
),
Request::LightClientBootstrap(request) => self.send_beacon_processor_work(
BeaconWorkEvent::lightclient_bootstrap_request(peer_id, request_id, request),
),
}
}
/// An RPC response has been received from the network.
fn handle_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
response: Response<T::EthSpec>,
) {
match response {
Response::Status(status_message) => {
debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status_message);
self.send_beacon_processor_work(BeaconWorkEvent::status_message(
peer_id,
status_message,
))
}
Response::BlocksByRange(beacon_block) => {
self.on_blocks_by_range_response(peer_id, request_id, beacon_block);
}
Response::BlocksByRoot(beacon_block) => {
self.on_blocks_by_root_response(peer_id, request_id, beacon_block);
}
Response::BlobsByRange(blob) => {
self.on_blobs_by_range_response(peer_id, request_id, blob);
}
Response::BlobsByRoot(blob) => {
self.on_blobs_by_root_response(peer_id, request_id, blob);
}
Response::LightClientBootstrap(_) => unreachable!(),
}
}
/// Handle RPC messages.
/// Note: `should_process` is currently only useful for the `Attestation` variant.
/// if `should_process` is `false`, we only propagate the message on successful verification,
/// else, we propagate **and** import into the beacon chain.
fn handle_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
gossip_message: PubsubMessage<T::EthSpec>,
should_process: bool,
) {
match gossip_message {
PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => self
.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation(
message_id,
peer_id,
*aggregate_and_proof,
timestamp_now(),
)),
PubsubMessage::Attestation(subnet_attestation) => {
self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation(
message_id,
peer_id,
subnet_attestation.1,
subnet_attestation.0,
should_process,
timestamp_now(),
))
}
PubsubMessage::BeaconBlock(block) => {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block(
message_id,
peer_id,
self.network_globals.client(&peer_id),
block,
timestamp_now(),
))
}
PubsubMessage::BlobSidecar(data) => {
let (blob_index, signed_blob) = *data;
let peer_client = self.network_globals.client(&peer_id);
let signed_blob = Arc::new(signed_blob);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar(
message_id,
peer_id,
peer_client,
blob_index,
signed_blob,
timestamp_now(),
))
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit(
message_id, peer_id, exit,
))
}
PubsubMessage::ProposerSlashing(proposer_slashing) => {
debug!(
self.log,
"Received a proposer slashing";
"peer_id" => %peer_id
);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing(
message_id,
peer_id,
proposer_slashing,
))
}
PubsubMessage::AttesterSlashing(attester_slashing) => {
debug!(
self.log,
"Received a attester slashing";
"peer_id" => %peer_id
);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing(
message_id,
peer_id,
attester_slashing,
))
}
PubsubMessage::SignedContributionAndProof(contribution_and_proof) => {
trace!(
self.log,
"Received sync committee aggregate";
"peer_id" => %peer_id
);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution(
message_id,
peer_id,
*contribution_and_proof,
timestamp_now(),
))
}
PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => {
trace!(
self.log,
"Received sync committee signature";
"peer_id" => %peer_id
);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature(
message_id,
peer_id,
sync_committtee_msg.1,
sync_committtee_msg.0,
timestamp_now(),
))
}
PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => {
trace!(
self.log,
"Received light client finality update";
"peer_id" => %peer_id
);
self.send_beacon_processor_work(
BeaconWorkEvent::gossip_light_client_finality_update(
message_id,
peer_id,
light_client_finality_update,
timestamp_now(),
),
)
}
PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => {
trace!(
self.log,
"Received light client optimistic update";
"peer_id" => %peer_id
);
self.send_beacon_processor_work(
BeaconWorkEvent::gossip_light_client_optimistic_update(
message_id,
peer_id,
light_client_optimistic_update,
timestamp_now(),
),
)
}
PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => self
.send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change(
message_id,
peer_id,
bls_to_execution_change,
)),
}
}
fn send_status(&mut self, peer_id: PeerId) {
let status_message = status_message(&self.chain);
debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message);
self.network
.send_processor_request(peer_id, Request::Status(status_message));
}
fn send_to_sync(&mut self, message: SyncMessage<T::EthSpec>) {
self.sync_send.send(message).unwrap_or_else(|e| {
warn!(
self.log,
"Could not send message to the sync service";
"error" => %e,
)
});
}
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
/// this function notifies the sync manager of the error.
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
// Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError {
peer_id,
request_id,
error,
});
}
}
/// Handle a `Status` request.
///
/// Processes the `Status` from the remote peer and sends back our `Status`.
pub fn on_status_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
status: StatusMessage,
) {
debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status);
// Say status back.
self.network.send_response(
peer_id,
Response::Status(status_message(&self.chain)),
request_id,
);
self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status))
}
/// Handle a `BlocksByRange` response from the peer.
/// A `beacon_block` behaves as a stream which is terminated on a `None` response.
pub fn on_blocks_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::BackFillBlobs { .. }
| SyncId::RangeBlobs { .. }) => id,
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
trace!(
self.log,
"Received BlocksByRange Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlock {
peer_id,
request_id,
beacon_block,
seen_timestamp: timestamp_now(),
});
}
pub fn on_blobs_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
trace!(
self.log,
"Received BlobsByRange Response";
"peer" => %peer_id,
);
if let RequestId::Sync(id) = request_id {
self.send_to_sync(SyncMessage::RpcBlob {
peer_id,
request_id: id,
blob_sidecar,
seen_timestamp: timestamp_now(),
});
} else {
debug!(
self.log,
"All blobs by range responses should belong to sync"
);
}
}
/// Handle a `BlocksByRoot` response from the peer.
pub fn on_blocks_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
trace!(
self.log,
"Received BlocksByRoot Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlock {
peer_id,
request_id,
beacon_block,
seen_timestamp: timestamp_now(),
});
}
/// Handle a `BlobsByRoot` response from the peer.
pub fn on_blobs_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
unreachable!("Batch syncing does not request BBRoot requests")
}
},
RequestId::Router => unreachable!("All BlobsByRoot requests belong to sync"),
};
trace!(
self.log,
"Received BlobsByRoot Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlob {
request_id,
peer_id,
blob_sidecar,
seen_timestamp: timestamp_now(),
});
}
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
self.beacon_processor_send
.try_send(work)
.unwrap_or_else(|e| {
let work_type = match &e {
mpsc::error::TrySendError::Closed(work)
| mpsc::error::TrySendError::Full(work) => work.work_type(),
};
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)
})
}
}
/// Wraps a Network Channel to employ various RPC related network functionality for the
/// processor.
#[derive(Clone)]
pub struct HandlerNetworkContext<T: EthSpec> {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,
/// Logger for the `NetworkContext`.
log: slog::Logger,
}
impl<T: EthSpec> HandlerNetworkContext<T> {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<T>>, log: slog::Logger) -> Self {
Self { network_send, log }
}
/// Sends a message to the network task.
fn inform_network(&mut self, msg: NetworkMessage<T>) {
self.network_send.send(msg).unwrap_or_else(
|e| warn!(self.log, "Could not send message to the network service"; "error" => %e),
)
}
/// Sends a request to the network task.
pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) {
self.inform_network(NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Router,
request,
})
}
/// Sends a response to the network task.
pub fn send_response(&mut self, peer_id: PeerId, response: Response<T>, id: PeerRequestId) {
self.inform_network(NetworkMessage::SendResponse {
peer_id,
id,
response,
})
}
}
fn timestamp_now() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
}

View File

@ -1,348 +0,0 @@
//! This module handles incoming network messages.
//!
//! It routes the messages to appropriate services.
//! It handles requests at the application layer in its associated processor and directs
//! syncing-related responses to the Sync manager.
#![allow(clippy::unit_arg)]
mod processor;
use crate::error;
use crate::service::{NetworkMessage, RequestId};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::prelude::*;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use processor::Processor;
use slog::{debug, o, trace};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use types::EthSpec;
/// Handles messages received from the network and client and organises syncing. This
/// functionality of this struct is to validate an decode messages from the network before
/// passing them to the internal message processor. The message processor spawns a syncing thread
/// which manages which blocks need to be requested and processed.
pub struct Router<T: BeaconChainTypes> {
/// Access to the peer db.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// Processes validated and decoded messages from the network. Has direct access to the
/// sync manager.
processor: Processor<T>,
/// The `Router` logger.
log: slog::Logger,
}
/// Types of messages the handler can receive.
#[derive(Debug)]
pub enum RouterMessage<T: EthSpec> {
/// We have initiated a connection to a new peer.
PeerDialed(PeerId),
/// Peer has disconnected,
PeerDisconnected(PeerId),
/// An RPC request has been received.
RPCRequestReceived {
peer_id: PeerId,
id: PeerRequestId,
request: Request,
},
/// An RPC response has been received.
RPCResponseReceived {
peer_id: PeerId,
request_id: RequestId,
response: Response<T>,
},
/// An RPC request failed
RPCFailed {
peer_id: PeerId,
request_id: RequestId,
error: RPCError,
},
/// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message, the message itself and a bool which indicates if the message should be processed
/// by the beacon chain after successful verification.
PubsubMessage(MessageId, PeerId, PubsubMessage<T>, bool),
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
}
impl<T: BeaconChainTypes> Router<T> {
/// Initializes and runs the Router.
pub fn spawn(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
executor: task_executor::TaskExecutor,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router"));
trace!(message_handler_log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise a message instance, which itself spawns the syncing thread.
let processor = Processor::new(
executor.clone(),
beacon_chain,
network_globals.clone(),
network_send,
&log,
);
// generate the Message handler
let mut handler = Router {
network_globals,
processor,
log: message_handler_log,
};
// spawn handler task and move the message handler instance into the spawned thread
executor.spawn(
async move {
debug!(log, "Network message router started");
UnboundedReceiverStream::new(handler_recv)
.for_each(move |msg| future::ready(handler.handle_message(msg)))
.await;
},
"router",
);
Ok(handler_send)
}
/// Handle all messages incoming from the network service.
fn handle_message(&mut self, message: RouterMessage<T::EthSpec>) {
match message {
// we have initiated a connection to a peer or the peer manager has requested a
// re-status
RouterMessage::PeerDialed(peer_id) | RouterMessage::StatusPeer(peer_id) => {
self.processor.send_status(peer_id);
}
// A peer has disconnected
RouterMessage::PeerDisconnected(peer_id) => {
self.processor.on_disconnect(peer_id);
}
RouterMessage::RPCRequestReceived {
peer_id,
id,
request,
} => {
self.handle_rpc_request(peer_id, id, request);
}
RouterMessage::RPCResponseReceived {
peer_id,
request_id,
response,
} => {
self.handle_rpc_response(peer_id, request_id, response);
}
RouterMessage::RPCFailed {
peer_id,
request_id,
error,
} => {
self.processor.on_rpc_error(peer_id, request_id, error);
}
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);
}
}
}
/* RPC - Related functionality */
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) {
if !self.network_globals.peers.read().is_connected(&peer_id) {
debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request);
return;
}
match request {
Request::Status(status_message) => {
self.processor
.on_status_request(peer_id, id, status_message)
}
Request::BlocksByRange(request) => self
.processor
.on_blocks_by_range_request(peer_id, id, request),
Request::BlocksByRoot(request) => self
.processor
.on_blocks_by_root_request(peer_id, id, request),
Request::BlobsByRange(request) => self
.processor
.on_blobs_by_range_request(peer_id, id, request),
Request::BlobsByRoot(request) => self
.processor
.on_blobs_by_root_request(peer_id, id, request),
Request::LightClientBootstrap(request) => self
.processor
.on_lightclient_bootstrap(peer_id, id, request),
}
}
/// An RPC response has been received from the network.
// we match on id and ignore responses past the timeout.
fn handle_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
response: Response<T::EthSpec>,
) {
// an error could have occurred.
match response {
Response::Status(status_message) => {
self.processor.on_status_response(peer_id, status_message);
}
Response::BlocksByRange(beacon_block) => {
self.processor
.on_blocks_by_range_response(peer_id, request_id, beacon_block);
}
Response::BlocksByRoot(beacon_block) => {
self.processor
.on_blocks_by_root_response(peer_id, request_id, beacon_block);
}
Response::BlobsByRange(blob) => {
self.processor
.on_blobs_by_range_response(peer_id, request_id, blob);
}
Response::BlobsByRoot(blob) => {
self.processor
.on_blobs_by_root_response(peer_id, request_id, blob);
}
Response::LightClientBootstrap(_) => unreachable!(),
}
}
/// Handle RPC messages.
/// Note: `should_process` is currently only useful for the `Attestation` variant.
/// if `should_process` is `false`, we only propagate the message on successful verification,
/// else, we propagate **and** import into the beacon chain.
fn handle_gossip(
&mut self,
id: MessageId,
peer_id: PeerId,
gossip_message: PubsubMessage<T::EthSpec>,
should_process: bool,
) {
match gossip_message {
// Attestations should never reach the router.
PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => {
self.processor
.on_aggregated_attestation_gossip(id, peer_id, *aggregate_and_proof);
}
PubsubMessage::Attestation(subnet_attestation) => {
self.processor.on_unaggregated_attestation_gossip(
id,
peer_id,
subnet_attestation.1.clone(),
subnet_attestation.0,
should_process,
);
}
PubsubMessage::BeaconBlock(block) => {
self.processor.on_block_gossip(
id,
peer_id,
self.network_globals.client(&peer_id),
block,
);
}
PubsubMessage::BlobSidecar(data) => {
let (blob_index, signed_blob) = *data;
self.processor.on_blob_sidecar_gossip(
id,
peer_id,
self.network_globals.client(&peer_id),
blob_index,
Arc::new(signed_blob),
);
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);
self.processor.on_voluntary_exit_gossip(id, peer_id, exit);
}
PubsubMessage::ProposerSlashing(proposer_slashing) => {
debug!(
self.log,
"Received a proposer slashing";
"peer_id" => %peer_id
);
self.processor
.on_proposer_slashing_gossip(id, peer_id, proposer_slashing);
}
PubsubMessage::AttesterSlashing(attester_slashing) => {
debug!(
self.log,
"Received a attester slashing";
"peer_id" => %peer_id
);
self.processor
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
}
PubsubMessage::SignedContributionAndProof(contribution_and_proof) => {
trace!(
self.log,
"Received sync committee aggregate";
"peer_id" => %peer_id
);
self.processor.on_sync_committee_contribution_gossip(
id,
peer_id,
*contribution_and_proof,
);
}
PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => {
trace!(
self.log,
"Received sync committee signature";
"peer_id" => %peer_id
);
self.processor.on_sync_committee_signature_gossip(
id,
peer_id,
sync_committtee_msg.1,
sync_committtee_msg.0,
);
}
PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => {
trace!(
self.log,
"Received BLS to execution change";
"peer_id" => %peer_id
);
self.processor.on_bls_to_execution_change_gossip(
id,
peer_id,
bls_to_execution_change,
);
}
PubsubMessage::LightClientFinalityUpdate(light_client_finality_update) => {
trace!(
self.log,
"Received light client finality update";
"peer_id" => %peer_id
);
self.processor.on_light_client_finality_update_gossip(
id,
peer_id,
light_client_finality_update,
);
}
PubsubMessage::LightClientOptimisticUpdate(light_client_optimistic_update) => {
trace!(
self.log,
"Received light client optimistic update";
"peer_id" => %peer_id
);
self.processor.on_light_client_optimistic_update_gossip(
id,
peer_id,
light_client_optimistic_update,
);
}
}
}
}

View File

@ -1,581 +0,0 @@
use crate::beacon_processor::{
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::service::{NetworkMessage, RequestId};
use crate::status::status_message;
use crate::sync::manager::RequestId as SyncId;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use lighthouse_network::rpc::*;
use lighthouse_network::{
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response,
};
use slog::{debug, error, o, trace, warn};
use std::cmp;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::SyncCommitteeMessage;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, BlobSidecar, EthSpec, LightClientFinalityUpdate,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit,
SubnetId, SyncSubnetId,
};
/// Processes validated messages from the network. It relays necessary data to the syncing thread
/// and processes blocks from the pubsub network.
pub struct Processor<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// A channel to the syncing thread.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
/// The `RPCHandler` logger.
log: slog::Logger,
}
impl<T: BeaconChainTypes> Processor<T> {
/// Instantiate a `Processor` instance
pub fn new(
executor: task_executor::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
log: &slog::Logger,
) -> Self {
let sync_logger = log.new(o!("service"=> "sync"));
let (beacon_processor_send, beacon_processor_receive) =
mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
// spawn the sync thread
let sync_send = crate::sync::manager::spawn(
executor.clone(),
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
beacon_processor_send.clone(),
sync_logger,
);
BeaconProcessor {
beacon_chain: Arc::downgrade(&beacon_chain),
network_tx: network_send.clone(),
sync_tx: sync_send.clone(),
network_globals,
executor,
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: Default::default(),
log: log.clone(),
}
.spawn_manager(beacon_processor_receive, None);
Processor {
chain: beacon_chain,
sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()),
beacon_processor_send,
log: log.new(o!("service" => "router")),
}
}
fn send_to_sync(&mut self, message: SyncMessage<T::EthSpec>) {
self.sync_send.send(message).unwrap_or_else(|e| {
warn!(
self.log,
"Could not send message to the sync service";
"error" => %e,
)
});
}
/// Handle a peer disconnect.
///
/// Removes the peer from the manager.
pub fn on_disconnect(&mut self, peer_id: PeerId) {
self.send_to_sync(SyncMessage::Disconnect(peer_id));
}
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
/// this function notifies the sync manager of the error.
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
// Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError {
peer_id,
request_id,
error,
});
}
}
/// Sends a `Status` message to the peer.
///
/// Called when we first connect to a peer, or when the PeerManager determines we need to
/// re-status.
pub fn send_status(&mut self, peer_id: PeerId) {
let status_message = status_message(&self.chain);
debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message);
self.network
.send_processor_request(peer_id, Request::Status(status_message));
}
/// Handle a `Status` request.
///
/// Processes the `Status` from the remote peer and sends back our `Status`.
pub fn on_status_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
status: StatusMessage,
) {
debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status);
// Say status back.
self.network.send_response(
peer_id,
Response::Status(status_message(&self.chain)),
request_id,
);
self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status))
}
/// Process a `Status` response from a peer.
pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) {
debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status);
self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status))
}
/// Handle a `BlocksByRoot` request from the peer.
pub fn on_blocks_by_root_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request(
peer_id, request_id, request,
))
}
pub fn on_blobs_by_range_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRangeRequest,
) {
self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_range_request(
peer_id, request_id, request,
))
}
pub fn on_blobs_by_root_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRootRequest,
) {
self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_root_request(
peer_id, request_id, request,
))
}
/// Handle a `LightClientBootstrap` request from the peer.
pub fn on_lightclient_bootstrap(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
request: LightClientBootstrapRequest,
) {
self.send_beacon_processor_work(BeaconWorkEvent::lightclient_bootstrap_request(
peer_id, request_id, request,
))
}
/// Handle a `BlocksByRange` request from the peer.
pub fn on_blocks_by_range_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
req: BlocksByRangeRequest,
) {
self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_range_request(
peer_id, request_id, req,
))
}
/// Handle a `BlocksByRange` response from the peer.
/// A `beacon_block` behaves as a stream which is terminated on a `None` response.
pub fn on_blocks_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::BackFillBlobs { .. }
| SyncId::RangeBlobs { .. }) => id,
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
trace!(
self.log,
"Received BlocksByRange Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlock {
peer_id,
request_id,
beacon_block,
seen_timestamp: timestamp_now(),
});
}
pub fn on_blobs_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
trace!(
self.log,
"Received BlobsByRange Response";
"peer" => %peer_id,
);
if let RequestId::Sync(id) = request_id {
self.send_to_sync(SyncMessage::RpcBlob {
peer_id,
request_id: id,
blob_sidecar,
seen_timestamp: timestamp_now(),
});
} else {
debug!(
self.log,
"All blobs by range responses should belong to sync"
);
}
}
/// Handle a `BlocksByRoot` response from the peer.
pub fn on_blocks_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
trace!(
self.log,
"Received BlocksByRoot Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlock {
peer_id,
request_id,
beacon_block,
seen_timestamp: timestamp_now(),
});
}
/// Handle a `BlobsByRoot` response from the peer.
pub fn on_blobs_by_root_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
unreachable!("Batch syncing does not request BBRoot requests")
}
},
RequestId::Router => unreachable!("All BlobsByRoot requests belong to sync"),
};
trace!(
self.log,
"Received BlobsByRoot Response";
"peer" => %peer_id,
);
self.send_to_sync(SyncMessage::RpcBlob {
request_id,
peer_id,
blob_sidecar,
seen_timestamp: timestamp_now(),
});
}
/// Process a gossip message declaring a new block.
///
/// Attempts to apply to block to the beacon chain. May queue the block for later processing.
///
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
pub fn on_block_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block(
message_id,
peer_id,
peer_client,
block,
timestamp_now(),
))
}
pub fn on_blob_sidecar_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blob_index: u64, // TODO: add a type for the blob index
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar(
message_id,
peer_id,
peer_client,
blob_index,
signed_blob,
timestamp_now(),
))
}
pub fn on_unaggregated_attestation_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
unaggregated_attestation: Attestation<T::EthSpec>,
subnet_id: SubnetId,
should_process: bool,
) {
self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation(
message_id,
peer_id,
unaggregated_attestation,
subnet_id,
should_process,
timestamp_now(),
))
}
pub fn on_aggregated_attestation_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
aggregate: SignedAggregateAndProof<T::EthSpec>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation(
message_id,
peer_id,
aggregate,
timestamp_now(),
))
}
pub fn on_voluntary_exit_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
voluntary_exit: Box<SignedVoluntaryExit>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit(
message_id,
peer_id,
voluntary_exit,
))
}
pub fn on_proposer_slashing_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: Box<ProposerSlashing>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing(
message_id,
peer_id,
proposer_slashing,
))
}
pub fn on_attester_slashing_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing(
message_id,
peer_id,
attester_slashing,
))
}
pub fn on_sync_committee_signature_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
sync_signature: SyncCommitteeMessage,
subnet_id: SyncSubnetId,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature(
message_id,
peer_id,
sync_signature,
subnet_id,
timestamp_now(),
))
}
pub fn on_sync_committee_contribution_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
sync_contribution: SignedContributionAndProof<T::EthSpec>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution(
message_id,
peer_id,
sync_contribution,
timestamp_now(),
))
}
pub fn on_bls_to_execution_change_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change(
message_id,
peer_id,
bls_to_execution_change,
))
}
pub fn on_light_client_finality_update_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
light_client_finality_update: Box<LightClientFinalityUpdate<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_finality_update(
message_id,
peer_id,
light_client_finality_update,
timestamp_now(),
))
}
pub fn on_light_client_optimistic_update_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_light_client_optimistic_update(
message_id,
peer_id,
light_client_optimistic_update,
timestamp_now(),
))
}
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
self.beacon_processor_send
.try_send(work)
.unwrap_or_else(|e| {
let work_type = match &e {
mpsc::error::TrySendError::Closed(work)
| mpsc::error::TrySendError::Full(work) => work.work_type(),
};
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)
})
}
}
/// Wraps a Network Channel to employ various RPC related network functionality for the
/// processor.
#[derive(Clone)]
pub struct HandlerNetworkContext<T: EthSpec> {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,
/// Logger for the `NetworkContext`.
log: slog::Logger,
}
impl<T: EthSpec> HandlerNetworkContext<T> {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<T>>, log: slog::Logger) -> Self {
Self { network_send, log }
}
/// Sends a message to the network task.
fn inform_network(&mut self, msg: NetworkMessage<T>) {
self.network_send.send(msg).unwrap_or_else(
|e| warn!(self.log, "Could not send message to the network service"; "error" => %e),
)
}
/// Sends a request to the network task.
pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) {
self.inform_network(NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Router,
request,
})
}
/// Sends a response to the network task.
pub fn send_response(&mut self, peer_id: PeerId, response: Response<T>, id: PeerRequestId) {
self.inform_network(NetworkMessage::SendResponse {
peer_id,
id,
response,
})
}
}
fn timestamp_now() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
}

View File

@ -472,7 +472,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
) {
match ev {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
self.send_to_router(RouterMessage::PeerDialed(peer_id));
self.send_to_router(RouterMessage::StatusPeer(peer_id));
}
NetworkEvent::PeerConnectedIncoming(_)
| NetworkEvent::PeerBanned(_)

View File

@ -1,13 +1,16 @@
# Frequently Asked Questions
## 1. Where can I find my API token?
## 1. Are there any requirements to run Siren?
Yes, Siren requires Lighthouse v3.5.1 or higher to function properly. These releases can be found on the [releases](https://github.com/sigp/lighthouse/releases) page of the Lighthouse repository.
## 2. Where can I find my API token?
The required Api token may be found in the default data directory of the validator client. For more information please refer to the lighthouse ui configuration [`api token section`](./ui-configuration.md#api-token).
## 2. How do I fix the Node Network Errors?
## 3. How do I fix the Node Network Errors?
If you recieve a red notification with a BEACON or VALIDATOR NODE NETWORK ERROR you can refer to the lighthouse ui configuration and [`connecting to clients section`](./ui-configuration.md#connecting-to-the-clients).
## 3. How do I change my Beacon or Validator address after logging in?
## 4. How do I change my Beacon or Validator address after logging in?
Once you have successfully arrived to the main dashboard, use the sidebar to access the settings view. In the top right hand corner there is a `Configurtion` action button that will redirect you back to the configuration screen where you can make appropriate changes.
## 4. Why doesn't my validator balance graph show any data?
## 5. Why doesn't my validator balance graph show any data?
If your graph is not showing data, it usually means your validator node is still caching data. The application must wait at least 3 epochs before it can render any graphical visualizations. This could take up to 20min.

View File

@ -2,6 +2,8 @@
Siren runs on Linux, MacOS and Windows.
## Version Requirement
The Siren app requires Lighthouse v3.5.1 or higher to function properly. These versions can be found on the [releases](https://github.com/sigp/lighthouse/releases) page of the Lighthouse repository.
## Pre-Built Electron Packages
@ -16,7 +18,7 @@ Simply download the package specific to your operating system and run it.
### Requirements
Building from source requires `Node v18` and `yarn`.
Building from source requires `Node v18` and `yarn`.
### Building From Source

View File

@ -160,6 +160,12 @@ where
self.map.contains(key)
}
/// Shrink the mappings to fit the current size.
pub fn shrink_to_fit(&mut self) {
self.map.shrink_to_fit();
self.list.shrink_to_fit();
}
#[cfg(test)]
#[track_caller]
fn check_invariant(&self) {

View File

@ -12,7 +12,7 @@ use test_random_derive::TestRandom;
use tree_hash::TreeHash;
use tree_hash_derive::TreeHash;
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum BlockType {
Blinded,
Full,

View File

@ -4,7 +4,7 @@ version = "3.5.1"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2021"
autotests = false
rust-version = "1.65"
rust-version = "1.66"
[features]
default = ["slasher-mdbx"]