From 36a0add0cd8d2bc2b22f8b3603d3427b0d02d78e Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Sat, 17 Sep 2022 15:23:28 +0200 Subject: [PATCH] network stuff --- .../src/peer_manager/mod.rs | 3 +++ .../src/rpc/codec/ssz_snappy.rs | 23 +++++++++++++++++++ beacon_node/lighthouse_network/src/rpc/mod.rs | 1 + .../lighthouse_network/src/rpc/protocol.rs | 6 +++++ .../src/rpc/rate_limiter.rs | 11 +++++++++ 5 files changed, 44 insertions(+) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 55b388445..05acef663 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -497,6 +497,7 @@ impl PeerManager { Protocol::Ping => PeerAction::MidToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -512,6 +513,7 @@ impl PeerManager { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, + Protocol::BlobsByRange => return, Protocol::Goodbye => return, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -527,6 +529,7 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::Goodbye => return, Protocol::MetaData => return, Protocol::Status => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index e952eece3..f19c7b5a4 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -228,6 +228,7 @@ impl Encoder> for SSZSnappyOutboundCodec< OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), + OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode }; @@ -473,6 +474,9 @@ fn handle_v1_request( Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, }))), + Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange( + BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))), Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -505,6 +509,9 @@ fn handle_v2_request( Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, }))), + Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange( + BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))), // MetaData requests return early from InboundUpgrade and do not reach the decoder. // Handle this case just for completeness. Protocol::MetaData => { @@ -542,6 +549,9 @@ fn handle_v1_response( Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), + Protocol::BlobsByRange => Err(RPCError::InvalidData( + "blobs by range via v1".to_string(), + )), Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -616,6 +626,15 @@ fn handle_v2_response( )?), )))), }, + Protocol::BlobsByRange => match fork_name { + ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new( + VariableList::from_ssz_bytes(decoded_buffer)?, + )))), + _ => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid forkname for blobsbyrange".to_string(), + )), + } _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, "Invalid v2 request".to_string(), @@ -672,6 +691,7 @@ mod tests { ForkName::Base => Slot::new(0), ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Eip4844 => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), }; ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } @@ -875,6 +895,9 @@ mod tests { OutboundRequest::BlocksByRoot(bbroot) => { assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot)) } + OutboundRequest::BlobsByRange(blbrange) => { + assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange)) + } OutboundRequest::Ping(ping) => { assert_eq!(decoded, InboundRequest::Ping(ping)) } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 0bedd423b..6197d57f1 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -292,6 +292,7 @@ where match end { ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::BlobsByRange => Protocol::BlobsByRange, }, ), }, diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 7a280ed5d..4de2948ff 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -159,6 +159,8 @@ pub enum Protocol { BlocksByRange, /// The `BlocksByRoot` protocol name. BlocksByRoot, + /// The `BlobsByRange` protocol name. + BlobsByRange, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. @@ -488,6 +490,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => 0, InboundRequest::BlocksByRange(req) => req.count, InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, + InboundRequest::BlobsByRange(req) => req.count, InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, } @@ -500,6 +503,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => Protocol::Goodbye, InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, + InboundRequest::BlobsByRange(_) => Protocol::BlobsByRange, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::MetaData(_) => Protocol::MetaData, } @@ -513,6 +517,7 @@ impl InboundRequest { // variants that have `multiple_responses()` can have values. InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + InboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), @@ -618,6 +623,7 @@ impl std::fmt::Display for InboundRequest { InboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + InboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::MetaData(_) => write!(f, "MetaData request"), } diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 70b14c33d..8cd1e749e 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -73,6 +73,8 @@ pub struct RPCRateLimiter { bbrange_rl: Limiter, /// BlocksByRoot rate limiter. bbroots_rl: Limiter, + /// BlobsByRange rate limiter. + blbrange_rl: Limiter, } /// Error type for non conformant requests @@ -98,6 +100,8 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the BlocksByRange protocol. + blbrange_quota: Option, } impl RPCRateLimiterBuilder { @@ -116,6 +120,7 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::BlobsByRange => self.blbrange_quota = q, } self } @@ -156,6 +161,8 @@ impl RPCRateLimiterBuilder { .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let blbrange_quota = self.blbrange_quota.ok_or("BlobsByRange quota not specified")?; + // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; let metadata_rl = Limiter::from_quota(metadata_quota)?; @@ -163,6 +170,7 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let blbrange_rl = Limiter::from_quota(blbrange_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds let prune_every = tokio::time::Duration::from_secs(30); @@ -176,6 +184,7 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + blbrange_rl, init_time: Instant::now(), }) } @@ -199,6 +208,7 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::BlobsByRange => &mut self.blbrange_rl, }; check(limiter) } @@ -211,6 +221,7 @@ impl RPCRateLimiter { self.goodbye_rl.prune(time_since_start); self.bbrange_rl.prune(time_since_start); self.bbroots_rl.prune(time_since_start); + self.blbrange_rl.prune(time_since_start); } }