split outbound and inbound codecs encoded types (#2410)

Splits the inbound and outbound requests, for maintainability.
This commit is contained in:
divma 2021-06-17 00:40:16 +00:00
parent a526145b4a
commit 3261eff0bf
10 changed files with 304 additions and 154 deletions

View File

@ -38,7 +38,7 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
pub enum BehaviourHandlerIn<TSpec: EthSpec> { pub enum BehaviourHandlerIn<TSpec: EthSpec> {
Delegate(DelegateIn<TSpec>), Delegate(DelegateIn<TSpec>),
/// Start the shutdown process. /// Start the shutdown process.
Shutdown(Option<(RequestId, RPCRequest<TSpec>)>), Shutdown(Option<(RequestId, OutboundRequest<TSpec>)>),
} }
impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> { impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {

View File

@ -595,7 +595,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id); trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id);
self.eth2_rpc self.eth2_rpc
.send_request(peer_id, id, RPCRequest::Ping(ping)); .send_request(peer_id, id, OutboundRequest::Ping(ping));
} }
/// Sends a Pong response to the peer. /// Sends a Pong response to the peer.
@ -610,7 +610,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Sends a METADATA request to a peer. /// Sends a METADATA request to a peer.
fn send_meta_data_request(&mut self, peer_id: PeerId) { fn send_meta_data_request(&mut self, peer_id: PeerId) {
let event = RPCRequest::MetaData(PhantomData); let event = OutboundRequest::MetaData(PhantomData);
self.eth2_rpc self.eth2_rpc
.send_request(peer_id, RequestId::Behaviour, event); .send_request(peer_id, RequestId::Behaviour, event);
} }
@ -749,17 +749,17 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
let peer_request_id = (handler_id, id); let peer_request_id = (handler_id, id);
match request { match request {
/* Behaviour managed protocols: Ping and Metadata */ /* Behaviour managed protocols: Ping and Metadata */
RPCRequest::Ping(ping) => { InboundRequest::Ping(ping) => {
// inform the peer manager and send the response // inform the peer manager and send the response
self.peer_manager.ping_request(&peer_id, ping.data); self.peer_manager.ping_request(&peer_id, ping.data);
// send a ping response // send a ping response
self.pong(peer_request_id, peer_id); self.pong(peer_request_id, peer_id);
} }
RPCRequest::MetaData(_) => { InboundRequest::MetaData(_) => {
// send the requested meta-data // send the requested meta-data
self.send_meta_data_response((handler_id, id), peer_id); self.send_meta_data_response((handler_id, id), peer_id);
} }
RPCRequest::Goodbye(reason) => { InboundRequest::Goodbye(reason) => {
// queue for disconnection without a goodbye message // queue for disconnection without a goodbye message
debug!( debug!(
self.log, "Peer sent Goodbye"; self.log, "Peer sent Goodbye";
@ -775,18 +775,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// inform the application layer early. // inform the application layer early.
} }
/* Protocols propagated to the Network */ /* Protocols propagated to the Network */
RPCRequest::Status(msg) => { InboundRequest::Status(msg) => {
// inform the peer manager that we have received a status from a peer // inform the peer manager that we have received a status from a peer
self.peer_manager.peer_statusd(&peer_id); self.peer_manager.peer_statusd(&peer_id);
// propagate the STATUS message upwards // propagate the STATUS message upwards
self.propagate_request(peer_request_id, peer_id, Request::Status(msg)) self.propagate_request(peer_request_id, peer_id, Request::Status(msg))
} }
RPCRequest::BlocksByRange(req) => self.propagate_request( InboundRequest::BlocksByRange(req) => self.propagate_request(
peer_request_id, peer_request_id,
peer_id, peer_id,
Request::BlocksByRange(req), Request::BlocksByRange(req),
), ),
RPCRequest::BlocksByRoot(req) => { InboundRequest::BlocksByRoot(req) => {
self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req))
} }
} }
@ -834,7 +834,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
peer_id, peer_id,
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown( event: BehaviourHandlerIn::Shutdown(
reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))), reason.map(|reason| (RequestId::Behaviour, OutboundRequest::Goodbye(reason))),
), ),
}); });
} }
@ -878,7 +878,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(Some(( event: BehaviourHandlerIn::Shutdown(Some((
RequestId::Behaviour, RequestId::Behaviour,
RPCRequest::Goodbye(reason), OutboundRequest::Goodbye(reason),
))), ))),
}); });
} }
@ -1293,12 +1293,12 @@ pub enum Request {
BlocksByRoot(BlocksByRootRequest), BlocksByRoot(BlocksByRootRequest),
} }
impl<TSpec: EthSpec> std::convert::From<Request> for RPCRequest<TSpec> { impl<TSpec: EthSpec> std::convert::From<Request> for OutboundRequest<TSpec> {
fn from(req: Request) -> RPCRequest<TSpec> { fn from(req: Request) -> OutboundRequest<TSpec> {
match req { match req {
Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r), Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r),
Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r), Request::BlocksByRange(r) => OutboundRequest::BlocksByRange(r),
Request::Status(s) => RPCRequest::Status(s), Request::Status(s) => OutboundRequest::Status(s),
} }
} }
} }

View File

@ -1,7 +1,7 @@
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC. //! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
use crate::rpc::methods::ErrorType; use crate::rpc::methods::ErrorType;
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BufMut; use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut; use libp2p::bytes::BytesMut;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -47,7 +47,7 @@ where
// This deals with Decoding RPC Responses from other peers and encoding our requests // This deals with Decoding RPC Responses from other peers and encoding our requests
pub struct BaseOutboundCodec<TOutboundCodec, TSpec> pub struct BaseOutboundCodec<TOutboundCodec, TSpec>
where where
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>, TOutboundCodec: OutboundCodec<OutboundRequest<TSpec>>,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// Inner codec for handling various encodings. /// Inner codec for handling various encodings.
@ -60,7 +60,7 @@ where
impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec> impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>, TOutboundCodec: OutboundCodec<OutboundRequest<TSpec>>,
{ {
pub fn new(codec: TOutboundCodec) -> Self { pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec { BaseOutboundCodec {
@ -102,9 +102,9 @@ where
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec> impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = RPCRequest<TSpec>>, TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = InboundRequest<TSpec>>,
{ {
type Item = RPCRequest<TSpec>; type Item = InboundRequest<TSpec>;
type Error = <TCodec as Decoder>::Error; type Error = <TCodec as Decoder>::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -115,14 +115,18 @@ where
/* Base Outbound Codec */ /* Base Outbound Codec */
// This Encodes RPC Requests sent to external peers // This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec> impl<TCodec, TSpec> Encoder<OutboundRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>, TCodec: OutboundCodec<OutboundRequest<TSpec>> + Encoder<OutboundRequest<TSpec>>,
{ {
type Error = <TCodec as Encoder<RPCRequest<TSpec>>>::Error; type Error = <TCodec as Encoder<OutboundRequest<TSpec>>>::Error;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
self.inner.encode(item, dst) self.inner.encode(item, dst)
} }
} }
@ -131,7 +135,7 @@ where
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec> impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>, CodecErrorType = ErrorType> TCodec: OutboundCodec<OutboundRequest<TSpec>, CodecErrorType = ErrorType>
+ Decoder<Item = RPCResponse<TSpec>>, + Decoder<Item = RPCResponse<TSpec>>,
{ {
type Item = RPCCodedResponse<TSpec>; type Item = RPCCodedResponse<TSpec>;

View File

@ -4,7 +4,7 @@ pub(crate) mod ssz_snappy;
use self::base::{BaseInboundCodec, BaseOutboundCodec}; use self::base::{BaseInboundCodec, BaseOutboundCodec};
use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError; use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCCodedResponse, RPCRequest}; use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse};
use libp2p::bytes::BytesMut; use libp2p::bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec; use types::EthSpec;
@ -29,7 +29,7 @@ impl<T: EthSpec> Encoder<RPCCodedResponse<T>> for InboundCodec<T> {
} }
impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> { impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
type Item = RPCRequest<TSpec>; type Item = InboundRequest<TSpec>;
type Error = RPCError; type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -39,10 +39,14 @@ impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
} }
} }
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for OutboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for OutboundCodec<TSpec> {
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match self { match self {
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
} }

View File

@ -1,9 +1,8 @@
use crate::rpc::methods::*;
use crate::rpc::{ use crate::rpc::{
codec::base::OutboundCodec, codec::base::OutboundCodec,
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
}; };
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use crate::rpc::{methods::*, InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BytesMut; use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder; use snap::read::FrameDecoder;
use snap::write::FrameEncoder; use snap::write::FrameEncoder;
@ -90,7 +89,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
// Decoder for inbound streams: Decodes RPC requests from peers // Decoder for inbound streams: Decodes RPC requests from peers
impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> { impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCRequest<TSpec>; type Item = InboundRequest<TSpec>;
type Error = RPCError; type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -133,27 +132,29 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
// since we have already checked `length` above. // since we have already checked `length` above.
match self.protocol.message_name { match self.protocol.message_name {
Protocol::Status => match self.protocol.version { Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( Version::V1 => Ok(Some(InboundRequest::Status(
&decoded_buffer, StatusMessage::from_ssz_bytes(&decoded_buffer)?,
)?))), ))),
}, },
Protocol::Goodbye => match self.protocol.version { Protocol::Goodbye => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Goodbye( Version::V1 => Ok(Some(InboundRequest::Goodbye(
GoodbyeReason::from_ssz_bytes(&decoded_buffer)?, GoodbyeReason::from_ssz_bytes(&decoded_buffer)?,
))), ))),
}, },
Protocol::BlocksByRange => match self.protocol.version { Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRange( Version::V1 => Ok(Some(InboundRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))), ))),
}, },
Protocol::BlocksByRoot => match self.protocol.version { Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { Version::V1 => {
Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
}))), })))
}
}, },
Protocol::Ping => match self.protocol.version { Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Ping(Ping { Version::V1 => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?, data: u64::from_ssz_bytes(&decoded_buffer)?,
}))), }))),
}, },
@ -163,7 +164,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
if !decoded_buffer.is_empty() { if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData) Err(RPCError::InvalidData)
} else { } else {
Ok(Some(RPCRequest::MetaData(PhantomData))) Ok(Some(InboundRequest::MetaData(PhantomData)))
} }
} }
}, },
@ -201,17 +202,21 @@ impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
} }
// Encoder for outbound streams: Encodes RPC Requests to peers // Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let bytes = match item { let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(), OutboundRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(), OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
RPCRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(),
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
}; };
// SSZ encoded bytes should be within `max_packet_size` // SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size { if bytes.len() > self.max_packet_size {
@ -318,7 +323,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
} }
} }
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> { impl<TSpec: EthSpec> OutboundCodec<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type CodecErrorType = ErrorType; type CodecErrorType = ErrorType;
fn decode_error( fn decode_error(

View File

@ -2,9 +2,10 @@
#![allow(clippy::cognitive_complexity)] #![allow(clippy::cognitive_complexity)]
use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination}; use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination};
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use super::protocol::{Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCSend}; use super::{RPCReceived, RPCSend};
use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::prelude::*; use futures::prelude::*;
use futures::{Sink, SinkExt}; use futures::{Sink, SinkExt};
@ -90,7 +91,7 @@ where
events_out: SmallVec<[HandlerEvent<TSpec>; 4]>, events_out: SmallVec<[HandlerEvent<TSpec>; 4]>,
/// Queue of outbound substreams to open. /// Queue of outbound substreams to open.
dial_queue: SmallVec<[(RequestId, RPCRequest<TSpec>); 4]>, dial_queue: SmallVec<[(RequestId, OutboundRequest<TSpec>); 4]>,
/// Current number of concurrent outbound substreams being opened. /// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32, dial_negotiated: u32,
@ -186,7 +187,7 @@ pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// The framed negotiated substream. /// The framed negotiated substream.
substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>, substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>,
/// Keeps track of the actual request sent. /// Keeps track of the actual request sent.
request: RPCRequest<TSpec>, request: OutboundRequest<TSpec>,
}, },
/// Closing an outbound substream> /// Closing an outbound substream>
Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>), Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>),
@ -221,7 +222,7 @@ where
} }
/// Initiates the handler's shutdown process, sending an optional last message to the peer. /// Initiates the handler's shutdown process, sending an optional last message to the peer.
pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest<TSpec>)>) { pub fn shutdown(&mut self, final_msg: Option<(RequestId, OutboundRequest<TSpec>)>) {
if matches!(self.state, HandlerState::Active) { if matches!(self.state, HandlerState::Active) {
if !self.dial_queue.is_empty() { if !self.dial_queue.is_empty() {
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
@ -247,7 +248,7 @@ where
} }
/// Opens an outbound substream with a request. /// Opens an outbound substream with a request.
fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) { fn send_request(&mut self, id: RequestId, req: OutboundRequest<TSpec>) {
match self.state { match self.state {
HandlerState::Active => { HandlerState::Active => {
self.dial_queue.push((id, req)); self.dial_queue.push((id, req));
@ -303,8 +304,8 @@ where
type OutEvent = HandlerEvent<TSpec>; type OutEvent = HandlerEvent<TSpec>;
type Error = RPCError; type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>; type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>; type OutboundProtocol = OutboundRequest<TSpec>;
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request type OutboundOpenInfo = (RequestId, OutboundRequest<TSpec>); // Keep track of the id and the request
type InboundOpenInfo = (); type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {

View File

@ -21,35 +21,37 @@ use types::EthSpec;
pub(crate) use handler::HandlerErr; pub(crate) use handler::HandlerErr;
pub(crate) use methods::{MetaData, Ping, RPCCodedResponse, RPCResponse}; pub(crate) use methods::{MetaData, Ping, RPCCodedResponse, RPCResponse};
pub(crate) use protocol::{RPCProtocol, RPCRequest}; pub(crate) use protocol::{InboundRequest, RPCProtocol};
pub use handler::SubstreamId; pub use handler::SubstreamId;
pub use methods::{ pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, MaxRequestBlocks, BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, MaxRequestBlocks,
RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS,
}; };
pub(crate) use outbound::OutboundRequest;
pub use protocol::{Protocol, RPCError}; pub use protocol::{Protocol, RPCError};
pub(crate) mod codec; pub(crate) mod codec;
mod handler; mod handler;
pub mod methods; pub mod methods;
mod outbound;
mod protocol; mod protocol;
mod rate_limiter; mod rate_limiter;
/// RPC events sent from Lighthouse. /// RPC events sent from Lighthouse.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCSend<T: EthSpec> { pub enum RPCSend<TSpec: EthSpec> {
/// A request sent from Lighthouse. /// A request sent from Lighthouse.
/// ///
/// The `RequestId` is given by the application making the request. These /// The `RequestId` is given by the application making the request. These
/// go over *outbound* connections. /// go over *outbound* connections.
Request(RequestId, RPCRequest<T>), Request(RequestId, OutboundRequest<TSpec>),
/// A response sent from Lighthouse. /// A response sent from Lighthouse.
/// ///
/// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the /// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the
/// peer. The second parameter is a single chunk of a response. These go over *inbound* /// peer. The second parameter is a single chunk of a response. These go over *inbound*
/// connections. /// connections.
Response(SubstreamId, RPCCodedResponse<T>), Response(SubstreamId, RPCCodedResponse<TSpec>),
} }
/// RPC events received from outside Lighthouse. /// RPC events received from outside Lighthouse.
@ -59,7 +61,7 @@ pub enum RPCReceived<T: EthSpec> {
/// ///
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the /// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
/// *inbound* substream over which it is managed. /// *inbound* substream over which it is managed.
Request(SubstreamId, RPCRequest<T>), Request(SubstreamId, InboundRequest<T>),
/// A response received from the outside. /// A response received from the outside.
/// ///
/// The `RequestId` corresponds to the application given ID of the original request sent to the /// The `RequestId` corresponds to the application given ID of the original request sent to the
@ -150,7 +152,7 @@ impl<TSpec: EthSpec> RPC<TSpec> {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
event: RPCRequest<TSpec>, event: OutboundRequest<TSpec>,
) { ) {
self.events.push(NetworkBehaviourAction::NotifyHandler { self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
@ -188,7 +190,8 @@ where
fn inject_connected(&mut self, peer_id: &PeerId) { fn inject_connected(&mut self, peer_id: &PeerId) {
// find the peer's meta-data // find the peer's meta-data
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id); debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
let rpc_event = RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData)); let rpc_event =
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
self.events.push(NetworkBehaviourAction::NotifyHandler { self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id, peer_id: *peer_id,
handler: NotifyHandler::Any, handler: NotifyHandler::Any,

View File

@ -0,0 +1,177 @@
use std::marker::PhantomData;
use super::methods::*;
use super::protocol::Protocol;
use super::protocol::ProtocolId;
use super::RPCError;
use crate::rpc::protocol::Encoding;
use crate::rpc::protocol::Version;
use crate::rpc::{
codec::{base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec},
methods::ResponseTermination,
};
use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
use futures::{FutureExt, SinkExt};
use libp2p::core::{OutboundUpgrade, UpgradeInfo};
use tokio_util::{
codec::Framed,
compat::{Compat, FuturesAsyncReadCompatExt},
};
use types::EthSpec;
/* Outbound request */
// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and
// `OutboundUpgrade`
#[derive(Debug, Clone, PartialEq)]
pub enum OutboundRequest<TSpec: EthSpec> {
Status(StatusMessage),
Goodbye(GoodbyeReason),
BlocksByRange(BlocksByRangeRequest),
BlocksByRoot(BlocksByRootRequest),
Ping(Ping),
MetaData(PhantomData<TSpec>),
}
impl<TSpec: EthSpec> UpgradeInfo for OutboundRequest<TSpec> {
type Info = ProtocolId;
type InfoIter = Vec<Self::Info>;
// add further protocols as we support more encodings/versions
fn protocol_info(&self) -> Self::InfoIter {
self.supported_protocols()
}
}
/// Implements the encoding per supported protocol for `RPCRequest`.
impl<TSpec: EthSpec> OutboundRequest<TSpec> {
pub fn supported_protocols(&self) -> Vec<ProtocolId> {
match self {
// add more protocols when versions/encodings are supported
OutboundRequest::Status(_) => vec![ProtocolId::new(
Protocol::Status,
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::Goodbye(_) => vec![ProtocolId::new(
Protocol::Goodbye,
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::BlocksByRange(_) => vec![ProtocolId::new(
Protocol::BlocksByRange,
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::BlocksByRoot(_) => vec![ProtocolId::new(
Protocol::BlocksByRoot,
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping,
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::MetaData(_) => vec![ProtocolId::new(
Protocol::MetaData,
Version::V1,
Encoding::SSZSnappy,
)],
}
}
/* These functions are used in the handler for stream management */
/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
match self {
OutboundRequest::Status(_) => 1,
OutboundRequest::Goodbye(_) => 0,
OutboundRequest::BlocksByRange(req) => req.count,
OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
OutboundRequest::Ping(_) => 1,
OutboundRequest::MetaData(_) => 1,
}
}
/// Gives the corresponding `Protocol` to this request.
pub fn protocol(&self) -> Protocol {
match self {
OutboundRequest::Status(_) => Protocol::Status,
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
OutboundRequest::Ping(_) => Protocol::Ping,
OutboundRequest::MetaData(_) => Protocol::MetaData,
}
}
/// Returns the `ResponseTermination` type associated with the request if a stream gets
/// terminated.
pub fn stream_termination(&self) -> ResponseTermination {
match self {
// this only gets called after `multiple_responses()` returns true. Therefore, only
// variants that have `multiple_responses()` can have values.
OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
OutboundRequest::Status(_) => unreachable!(),
OutboundRequest::Goodbye(_) => unreachable!(),
OutboundRequest::Ping(_) => unreachable!(),
OutboundRequest::MetaData(_) => unreachable!(),
}
}
}
/* RPC Response type - used for outbound upgrades */
/* Outbound upgrades */
pub type OutboundFramed<TSocket, TSpec> = Framed<Compat<TSocket>, OutboundCodec<TSpec>>;
impl<TSocket, TSpec> OutboundUpgrade<TSocket> for OutboundRequest<TSpec>
where
TSpec: EthSpec + Send + 'static,
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = OutboundFramed<TSocket, TSpec>;
type Error = RPCError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future {
// convert to a tokio compatible socket
let socket = socket.compat();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(
protocol,
usize::max_value(),
));
OutboundCodec::SSZSnappy(ssz_snappy_codec)
}
};
let mut socket = Framed::new(socket, codec);
async {
socket.send(self).await?;
socket.close().await?;
Ok(socket)
}
.boxed()
}
}
impl<TSpec: EthSpec> std::fmt::Display for OutboundRequest<TSpec> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutboundRequest::Status(status) => write!(f, "Status Message: {}", status),
OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason),
OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
OutboundRequest::MetaData(_) => write!(f, "MetaData request"),
}
}
}

View File

@ -1,17 +1,13 @@
use super::methods::*; use super::methods::*;
use crate::rpc::{ use crate::rpc::{
codec::{ codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec},
base::{BaseInboundCodec, BaseOutboundCodec},
ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec},
InboundCodec, OutboundCodec,
},
methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN}, methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN},
MaxRequestBlocks, MAX_REQUEST_BLOCKS, MaxRequestBlocks, MAX_REQUEST_BLOCKS,
}; };
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite}; use futures::prelude::{AsyncRead, AsyncWrite};
use futures::{FutureExt, SinkExt, StreamExt}; use futures::{FutureExt, StreamExt};
use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use libp2p::core::{InboundUpgrade, ProtocolName, UpgradeInfo};
use ssz::Encode; use ssz::Encode;
use ssz_types::VariableList; use ssz_types::VariableList;
use std::io; use std::io;
@ -78,7 +74,7 @@ const TTFB_TIMEOUT: u64 = 5;
const REQUEST_TIMEOUT: u64 = 15; const REQUEST_TIMEOUT: u64 = 15;
/// Protocol names to be used. /// Protocol names to be used.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol { pub enum Protocol {
/// The Status protocol name. /// The Status protocol name.
Status, Status,
@ -276,7 +272,7 @@ impl ProtocolName for ProtocolId {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol // The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready. // handler to respond to once ready.
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>); pub type InboundOutput<TSocket, TSpec> = (InboundRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> = pub type InboundFramed<TSocket, TSpec> =
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, InboundCodec<TSpec>>; Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, InboundCodec<TSpec>>;
@ -308,7 +304,7 @@ where
// MetaData requests should be empty, return the stream // MetaData requests should be empty, return the stream
match protocol_name { match protocol_name {
Protocol::MetaData => Ok((RPCRequest::MetaData(PhantomData), socket)), Protocol::MetaData => Ok((InboundRequest::MetaData(PhantomData), socket)),
_ => { _ => {
match tokio::time::timeout( match tokio::time::timeout(
Duration::from_secs(REQUEST_TIMEOUT), Duration::from_secs(REQUEST_TIMEOUT),
@ -328,13 +324,8 @@ where
} }
} }
/* Outbound request */
// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and
// `OutboundUpgrade`
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum RPCRequest<TSpec: EthSpec> { pub enum InboundRequest<TSpec: EthSpec> {
Status(StatusMessage), Status(StatusMessage),
Goodbye(GoodbyeReason), Goodbye(GoodbyeReason),
BlocksByRange(BlocksByRangeRequest), BlocksByRange(BlocksByRangeRequest),
@ -343,7 +334,7 @@ pub enum RPCRequest<TSpec: EthSpec> {
MetaData(PhantomData<TSpec>), MetaData(PhantomData<TSpec>),
} }
impl<TSpec: EthSpec> UpgradeInfo for RPCRequest<TSpec> { impl<TSpec: EthSpec> UpgradeInfo for InboundRequest<TSpec> {
type Info = ProtocolId; type Info = ProtocolId;
type InfoIter = Vec<Self::Info>; type InfoIter = Vec<Self::Info>;
@ -354,36 +345,36 @@ impl<TSpec: EthSpec> UpgradeInfo for RPCRequest<TSpec> {
} }
/// Implements the encoding per supported protocol for `RPCRequest`. /// Implements the encoding per supported protocol for `RPCRequest`.
impl<TSpec: EthSpec> RPCRequest<TSpec> { impl<TSpec: EthSpec> InboundRequest<TSpec> {
pub fn supported_protocols(&self) -> Vec<ProtocolId> { pub fn supported_protocols(&self) -> Vec<ProtocolId> {
match self { match self {
// add more protocols when versions/encodings are supported // add more protocols when versions/encodings are supported
RPCRequest::Status(_) => vec![ProtocolId::new( InboundRequest::Status(_) => vec![ProtocolId::new(
Protocol::Status, Protocol::Status,
Version::V1, Version::V1,
Encoding::SSZSnappy, Encoding::SSZSnappy,
)], )],
RPCRequest::Goodbye(_) => vec![ProtocolId::new( InboundRequest::Goodbye(_) => vec![ProtocolId::new(
Protocol::Goodbye, Protocol::Goodbye,
Version::V1, Version::V1,
Encoding::SSZSnappy, Encoding::SSZSnappy,
)], )],
RPCRequest::BlocksByRange(_) => vec![ProtocolId::new( InboundRequest::BlocksByRange(_) => vec![ProtocolId::new(
Protocol::BlocksByRange, Protocol::BlocksByRange,
Version::V1, Version::V1,
Encoding::SSZSnappy, Encoding::SSZSnappy,
)], )],
RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new( InboundRequest::BlocksByRoot(_) => vec![ProtocolId::new(
Protocol::BlocksByRoot, Protocol::BlocksByRoot,
Version::V1, Version::V1,
Encoding::SSZSnappy, Encoding::SSZSnappy,
)], )],
RPCRequest::Ping(_) => vec![ProtocolId::new( InboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping, Protocol::Ping,
Version::V1, Version::V1,
Encoding::SSZSnappy, Encoding::SSZSnappy,
)], )],
RPCRequest::MetaData(_) => vec![ProtocolId::new( InboundRequest::MetaData(_) => vec![ProtocolId::new(
Protocol::MetaData, Protocol::MetaData,
Version::V1, Version::V1,
Encoding::SSZSnappy, Encoding::SSZSnappy,
@ -396,24 +387,24 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
/// Number of responses expected for this request. /// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 { pub fn expected_responses(&self) -> u64 {
match self { match self {
RPCRequest::Status(_) => 1, InboundRequest::Status(_) => 1,
RPCRequest::Goodbye(_) => 0, InboundRequest::Goodbye(_) => 0,
RPCRequest::BlocksByRange(req) => req.count, InboundRequest::BlocksByRange(req) => req.count,
RPCRequest::BlocksByRoot(req) => req.block_roots.len() as u64, InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
RPCRequest::Ping(_) => 1, InboundRequest::Ping(_) => 1,
RPCRequest::MetaData(_) => 1, InboundRequest::MetaData(_) => 1,
} }
} }
/// Gives the corresponding `Protocol` to this request. /// Gives the corresponding `Protocol` to this request.
pub fn protocol(&self) -> Protocol { pub fn protocol(&self) -> Protocol {
match self { match self {
RPCRequest::Status(_) => Protocol::Status, InboundRequest::Status(_) => Protocol::Status,
RPCRequest::Goodbye(_) => Protocol::Goodbye, InboundRequest::Goodbye(_) => Protocol::Goodbye,
RPCRequest::BlocksByRange(_) => Protocol::BlocksByRange, InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
RPCRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
RPCRequest::Ping(_) => Protocol::Ping, InboundRequest::Ping(_) => Protocol::Ping,
RPCRequest::MetaData(_) => Protocol::MetaData, InboundRequest::MetaData(_) => Protocol::MetaData,
} }
} }
@ -423,53 +414,18 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
match self { match self {
// this only gets called after `multiple_responses()` returns true. Therefore, only // this only gets called after `multiple_responses()` returns true. Therefore, only
// variants that have `multiple_responses()` can have values. // variants that have `multiple_responses()` can have values.
RPCRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
RPCRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
RPCRequest::Status(_) => unreachable!(), InboundRequest::Status(_) => unreachable!(),
RPCRequest::Goodbye(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(),
RPCRequest::Ping(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(),
RPCRequest::MetaData(_) => unreachable!(), InboundRequest::MetaData(_) => unreachable!(),
} }
} }
} }
/* RPC Response type - used for outbound upgrades */ /* RPC Response type - used for outbound upgrades */
/* Outbound upgrades */
pub type OutboundFramed<TSocket, TSpec> = Framed<Compat<TSocket>, OutboundCodec<TSpec>>;
impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec>
where
TSpec: EthSpec + Send + 'static,
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = OutboundFramed<TSocket, TSpec>;
type Error = RPCError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future {
// convert to a tokio compatible socket
let socket = socket.compat();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec =
BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE));
OutboundCodec::SSZSnappy(ssz_snappy_codec)
}
};
let mut socket = Framed::new(socket, codec);
async {
socket.send(self).await?;
socket.close().await?;
Ok(socket)
}
.boxed()
}
}
/// Error in RPC Encoding/Decoding. /// Error in RPC Encoding/Decoding.
#[derive(Debug, Clone, PartialEq, AsStaticStr)] #[derive(Debug, Clone, PartialEq, AsStaticStr)]
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
@ -556,15 +512,15 @@ impl std::error::Error for RPCError {
} }
} }
impl<TSpec: EthSpec> std::fmt::Display for RPCRequest<TSpec> { impl<TSpec: EthSpec> std::fmt::Display for InboundRequest<TSpec> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
RPCRequest::Status(status) => write!(f, "Status Message: {}", status), InboundRequest::Status(status) => write!(f, "Status Message: {}", status),
RPCRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), InboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason),
RPCRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
RPCRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
RPCRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
RPCRequest::MetaData(_) => write!(f, "MetaData request"), InboundRequest::MetaData(_) => write!(f, "MetaData request"),
} }
} }
} }

View File

@ -1,4 +1,4 @@
use crate::rpc::{Protocol, RPCRequest}; use crate::rpc::{InboundRequest, Protocol};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use libp2p::PeerId; use libp2p::PeerId;
use std::convert::TryInto; use std::convert::TryInto;
@ -185,7 +185,7 @@ impl RPCRateLimiter {
pub fn allows<T: EthSpec>( pub fn allows<T: EthSpec>(
&mut self, &mut self,
peer_id: &PeerId, peer_id: &PeerId,
request: &RPCRequest<T>, request: &InboundRequest<T>,
) -> Result<(), RateLimitedErr> { ) -> Result<(), RateLimitedErr> {
let time_since_start = self.init_time.elapsed(); let time_since_start = self.init_time.elapsed();
let mut tokens = request.expected_responses().max(1); let mut tokens = request.expected_responses().max(1);
@ -207,7 +207,7 @@ impl RPCRateLimiter {
// 9 | 4 // 9 | 4
// 10 | 5 // 10 | 5
if let RPCRequest::BlocksByRange(bbr_req) = request { if let InboundRequest::BlocksByRange(bbr_req) = request {
let penalty_factor = (bbr_req.step as f64 / 5.0).powi(2) as u64 + 1; let penalty_factor = (bbr_req.step as f64 / 5.0).powi(2) as u64 + 1;
tokens *= penalty_factor; tokens *= penalty_factor;
} }