network stuff

This commit is contained in:
Marius van der Wijden 2022-09-17 16:10:09 +02:00
parent d4d40be870
commit aeb52ff186
6 changed files with 51 additions and 1 deletions

View File

@ -70,6 +70,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::Status(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => RPCResponse::MetaData(res) =>
// Encode the correct version of the MetaData response based on the negotiated version. // Encode the correct version of the MetaData response based on the negotiated version.

View File

@ -336,6 +336,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
RPCResponse::Status(_) => false, RPCResponse::Status(_) => false,
RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true,
RPCResponse::Pong(_) => false, RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false, RPCResponse::MetaData(_) => false,
}, },
@ -370,6 +371,7 @@ impl<T: EthSpec> RPCResponse<T> {
RPCResponse::Status(_) => Protocol::Status, RPCResponse::Status(_) => Protocol::Status,
RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange,
RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot,
RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange,
RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::Pong(_) => Protocol::Ping,
RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::MetaData(_) => Protocol::MetaData,
} }
@ -404,6 +406,9 @@ impl<T: EthSpec> std::fmt::Display for RPCResponse<T> {
} }
RPCResponse::BlocksByRoot(block) => { RPCResponse::BlocksByRoot(block) => {
write!(f, "BlocksByRoot: Block slot: {}", block.slot()) write!(f, "BlocksByRoot: Block slot: {}", block.slot())
}
RPCResponse::BlobsByRange(blob) => {
write!(f, "BlobsByRange: Blob slot: {}", blob.len())
} }
RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data),
RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()),

View File

@ -76,6 +76,9 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
], ],
OutboundRequest::BlobsByRange(_) => vec![
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
],
OutboundRequest::Ping(_) => vec![ProtocolId::new( OutboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping, Protocol::Ping,
Version::V1, Version::V1,
@ -97,6 +100,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::Goodbye(_) => 0, OutboundRequest::Goodbye(_) => 0,
OutboundRequest::BlocksByRange(req) => req.count, OutboundRequest::BlocksByRange(req) => req.count,
OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
OutboundRequest::BlobsByRange(req) => req.count,
OutboundRequest::Ping(_) => 1, OutboundRequest::Ping(_) => 1,
OutboundRequest::MetaData(_) => 1, OutboundRequest::MetaData(_) => 1,
} }
@ -109,6 +113,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::Goodbye(_) => Protocol::Goodbye, OutboundRequest::Goodbye(_) => Protocol::Goodbye,
OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
OutboundRequest::BlobsByRange(_) => Protocol::BlobsByRange,
OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::Ping(_) => Protocol::Ping,
OutboundRequest::MetaData(_) => Protocol::MetaData, OutboundRequest::MetaData(_) => Protocol::MetaData,
} }
@ -122,6 +127,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
// variants that have `multiple_responses()` can have values. // variants that have `multiple_responses()` can have values.
OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
OutboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange,
OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Status(_) => unreachable!(),
OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(),
OutboundRequest::Ping(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(),
@ -177,6 +183,7 @@ impl<TSpec: EthSpec> std::fmt::Display for OutboundRequest<TSpec> {
OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason),
OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
OutboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req),
OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
OutboundRequest::MetaData(_) => write!(f, "MetaData request"), OutboundRequest::MetaData(_) => write!(f, "MetaData request"),
} }

View File

@ -189,6 +189,7 @@ impl std::fmt::Display for Protocol {
Protocol::Goodbye => "goodbye", Protocol::Goodbye => "goodbye",
Protocol::BlocksByRange => "beacon_blocks_by_range", Protocol::BlocksByRange => "beacon_blocks_by_range",
Protocol::BlocksByRoot => "beacon_blocks_by_root", Protocol::BlocksByRoot => "beacon_blocks_by_root",
Protocol::BlobsByRange => "blobs_sidecars_by_range",
Protocol::Ping => "ping", Protocol::Ping => "ping",
Protocol::MetaData => "metadata", Protocol::MetaData => "metadata",
}; };
@ -297,6 +298,9 @@ impl ProtocolId {
Protocol::BlocksByRoot => { Protocol::BlocksByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
} }
Protocol::BlobsByRange => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
}
Protocol::Ping => RpcLimits::new( Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(), <Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(), <Ping as Encode>::ssz_fixed_len(),
@ -315,6 +319,7 @@ impl ProtocolId {
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::Ping => RpcLimits::new( Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(), <Ping as Encode>::ssz_fixed_len(),
@ -469,6 +474,9 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
], ],
InboundRequest::BlobsByRange(_) => vec![
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
],
InboundRequest::Ping(_) => vec![ProtocolId::new( InboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping, Protocol::Ping,
Version::V1, Version::V1,

View File

@ -168,6 +168,9 @@ impl<T: BeaconChainTypes> Router<T> {
Request::BlocksByRoot(request) => self Request::BlocksByRoot(request) => self
.processor .processor
.on_blocks_by_root_request(peer_id, id, request), .on_blocks_by_root_request(peer_id, id, request),
Request::BlobsByRange(request) => self
.processor
.on_blobs_by_range_request(peer_id, id, request),
} }
} }
@ -192,6 +195,10 @@ impl<T: BeaconChainTypes> Router<T> {
self.processor self.processor
.on_blocks_by_root_response(peer_id, request_id, beacon_block); .on_blocks_by_root_response(peer_id, request_id, beacon_block);
} }
Response::BlobsByRange(beacon_blob) => {
self.processor
.on_blobs_by_range_response(peer_id, request_id, beacon_blob);
}
} }
} }

View File

@ -7,6 +7,7 @@ use crate::sync::manager::RequestId as SyncId;
use crate::sync::SyncMessage; use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::rpc::*; use lighthouse_network::rpc::*;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::{ use lighthouse_network::{
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response,
}; };
@ -18,7 +19,7 @@ use store::SyncCommitteeMessage;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{ use types::{
Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof, Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, VariableList, blobs_sidecar::BlobsSidecar,
}; };
use types::signed_blobs_sidecar::SignedBlobsSidecar; use types::signed_blobs_sidecar::SignedBlobsSidecar;
@ -161,6 +162,18 @@ impl<T: BeaconChainTypes> Processor<T> {
)) ))
} }
pub fn on_blobs_by_range_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRangeRequest,
) {
/*
self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request(
peer_id, request_id, request,
))
*/
}
/// Handle a `BlocksByRange` request from the peer. /// Handle a `BlocksByRange` request from the peer.
pub fn on_blocks_by_range_request( pub fn on_blocks_by_range_request(
&mut self, &mut self,
@ -235,6 +248,15 @@ impl<T: BeaconChainTypes> Processor<T> {
}); });
} }
pub fn on_blobs_by_range_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
beacon_blob: Option<Arc<VariableList<BlobsSidecar<T::EthSpec>, <<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxRequestBlobsSidecars>>>,
) {
}
/// Process a gossip message declaring a new block. /// Process a gossip message declaring a new block.
/// ///
/// Attempts to apply to block to the beacon chain. May queue the block for later processing. /// Attempts to apply to block to the beacon chain. May queue the block for later processing.