From acace8ab311d23760a87d2bc6ac79d5b097906d9 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Sat, 17 Sep 2022 14:55:18 +0200 Subject: [PATCH] network: blobs by range message --- .../lighthouse_network/src/behaviour/mod.rs | 23 ++++++++++++++++++- .../lighthouse_network/src/rpc/methods.rs | 17 ++++++++++++++ .../lighthouse_network/src/rpc/outbound.rs | 1 + .../lighthouse_network/src/rpc/protocol.rs | 1 + .../lighthouse_network/src/types/topics.rs | 1 + consensus/types/src/eth_spec.rs | 6 ++++- 6 files changed, 47 insertions(+), 2 deletions(-) diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index 9c9e094db..898fea4a7 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -10,6 +10,7 @@ use crate::peer_manager::{ ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::rpc::*; +use crate::rpc::methods::BlobsByRangeRequest; use crate::service::{Context as ServiceContext, METADATA_FILENAME}; use crate::types::{ subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, @@ -39,6 +40,7 @@ use libp2p::{ }; use slog::{crit, debug, o, trace, warn}; use ssz::Encode; +use types::blobs_sidecar::BlobsSidecar; use std::collections::HashSet; use std::fs::File; use std::io::Write; @@ -51,7 +53,7 @@ use std::{ }; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, - SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, + SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, VariableList }; use self::gossip_cache::GossipCache; @@ -797,6 +799,9 @@ impl Behaviour { Request::BlocksByRoot { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"]) } + Request::BlobsByRange { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]) + } } self.add_event(BehaviourEvent::RequestReceived { peer_id, @@ -1095,6 +1100,9 @@ where InboundRequest::BlocksByRoot(req) => { self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) } + InboundRequest::BlobsByRange(req) => { + self.propagate_request(peer_request_id, peer_id, Request::BlobsByRange(req)) + } } } Ok(RPCReceived::Response(id, resp)) => { @@ -1117,12 +1125,16 @@ where RPCResponse::BlocksByRoot(resp) => { self.propagate_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RPCResponse::BlobsByRange(resp) => { + self.propagate_response(id, peer_id, Response::BlobsByRange(Some(resp))) + } } } Ok(RPCReceived::EndOfStream(id, termination)) => { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + ResponseTermination::BlobsByRange => Response::BlobsByRange(None), }; self.propagate_response(id, peer_id, response); } @@ -1329,6 +1341,8 @@ pub enum Request { BlocksByRange(BlocksByRangeRequest), /// A request blocks root request. BlocksByRoot(BlocksByRootRequest), + /// A blobs by range request. + BlobsByRange(BlobsByRangeRequest), } impl std::convert::From for OutboundRequest { @@ -1342,6 +1356,7 @@ impl std::convert::From for OutboundRequest { step: 1, }) } + Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r), Request::Status(s) => OutboundRequest::Status(s), } } @@ -1361,6 +1376,8 @@ pub enum Response { BlocksByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get BLOBS_BY_RANGE request + BlobsByRange(Option, TSpec::MaxRequestBlobsSidecars>>>) } impl std::convert::From> for RPCCodedResponse { @@ -1374,6 +1391,10 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, + Response::BlobsByRange(r) => match r { + Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRange(b)), + None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRange), + } Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 26d755a6e..a7bd51106 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::blobs_sidecar::BlobsSidecar; /// Maximum number of blocks in a single request. pub type MaxRequestBlocks = U1024; @@ -204,6 +205,16 @@ pub struct BlocksByRangeRequest { pub count: u64, } +/// Request a number of beacon blobs from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct BlobsByRangeRequest { + /// The starting slot to request blobs. + pub start_slot: u64, + + /// The number of blobs from the start slot. + pub count: u64, +} + /// Request a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct OldBlocksByRangeRequest { @@ -243,6 +254,9 @@ pub enum RPCResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get BLOBS_BY_RANGE request + BlobsByRange(Arc, T::MaxRequestBlobsSidecars>>), + /// A PONG response to a PING request. Pong(Ping), @@ -258,6 +272,9 @@ pub enum ResponseTermination { /// Blocks by root stream termination. BlocksByRoot, + + // Blobs by range stream termination. + BlobsByRange } /// The structured response containing a result/code indicating success or failure diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 7d5acc436..1c6920165 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -38,6 +38,7 @@ pub enum OutboundRequest { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + BlobsByRange(BlobsByRangeRequest), Ping(Ping), MetaData(PhantomData), } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 352348f74..7a280ed5d 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -427,6 +427,7 @@ pub enum InboundRequest { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + BlobsByRange(BlobsByRangeRequest), Ping(Ping), MetaData(PhantomData), } diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 901a193e3..f119a5855 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -210,6 +210,7 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), + GossipKind::BlobsSidecar => BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 9d8a765c2..9de4777f5 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -100,6 +100,7 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq + */ type MaxBlobsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq; type FieldElementsPerBlob: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type MaxRequestBlobsSidecars: Unsigned + Clone + Sync + Send + Debug + PartialEq; /* * Derived values (set these CAREFULLY) */ @@ -277,6 +278,7 @@ impl EthSpec for MainnetEthSpec { type SlotsPerEth1VotingPeriod = U2048; // 64 epochs * 32 slots per epoch type MaxBlobsPerBlock = U16; type FieldElementsPerBlob = U4096; + type MaxRequestBlobsSidecars = U128; fn default_spec() -> ChainSpec { ChainSpec::mainnet() @@ -323,7 +325,8 @@ impl EthSpec for MinimalEthSpec { MinGasLimit, MaxExtraDataBytes, MaxBlobsPerBlock, - FieldElementsPerBlob + FieldElementsPerBlob, + MaxRequestBlobsSidecars }); fn default_spec() -> ChainSpec { @@ -370,6 +373,7 @@ impl EthSpec for GnosisEthSpec { type SlotsPerEth1VotingPeriod = U1024; // 64 epochs * 16 slots per epoch type MaxBlobsPerBlock = U16; type FieldElementsPerBlob = U4096; + type MaxRequestBlobsSidecars = U128; fn default_spec() -> ChainSpec { ChainSpec::gnosis()