diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 0f2913595..905ce998f 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -501,6 +501,7 @@ impl PeerManager { Protocol::Ping => PeerAction::MidToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::TxBlobsByRange => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -515,6 +516,7 @@ impl PeerManager { match protocol { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, + Protocol::TxBlobsByRange => return, Protocol::BlocksByRoot => return, Protocol::Goodbye => return, Protocol::MetaData => PeerAction::LowToleranceError, @@ -530,6 +532,7 @@ impl PeerManager { ConnectionDirection::Outgoing => match protocol { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, + Protocol::TxBlobsByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::Goodbye => return, Protocol::MetaData => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 9c84305e4..7b83eb1b6 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -15,10 +15,7 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; -use types::{ - EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockMerge, SignedBeaconBlockShanghai, -}; +use types::{BlobWrapper, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockMerge, SignedBeaconBlockShanghai}; use unsigned_varint::codec::Uvi; const CONTEXT_BYTES_LEN: usize = 4; @@ -69,6 +66,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCCodedResponse::Success(resp) => match &resp { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), + RPCResponse::TxBlobsByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => @@ -227,6 +225,7 @@ impl Encoder> for SSZSnappyOutboundCodec< OutboundRequest::Status(req) => req.as_ssz_bytes(), OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(), + OutboundRequest::TxBlobsByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode @@ -469,6 +468,9 @@ fn handle_v1_request( Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange( OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), + Protocol::TxBlobsByRange => Ok(Some(InboundRequest::TxBlobsByRange( + TxBlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))), Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, }))), @@ -501,6 +503,9 @@ fn handle_v2_request( Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange( OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), + Protocol::TxBlobsByRange => Ok(Some(InboundRequest::TxBlobsByRange( + TxBlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))), Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, }))), @@ -538,6 +543,9 @@ fn handle_v1_response( Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), + Protocol::TxBlobsByRange => Ok(Some(RPCResponse::TxBlobsByRange(Arc::new( + BlobWrapper::from_ssz_bytes(decoded_buffer)?), + ))), Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), @@ -595,6 +603,13 @@ fn handle_v2_response( )?), )))), }, + Protocol::TxBlobsByRange => { + Ok(Some(RPCResponse::TxBlobsByRange(Box::new( + BlobWrapper::from_ssz_bytes( + decoded_buffer, + )? + )))) + }, Protocol::BlocksByRoot => match fork_name { ForkName::Altair => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes( diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 26d755a6e..8aa04866c 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -12,7 +12,7 @@ use std::ops::Deref; use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobWrapper, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Maximum number of blocks in a single request. pub type MaxRequestBlocks = U1024; @@ -221,6 +221,12 @@ pub struct OldBlocksByRangeRequest { pub step: u64, } +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct TxBlobsByRangeRequest { + pub execution_block_number: u64, + pub count: u64, +} + /// Request a number of beacon block bodies from a peer. #[derive(Clone, Debug, PartialEq)] pub struct BlocksByRootRequest { @@ -240,6 +246,8 @@ pub enum RPCResponse { /// batch. BlocksByRange(Arc>), + TxBlobsByRange(Box>), + /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), @@ -256,6 +264,8 @@ pub enum ResponseTermination { /// Blocks by range stream termination. BlocksByRange, + TxBlobsByRange, + /// Blocks by root stream termination. BlocksByRoot, } @@ -318,6 +328,7 @@ impl RPCCodedResponse { RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(_) => false, RPCResponse::BlocksByRange(_) => true, + RPCResponse::TxBlobsByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, @@ -385,6 +396,9 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlocksByRange(block) => { write!(f, "BlocksByRange: Block slot: {}", block.slot()) } + RPCResponse::TxBlobsByRange(blob) => { + write!(f, "TxBlobsByRange: Block slot: {}", blob.beacon_block_slot) + } RPCResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } @@ -436,6 +450,16 @@ impl std::fmt::Display for OldBlocksByRangeRequest { } } +impl std::fmt::Display for TxBlobsByRangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Execution block number: {}, Count: {}", + self.execution_block_number, self.count + ) + } +} + impl slog::KV for StatusMessage { fn serialize( &self, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 7b0092ef7..2d374c770 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -125,6 +125,12 @@ impl RPC { methods::MAX_REQUEST_BLOCKS, Duration::from_secs(10), ) + //FIXME(sean) + .n_every( + Protocol::TxBlobsByRange, + methods::MAX_REQUEST_BLOCKS, + Duration::from_secs(10), + ) .n_every(Protocol::BlocksByRoot, 128, Duration::from_secs(10)) .build() .expect("Configuration parameters are valid"); diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 7d5acc436..7664a5752 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -37,6 +37,7 @@ pub enum OutboundRequest { Status(StatusMessage), Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), + TxBlobsByRange(TxBlobsByRangeRequest), BlocksByRoot(BlocksByRootRequest), Ping(Ping), MetaData(PhantomData), @@ -71,6 +72,10 @@ impl OutboundRequest { ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), ], + //FIXME(sean) what should the protocol version be? + OutboundRequest::TxBlobsByRange(_) => vec![ + ProtocolId::new(Protocol::TxBlobsByRange, Version::V2, Encoding::SSZSnappy), + ], OutboundRequest::BlocksByRoot(_) => vec![ ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), @@ -95,6 +100,7 @@ impl OutboundRequest { OutboundRequest::Status(_) => 1, OutboundRequest::Goodbye(_) => 0, OutboundRequest::BlocksByRange(req) => req.count, + OutboundRequest::TxBlobsByRange(req) => req.count, OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, @@ -107,6 +113,7 @@ impl OutboundRequest { OutboundRequest::Status(_) => Protocol::Status, OutboundRequest::Goodbye(_) => Protocol::Goodbye, OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, + OutboundRequest::TxBlobsByRange(_) => Protocol::TxBlobsByRange, OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::MetaData(_) => Protocol::MetaData, @@ -120,6 +127,7 @@ impl OutboundRequest { // this only gets called after `multiple_responses()` returns true. Therefore, only // variants that have `multiple_responses()` can have values. OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, + OutboundRequest::TxBlobsByRange(_) => ResponseTermination::TxBlobsByRange, OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), @@ -175,6 +183,7 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::Status(status) => write!(f, "Status Message: {}", status), OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), + OutboundRequest::TxBlobsByRange(req) => write!(f, "Blobs by range: {}", req), OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 81960214b..579a31d95 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -20,10 +20,7 @@ use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, }; -use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EthSpec, ForkContext, - ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, -}; +use types::{BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, BlobWrapper, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock}; lazy_static! { // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is @@ -71,6 +68,12 @@ lazy_static! { + types::ExecutionPayload::::max_execution_payload_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET; // Adding the additional ssz offset for the `ExecutionPayload` field + pub static ref BLOB_MIN: usize = BlobWrapper::::empty() + .as_ssz_bytes() + .len(); + + pub static ref BLOB_MAX: usize = BlobWrapper::::max_size(); + pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) .as_ssz_bytes() @@ -147,6 +150,7 @@ pub enum Protocol { Goodbye, /// The `BlocksByRange` protocol name. BlocksByRange, + TxBlobsByRange, /// The `BlocksByRoot` protocol name. BlocksByRoot, /// The `Ping` protocol name. @@ -176,6 +180,8 @@ impl std::fmt::Display for Protocol { Protocol::Status => "status", Protocol::Goodbye => "goodbye", Protocol::BlocksByRange => "beacon_blocks_by_range", + //FIXME(sean) verify + Protocol::TxBlobsByRange => "tx_blobs_by_range", Protocol::BlocksByRoot => "beacon_blocks_by_root", Protocol::Ping => "ping", Protocol::MetaData => "metadata", @@ -282,6 +288,12 @@ impl ProtocolId { ::ssz_fixed_len(), ::ssz_fixed_len(), ), + Protocol::TxBlobsByRange => { + RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ) + } Protocol::BlocksByRoot => { RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) } @@ -451,6 +463,11 @@ impl InboundRequest { ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), ], + //FIXME(sean) do I need v1 + InboundRequest::TxBlobsByRange(_) => vec![ + // V2 has higher preference when negotiating a stream + ProtocolId::new(Protocol::TxBlobsByRange, Version::V2, Encoding::SSZSnappy), + ], InboundRequest::BlocksByRoot(_) => vec![ // V2 has higher preference when negotiating a stream ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), @@ -476,6 +493,7 @@ impl InboundRequest { InboundRequest::Status(_) => 1, InboundRequest::Goodbye(_) => 0, InboundRequest::BlocksByRange(req) => req.count, + InboundRequest::TxBlobsByRange(req) => req.count, InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, @@ -488,6 +506,7 @@ impl InboundRequest { InboundRequest::Status(_) => Protocol::Status, InboundRequest::Goodbye(_) => Protocol::Goodbye, InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, + InboundRequest::TxBlobsByRange(_) => Protocol::TxBlobsByRange, InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::MetaData(_) => Protocol::MetaData, @@ -501,6 +520,7 @@ impl InboundRequest { // this only gets called after `multiple_responses()` returns true. Therefore, only // variants that have `multiple_responses()` can have values. InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, + InboundRequest::TxBlobsByRange(_) => ResponseTermination::TxBlobsByRange, InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), @@ -606,6 +626,7 @@ impl std::fmt::Display for InboundRequest { InboundRequest::Status(status) => write!(f, "Status Message: {}", status), InboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), + InboundRequest::TxBlobsByRange(req) => write!(f, "Blobs by range: {}", req), InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::MetaData(_) => write!(f, "MetaData request"), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 70b14c33d..6d6d34462 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -71,6 +71,7 @@ pub struct RPCRateLimiter { status_rl: Limiter, /// BlocksByRange rate limiter. bbrange_rl: Limiter, + txbbrange_rl: Limiter, /// BlocksByRoot rate limiter. bbroots_rl: Limiter, } @@ -96,6 +97,7 @@ pub struct RPCRateLimiterBuilder { status_quota: Option, /// Quota for the BlocksByRange protocol. bbrange_quota: Option, + txbbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, } @@ -115,6 +117,7 @@ impl RPCRateLimiterBuilder { Protocol::MetaData => self.metadata_quota = q, Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, + Protocol::TxBlobsByRange => self.txbbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, } self @@ -155,6 +158,9 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let txbbrange_quota = self + .txbbrange_quota + .ok_or("TxBlobsByRange quota not specified")?; // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; @@ -163,6 +169,7 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let txbbrange_rl = Limiter::from_quota(txbbrange_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds let prune_every = tokio::time::Duration::from_secs(30); @@ -176,6 +183,7 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + txbbrange_rl, init_time: Instant::now(), }) } @@ -198,6 +206,7 @@ impl RPCRateLimiter { Protocol::MetaData => &mut self.metadata_rl, Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, + Protocol::TxBlobsByRange => &mut self.txbbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, }; check(limiter) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 53d29ccb2..02b0f60e9 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -39,7 +39,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, + BlobWrapper, SignedBeaconBlock, SyncSubnetId }; +use crate::rpc::methods::TxBlobsByRangeRequest; use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER}; use self::behaviour::Behaviour; @@ -981,6 +983,9 @@ impl Network { Request::BlocksByRange { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_range"]) } + Request::TxBlobsByRange { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["tx_blobs_by_range"]) + } Request::BlocksByRoot { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"]) } @@ -1271,6 +1276,9 @@ impl Network { RPCResponse::BlocksByRange(resp) => { self.build_response(id, peer_id, Response::BlocksByRange(Some(resp))) } + RPCResponse::TxBlobsByRange(resp) => { + self.propagate_response(id, peer_id, Response::TxBlobsByRange(Some(resp))) + } RPCResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } @@ -1279,6 +1287,7 @@ impl Network { Ok(RPCReceived::EndOfStream(id, termination)) => { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), + ResponseTermination::TxBlobsByRange => Response::TxBlobsByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), }; self.build_response(id, peer_id, response) diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 03b877506..e17f10858 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -165,6 +165,9 @@ impl Router { Request::BlocksByRange(request) => self .processor .on_blocks_by_range_request(peer_id, id, request), + Request::TxBlobsByRange(request) => self + .processor + .on_tx_blobs_by_range_request(peer_id, id, request), Request::BlocksByRoot(request) => self .processor .on_blocks_by_root_request(peer_id, id, request), @@ -188,6 +191,10 @@ impl Router { self.processor .on_blocks_by_range_response(peer_id, request_id, beacon_block); } + Response::TxBlobsByRange(blob_wrapper) => { + self.processor + .on_tx_blobs_by_range_response(peer_id, request_id, blob_wrapper); + } Response::BlocksByRoot(beacon_block) => { self.processor .on_blocks_by_root_response(peer_id, request_id, beacon_block); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index ce11cbdce..6bc642206 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -16,10 +16,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::SyncCommitteeMessage; use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, -}; +use lighthouse_network::rpc::methods::TxBlobsByRangeRequest; +use types::{Attestation, AttesterSlashing, BlobWrapper, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId}; /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. @@ -204,6 +202,25 @@ impl Processor { }); } + /// Handle a `BlocksByRange` request from the peer. + pub fn on_tx_blobs_by_range_request( + &mut self, + peer_id: PeerId, + request_id: PeerRequestId, + req: TxBlobsByRangeRequest, + ) { + //FIXME(sean) + } + + pub fn on_tx_blobs_by_range_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + blob_wrapper: Option>>, + ) { + //FIXME(sean) + } + /// Handle a `BlocksByRoot` response from the peer. pub fn on_blocks_by_root_response( &mut self, diff --git a/consensus/types/src/blob_wrapper.rs b/consensus/types/src/blob_wrapper.rs index 23f685e1e..4368be0bd 100644 --- a/consensus/types/src/blob_wrapper.rs +++ b/consensus/types/src/blob_wrapper.rs @@ -1,14 +1,27 @@ use crate::{Blob, EthSpec, Hash256, SignedBeaconBlock, Slot}; use serde_derive::{Deserialize, Serialize}; -use ssz_derive::Encode; +use ssz_derive::{Encode, Decode}; use ssz_types::VariableList; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; +use ssz::{Decode, Encode}; #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash)] +#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default)] pub struct BlobWrapper { pub beacon_block_root: Hash256, pub beacon_block_slot: Slot, pub blobs: VariableList, E::MaxObjectListSize>, } + +impl BlobWrapper { + pub fn empty() -> Self { + Self::default() + } + pub fn max_size() -> usize { + // Fixed part + Self::empty().as_ssz_bytes().len() + // Max size of variable length `blobs` field + + (E::max_object_list_size() * as Encode>::ssz_fixed_len()) + } +} \ No newline at end of file diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 3089380b5..6bf462b02 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -227,6 +227,14 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq + fn bytes_per_logs_bloom() -> usize { Self::BytesPerLogsBloom::to_usize() } + + fn max_object_list_size() -> usize { + Self::MaxObjectListSize::to_usize() + } + + fn chunks_per_blob() -> usize { + Self::ChunksPerBlob::to_usize() + } } /// Macro to inherit some type values from another EthSpec. diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index e11bab770..c75766cb7 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -169,6 +169,7 @@ pub use crate::validator_registration_data::*; pub use crate::validator_subscription::ValidatorSubscription; pub use crate::voluntary_exit::VoluntaryExit; use serde_big_array::BigArray; +pub use crate::blob_wrapper::BlobWrapper; pub type CommitteeIndex = u64; pub type Hash256 = H256;