self rate limiting
This commit is contained in:
parent
a42d07592c
commit
493784366f
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -2620,7 +2620,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "fixed-hash"
|
||||
version = "0.7.0"
|
||||
source = "git+https://github.com/paritytech/parity-common?rev=df638ab0885293d21d656dc300d39236b69ce57d#df638ab0885293d21d656dc300d39236b69ce57d"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"rand 0.8.5",
|
||||
|
@ -94,7 +94,6 @@ resolver = "2"
|
||||
|
||||
[patch]
|
||||
[patch.crates-io]
|
||||
fixed-hash = { git = "https://github.com/paritytech/parity-common", rev="df638ab0885293d21d656dc300d39236b69ce57d" }
|
||||
warp = { git = "https://github.com/macladson/warp", rev="7e75acc368229a46a236a8c991bf251fe7fe50ef" }
|
||||
eth2_ssz = { path = "consensus/ssz" }
|
||||
eth2_ssz_derive = { path = "consensus/ssz_derive" }
|
||||
|
@ -1,3 +1,4 @@
|
||||
use crate::rpc::config::OutboundRateLimiterConfig;
|
||||
use crate::types::GossipKind;
|
||||
use crate::{Enr, PeerIdSerialized};
|
||||
use directory::{
|
||||
@ -133,6 +134,9 @@ pub struct Config {
|
||||
|
||||
/// Whether light client protocols should be enabled.
|
||||
pub enable_light_client_server: bool,
|
||||
|
||||
/// Configuration for the outbound rate limiter (requests made by this node).
|
||||
pub outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@ -211,6 +215,7 @@ impl Default for Config {
|
||||
topics: Vec::new(),
|
||||
metrics_enabled: false,
|
||||
enable_light_client_server: false,
|
||||
outbound_rate_limiter_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
189
beacon_node/lighthouse_network/src/rpc/config.rs
Normal file
189
beacon_node/lighthouse_network/src/rpc/config.rs
Normal file
@ -0,0 +1,189 @@
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use super::{methods, rate_limiter::Quota, Protocol};
|
||||
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
|
||||
/// Auxiliary struct to aid on configuration parsing.
|
||||
///
|
||||
/// A protocol's quota is specified as `protocol_name:tokens/time_in_seconds`.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct ProtocolQuota {
|
||||
protocol: Protocol,
|
||||
quota: Quota,
|
||||
}
|
||||
|
||||
impl Display for ProtocolQuota {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}:{}/{}",
|
||||
self.protocol.as_ref(),
|
||||
self.quota.max_tokens,
|
||||
self.quota.replenish_all_every.as_secs()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ProtocolQuota {
|
||||
type Err = &'static str;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (protocol_str, quota_str) = s
|
||||
.split_once(':')
|
||||
.ok_or("Missing ':' from quota definition.")?;
|
||||
let protocol = protocol_str
|
||||
.parse()
|
||||
.map_err(|_parse_err| "Wrong protocol representation in quota")?;
|
||||
let (tokens_str, time_str) = quota_str
|
||||
.split_once('/')
|
||||
.ok_or("Quota should be defined as \"n/t\" (t in seconds). Missing '/' from quota.")?;
|
||||
let tokens = tokens_str
|
||||
.parse()
|
||||
.map_err(|_| "Failed to parse tokens from quota.")?;
|
||||
let seconds = time_str
|
||||
.parse::<u64>()
|
||||
.map_err(|_| "Failed to parse time in seconds from quota.")?;
|
||||
Ok(ProtocolQuota {
|
||||
protocol,
|
||||
quota: Quota {
|
||||
replenish_all_every: Duration::from_secs(seconds),
|
||||
max_tokens: tokens,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Configurations for the rate limiter applied to outbound requests (made by the node itself).
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct OutboundRateLimiterConfig {
|
||||
pub(super) ping_quota: Quota,
|
||||
pub(super) meta_data_quota: Quota,
|
||||
pub(super) status_quota: Quota,
|
||||
pub(super) goodbye_quota: Quota,
|
||||
pub(super) blocks_by_range_quota: Quota,
|
||||
pub(super) blocks_by_root_quota: Quota,
|
||||
pub(super) blobs_by_range_quota: Quota,
|
||||
pub(super) blobs_by_root_quota: Quota,
|
||||
}
|
||||
|
||||
impl OutboundRateLimiterConfig {
|
||||
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10);
|
||||
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
|
||||
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
|
||||
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
|
||||
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
|
||||
Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10);
|
||||
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
|
||||
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota =
|
||||
Quota::n_every(methods::MAX_REQUEST_BLOBS_SIDECARS, 10);
|
||||
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
|
||||
}
|
||||
|
||||
impl Default for OutboundRateLimiterConfig {
|
||||
fn default() -> Self {
|
||||
OutboundRateLimiterConfig {
|
||||
ping_quota: Self::DEFAULT_PING_QUOTA,
|
||||
meta_data_quota: Self::DEFAULT_META_DATA_QUOTA,
|
||||
status_quota: Self::DEFAULT_STATUS_QUOTA,
|
||||
goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA,
|
||||
blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA,
|
||||
blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA,
|
||||
blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA,
|
||||
blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for OutboundRateLimiterConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
macro_rules! fmt_q {
|
||||
($quota:expr) => {
|
||||
&format_args!(
|
||||
"{}/{}s",
|
||||
$quota.max_tokens,
|
||||
$quota.replenish_all_every.as_secs()
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
f.debug_struct("OutboundRateLimiterConfig")
|
||||
.field("ping", fmt_q!(&self.ping_quota))
|
||||
.field("metadata", fmt_q!(&self.meta_data_quota))
|
||||
.field("status", fmt_q!(&self.status_quota))
|
||||
.field("goodbye", fmt_q!(&self.goodbye_quota))
|
||||
.field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota))
|
||||
.field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota))
|
||||
.field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota))
|
||||
.field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse configurations for the outbound rate limiter. Protocols that are not specified use
|
||||
/// the default values. Protocol specified more than once use only the first given Quota.
|
||||
///
|
||||
/// The expected format is a ';' separated list of [`ProtocolQuota`].
|
||||
impl FromStr for OutboundRateLimiterConfig {
|
||||
type Err = &'static str;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut ping_quota = None;
|
||||
let mut meta_data_quota = None;
|
||||
let mut status_quota = None;
|
||||
let mut goodbye_quota = None;
|
||||
let mut blocks_by_range_quota = None;
|
||||
let mut blocks_by_root_quota = None;
|
||||
let mut blobs_by_range_quota = None;
|
||||
let mut blobs_by_root_quota = None;
|
||||
for proto_def in s.split(';') {
|
||||
let ProtocolQuota { protocol, quota } = proto_def.parse()?;
|
||||
let quota = Some(quota);
|
||||
match protocol {
|
||||
Protocol::Status => status_quota = status_quota.or(quota),
|
||||
Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota),
|
||||
Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota),
|
||||
Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota),
|
||||
Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota),
|
||||
Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota),
|
||||
Protocol::Ping => ping_quota = ping_quota.or(quota),
|
||||
Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota),
|
||||
Protocol::LightClientBootstrap => return Err("Lighthouse does not send LightClientBootstrap requests. Quota should not be set."),
|
||||
}
|
||||
}
|
||||
Ok(OutboundRateLimiterConfig {
|
||||
ping_quota: ping_quota.unwrap_or(Self::DEFAULT_PING_QUOTA),
|
||||
meta_data_quota: meta_data_quota.unwrap_or(Self::DEFAULT_META_DATA_QUOTA),
|
||||
status_quota: status_quota.unwrap_or(Self::DEFAULT_STATUS_QUOTA),
|
||||
goodbye_quota: goodbye_quota.unwrap_or(Self::DEFAULT_GOODBYE_QUOTA),
|
||||
blocks_by_range_quota: blocks_by_range_quota
|
||||
.unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA),
|
||||
blocks_by_root_quota: blocks_by_root_quota
|
||||
.unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA),
|
||||
blobs_by_range_quota: blobs_by_range_quota
|
||||
.unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA),
|
||||
blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_quota_inverse() {
|
||||
let quota = ProtocolQuota {
|
||||
protocol: Protocol::Goodbye,
|
||||
quota: Quota {
|
||||
replenish_all_every: Duration::from_secs(10),
|
||||
max_tokens: 8,
|
||||
},
|
||||
};
|
||||
assert_eq!(quota.to_string().parse(), Ok(quota))
|
||||
}
|
||||
}
|
@ -12,7 +12,7 @@ use libp2p::swarm::{
|
||||
PollParameters, SubstreamProtocol,
|
||||
};
|
||||
use libp2p::PeerId;
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr};
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
|
||||
use slog::{crit, debug, o};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
@ -33,12 +33,17 @@ pub use methods::{
|
||||
pub(crate) use outbound::OutboundRequest;
|
||||
pub use protocol::{max_rpc_size, Protocol, RPCError};
|
||||
|
||||
use self::config::OutboundRateLimiterConfig;
|
||||
use self::self_limiter::SelfRateLimiter;
|
||||
|
||||
pub(crate) mod codec;
|
||||
pub mod config;
|
||||
mod handler;
|
||||
pub mod methods;
|
||||
mod outbound;
|
||||
mod protocol;
|
||||
mod rate_limiter;
|
||||
mod self_limiter;
|
||||
|
||||
/// Composite trait for a request id.
|
||||
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
|
||||
@ -101,13 +106,18 @@ pub struct RPCMessage<Id, TSpec: EthSpec> {
|
||||
pub event: HandlerEvent<Id, TSpec>,
|
||||
}
|
||||
|
||||
type BehaviourAction<Id, TSpec> =
|
||||
NetworkBehaviourAction<RPCMessage<Id, TSpec>, RPCHandler<Id, TSpec>>;
|
||||
|
||||
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
|
||||
/// logic.
|
||||
pub struct RPC<Id: ReqId, TSpec: EthSpec> {
|
||||
/// Rate limiter
|
||||
limiter: RateLimiter,
|
||||
/// Rate limiter for our own requests.
|
||||
self_limiter: Option<SelfRateLimiter<Id, TSpec>>,
|
||||
/// Queue of events to be processed.
|
||||
events: Vec<NetworkBehaviourAction<RPCMessage<Id, TSpec>, RPCHandler<Id, TSpec>>>,
|
||||
events: Vec<BehaviourAction<Id, TSpec>>,
|
||||
fork_context: Arc<ForkContext>,
|
||||
enable_light_client_server: bool,
|
||||
/// Slog logger for RPC behaviour.
|
||||
@ -118,10 +128,12 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
|
||||
pub fn new(
|
||||
fork_context: Arc<ForkContext>,
|
||||
enable_light_client_server: bool,
|
||||
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
|
||||
log: slog::Logger,
|
||||
) -> Self {
|
||||
let log = log.new(o!("service" => "libp2p_rpc"));
|
||||
let limiter = RPCRateLimiterBuilder::new()
|
||||
|
||||
let limiter = RateLimiter::builder()
|
||||
.n_every(Protocol::MetaData, 2, Duration::from_secs(5))
|
||||
.n_every(Protocol::Ping, 2, Duration::from_secs(10))
|
||||
.n_every(Protocol::Status, 5, Duration::from_secs(15))
|
||||
@ -141,8 +153,14 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
|
||||
)
|
||||
.build()
|
||||
.expect("Configuration parameters are valid");
|
||||
|
||||
let self_limiter = outbound_rate_limiter_config.map(|config| {
|
||||
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
|
||||
});
|
||||
|
||||
RPC {
|
||||
limiter,
|
||||
self_limiter,
|
||||
events: Vec::new(),
|
||||
fork_context,
|
||||
enable_light_client_server,
|
||||
@ -169,12 +187,24 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
|
||||
/// Submits an RPC request.
|
||||
///
|
||||
/// The peer must be connected for this to succeed.
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, event: OutboundRequest<TSpec>) {
|
||||
self.events.push(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: RPCSend::Request(request_id, event),
|
||||
});
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: OutboundRequest<TSpec>) {
|
||||
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
|
||||
match self_limiter.allows(peer_id, request_id, req) {
|
||||
Ok(event) => event,
|
||||
Err(_e) => {
|
||||
// Request is logged and queued internally in the self rate limiter.
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: RPCSend::Request(request_id, req),
|
||||
}
|
||||
};
|
||||
|
||||
self.events.push(event);
|
||||
}
|
||||
|
||||
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
|
||||
@ -279,11 +309,19 @@ where
|
||||
cx: &mut Context,
|
||||
_: &mut impl PollParameters,
|
||||
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
|
||||
// let the rate limiter prune
|
||||
// let the rate limiter prune.
|
||||
let _ = self.limiter.poll_unpin(cx);
|
||||
|
||||
if let Some(self_limiter) = self.self_limiter.as_mut() {
|
||||
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
|
||||
self.events.push(event)
|
||||
}
|
||||
}
|
||||
|
||||
if !self.events.is_empty() {
|
||||
return Poll::Ready(self.events.remove(0));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use strum::IntoStaticStr;
|
||||
use strum::{AsRefStr, Display, EnumString, IntoStaticStr};
|
||||
use tokio_io_timeout::TimeoutStream;
|
||||
use tokio_util::{
|
||||
codec::Framed,
|
||||
@ -169,25 +169,32 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits {
|
||||
}
|
||||
|
||||
/// Protocol names to be used.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, AsRefStr, Display)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum Protocol {
|
||||
/// The Status protocol name.
|
||||
Status,
|
||||
/// The Goodbye protocol name.
|
||||
Goodbye,
|
||||
/// The `BlocksByRange` protocol name.
|
||||
#[strum(serialize = "beacon_blocks_by_range")]
|
||||
BlocksByRange,
|
||||
/// The `BlocksByRoot` protocol name.
|
||||
#[strum(serialize = "beacon_blocks_by_root")]
|
||||
BlocksByRoot,
|
||||
/// The `BlobsByRange` protocol name.
|
||||
#[strum(serialize = "blobs_sidecars_by_range")]
|
||||
BlobsByRange,
|
||||
/// The `BlobsByRoot` protocol name.
|
||||
#[strum(serialize = "beacon_block_and_blobs_sidecar_by_root")]
|
||||
BlobsByRoot,
|
||||
/// The `Ping` protocol name.
|
||||
Ping,
|
||||
/// The `MetaData` protocol name.
|
||||
#[strum(serialize = "metadata")]
|
||||
MetaData,
|
||||
/// The `LightClientBootstrap` protocol name.
|
||||
#[strum(serialize = "light_client_bootstrap")]
|
||||
LightClientBootstrap,
|
||||
}
|
||||
|
||||
@ -222,23 +229,6 @@ impl Protocol {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Protocol {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let repr = match self {
|
||||
Protocol::Status => "status",
|
||||
Protocol::Goodbye => "goodbye",
|
||||
Protocol::BlocksByRange => "beacon_blocks_by_range",
|
||||
Protocol::BlocksByRoot => "beacon_blocks_by_root",
|
||||
Protocol::BlobsByRange => "blobs_sidecars_by_range",
|
||||
Protocol::BlobsByRoot => "beacon_block_and_blobs_sidecar_by_root",
|
||||
Protocol::Ping => "ping",
|
||||
Protocol::MetaData => "metadata",
|
||||
Protocol::LightClientBootstrap => "light_client_bootstrap",
|
||||
};
|
||||
f.write_str(repr)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Encoding {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let repr = match self {
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::rpc::{InboundRequest, Protocol};
|
||||
use crate::rpc::Protocol;
|
||||
use fnv::FnvHashMap;
|
||||
use libp2p::PeerId;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::convert::TryInto;
|
||||
use std::future::Future;
|
||||
use std::hash::Hash;
|
||||
@ -47,12 +48,31 @@ type Nanosecs = u64;
|
||||
/// n*`replenish_all_every`/`max_tokens` units of time since their last request.
|
||||
///
|
||||
/// To produce hard limits, set `max_tokens` to 1.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct Quota {
|
||||
/// How often are `max_tokens` fully replenished.
|
||||
replenish_all_every: Duration,
|
||||
pub(super) replenish_all_every: Duration,
|
||||
/// Token limit. This translates on how large can an instantaneous batch of
|
||||
/// tokens be.
|
||||
max_tokens: u64,
|
||||
pub(super) max_tokens: u64,
|
||||
}
|
||||
|
||||
impl Quota {
|
||||
/// A hard limit of one token every `seconds`.
|
||||
pub const fn one_every(seconds: u64) -> Self {
|
||||
Quota {
|
||||
replenish_all_every: Duration::from_secs(seconds),
|
||||
max_tokens: 1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Allow `n` tokens to be use used every `seconds`.
|
||||
pub const fn n_every(n: u64, seconds: u64) -> Self {
|
||||
Quota {
|
||||
replenish_all_every: Duration::from_secs(seconds),
|
||||
max_tokens: n,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages rate limiting of requests per peer, with differentiated rates per protocol.
|
||||
@ -82,6 +102,7 @@ pub struct RPCRateLimiter {
|
||||
}
|
||||
|
||||
/// Error type for non conformant requests
|
||||
#[derive(Debug)]
|
||||
pub enum RateLimitedErr {
|
||||
/// Required tokens for this request exceed the maximum
|
||||
TooLarge,
|
||||
@ -90,7 +111,7 @@ pub enum RateLimitedErr {
|
||||
}
|
||||
|
||||
/// User-friendly builder of a `RPCRateLimiter`
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Clone)]
|
||||
pub struct RPCRateLimiterBuilder {
|
||||
/// Quota for the Goodbye protocol.
|
||||
goodbye_quota: Option<Quota>,
|
||||
@ -113,13 +134,8 @@ pub struct RPCRateLimiterBuilder {
|
||||
}
|
||||
|
||||
impl RPCRateLimiterBuilder {
|
||||
/// Get an empty `RPCRateLimiterBuilder`.
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
/// Set a quota for a protocol.
|
||||
fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self {
|
||||
pub fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self {
|
||||
let q = Some(quota);
|
||||
match protocol {
|
||||
Protocol::Ping => self.ping_quota = q,
|
||||
@ -213,11 +229,40 @@ impl RPCRateLimiterBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RateLimiterItem {
|
||||
fn protocol(&self) -> Protocol;
|
||||
fn expected_responses(&self) -> u64;
|
||||
}
|
||||
|
||||
impl<T: EthSpec> RateLimiterItem for super::InboundRequest<T> {
|
||||
fn protocol(&self) -> Protocol {
|
||||
self.protocol()
|
||||
}
|
||||
|
||||
fn expected_responses(&self) -> u64 {
|
||||
self.expected_responses()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> RateLimiterItem for super::OutboundRequest<T> {
|
||||
fn protocol(&self) -> Protocol {
|
||||
self.protocol()
|
||||
}
|
||||
|
||||
fn expected_responses(&self) -> u64 {
|
||||
self.expected_responses()
|
||||
}
|
||||
}
|
||||
impl RPCRateLimiter {
|
||||
pub fn allows<T: EthSpec>(
|
||||
/// Get a builder instance.
|
||||
pub fn builder() -> RPCRateLimiterBuilder {
|
||||
RPCRateLimiterBuilder::default()
|
||||
}
|
||||
|
||||
pub fn allows<Item: RateLimiterItem>(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
request: &InboundRequest<T>,
|
||||
request: &Item,
|
||||
) -> Result<(), RateLimitedErr> {
|
||||
let time_since_start = self.init_time.elapsed();
|
||||
let tokens = request.expected_responses().max(1);
|
||||
|
206
beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Normal file
206
beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Normal file
@ -0,0 +1,206 @@
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap, VecDeque},
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use libp2p::{swarm::NotifyHandler, PeerId};
|
||||
use slog::{crit, debug, Logger};
|
||||
use smallvec::SmallVec;
|
||||
use tokio_util::time::DelayQueue;
|
||||
use types::EthSpec;
|
||||
|
||||
use super::{
|
||||
config::OutboundRateLimiterConfig,
|
||||
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
|
||||
BehaviourAction, OutboundRequest, Protocol, RPCSend, ReqId,
|
||||
};
|
||||
|
||||
/// A request that was rate limited or waiting on rate limited requests for the same peer and
|
||||
/// protocol.
|
||||
struct QueuedRequest<Id: ReqId, TSpec: EthSpec> {
|
||||
req: OutboundRequest<TSpec>,
|
||||
request_id: Id,
|
||||
}
|
||||
|
||||
pub(crate) struct SelfRateLimiter<Id: ReqId, TSpec: EthSpec> {
|
||||
/// Requests queued for sending per peer. This requests are stored when the self rate
|
||||
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
|
||||
/// are stored in the same way.
|
||||
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, TSpec>>>,
|
||||
/// The delay required to allow a peer's outbound request per protocol.
|
||||
next_peer_request: DelayQueue<(PeerId, Protocol)>,
|
||||
/// Rate limiter for our own requests.
|
||||
limiter: RateLimiter,
|
||||
/// Requests that are ready to be sent.
|
||||
ready_requests: SmallVec<[BehaviourAction<Id, TSpec>; 3]>,
|
||||
/// Slog logger.
|
||||
log: Logger,
|
||||
}
|
||||
|
||||
/// Error returned when the rate limiter does not accept a request.
|
||||
// NOTE: this is currently not used, but might be useful for debugging.
|
||||
pub enum Error {
|
||||
/// There are queued requests for this same peer and protocol.
|
||||
PendingRequests,
|
||||
/// Request was tried but rate limited.
|
||||
RateLimited,
|
||||
}
|
||||
|
||||
impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
|
||||
/// Creates a new [`SelfRateLimiter`] based on configration values.
|
||||
pub fn new(config: OutboundRateLimiterConfig, log: Logger) -> Result<Self, &'static str> {
|
||||
debug!(log, "Using self rate limiting params"; "config" => ?config);
|
||||
// Destructure to make sure every configuration value is used.
|
||||
let OutboundRateLimiterConfig {
|
||||
ping_quota,
|
||||
meta_data_quota,
|
||||
status_quota,
|
||||
goodbye_quota,
|
||||
blocks_by_range_quota,
|
||||
blocks_by_root_quota,
|
||||
blobs_by_range_quota,
|
||||
blobs_by_root_quota,
|
||||
} = config;
|
||||
|
||||
let limiter = RateLimiter::builder()
|
||||
.set_quota(Protocol::Ping, ping_quota)
|
||||
.set_quota(Protocol::MetaData, meta_data_quota)
|
||||
.set_quota(Protocol::Status, status_quota)
|
||||
.set_quota(Protocol::Goodbye, goodbye_quota)
|
||||
.set_quota(Protocol::BlocksByRange, blocks_by_range_quota)
|
||||
.set_quota(Protocol::BlocksByRoot, blocks_by_root_quota)
|
||||
.set_quota(Protocol::BlobsByRange, blobs_by_range_quota)
|
||||
.set_quota(Protocol::BlobsByRoot, blobs_by_root_quota)
|
||||
// Manually set the LightClientBootstrap quota, since we use the same rate limiter for
|
||||
// inbound and outbound requests, and the LightClientBootstrap is an only inbound
|
||||
// protocol.
|
||||
.one_every(Protocol::LightClientBootstrap, Duration::from_secs(10))
|
||||
.build()?;
|
||||
|
||||
Ok(SelfRateLimiter {
|
||||
delayed_requests: Default::default(),
|
||||
next_peer_request: Default::default(),
|
||||
limiter,
|
||||
ready_requests: Default::default(),
|
||||
log,
|
||||
})
|
||||
}
|
||||
|
||||
/// Checks if the rate limiter allows the request. If it's allowed, returns the
|
||||
/// [`NetworkBehaviourAction`] that should be emitted. When not allowed, the request is delayed
|
||||
/// until it can be sent.
|
||||
pub fn allows(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: Id,
|
||||
req: OutboundRequest<TSpec>,
|
||||
) -> Result<BehaviourAction<Id, TSpec>, Error> {
|
||||
let protocol = req.protocol();
|
||||
// First check that there are not already other requests waiting to be sent.
|
||||
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
|
||||
queued_requests.push_back(QueuedRequest { req, request_id });
|
||||
|
||||
return Err(Error::PendingRequests);
|
||||
}
|
||||
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log) {
|
||||
Err((rate_limited_req, wait_time)) => {
|
||||
let key = (peer_id, protocol);
|
||||
self.next_peer_request.insert(key, wait_time);
|
||||
self.delayed_requests
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push_back(rate_limited_req);
|
||||
|
||||
Err(Error::RateLimited)
|
||||
}
|
||||
Ok(event) => Ok(event),
|
||||
}
|
||||
}
|
||||
|
||||
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
|
||||
/// request, the [`NetworkBehaviourAction`] that should be emitted is returned. If the request
|
||||
/// should be delayed, it's returned with the duration to wait.
|
||||
fn try_send_request(
|
||||
limiter: &mut RateLimiter,
|
||||
peer_id: PeerId,
|
||||
request_id: Id,
|
||||
req: OutboundRequest<TSpec>,
|
||||
log: &Logger,
|
||||
) -> Result<BehaviourAction<Id, TSpec>, (QueuedRequest<Id, TSpec>, Duration)> {
|
||||
match limiter.allows(&peer_id, &req) {
|
||||
Ok(()) => Ok(BehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: RPCSend::Request(request_id, req),
|
||||
}),
|
||||
Err(e) => {
|
||||
let protocol = req.protocol();
|
||||
match e {
|
||||
RateLimitedErr::TooLarge => {
|
||||
// this should never happen with default parameters. Let's just send the request.
|
||||
// Log a crit since this is a config issue.
|
||||
crit!(
|
||||
log,
|
||||
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.";
|
||||
"protocol" => %req.protocol()
|
||||
);
|
||||
Ok(BehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: RPCSend::Request(request_id, req),
|
||||
})
|
||||
}
|
||||
RateLimitedErr::TooSoon(wait_time) => {
|
||||
debug!(log, "Self rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id);
|
||||
Err((QueuedRequest { req, request_id }, wait_time))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// When a peer and protocol are allowed to send a next request, this function checks the
|
||||
/// queued requests and attempts marking as ready as many as the limiter allows.
|
||||
fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) {
|
||||
if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) {
|
||||
let queued_requests = entry.get_mut();
|
||||
while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() {
|
||||
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log)
|
||||
{
|
||||
Err((rate_limited_req, wait_time)) => {
|
||||
let key = (peer_id, protocol);
|
||||
self.next_peer_request.insert(key, wait_time);
|
||||
queued_requests.push_back(rate_limited_req);
|
||||
// If one fails just wait for the next window that allows sending requests.
|
||||
return;
|
||||
}
|
||||
Ok(event) => self.ready_requests.push(event),
|
||||
}
|
||||
}
|
||||
if queued_requests.is_empty() {
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, TSpec>> {
|
||||
// First check the requests that were self rate limited, since those might add events to
|
||||
// the queue. Also do this this before rate limiter prunning to avoid removing and
|
||||
// immediately adding rate limiting keys.
|
||||
if let Poll::Ready(Some(Ok(expired))) = self.next_peer_request.poll_expired(cx) {
|
||||
let (peer_id, protocol) = expired.into_inner();
|
||||
self.next_peer_request_ready(peer_id, protocol);
|
||||
}
|
||||
// Prune the rate limiter.
|
||||
let _ = self.limiter.poll_unpin(cx);
|
||||
|
||||
// Finally return any queued events.
|
||||
if !self.ready_requests.is_empty() {
|
||||
return Poll::Ready(self.ready_requests.remove(0));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
@ -266,6 +266,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
|
||||
let eth2_rpc = RPC::new(
|
||||
ctx.fork_context.clone(),
|
||||
config.enable_light_client_server,
|
||||
config.outbound_rate_limiter_config.clone(),
|
||||
log.clone(),
|
||||
);
|
||||
|
||||
|
@ -194,6 +194,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.help("Lighthouse by default does not discover private IP addresses. Set this flag to enable connection attempts to local addresses.")
|
||||
.takes_value(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("self-limiter")
|
||||
.long("self-limiter")
|
||||
.help(
|
||||
"Enables the outbound rate limiter (requests made by this node).\
|
||||
\
|
||||
Rate limit quotas per protocol can be set in the form of \
|
||||
<protocol_name>:<tokens>/<time_in_seconds>. To set quotas for multiple protocols, \
|
||||
separate them by ';'. If the self rate limiter is enabled and a protocol is not \
|
||||
present in the configuration, the quotas used for the inbound rate limiter will be \
|
||||
used."
|
||||
)
|
||||
.min_values(0)
|
||||
.hidden(true)
|
||||
)
|
||||
/* REST API related arguments */
|
||||
.arg(
|
||||
Arg::with_name("http")
|
||||
|
@ -984,6 +984,13 @@ pub fn set_network_config(
|
||||
// Light client server config.
|
||||
config.enable_light_client_server = cli_args.is_present("light-client-server");
|
||||
|
||||
// This flag can be used both with or without a value. Try to parse it first with a value, if
|
||||
// no value is defined but the flag is present, use the default params.
|
||||
config.outbound_rate_limiter_config = clap_utils::parse_optional(cli_args, "self-limiter")?;
|
||||
if cli_args.is_present("self-limiter") && config.outbound_rate_limiter_config.is_none() {
|
||||
config.outbound_rate_limiter_config = Some(Default::default());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user