network: blobs by range message

This commit is contained in:
Marius van der Wijden 2022-09-17 14:55:18 +02:00
parent bcc738cb9d
commit acace8ab31
6 changed files with 47 additions and 2 deletions

View File

@ -10,6 +10,7 @@ use crate::peer_manager::{
ConnectionDirection, PeerManager, PeerManagerEvent, ConnectionDirection, PeerManager, PeerManagerEvent,
}; };
use crate::rpc::*; use crate::rpc::*;
use crate::rpc::methods::BlobsByRangeRequest;
use crate::service::{Context as ServiceContext, METADATA_FILENAME}; use crate::service::{Context as ServiceContext, METADATA_FILENAME};
use crate::types::{ use crate::types::{
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet,
@ -39,6 +40,7 @@ use libp2p::{
}; };
use slog::{crit, debug, o, trace, warn}; use slog::{crit, debug, o, trace, warn};
use ssz::Encode; use ssz::Encode;
use types::blobs_sidecar::BlobsSidecar;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use std::io::Write; use std::io::Write;
@ -51,7 +53,7 @@ use std::{
}; };
use types::{ use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext,
SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, VariableList
}; };
use self::gossip_cache::GossipCache; use self::gossip_cache::GossipCache;
@ -797,6 +799,9 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Behaviour<AppReqId, TSpec> {
Request::BlocksByRoot { .. } => { Request::BlocksByRoot { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"]) metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"])
} }
Request::BlobsByRange { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"])
}
} }
self.add_event(BehaviourEvent::RequestReceived { self.add_event(BehaviourEvent::RequestReceived {
peer_id, peer_id,
@ -1095,6 +1100,9 @@ where
InboundRequest::BlocksByRoot(req) => { InboundRequest::BlocksByRoot(req) => {
self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req))
} }
InboundRequest::BlobsByRange(req) => {
self.propagate_request(peer_request_id, peer_id, Request::BlobsByRange(req))
}
} }
} }
Ok(RPCReceived::Response(id, resp)) => { Ok(RPCReceived::Response(id, resp)) => {
@ -1117,12 +1125,16 @@ where
RPCResponse::BlocksByRoot(resp) => { RPCResponse::BlocksByRoot(resp) => {
self.propagate_response(id, peer_id, Response::BlocksByRoot(Some(resp))) self.propagate_response(id, peer_id, Response::BlocksByRoot(Some(resp)))
} }
RPCResponse::BlobsByRange(resp) => {
self.propagate_response(id, peer_id, Response::BlobsByRange(Some(resp)))
}
} }
} }
Ok(RPCReceived::EndOfStream(id, termination)) => { Ok(RPCReceived::EndOfStream(id, termination)) => {
let response = match termination { let response = match termination {
ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
ResponseTermination::BlobsByRange => Response::BlobsByRange(None),
}; };
self.propagate_response(id, peer_id, response); self.propagate_response(id, peer_id, response);
} }
@ -1329,6 +1341,8 @@ pub enum Request {
BlocksByRange(BlocksByRangeRequest), BlocksByRange(BlocksByRangeRequest),
/// A request blocks root request. /// A request blocks root request.
BlocksByRoot(BlocksByRootRequest), BlocksByRoot(BlocksByRootRequest),
/// A blobs by range request.
BlobsByRange(BlobsByRangeRequest),
} }
impl<TSpec: EthSpec> std::convert::From<Request> for OutboundRequest<TSpec> { impl<TSpec: EthSpec> std::convert::From<Request> for OutboundRequest<TSpec> {
@ -1342,6 +1356,7 @@ impl<TSpec: EthSpec> std::convert::From<Request> for OutboundRequest<TSpec> {
step: 1, step: 1,
}) })
} }
Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r),
Request::Status(s) => OutboundRequest::Status(s), Request::Status(s) => OutboundRequest::Status(s),
} }
} }
@ -1361,6 +1376,8 @@ pub enum Response<TSpec: EthSpec> {
BlocksByRange(Option<Arc<SignedBeaconBlock<TSpec>>>), BlocksByRange(Option<Arc<SignedBeaconBlock<TSpec>>>),
/// A response to a get BLOCKS_BY_ROOT request. /// A response to a get BLOCKS_BY_ROOT request.
BlocksByRoot(Option<Arc<SignedBeaconBlock<TSpec>>>), BlocksByRoot(Option<Arc<SignedBeaconBlock<TSpec>>>),
/// A response to a get BLOBS_BY_RANGE request
BlobsByRange(Option<Arc<VariableList<BlobsSidecar<TSpec>, TSpec::MaxRequestBlobsSidecars>>>)
} }
impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TSpec> { impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TSpec> {
@ -1374,6 +1391,10 @@ impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TS
Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)), Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)),
None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange),
}, },
Response::BlobsByRange(r) => match r {
Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRange(b)),
None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRange),
}
Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)),
} }
} }

View File

@ -13,6 +13,7 @@ use std::sync::Arc;
use strum::IntoStaticStr; use strum::IntoStaticStr;
use superstruct::superstruct; use superstruct::superstruct;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::blobs_sidecar::BlobsSidecar;
/// Maximum number of blocks in a single request. /// Maximum number of blocks in a single request.
pub type MaxRequestBlocks = U1024; pub type MaxRequestBlocks = U1024;
@ -204,6 +205,16 @@ pub struct BlocksByRangeRequest {
pub count: u64, pub count: u64,
} }
/// Request a number of beacon blobs from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlobsByRangeRequest {
/// The starting slot to request blobs.
pub start_slot: u64,
/// The number of blobs from the start slot.
pub count: u64,
}
/// Request a number of beacon block roots from a peer. /// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)] #[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct OldBlocksByRangeRequest { pub struct OldBlocksByRangeRequest {
@ -243,6 +254,9 @@ pub enum RPCResponse<T: EthSpec> {
/// A response to a get BLOCKS_BY_ROOT request. /// A response to a get BLOCKS_BY_ROOT request.
BlocksByRoot(Arc<SignedBeaconBlock<T>>), BlocksByRoot(Arc<SignedBeaconBlock<T>>),
/// A response to a get BLOBS_BY_RANGE request
BlobsByRange(Arc<VariableList<BlobsSidecar<T>, T::MaxRequestBlobsSidecars>>),
/// A PONG response to a PING request. /// A PONG response to a PING request.
Pong(Ping), Pong(Ping),
@ -258,6 +272,9 @@ pub enum ResponseTermination {
/// Blocks by root stream termination. /// Blocks by root stream termination.
BlocksByRoot, BlocksByRoot,
// Blobs by range stream termination.
BlobsByRange
} }
/// The structured response containing a result/code indicating success or failure /// The structured response containing a result/code indicating success or failure

View File

@ -38,6 +38,7 @@ pub enum OutboundRequest<TSpec: EthSpec> {
Goodbye(GoodbyeReason), Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest), BlocksByRange(OldBlocksByRangeRequest),
BlocksByRoot(BlocksByRootRequest), BlocksByRoot(BlocksByRootRequest),
BlobsByRange(BlobsByRangeRequest),
Ping(Ping), Ping(Ping),
MetaData(PhantomData<TSpec>), MetaData(PhantomData<TSpec>),
} }

View File

@ -427,6 +427,7 @@ pub enum InboundRequest<TSpec: EthSpec> {
Goodbye(GoodbyeReason), Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest), BlocksByRange(OldBlocksByRangeRequest),
BlocksByRoot(BlocksByRootRequest), BlocksByRoot(BlocksByRootRequest),
BlobsByRange(BlobsByRangeRequest),
Ping(Ping), Ping(Ping),
MetaData(PhantomData<TSpec>), MetaData(PhantomData<TSpec>),
} }

View File

@ -210,6 +210,7 @@ impl std::fmt::Display for GossipTopic {
let kind = match self.kind { let kind = match self.kind {
GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(),
GossipKind::BlobsSidecar => BLOBS_SIDECAR_TOPIC.into(),
GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(),
GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(),
GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(),

View File

@ -100,6 +100,7 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq +
*/ */
type MaxBlobsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq; type MaxBlobsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type FieldElementsPerBlob: Unsigned + Clone + Sync + Send + Debug + PartialEq; type FieldElementsPerBlob: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type MaxRequestBlobsSidecars: Unsigned + Clone + Sync + Send + Debug + PartialEq;
/* /*
* Derived values (set these CAREFULLY) * Derived values (set these CAREFULLY)
*/ */
@ -277,6 +278,7 @@ impl EthSpec for MainnetEthSpec {
type SlotsPerEth1VotingPeriod = U2048; // 64 epochs * 32 slots per epoch type SlotsPerEth1VotingPeriod = U2048; // 64 epochs * 32 slots per epoch
type MaxBlobsPerBlock = U16; type MaxBlobsPerBlock = U16;
type FieldElementsPerBlob = U4096; type FieldElementsPerBlob = U4096;
type MaxRequestBlobsSidecars = U128;
fn default_spec() -> ChainSpec { fn default_spec() -> ChainSpec {
ChainSpec::mainnet() ChainSpec::mainnet()
@ -323,7 +325,8 @@ impl EthSpec for MinimalEthSpec {
MinGasLimit, MinGasLimit,
MaxExtraDataBytes, MaxExtraDataBytes,
MaxBlobsPerBlock, MaxBlobsPerBlock,
FieldElementsPerBlob FieldElementsPerBlob,
MaxRequestBlobsSidecars
}); });
fn default_spec() -> ChainSpec { fn default_spec() -> ChainSpec {
@ -370,6 +373,7 @@ impl EthSpec for GnosisEthSpec {
type SlotsPerEth1VotingPeriod = U1024; // 64 epochs * 16 slots per epoch type SlotsPerEth1VotingPeriod = U1024; // 64 epochs * 16 slots per epoch
type MaxBlobsPerBlock = U16; type MaxBlobsPerBlock = U16;
type FieldElementsPerBlob = U4096; type FieldElementsPerBlob = U4096;
type MaxRequestBlobsSidecars = U128;
fn default_spec() -> ChainSpec { fn default_spec() -> ChainSpec {
ChainSpec::gnosis() ChainSpec::gnosis()