From 493784366f3dc30fe97da62a29940b92431dd756 Mon Sep 17 00:00:00 2001 From: Diva M Date: Tue, 7 Feb 2023 13:00:35 -0500 Subject: [PATCH] self rate limiting --- Cargo.lock | 3 +- Cargo.toml | 1 - beacon_node/lighthouse_network/src/config.rs | 5 + .../lighthouse_network/src/rpc/config.rs | 189 ++++++++++++++++ beacon_node/lighthouse_network/src/rpc/mod.rs | 58 ++++- .../lighthouse_network/src/rpc/protocol.rs | 28 +-- .../src/rpc/rate_limiter.rs | 69 +++++- .../src/rpc/self_limiter.rs | 206 ++++++++++++++++++ .../lighthouse_network/src/service/mod.rs | 1 + beacon_node/src/cli.rs | 15 ++ beacon_node/src/config.rs | 7 + 11 files changed, 539 insertions(+), 43 deletions(-) create mode 100644 beacon_node/lighthouse_network/src/rpc/config.rs create mode 100644 beacon_node/lighthouse_network/src/rpc/self_limiter.rs diff --git a/Cargo.lock b/Cargo.lock index bc235ad2f..b2e6e4ca2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,7 +2620,8 @@ dependencies = [ [[package]] name = "fixed-hash" version = "0.7.0" -source = "git+https://github.com/paritytech/parity-common?rev=df638ab0885293d21d656dc300d39236b69ce57d#df638ab0885293d21d656dc300d39236b69ce57d" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 76edfbfe8..581c056a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,6 @@ resolver = "2" [patch] [patch.crates-io] -fixed-hash = { git = "https://github.com/paritytech/parity-common", rev="df638ab0885293d21d656dc300d39236b69ce57d" } warp = { git = "https://github.com/macladson/warp", rev="7e75acc368229a46a236a8c991bf251fe7fe50ef" } eth2_ssz = { path = "consensus/ssz" } eth2_ssz_derive = { path = "consensus/ssz_derive" } diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 32712e32a..e40384332 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -1,3 +1,4 @@ +use crate::rpc::config::OutboundRateLimiterConfig; use crate::types::GossipKind; use crate::{Enr, PeerIdSerialized}; use directory::{ @@ -133,6 +134,9 @@ pub struct Config { /// Whether light client protocols should be enabled. pub enable_light_client_server: bool, + + /// Configuration for the outbound rate limiter (requests made by this node). + pub outbound_rate_limiter_config: Option, } impl Default for Config { @@ -211,6 +215,7 @@ impl Default for Config { topics: Vec::new(), metrics_enabled: false, enable_light_client_server: false, + outbound_rate_limiter_config: None, } } } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs new file mode 100644 index 000000000..7d88dbaff --- /dev/null +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -0,0 +1,189 @@ +use std::{ + fmt::{Debug, Display}, + str::FromStr, + time::Duration, +}; + +use super::{methods, rate_limiter::Quota, Protocol}; + +use serde_derive::{Deserialize, Serialize}; + +/// Auxiliary struct to aid on configuration parsing. +/// +/// A protocol's quota is specified as `protocol_name:tokens/time_in_seconds`. +#[derive(Debug, PartialEq, Eq)] +struct ProtocolQuota { + protocol: Protocol, + quota: Quota, +} + +impl Display for ProtocolQuota { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{}/{}", + self.protocol.as_ref(), + self.quota.max_tokens, + self.quota.replenish_all_every.as_secs() + ) + } +} + +impl FromStr for ProtocolQuota { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let (protocol_str, quota_str) = s + .split_once(':') + .ok_or("Missing ':' from quota definition.")?; + let protocol = protocol_str + .parse() + .map_err(|_parse_err| "Wrong protocol representation in quota")?; + let (tokens_str, time_str) = quota_str + .split_once('/') + .ok_or("Quota should be defined as \"n/t\" (t in seconds). Missing '/' from quota.")?; + let tokens = tokens_str + .parse() + .map_err(|_| "Failed to parse tokens from quota.")?; + let seconds = time_str + .parse::() + .map_err(|_| "Failed to parse time in seconds from quota.")?; + Ok(ProtocolQuota { + protocol, + quota: Quota { + replenish_all_every: Duration::from_secs(seconds), + max_tokens: tokens, + }, + }) + } +} + +/// Configurations for the rate limiter applied to outbound requests (made by the node itself). +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct OutboundRateLimiterConfig { + pub(super) ping_quota: Quota, + pub(super) meta_data_quota: Quota, + pub(super) status_quota: Quota, + pub(super) goodbye_quota: Quota, + pub(super) blocks_by_range_quota: Quota, + pub(super) blocks_by_root_quota: Quota, + pub(super) blobs_by_range_quota: Quota, + pub(super) blobs_by_root_quota: Quota, +} + +impl OutboundRateLimiterConfig { + pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10); + pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5); + pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15); + pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10); + pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = + Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10); + pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); + pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = + Quota::n_every(methods::MAX_REQUEST_BLOBS_SIDECARS, 10); + pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); +} + +impl Default for OutboundRateLimiterConfig { + fn default() -> Self { + OutboundRateLimiterConfig { + ping_quota: Self::DEFAULT_PING_QUOTA, + meta_data_quota: Self::DEFAULT_META_DATA_QUOTA, + status_quota: Self::DEFAULT_STATUS_QUOTA, + goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, + blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, + blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, + blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, + } + } +} + +impl Debug for OutboundRateLimiterConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + macro_rules! fmt_q { + ($quota:expr) => { + &format_args!( + "{}/{}s", + $quota.max_tokens, + $quota.replenish_all_every.as_secs() + ) + }; + } + + f.debug_struct("OutboundRateLimiterConfig") + .field("ping", fmt_q!(&self.ping_quota)) + .field("metadata", fmt_q!(&self.meta_data_quota)) + .field("status", fmt_q!(&self.status_quota)) + .field("goodbye", fmt_q!(&self.goodbye_quota)) + .field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota)) + .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) + .field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota)) + .field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota)) + .finish() + } +} + +/// Parse configurations for the outbound rate limiter. Protocols that are not specified use +/// the default values. Protocol specified more than once use only the first given Quota. +/// +/// The expected format is a ';' separated list of [`ProtocolQuota`]. +impl FromStr for OutboundRateLimiterConfig { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let mut ping_quota = None; + let mut meta_data_quota = None; + let mut status_quota = None; + let mut goodbye_quota = None; + let mut blocks_by_range_quota = None; + let mut blocks_by_root_quota = None; + let mut blobs_by_range_quota = None; + let mut blobs_by_root_quota = None; + for proto_def in s.split(';') { + let ProtocolQuota { protocol, quota } = proto_def.parse()?; + let quota = Some(quota); + match protocol { + Protocol::Status => status_quota = status_quota.or(quota), + Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota), + Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota), + Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), + Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), + Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), + Protocol::Ping => ping_quota = ping_quota.or(quota), + Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), + Protocol::LightClientBootstrap => return Err("Lighthouse does not send LightClientBootstrap requests. Quota should not be set."), + } + } + Ok(OutboundRateLimiterConfig { + ping_quota: ping_quota.unwrap_or(Self::DEFAULT_PING_QUOTA), + meta_data_quota: meta_data_quota.unwrap_or(Self::DEFAULT_META_DATA_QUOTA), + status_quota: status_quota.unwrap_or(Self::DEFAULT_STATUS_QUOTA), + goodbye_quota: goodbye_quota.unwrap_or(Self::DEFAULT_GOODBYE_QUOTA), + blocks_by_range_quota: blocks_by_range_quota + .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), + blocks_by_root_quota: blocks_by_root_quota + .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + blobs_by_range_quota: blobs_by_range_quota + .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), + blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_quota_inverse() { + let quota = ProtocolQuota { + protocol: Protocol::Goodbye, + quota: Quota { + replenish_all_every: Duration::from_secs(10), + max_tokens: 8, + }, + }; + assert_eq!(quota.to_string().parse(), Ok(quota)) + } +} diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 9d2aeb0eb..8727f7df7 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -12,7 +12,7 @@ use libp2p::swarm::{ PollParameters, SubstreamProtocol, }; use libp2p::PeerId; -use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr}; +use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}; use slog::{crit, debug, o}; use std::marker::PhantomData; use std::sync::Arc; @@ -33,12 +33,17 @@ pub use methods::{ pub(crate) use outbound::OutboundRequest; pub use protocol::{max_rpc_size, Protocol, RPCError}; +use self::config::OutboundRateLimiterConfig; +use self::self_limiter::SelfRateLimiter; + pub(crate) mod codec; +pub mod config; mod handler; pub mod methods; mod outbound; mod protocol; mod rate_limiter; +mod self_limiter; /// Composite trait for a request id. pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} @@ -101,13 +106,18 @@ pub struct RPCMessage { pub event: HandlerEvent, } +type BehaviourAction = + NetworkBehaviourAction, RPCHandler>; + /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { /// Rate limiter limiter: RateLimiter, + /// Rate limiter for our own requests. + self_limiter: Option>, /// Queue of events to be processed. - events: Vec, RPCHandler>>, + events: Vec>, fork_context: Arc, enable_light_client_server: bool, /// Slog logger for RPC behaviour. @@ -118,10 +128,12 @@ impl RPC { pub fn new( fork_context: Arc, enable_light_client_server: bool, + outbound_rate_limiter_config: Option, log: slog::Logger, ) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); - let limiter = RPCRateLimiterBuilder::new() + + let limiter = RateLimiter::builder() .n_every(Protocol::MetaData, 2, Duration::from_secs(5)) .n_every(Protocol::Ping, 2, Duration::from_secs(10)) .n_every(Protocol::Status, 5, Duration::from_secs(15)) @@ -141,8 +153,14 @@ impl RPC { ) .build() .expect("Configuration parameters are valid"); + + let self_limiter = outbound_rate_limiter_config.map(|config| { + SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid") + }); + RPC { limiter, + self_limiter, events: Vec::new(), fork_context, enable_light_client_server, @@ -169,12 +187,24 @@ impl RPC { /// Submits an RPC request. /// /// The peer must be connected for this to succeed. - pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, event: OutboundRequest) { - self.events.push(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: RPCSend::Request(request_id, event), - }); + pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: OutboundRequest) { + let event = if let Some(self_limiter) = self.self_limiter.as_mut() { + match self_limiter.allows(peer_id, request_id, req) { + Ok(event) => event, + Err(_e) => { + // Request is logged and queued internally in the self rate limiter. + return; + } + } + } else { + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: RPCSend::Request(request_id, req), + } + }; + + self.events.push(event); } /// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This @@ -279,11 +309,19 @@ where cx: &mut Context, _: &mut impl PollParameters, ) -> Poll> { - // let the rate limiter prune + // let the rate limiter prune. let _ = self.limiter.poll_unpin(cx); + + if let Some(self_limiter) = self.self_limiter.as_mut() { + if let Poll::Ready(event) = self_limiter.poll_ready(cx) { + self.events.push(event) + } + } + if !self.events.is_empty() { return Poll::Ready(self.events.remove(0)); } + Poll::Pending } } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 7f5419d16..e5a931580 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -14,7 +14,7 @@ use std::io; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use strum::IntoStaticStr; +use strum::{AsRefStr, Display, EnumString, IntoStaticStr}; use tokio_io_timeout::TimeoutStream; use tokio_util::{ codec::Framed, @@ -169,25 +169,32 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits { } /// Protocol names to be used. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, AsRefStr, Display)] +#[strum(serialize_all = "snake_case")] pub enum Protocol { /// The Status protocol name. Status, /// The Goodbye protocol name. Goodbye, /// The `BlocksByRange` protocol name. + #[strum(serialize = "beacon_blocks_by_range")] BlocksByRange, /// The `BlocksByRoot` protocol name. + #[strum(serialize = "beacon_blocks_by_root")] BlocksByRoot, /// The `BlobsByRange` protocol name. + #[strum(serialize = "blobs_sidecars_by_range")] BlobsByRange, /// The `BlobsByRoot` protocol name. + #[strum(serialize = "beacon_block_and_blobs_sidecar_by_root")] BlobsByRoot, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. + #[strum(serialize = "metadata")] MetaData, /// The `LightClientBootstrap` protocol name. + #[strum(serialize = "light_client_bootstrap")] LightClientBootstrap, } @@ -222,23 +229,6 @@ impl Protocol { } } -impl std::fmt::Display for Protocol { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let repr = match self { - Protocol::Status => "status", - Protocol::Goodbye => "goodbye", - Protocol::BlocksByRange => "beacon_blocks_by_range", - Protocol::BlocksByRoot => "beacon_blocks_by_root", - Protocol::BlobsByRange => "blobs_sidecars_by_range", - Protocol::BlobsByRoot => "beacon_block_and_blobs_sidecar_by_root", - Protocol::Ping => "ping", - Protocol::MetaData => "metadata", - Protocol::LightClientBootstrap => "light_client_bootstrap", - }; - f.write_str(repr) - } -} - impl std::fmt::Display for Encoding { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let repr = match self { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index d021afcea..83fc56247 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -1,6 +1,7 @@ -use crate::rpc::{InboundRequest, Protocol}; +use crate::rpc::Protocol; use fnv::FnvHashMap; use libp2p::PeerId; +use serde_derive::{Deserialize, Serialize}; use std::convert::TryInto; use std::future::Future; use std::hash::Hash; @@ -47,12 +48,31 @@ type Nanosecs = u64; /// n*`replenish_all_every`/`max_tokens` units of time since their last request. /// /// To produce hard limits, set `max_tokens` to 1. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Quota { /// How often are `max_tokens` fully replenished. - replenish_all_every: Duration, + pub(super) replenish_all_every: Duration, /// Token limit. This translates on how large can an instantaneous batch of /// tokens be. - max_tokens: u64, + pub(super) max_tokens: u64, +} + +impl Quota { + /// A hard limit of one token every `seconds`. + pub const fn one_every(seconds: u64) -> Self { + Quota { + replenish_all_every: Duration::from_secs(seconds), + max_tokens: 1, + } + } + + /// Allow `n` tokens to be use used every `seconds`. + pub const fn n_every(n: u64, seconds: u64) -> Self { + Quota { + replenish_all_every: Duration::from_secs(seconds), + max_tokens: n, + } + } } /// Manages rate limiting of requests per peer, with differentiated rates per protocol. @@ -82,6 +102,7 @@ pub struct RPCRateLimiter { } /// Error type for non conformant requests +#[derive(Debug)] pub enum RateLimitedErr { /// Required tokens for this request exceed the maximum TooLarge, @@ -90,7 +111,7 @@ pub enum RateLimitedErr { } /// User-friendly builder of a `RPCRateLimiter` -#[derive(Default)] +#[derive(Default, Clone)] pub struct RPCRateLimiterBuilder { /// Quota for the Goodbye protocol. goodbye_quota: Option, @@ -113,13 +134,8 @@ pub struct RPCRateLimiterBuilder { } impl RPCRateLimiterBuilder { - /// Get an empty `RPCRateLimiterBuilder`. - pub fn new() -> Self { - Default::default() - } - /// Set a quota for a protocol. - fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self { + pub fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self { let q = Some(quota); match protocol { Protocol::Ping => self.ping_quota = q, @@ -213,11 +229,40 @@ impl RPCRateLimiterBuilder { } } +pub trait RateLimiterItem { + fn protocol(&self) -> Protocol; + fn expected_responses(&self) -> u64; +} + +impl RateLimiterItem for super::InboundRequest { + fn protocol(&self) -> Protocol { + self.protocol() + } + + fn expected_responses(&self) -> u64 { + self.expected_responses() + } +} + +impl RateLimiterItem for super::OutboundRequest { + fn protocol(&self) -> Protocol { + self.protocol() + } + + fn expected_responses(&self) -> u64 { + self.expected_responses() + } +} impl RPCRateLimiter { - pub fn allows( + /// Get a builder instance. + pub fn builder() -> RPCRateLimiterBuilder { + RPCRateLimiterBuilder::default() + } + + pub fn allows( &mut self, peer_id: &PeerId, - request: &InboundRequest, + request: &Item, ) -> Result<(), RateLimitedErr> { let time_since_start = self.init_time.elapsed(); let tokens = request.expected_responses().max(1); diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs new file mode 100644 index 000000000..8363e39a5 --- /dev/null +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -0,0 +1,206 @@ +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + task::{Context, Poll}, + time::Duration, +}; + +use futures::FutureExt; +use libp2p::{swarm::NotifyHandler, PeerId}; +use slog::{crit, debug, Logger}; +use smallvec::SmallVec; +use tokio_util::time::DelayQueue; +use types::EthSpec; + +use super::{ + config::OutboundRateLimiterConfig, + rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, + BehaviourAction, OutboundRequest, Protocol, RPCSend, ReqId, +}; + +/// A request that was rate limited or waiting on rate limited requests for the same peer and +/// protocol. +struct QueuedRequest { + req: OutboundRequest, + request_id: Id, +} + +pub(crate) struct SelfRateLimiter { + /// Requests queued for sending per peer. This requests are stored when the self rate + /// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore + /// are stored in the same way. + delayed_requests: HashMap<(PeerId, Protocol), VecDeque>>, + /// The delay required to allow a peer's outbound request per protocol. + next_peer_request: DelayQueue<(PeerId, Protocol)>, + /// Rate limiter for our own requests. + limiter: RateLimiter, + /// Requests that are ready to be sent. + ready_requests: SmallVec<[BehaviourAction; 3]>, + /// Slog logger. + log: Logger, +} + +/// Error returned when the rate limiter does not accept a request. +// NOTE: this is currently not used, but might be useful for debugging. +pub enum Error { + /// There are queued requests for this same peer and protocol. + PendingRequests, + /// Request was tried but rate limited. + RateLimited, +} + +impl SelfRateLimiter { + /// Creates a new [`SelfRateLimiter`] based on configration values. + pub fn new(config: OutboundRateLimiterConfig, log: Logger) -> Result { + debug!(log, "Using self rate limiting params"; "config" => ?config); + // Destructure to make sure every configuration value is used. + let OutboundRateLimiterConfig { + ping_quota, + meta_data_quota, + status_quota, + goodbye_quota, + blocks_by_range_quota, + blocks_by_root_quota, + blobs_by_range_quota, + blobs_by_root_quota, + } = config; + + let limiter = RateLimiter::builder() + .set_quota(Protocol::Ping, ping_quota) + .set_quota(Protocol::MetaData, meta_data_quota) + .set_quota(Protocol::Status, status_quota) + .set_quota(Protocol::Goodbye, goodbye_quota) + .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) + .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) + .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) + // Manually set the LightClientBootstrap quota, since we use the same rate limiter for + // inbound and outbound requests, and the LightClientBootstrap is an only inbound + // protocol. + .one_every(Protocol::LightClientBootstrap, Duration::from_secs(10)) + .build()?; + + Ok(SelfRateLimiter { + delayed_requests: Default::default(), + next_peer_request: Default::default(), + limiter, + ready_requests: Default::default(), + log, + }) + } + + /// Checks if the rate limiter allows the request. If it's allowed, returns the + /// [`NetworkBehaviourAction`] that should be emitted. When not allowed, the request is delayed + /// until it can be sent. + pub fn allows( + &mut self, + peer_id: PeerId, + request_id: Id, + req: OutboundRequest, + ) -> Result, Error> { + let protocol = req.protocol(); + // First check that there are not already other requests waiting to be sent. + if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) { + queued_requests.push_back(QueuedRequest { req, request_id }); + + return Err(Error::PendingRequests); + } + match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log) { + Err((rate_limited_req, wait_time)) => { + let key = (peer_id, protocol); + self.next_peer_request.insert(key, wait_time); + self.delayed_requests + .entry(key) + .or_default() + .push_back(rate_limited_req); + + Err(Error::RateLimited) + } + Ok(event) => Ok(event), + } + } + + /// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the + /// request, the [`NetworkBehaviourAction`] that should be emitted is returned. If the request + /// should be delayed, it's returned with the duration to wait. + fn try_send_request( + limiter: &mut RateLimiter, + peer_id: PeerId, + request_id: Id, + req: OutboundRequest, + log: &Logger, + ) -> Result, (QueuedRequest, Duration)> { + match limiter.allows(&peer_id, &req) { + Ok(()) => Ok(BehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: RPCSend::Request(request_id, req), + }), + Err(e) => { + let protocol = req.protocol(); + match e { + RateLimitedErr::TooLarge => { + // this should never happen with default parameters. Let's just send the request. + // Log a crit since this is a config issue. + crit!( + log, + "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."; + "protocol" => %req.protocol() + ); + Ok(BehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: RPCSend::Request(request_id, req), + }) + } + RateLimitedErr::TooSoon(wait_time) => { + debug!(log, "Self rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); + Err((QueuedRequest { req, request_id }, wait_time)) + } + } + } + } + } + + /// When a peer and protocol are allowed to send a next request, this function checks the + /// queued requests and attempts marking as ready as many as the limiter allows. + fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) { + if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) { + let queued_requests = entry.get_mut(); + while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() { + match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log) + { + Err((rate_limited_req, wait_time)) => { + let key = (peer_id, protocol); + self.next_peer_request.insert(key, wait_time); + queued_requests.push_back(rate_limited_req); + // If one fails just wait for the next window that allows sending requests. + return; + } + Ok(event) => self.ready_requests.push(event), + } + } + if queued_requests.is_empty() { + entry.remove(); + } + } + } + + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // First check the requests that were self rate limited, since those might add events to + // the queue. Also do this this before rate limiter prunning to avoid removing and + // immediately adding rate limiting keys. + if let Poll::Ready(Some(Ok(expired))) = self.next_peer_request.poll_expired(cx) { + let (peer_id, protocol) = expired.into_inner(); + self.next_peer_request_ready(peer_id, protocol); + } + // Prune the rate limiter. + let _ = self.limiter.poll_unpin(cx); + + // Finally return any queued events. + if !self.ready_requests.is_empty() { + return Poll::Ready(self.ready_requests.remove(0)); + } + + Poll::Pending + } +} diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e7f8d8945..9b0728d8d 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -266,6 +266,7 @@ impl Network { let eth2_rpc = RPC::new( ctx.fork_context.clone(), config.enable_light_client_server, + config.outbound_rate_limiter_config.clone(), log.clone(), ); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index c5aef78aa..c182e876a 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -194,6 +194,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Lighthouse by default does not discover private IP addresses. Set this flag to enable connection attempts to local addresses.") .takes_value(false), ) + .arg( + Arg::with_name("self-limiter") + .long("self-limiter") + .help( + "Enables the outbound rate limiter (requests made by this node).\ + \ + Rate limit quotas per protocol can be set in the form of \ + :/. To set quotas for multiple protocols, \ + separate them by ';'. If the self rate limiter is enabled and a protocol is not \ + present in the configuration, the quotas used for the inbound rate limiter will be \ + used." + ) + .min_values(0) + .hidden(true) + ) /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 1ce0f5f77..3dedcf702 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -984,6 +984,13 @@ pub fn set_network_config( // Light client server config. config.enable_light_client_server = cli_args.is_present("light-client-server"); + // This flag can be used both with or without a value. Try to parse it first with a value, if + // no value is defined but the flag is present, use the default params. + config.outbound_rate_limiter_config = clap_utils::parse_optional(cli_args, "self-limiter")?; + if cli_args.is_present("self-limiter") && config.outbound_rate_limiter_config.is_none() { + config.outbound_rate_limiter_config = Some(Default::default()); + } + Ok(()) }