From aeb52ff18639b1c8980ac1fc3be997d2b3dd7b7d Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Sat, 17 Sep 2022 16:10:09 +0200 Subject: [PATCH] network stuff --- .../src/rpc/codec/ssz_snappy.rs | 1 + .../lighthouse_network/src/rpc/methods.rs | 5 ++++ .../lighthouse_network/src/rpc/outbound.rs | 7 ++++++ .../lighthouse_network/src/rpc/protocol.rs | 8 +++++++ beacon_node/network/src/router/mod.rs | 7 ++++++ beacon_node/network/src/router/processor.rs | 24 ++++++++++++++++++- 6 files changed, 51 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index f19c7b5a4..9bcca2e6b 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -70,6 +70,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => // Encode the correct version of the MetaData response based on the negotiated version. diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index a7bd51106..ad67c29ba 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -336,6 +336,7 @@ impl RPCCodedResponse { RPCResponse::Status(_) => false, RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, + RPCResponse::BlobsByRange(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, }, @@ -370,6 +371,7 @@ impl RPCResponse { RPCResponse::Status(_) => Protocol::Status, RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, + RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, } @@ -404,6 +406,9 @@ impl std::fmt::Display for RPCResponse { } RPCResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) + } + RPCResponse::BlobsByRange(blob) => { + write!(f, "BlobsByRange: Blob slot: {}", blob.len()) } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 1c6920165..4f5b2a578 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -76,6 +76,9 @@ impl OutboundRequest { ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), ], + OutboundRequest::BlobsByRange(_) => vec![ + ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), + ], OutboundRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, @@ -97,6 +100,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => 0, OutboundRequest::BlocksByRange(req) => req.count, OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, + OutboundRequest::BlobsByRange(req) => req.count, OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, } @@ -109,6 +113,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => Protocol::Goodbye, OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, + OutboundRequest::BlobsByRange(_) => Protocol::BlobsByRange, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::MetaData(_) => Protocol::MetaData, } @@ -122,6 +127,7 @@ impl OutboundRequest { // variants that have `multiple_responses()` can have values. OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + OutboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), @@ -177,6 +183,7 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + OutboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 4de2948ff..203f5d0b5 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -189,6 +189,7 @@ impl std::fmt::Display for Protocol { Protocol::Goodbye => "goodbye", Protocol::BlocksByRange => "beacon_blocks_by_range", Protocol::BlocksByRoot => "beacon_blocks_by_root", + Protocol::BlobsByRange => "blobs_sidecars_by_range", Protocol::Ping => "ping", Protocol::MetaData => "metadata", }; @@ -297,6 +298,9 @@ impl ProtocolId { Protocol::BlocksByRoot => { RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) } + Protocol::BlobsByRange => { + RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) + } Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -315,6 +319,7 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), + Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), @@ -469,6 +474,9 @@ impl InboundRequest { ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), ], + InboundRequest::BlobsByRange(_) => vec![ + ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), + ], InboundRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 0c9b41779..24a202c49 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -168,6 +168,9 @@ impl Router { Request::BlocksByRoot(request) => self .processor .on_blocks_by_root_request(peer_id, id, request), + Request::BlobsByRange(request) => self + .processor + .on_blobs_by_range_request(peer_id, id, request), } } @@ -192,6 +195,10 @@ impl Router { self.processor .on_blocks_by_root_response(peer_id, request_id, beacon_block); } + Response::BlobsByRange(beacon_blob) => { + self.processor + .on_blobs_by_range_response(peer_id, request_id, beacon_blob); + } } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index c716707f3..2d12cf540 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -7,6 +7,7 @@ use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::rpc::*; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, }; @@ -18,7 +19,7 @@ use store::SyncCommitteeMessage; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, + SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, VariableList, blobs_sidecar::BlobsSidecar, }; use types::signed_blobs_sidecar::SignedBlobsSidecar; @@ -161,6 +162,18 @@ impl Processor { )) } + pub fn on_blobs_by_range_request( + &mut self, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRangeRequest, + ) { + /* + self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request( + peer_id, request_id, request, + )) + */ + } /// Handle a `BlocksByRange` request from the peer. pub fn on_blocks_by_range_request( &mut self, @@ -235,6 +248,15 @@ impl Processor { }); } + pub fn on_blobs_by_range_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + beacon_blob: Option, <::EthSpec as EthSpec>::MaxRequestBlobsSidecars>>>, + ) { + + } + /// Process a gossip message declaring a new block. /// /// Attempts to apply to block to the beacon chain. May queue the block for later processing.