From 4d77784bb88a61e0e436b58578085c99b844bf50 Mon Sep 17 00:00:00 2001 From: divma Date: Fri, 31 Jul 2020 05:47:09 +0000 Subject: [PATCH] Rate limit RPC requests (#1402) ## Issue Addressed #1056 ## Proposed Changes - Add a rate limiter to the RPC behaviour. This also means the rate limiting occurs just before the door to the application level, so the number of connections a peer opens does not affect this (this would happen in the future if put on the handler) - The algorithm used is the leaky bucket as a meter / token bucket implemented the GCRA way - Each protocol has its own limit. Due to the way the algorithm works, the "small" protocols have a hard limit, while bbrange and bbroot allow [burstiness](https://www.wikiwand.com/en/Burstiness). This is so that a peer can't request hundreds of individual requests expecting only one block in a short period of time, it also allows a peer to send two half size requests instead of one with max if they want to without getting limited, and.. it also allows a peer to request a batch of the maximum size and then send _appropriately spaced_ requests of really small sizes. From what I've seen in sync this is plausible when reaching the target slot. ## Additional Info Needs to be heavily tested --- .../eth2_libp2p/src/peer_manager/mod.rs | 9 + beacon_node/eth2_libp2p/src/rpc/methods.rs | 3 + beacon_node/eth2_libp2p/src/rpc/mod.rs | 77 +++- beacon_node/eth2_libp2p/src/rpc/protocol.rs | 4 + .../eth2_libp2p/src/rpc/rate_limiter.rs | 377 ++++++++++++++++++ 5 files changed, 461 insertions(+), 9 deletions(-) create mode 100644 beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 58bc2a097..d8cdfb4b2 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -331,6 +331,7 @@ impl PeerManager { RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError, RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError, + RPCResponseErrorCode::RateLimited => PeerAction::LowToleranceError, }, RPCError::SSZDecodeError(_) => PeerAction::Fatal, RPCError::UnsupportedProtocol => { @@ -359,6 +360,14 @@ impl PeerManager { Protocol::Status => return, }, RPCError::NegotiationTimeout => PeerAction::HighToleranceError, + RPCError::RateLimited => match protocol { + Protocol::Ping => PeerAction::MidToleranceError, + Protocol::BlocksByRange => PeerAction::HighToleranceError, + Protocol::BlocksByRoot => PeerAction::HighToleranceError, + Protocol::Goodbye => PeerAction::LowToleranceError, + Protocol::MetaData => PeerAction::LowToleranceError, + Protocol::Status => PeerAction::LowToleranceError, + }, }; self.report_peer(peer_id, peer_action); diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 6ff4d3a99..184dfd84f 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -258,6 +258,7 @@ pub enum RPCCodedResponse { /// The code assigned to an erroneous `RPCResponse`. #[derive(Debug, Clone, Copy)] pub enum RPCResponseErrorCode { + RateLimited, InvalidRequest, ServerError, Unknown, @@ -322,6 +323,7 @@ impl RPCResponseErrorCode { RPCResponseErrorCode::InvalidRequest => 1, RPCResponseErrorCode::ServerError => 2, RPCResponseErrorCode::Unknown => 255, + RPCResponseErrorCode::RateLimited => 128, } } } @@ -332,6 +334,7 @@ impl std::fmt::Display for RPCResponseErrorCode { RPCResponseErrorCode::InvalidRequest => "The request was invalid", RPCResponseErrorCode::ServerError => "Server error occurred", RPCResponseErrorCode::Unknown => "Unknown error occurred", + RPCResponseErrorCode::RateLimited => "Rate limited", }; f.write_str(repr) } diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index edd4555eb..e1b9ea438 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -4,6 +4,7 @@ //! direct peer-to-peer communication primarily for sending/receiving chain information for //! syncing. +use futures::future::FutureExt; use handler::RPCHandler; use libp2p::core::{connection::ConnectionId, ConnectedPoint}; use libp2p::swarm::{ @@ -11,9 +12,11 @@ use libp2p::swarm::{ PollParameters, SubstreamProtocol, }; use libp2p::{Multiaddr, PeerId}; -use slog::{debug, o}; +use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr}; +use slog::{crit, debug, o, warn}; use std::marker::PhantomData; use std::task::{Context, Poll}; +use std::time::Duration; use types::EthSpec; pub(crate) use handler::HandlerErr; @@ -31,6 +34,7 @@ pub(crate) mod codec; mod handler; pub mod methods; mod protocol; +mod rate_limiter; /// RPC events sent from Lighthouse. #[derive(Debug, Clone)] @@ -88,6 +92,8 @@ pub struct RPCMessage { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { + /// Rate limiter + limiter: RateLimiter, /// Queue of events to be processed. events: Vec, RPCMessage>>, /// Slog logger for RPC behaviour. @@ -97,7 +103,25 @@ pub struct RPC { impl RPC { pub fn new(log: slog::Logger) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); + let limiter = RPCRateLimiterBuilder::new() + .n_every(Protocol::MetaData, 2, Duration::from_secs(5)) + .one_every(Protocol::Ping, Duration::from_secs(5)) + .n_every(Protocol::Status, 3, Duration::from_secs(15)) + .one_every(Protocol::Goodbye, Duration::from_secs(10)) + .n_every( + Protocol::BlocksByRange, + methods::MAX_REQUEST_BLOCKS, + Duration::from_secs(10), + ) + .n_every( + Protocol::BlocksByRoot, + methods::MAX_REQUEST_BLOCKS, + Duration::from_secs(10), + ) + .build() + .unwrap(); RPC { + limiter, events: Vec::new(), log, } @@ -193,18 +217,51 @@ where conn_id: ConnectionId, event: ::OutEvent, ) { - // send the event to the user - self.events - .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { - peer_id, - conn_id, - event, - })); + if let Ok(RPCReceived::Request(ref id, ref req)) = event { + // check if the request is conformant to the quota + match self.limiter.allows(&peer_id, req) { + Ok(()) => { + // send the event to the user + self.events + .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { + peer_id, + conn_id, + event, + })) + } + Err(RateLimitedErr::TooLarge) => { + // we set the batch sizes, so this is a coding/config err + crit!(self.log, "Batch too large to ever be processed"; + "protocol" => format!("{}", req.protocol())); + } + Err(RateLimitedErr::TooSoon(wait_time)) => { + warn!(self.log, "Request exceeds the rate limit"; + "request" => req.to_string(), "peer_id" => peer_id.to_string(), "wait_time_ms" => wait_time.as_millis()); + // send an error code to the peer. + // the handler upon receiving the error code will send it back to the behaviour + self.send_response( + peer_id, + (conn_id, *id), + RPCCodedResponse::Error( + RPCResponseErrorCode::RateLimited, + format!("Rate limited: wait {:?}", wait_time).into(), + ), + ); + } + } + } else { + self.events + .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { + peer_id, + conn_id, + event, + })); + } } fn poll( &mut self, - _cx: &mut Context, + cx: &mut Context, _: &mut impl PollParameters, ) -> Poll< NetworkBehaviourAction< @@ -212,6 +269,8 @@ where Self::OutEvent, >, > { + // let the rate limiter prune + let _ = self.limiter.poll_unpin(cx); if !self.events.is_empty() { return Poll::Ready(self.events.remove(0)); } diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 6f95f6679..91bbdbd14 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -421,6 +421,8 @@ pub enum RPCError { NegotiationTimeout, /// Handler rejected this request. HandlerRejected, + /// The request exceeds the rate limit. + RateLimited, } impl From for RPCError { @@ -459,6 +461,7 @@ impl std::fmt::Display for RPCError { RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err), RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"), RPCError::HandlerRejected => write!(f, "Handler rejected the request"), + RPCError::RateLimited => write!(f, "Request exceeds the rate limit"), } } } @@ -477,6 +480,7 @@ impl std::error::Error for RPCError { RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, RPCError::HandlerRejected => None, + RPCError::RateLimited => None, } } } diff --git a/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs b/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs new file mode 100644 index 000000000..bb41246b1 --- /dev/null +++ b/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs @@ -0,0 +1,377 @@ +use crate::rpc::{Protocol, RPCRequest}; +use fnv::FnvHashMap; +use futures::StreamExt; +use libp2p::PeerId; +use std::convert::TryInto; +use std::future::Future; +use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tokio::time::Interval; +use types::EthSpec; + +/// Nanoseconds since a given time. +// Maintained as u64 to reduce footprint +// NOTE: this also implies that the rate limiter will manage checking if a batch is allowed for at +// most + u64::MAX nanosecs, ~500 years. So it is realistic to assume this is fine. +type Nanosecs = u64; + +/// User-friendly rate limiting parameters of the GCRA. +/// +/// A quota of `max_tokens` tokens every `replenish_all_every` units of time means that: +/// 1. One token is replenished every `replenish_all_every`/`max_tokens` units of time. +/// 2. Instantaneous bursts (batches) of up to `max_tokens` tokens are allowed. +/// +/// The above implies that if `max_tokens` is greater than 1, the perceived rate may be higher (but +/// bounded) than the defined rate when instantaneous bursts occur. For instance, for a rate of +/// 4T/2s a first burst of 4T is allowed with subsequent requests of 1T every 0.5s forever, +/// producing a perceived rate over the window of the first 2s of 8T. However, subsequent sliding +/// windows of 2s keep the limit. +/// +/// In this scenario using the same rate as above, the sender is always maxing out their tokens, +/// except at seconds 1.5, 3, 3.5 and 4 +/// +/// ```ignore +/// x +/// used x +/// tokens x x x +/// at a x x x x x x +/// given +--+--+--o--+--+--o--o--o--> seconds +/// time | | | | | | | | | +/// 0 1 2 3 4 +/// +/// 4 1 1 1 2 1 1 2 3 <= available tokens when the batch is received +/// ``` +/// +/// For a sender to request a batch of `n`T, they would need to wait at least +/// n*`replenish_all_every`/`max_tokens` units of time since their last request. +/// +/// To produce hard limits, set `max_tokens` to 1. +pub struct Quota { + /// How often are `max_tokens` fully replenished. + replenish_all_every: Duration, + /// Token limit. This translates on how large can an instantaneous batch of + /// tokens be. + max_tokens: u64, +} + +/// Manages rate limiting of requests per peer, with differentiated rates per protocol. +pub struct RPCRateLimiter { + /// Interval to prune peers for which their timer ran out. + prune_interval: Interval, + /// Creation time of the rate limiter. + init_time: Instant, + /// Goodbye rate limiter. + goodbye_rl: Limiter, + /// Ping rate limiter. + ping_rl: Limiter, + /// MetaData rate limiter. + metadata_rl: Limiter, + /// Status rate limiter. + status_rl: Limiter, + /// BlocksByRange rate limiter. + bbrange_rl: Limiter, + /// BlocksByRoot rate limiter. + bbroots_rl: Limiter, +} + +/// Error type for non conformant requests +pub enum RateLimitedErr { + /// Required tokens for this request exceed the maximum + TooLarge, + /// Request does not fit in the quota. Gives the earliest time the request could be accepted. + TooSoon(Duration), +} + +/// User-friendly builder of a `RPCRateLimiter` +#[derive(Default)] +pub struct RPCRateLimiterBuilder { + /// Quota for the Goodbye protocol. + goodbye_quota: Option, + /// Quota for the Ping protocol. + ping_quota: Option, + /// Quota for the MetaData protocol. + metadata_quota: Option, + /// Quota for the Status protocol. + status_quota: Option, + /// Quota for the BlocksByRange protocol. + bbrange_quota: Option, + /// Quota for the BlocksByRoot protocol. + bbroots_quota: Option, +} + +impl RPCRateLimiterBuilder { + /// Get an empty `RPCRateLimiterBuilder`. + pub fn new() -> Self { + Default::default() + } + + /// Set a quota for a protocol. + fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self { + let q = Some(quota); + match protocol { + Protocol::Ping => self.ping_quota = q, + Protocol::Status => self.status_quota = q, + Protocol::MetaData => self.metadata_quota = q, + Protocol::Goodbye => self.goodbye_quota = q, + Protocol::BlocksByRange => self.bbrange_quota = q, + Protocol::BlocksByRoot => self.bbroots_quota = q, + } + self + } + + /// Allow one token every `time_period` to be used for this `protocol`. + /// This produces a hard limit. + pub fn one_every(self, protocol: Protocol, time_period: Duration) -> Self { + self.set_quota( + protocol, + Quota { + replenish_all_every: time_period, + max_tokens: 1, + }, + ) + } + + /// Allow `n` tokens to be use used every `time_period` for this `protocol`. + pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self { + self.set_quota( + protocol, + Quota { + max_tokens: n, + replenish_all_every: time_period, + }, + ) + } + + pub fn build(self) -> Result { + // get our quotas + let ping_quota = self.ping_quota.ok_or("Ping quota not specified")?; + let metadata_quota = self.metadata_quota.ok_or("MetaData quota not specified")?; + let status_quota = self.status_quota.ok_or("Status quota not specified")?; + let goodbye_quota = self.goodbye_quota.ok_or("Goodbye quota not specified")?; + let bbroots_quota = self + .bbroots_quota + .ok_or("BlocksByRoot quota not specified")?; + let bbrange_quota = self + .bbrange_quota + .ok_or("BlocksByRange quota not specified")?; + + // create the rate limiters + let ping_rl = Limiter::from_quota(ping_quota)?; + let metadata_rl = Limiter::from_quota(metadata_quota)?; + let status_rl = Limiter::from_quota(status_quota)?; + let goodbye_rl = Limiter::from_quota(goodbye_quota)?; + let bbroots_rl = Limiter::from_quota(bbroots_quota)?; + let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + + // check for peers to prune every 30 seconds, starting in 30 seconds + let prune_every = tokio::time::Duration::from_secs(30); + let prune_start = tokio::time::Instant::now() + prune_every; + let prune_interval = tokio::time::interval_at(prune_start, prune_every); + Ok(RPCRateLimiter { + prune_interval, + ping_rl, + metadata_rl, + status_rl, + goodbye_rl, + bbroots_rl, + bbrange_rl, + init_time: Instant::now(), + }) + } +} + +impl RPCRateLimiter { + pub fn allows( + &mut self, + peer_id: &PeerId, + request: &RPCRequest, + ) -> Result<(), RateLimitedErr> { + let time_since_start = self.init_time.elapsed(); + let tokens = request.expected_responses().max(1); + let check = + |limiter: &mut Limiter| limiter.allows(time_since_start, peer_id, tokens); + let limiter = match request.protocol() { + Protocol::Ping => &mut self.ping_rl, + Protocol::Status => &mut self.status_rl, + Protocol::MetaData => &mut self.metadata_rl, + Protocol::Goodbye => &mut self.goodbye_rl, + Protocol::BlocksByRange => &mut self.bbrange_rl, + Protocol::BlocksByRoot => &mut self.bbroots_rl, + }; + check(limiter) + } + + pub fn prune(&mut self) { + let time_since_start = self.init_time.elapsed(); + self.ping_rl.prune(time_since_start); + self.status_rl.prune(time_since_start); + self.metadata_rl.prune(time_since_start); + self.goodbye_rl.prune(time_since_start); + self.bbrange_rl.prune(time_since_start); + self.bbroots_rl.prune(time_since_start); + } +} + +impl Future for RPCRateLimiter { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + while let Poll::Ready(Some(_)) = self.prune_interval.poll_next_unpin(cx) { + self.prune(); + } + + Poll::Pending + } +} + +/// Per key rate limiter using the token bucket / leaky bucket as a meter rate limiting algorithm, +/// with the GCRA implementation. +pub struct Limiter { + /// After how long is the bucket considered full via replenishing 1T every `t`. + tau: Nanosecs, + /// How often is 1T replenished. + t: Nanosecs, + /// Time when the bucket will be full for each peer. TAT (theoretical arrival time) from GCRA. + tat_per_key: FnvHashMap, +} + +impl Limiter { + pub fn from_quota(quota: Quota) -> Result { + if quota.max_tokens == 0 { + return Err("Max number of tokens should be positive"); + } + let tau = quota.replenish_all_every.as_nanos(); + if tau == 0 { + return Err("Replenish time must be positive"); + } + let t = (tau / quota.max_tokens as u128) + .try_into() + .map_err(|_| "total replenish time is too long")?; + let tau = tau + .try_into() + .map_err(|_| "total replenish time is too long")?; + Ok(Limiter { + tau, + t, + tat_per_key: FnvHashMap::default(), + }) + } + + pub fn allows( + &mut self, + time_since_start: Duration, + key: &Key, + tokens: u64, + ) -> Result<(), RateLimitedErr> { + let time_since_start = time_since_start.as_nanos() as u64; + let tau = self.tau; + let t = self.t; + // how long does it take to replenish these tokens + let additional_time = t * tokens; + if additional_time > tau { + // the time required to process this amount of tokens is longer than the time that + // makes the bucket full. So, this batch can _never_ be processed + return Err(RateLimitedErr::TooLarge); + } + // If the key is new, we consider their bucket full (which means, their request will be + // allowed) + let tat = self + .tat_per_key + .entry(key.clone()) + .or_insert(time_since_start); + // check how soon could the request be made + let earliest_time = (*tat + additional_time).saturating_sub(tau); + // earliest_time is in the future + if time_since_start < earliest_time { + Err(RateLimitedErr::TooSoon(Duration::from_nanos( + /* time they need to wait, i.e. how soon were they */ + earliest_time - time_since_start, + ))) + } else { + // calculate the new TAT + *tat = time_since_start.max(*tat) + additional_time; + Ok(()) + } + } + + /// Removes keys for which their bucket is full by `time_limit` + pub fn prune(&mut self, time_limit: Duration) { + let lim = &mut (time_limit.as_nanos() as u64); + // remove those for which tat < lim + self.tat_per_key.retain(|_k, tat| tat >= lim) + } +} + +#[cfg(test)] +mod tests { + use crate::rpc::rate_limiter::{Limiter, Quota}; + use std::time::Duration; + + #[test] + fn it_works_a() { + let mut limiter = Limiter::from_quota(Quota { + replenish_all_every: Duration::from_secs(2), + max_tokens: 4, + }) + .unwrap(); + let key = 10; + // x + // used x + // tokens x x + // x x x x + // +--+--+--+--+----> seconds + // | | | | | + // 0 1 2 + + assert!(limiter + .allows(Duration::from_secs_f32(0.0), &key, 4) + .is_ok()); + limiter.prune(Duration::from_secs_f32(0.1)); + assert!(limiter + .allows(Duration::from_secs_f32(0.1), &key, 1) + .is_err()); + assert!(limiter + .allows(Duration::from_secs_f32(0.5), &key, 1) + .is_ok()); + assert!(limiter + .allows(Duration::from_secs_f32(1.0), &key, 1) + .is_ok()); + assert!(limiter + .allows(Duration::from_secs_f32(1.4), &key, 1) + .is_err()); + assert!(limiter + .allows(Duration::from_secs_f32(2.0), &key, 2) + .is_ok()); + } + + #[test] + fn it_works_b() { + let mut limiter = Limiter::from_quota(Quota { + replenish_all_every: Duration::from_secs(2), + max_tokens: 4, + }) + .unwrap(); + let key = 10; + // if we limit to 4T per 2s, check that 4 requests worth 1 token can be sent before the + // first half second, when one token will be available again. Check also that before + // regaining a token, another request is rejected + + assert!(limiter + .allows(Duration::from_secs_f32(0.0), &key, 1) + .is_ok()); + assert!(limiter + .allows(Duration::from_secs_f32(0.1), &key, 1) + .is_ok()); + assert!(limiter + .allows(Duration::from_secs_f32(0.2), &key, 1) + .is_ok()); + assert!(limiter + .allows(Duration::from_secs_f32(0.3), &key, 1) + .is_ok()); + assert!(limiter + .allows(Duration::from_secs_f32(0.4), &key, 1) + .is_err()); + } +}