network stuff

This commit is contained in:
Marius van der Wijden 2022-09-17 15:23:28 +02:00
parent 0518665949
commit 36a0add0cd
5 changed files with 44 additions and 0 deletions

View File

@ -497,6 +497,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::Ping => PeerAction::MidToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
@ -512,6 +513,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::Ping => PeerAction::Fatal,
Protocol::BlocksByRange => return,
Protocol::BlocksByRoot => return,
Protocol::BlobsByRange => return,
Protocol::Goodbye => return,
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
@ -527,6 +529,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::Ping => PeerAction::LowToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::Goodbye => return,
Protocol::MetaData => return,
Protocol::Status => return,

View File

@ -228,6 +228,7 @@ impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
@ -473,6 +474,9 @@ fn handle_v1_request<T: EthSpec>(
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
@ -505,6 +509,9 @@ fn handle_v2_request<T: EthSpec>(
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
// Handle this case just for completeness.
Protocol::MetaData => {
@ -542,6 +549,9 @@ fn handle_v1_response<T: EthSpec>(
Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Protocol::BlobsByRange => Err(RPCError::InvalidData(
"blobs by range via v1".to_string(),
)),
Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
@ -616,6 +626,15 @@ fn handle_v2_response<T: EthSpec>(
)?),
)))),
},
Protocol::BlobsByRange => match fork_name {
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new(
VariableList::from_ssz_bytes(decoded_buffer)?,
)))),
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid forkname for blobsbyrange".to_string(),
)),
}
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid v2 request".to_string(),
@ -672,6 +691,7 @@ mod tests {
ForkName::Base => Slot::new(0),
ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()),
ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()),
ForkName::Eip4844 => merge_fork_epoch.start_slot(Spec::slots_per_epoch()),
};
ForkContext::new::<Spec>(current_slot, Hash256::zero(), &chain_spec)
}
@ -875,6 +895,9 @@ mod tests {
OutboundRequest::BlocksByRoot(bbroot) => {
assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot))
}
OutboundRequest::BlobsByRange(blbrange) => {
assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange))
}
OutboundRequest::Ping(ping) => {
assert_eq!(decoded, InboundRequest::Ping(ping))
}

View File

@ -292,6 +292,7 @@ where
match end {
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
},
),
},

View File

@ -159,6 +159,8 @@ pub enum Protocol {
BlocksByRange,
/// The `BlocksByRoot` protocol name.
BlocksByRoot,
/// The `BlobsByRange` protocol name.
BlobsByRange,
/// The `Ping` protocol name.
Ping,
/// The `MetaData` protocol name.
@ -488,6 +490,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::Goodbye(_) => 0,
InboundRequest::BlocksByRange(req) => req.count,
InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
InboundRequest::BlobsByRange(req) => req.count,
InboundRequest::Ping(_) => 1,
InboundRequest::MetaData(_) => 1,
}
@ -500,6 +503,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::Goodbye(_) => Protocol::Goodbye,
InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
InboundRequest::BlobsByRange(_) => Protocol::BlobsByRange,
InboundRequest::Ping(_) => Protocol::Ping,
InboundRequest::MetaData(_) => Protocol::MetaData,
}
@ -513,6 +517,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
// variants that have `multiple_responses()` can have values.
InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
InboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange,
InboundRequest::Status(_) => unreachable!(),
InboundRequest::Goodbye(_) => unreachable!(),
InboundRequest::Ping(_) => unreachable!(),
@ -618,6 +623,7 @@ impl<TSpec: EthSpec> std::fmt::Display for InboundRequest<TSpec> {
InboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason),
InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
InboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req),
InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
InboundRequest::MetaData(_) => write!(f, "MetaData request"),
}

View File

@ -73,6 +73,8 @@ pub struct RPCRateLimiter {
bbrange_rl: Limiter<PeerId>,
/// BlocksByRoot rate limiter.
bbroots_rl: Limiter<PeerId>,
/// BlobsByRange rate limiter.
blbrange_rl: Limiter<PeerId>,
}
/// Error type for non conformant requests
@ -98,6 +100,8 @@ pub struct RPCRateLimiterBuilder {
bbrange_quota: Option<Quota>,
/// Quota for the BlocksByRoot protocol.
bbroots_quota: Option<Quota>,
/// Quota for the BlocksByRange protocol.
blbrange_quota: Option<Quota>,
}
impl RPCRateLimiterBuilder {
@ -116,6 +120,7 @@ impl RPCRateLimiterBuilder {
Protocol::Goodbye => self.goodbye_quota = q,
Protocol::BlocksByRange => self.bbrange_quota = q,
Protocol::BlocksByRoot => self.bbroots_quota = q,
Protocol::BlobsByRange => self.blbrange_quota = q,
}
self
}
@ -156,6 +161,8 @@ impl RPCRateLimiterBuilder {
.bbrange_quota
.ok_or("BlocksByRange quota not specified")?;
let blbrange_quota = self.blbrange_quota.ok_or("BlobsByRange quota not specified")?;
// create the rate limiters
let ping_rl = Limiter::from_quota(ping_quota)?;
let metadata_rl = Limiter::from_quota(metadata_quota)?;
@ -163,6 +170,7 @@ impl RPCRateLimiterBuilder {
let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
let bbroots_rl = Limiter::from_quota(bbroots_quota)?;
let bbrange_rl = Limiter::from_quota(bbrange_quota)?;
let blbrange_rl = Limiter::from_quota(blbrange_quota)?;
// check for peers to prune every 30 seconds, starting in 30 seconds
let prune_every = tokio::time::Duration::from_secs(30);
@ -176,6 +184,7 @@ impl RPCRateLimiterBuilder {
goodbye_rl,
bbroots_rl,
bbrange_rl,
blbrange_rl,
init_time: Instant::now(),
})
}
@ -199,6 +208,7 @@ impl RPCRateLimiter {
Protocol::Goodbye => &mut self.goodbye_rl,
Protocol::BlocksByRange => &mut self.bbrange_rl,
Protocol::BlocksByRoot => &mut self.bbroots_rl,
Protocol::BlobsByRange => &mut self.blbrange_rl,
};
check(limiter)
}
@ -211,6 +221,7 @@ impl RPCRateLimiter {
self.goodbye_rl.prune(time_since_start);
self.bbrange_rl.prune(time_since_start);
self.bbroots_rl.prune(time_since_start);
self.blbrange_rl.prune(time_since_start);
}
}