diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 5a11890a2..01bb8569d 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -1,5 +1,5 @@ use crate::listen_addr::{ListenAddr, ListenAddress}; -use crate::rpc::config::OutboundRateLimiterConfig; +use crate::rpc::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; use crate::types::GossipKind; use crate::{Enr, PeerIdSerialized}; use directory::{ @@ -148,6 +148,9 @@ pub struct Config { /// Configures if/where invalid blocks should be stored. pub invalid_block_storage: Option, + + /// Configuration for the inbound rate limiter (requests received by this node). + pub inbound_rate_limiter_config: Option, } impl Config { @@ -333,6 +336,7 @@ impl Default for Config { enable_light_client_server: false, outbound_rate_limiter_config: None, invalid_block_storage: None, + inbound_rate_limiter_config: None, } } } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index bea0929fb..a0f3acaf7 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -58,18 +58,41 @@ impl FromStr for ProtocolQuota { } } -/// Configurations for the rate limiter applied to outbound requests (made by the node itself). +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)] +pub struct OutboundRateLimiterConfig(pub RateLimiterConfig); + +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)] +pub struct InboundRateLimiterConfig(pub RateLimiterConfig); + +impl FromStr for OutboundRateLimiterConfig { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + RateLimiterConfig::from_str(s).map(Self) + } +} + +impl FromStr for InboundRateLimiterConfig { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + RateLimiterConfig::from_str(s).map(Self) + } +} + +/// Configurations for the rate limiter. #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct OutboundRateLimiterConfig { +pub struct RateLimiterConfig { pub(super) ping_quota: Quota, pub(super) meta_data_quota: Quota, pub(super) status_quota: Quota, pub(super) goodbye_quota: Quota, pub(super) blocks_by_range_quota: Quota, pub(super) blocks_by_root_quota: Quota, + pub(super) light_client_bootstrap_quota: Quota, } -impl OutboundRateLimiterConfig { +impl RateLimiterConfig { pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10); pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5); pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15); @@ -77,22 +100,24 @@ impl OutboundRateLimiterConfig { pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); + pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); } -impl Default for OutboundRateLimiterConfig { +impl Default for RateLimiterConfig { fn default() -> Self { - OutboundRateLimiterConfig { + RateLimiterConfig { ping_quota: Self::DEFAULT_PING_QUOTA, meta_data_quota: Self::DEFAULT_META_DATA_QUOTA, status_quota: Self::DEFAULT_STATUS_QUOTA, goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA, } } } -impl Debug for OutboundRateLimiterConfig { +impl Debug for RateLimiterConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { macro_rules! fmt_q { ($quota:expr) => { @@ -104,7 +129,7 @@ impl Debug for OutboundRateLimiterConfig { }; } - f.debug_struct("OutboundRateLimiterConfig") + f.debug_struct("RateLimiterConfig") .field("ping", fmt_q!(&self.ping_quota)) .field("metadata", fmt_q!(&self.meta_data_quota)) .field("status", fmt_q!(&self.status_quota)) @@ -119,7 +144,7 @@ impl Debug for OutboundRateLimiterConfig { /// the default values. Protocol specified more than once use only the first given Quota. /// /// The expected format is a ';' separated list of [`ProtocolQuota`]. -impl FromStr for OutboundRateLimiterConfig { +impl FromStr for RateLimiterConfig { type Err = &'static str; fn from_str(s: &str) -> Result { @@ -129,6 +154,8 @@ impl FromStr for OutboundRateLimiterConfig { let mut goodbye_quota = None; let mut blocks_by_range_quota = None; let mut blocks_by_root_quota = None; + let mut light_client_bootstrap_quota = None; + for proto_def in s.split(';') { let ProtocolQuota { protocol, quota } = proto_def.parse()?; let quota = Some(quota); @@ -139,10 +166,12 @@ impl FromStr for OutboundRateLimiterConfig { Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), Protocol::Ping => ping_quota = ping_quota.or(quota), Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), - Protocol::LightClientBootstrap => return Err("Lighthouse does not send LightClientBootstrap requests. Quota should not be set."), + Protocol::LightClientBootstrap => { + light_client_bootstrap_quota = light_client_bootstrap_quota.or(quota) + } } } - Ok(OutboundRateLimiterConfig { + Ok(RateLimiterConfig { ping_quota: ping_quota.unwrap_or(Self::DEFAULT_PING_QUOTA), meta_data_quota: meta_data_quota.unwrap_or(Self::DEFAULT_META_DATA_QUOTA), status_quota: status_quota.unwrap_or(Self::DEFAULT_STATUS_QUOTA), @@ -151,6 +180,8 @@ impl FromStr for OutboundRateLimiterConfig { .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), blocks_by_root_quota: blocks_by_root_quota .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + light_client_bootstrap_quota: light_client_bootstrap_quota + .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA), }) } } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 31569b820..4f7af95cf 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -17,7 +17,6 @@ use slog::{crit, debug, o}; use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; use types::{EthSpec, ForkContext}; pub(crate) use handler::HandlerErr; @@ -32,7 +31,7 @@ pub use methods::{ pub(crate) use outbound::OutboundRequest; pub use protocol::{max_rpc_size, Protocol, RPCError}; -use self::config::OutboundRateLimiterConfig; +use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; use self::self_limiter::SelfRateLimiter; pub(crate) mod codec; @@ -112,7 +111,7 @@ type BehaviourAction = /// logic. pub struct RPC { /// Rate limiter - limiter: RateLimiter, + limiter: Option, /// Rate limiter for our own requests. self_limiter: Option>, /// Queue of events to be processed. @@ -127,32 +126,24 @@ impl RPC { pub fn new( fork_context: Arc, enable_light_client_server: bool, + inbound_rate_limiter_config: Option, outbound_rate_limiter_config: Option, log: slog::Logger, ) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); - let limiter = RateLimiter::builder() - .n_every(Protocol::MetaData, 2, Duration::from_secs(5)) - .n_every(Protocol::Ping, 2, Duration::from_secs(10)) - .n_every(Protocol::Status, 5, Duration::from_secs(15)) - .one_every(Protocol::Goodbye, Duration::from_secs(10)) - .one_every(Protocol::LightClientBootstrap, Duration::from_secs(10)) - .n_every( - Protocol::BlocksByRange, - methods::MAX_REQUEST_BLOCKS, - Duration::from_secs(10), - ) - .n_every(Protocol::BlocksByRoot, 128, Duration::from_secs(10)) - .build() - .expect("Configuration parameters are valid"); + let inbound_limiter = inbound_rate_limiter_config.map(|config| { + debug!(log, "Using inbound rate limiting params"; "config" => ?config); + RateLimiter::new_with_config(config.0) + .expect("Inbound limiter configuration parameters are valid") + }); let self_limiter = outbound_rate_limiter_config.map(|config| { SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid") }); RPC { - limiter, + limiter: inbound_limiter, self_limiter, events: Vec::new(), fork_context, @@ -242,50 +233,60 @@ where event: ::OutEvent, ) { if let Ok(RPCReceived::Request(ref id, ref req)) = event { - // check if the request is conformant to the quota - match self.limiter.allows(&peer_id, req) { - Ok(()) => { - // send the event to the user - self.events - .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { - peer_id, - conn_id, - event, - })) - } - Err(RateLimitedErr::TooLarge) => { - // we set the batch sizes, so this is a coding/config err for most protocols - let protocol = req.protocol(); - if matches!(protocol, Protocol::BlocksByRange) { - debug!(self.log, "Blocks by range request will never be processed"; "request" => %req); - } else { - crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol); + if let Some(limiter) = self.limiter.as_mut() { + // check if the request is conformant to the quota + match limiter.allows(&peer_id, req) { + Ok(()) => { + // send the event to the user + self.events + .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { + peer_id, + conn_id, + event, + })) } - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - (conn_id, *id), - RPCCodedResponse::Error( - RPCResponseErrorCode::RateLimited, - "Rate limited. Request too large".into(), - ), - ); - } - Err(RateLimitedErr::TooSoon(wait_time)) => { - debug!(self.log, "Request exceeds the rate limit"; + Err(RateLimitedErr::TooLarge) => { + // we set the batch sizes, so this is a coding/config err for most protocols + let protocol = req.protocol(); + if matches!(protocol, Protocol::BlocksByRange) { + debug!(self.log, "Blocks by range request will never be processed"; "request" => %req); + } else { + crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol); + } + // send an error code to the peer. + // the handler upon receiving the error code will send it back to the behaviour + self.send_response( + peer_id, + (conn_id, *id), + RPCCodedResponse::Error( + RPCResponseErrorCode::RateLimited, + "Rate limited. Request too large".into(), + ), + ); + } + Err(RateLimitedErr::TooSoon(wait_time)) => { + debug!(self.log, "Request exceeds the rate limit"; "request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis()); - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - (conn_id, *id), - RPCCodedResponse::Error( - RPCResponseErrorCode::RateLimited, - format!("Wait {:?}", wait_time).into(), - ), - ); + // send an error code to the peer. + // the handler upon receiving the error code will send it back to the behaviour + self.send_response( + peer_id, + (conn_id, *id), + RPCCodedResponse::Error( + RPCResponseErrorCode::RateLimited, + format!("Wait {:?}", wait_time).into(), + ), + ); + } } + } else { + // No rate limiting, send the event to the user + self.events + .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { + peer_id, + conn_id, + event, + })) } } else { self.events @@ -303,7 +304,9 @@ where _: &mut impl PollParameters, ) -> Poll> { // let the rate limiter prune. - let _ = self.limiter.poll_unpin(cx); + if let Some(limiter) = self.limiter.as_mut() { + let _ = limiter.poll_unpin(cx); + } if let Some(self_limiter) = self.self_limiter.as_mut() { if let Poll::Ready(event) = self_limiter.poll_ready(cx) { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index a1f7b89a2..1fdc6cce3 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -1,3 +1,4 @@ +use super::config::RateLimiterConfig; use crate::rpc::Protocol; use fnv::FnvHashMap; use libp2p::PeerId; @@ -141,29 +142,6 @@ impl RPCRateLimiterBuilder { self } - /// Allow one token every `time_period` to be used for this `protocol`. - /// This produces a hard limit. - pub fn one_every(self, protocol: Protocol, time_period: Duration) -> Self { - self.set_quota( - protocol, - Quota { - replenish_all_every: time_period, - max_tokens: 1, - }, - ) - } - - /// Allow `n` tokens to be use used every `time_period` for this `protocol`. - pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self { - self.set_quota( - protocol, - Quota { - max_tokens: n, - replenish_all_every: time_period, - }, - ) - } - pub fn build(self) -> Result { // get our quotas let ping_quota = self.ping_quota.ok_or("Ping quota not specified")?; @@ -232,6 +210,29 @@ impl RateLimiterItem for super::OutboundRequest { } } impl RPCRateLimiter { + pub fn new_with_config(config: RateLimiterConfig) -> Result { + // Destructure to make sure every configuration value is used. + let RateLimiterConfig { + ping_quota, + meta_data_quota, + status_quota, + goodbye_quota, + blocks_by_range_quota, + blocks_by_root_quota, + light_client_bootstrap_quota, + } = config; + + Self::builder() + .set_quota(Protocol::Ping, ping_quota) + .set_quota(Protocol::MetaData, meta_data_quota) + .set_quota(Protocol::Status, status_quota) + .set_quota(Protocol::Goodbye, goodbye_quota) + .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) + .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota) + .build() + } + /// Get a builder instance. pub fn builder() -> RPCRateLimiterBuilder { RPCRateLimiterBuilder::default() diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 451c6206f..6748a1947 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -52,28 +52,7 @@ impl SelfRateLimiter { /// Creates a new [`SelfRateLimiter`] based on configration values. pub fn new(config: OutboundRateLimiterConfig, log: Logger) -> Result { debug!(log, "Using self rate limiting params"; "config" => ?config); - // Destructure to make sure every configuration value is used. - let OutboundRateLimiterConfig { - ping_quota, - meta_data_quota, - status_quota, - goodbye_quota, - blocks_by_range_quota, - blocks_by_root_quota, - } = config; - - let limiter = RateLimiter::builder() - .set_quota(Protocol::Ping, ping_quota) - .set_quota(Protocol::MetaData, meta_data_quota) - .set_quota(Protocol::Status, status_quota) - .set_quota(Protocol::Goodbye, goodbye_quota) - .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) - .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) - // Manually set the LightClientBootstrap quota, since we use the same rate limiter for - // inbound and outbound requests, and the LightClientBootstrap is an only inbound - // protocol. - .one_every(Protocol::LightClientBootstrap, Duration::from_secs(10)) - .build()?; + let limiter = RateLimiter::new_with_config(config.0)?; Ok(SelfRateLimiter { delayed_requests: Default::default(), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index f815e3bd3..34d5a5631 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -266,6 +266,7 @@ impl Network { let eth2_rpc = RPC::new( ctx.fork_context.clone(), config.enable_light_client_server, + config.inbound_rate_limiter_config.clone(), config.outbound_rate_limiter_config.clone(), log.clone(), ); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 59a5f4b2e..10d9ffafd 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -282,7 +282,23 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { for a beacon node being referenced by validator client using the --proposer-node flag. This configuration is for enabling more secure setups.") .takes_value(false), ) - + .arg( + Arg::with_name("inbound-rate-limiter") + .long("inbound-rate-limiter") + .help( + "Configures the inbound rate limiter (requests received by this node).\ + \ + Rate limit quotas per protocol can be set in the form of \ + :/. To set quotas for multiple protocols, \ + separate them by ';'. If the inbound rate limiter is enabled and a protocol is not \ + present in the configuration, the default quotas will be used. \ + \ + This is enabled by default, using default quotas. To disable rate limiting pass \ + `disabled` to this option instead." + ) + .takes_value(true) + .hidden(true) + ) .arg( Arg::with_name("disable-backfill-rate-limiting") .long("disable-backfill-rate-limiting") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 6f626bee8..92e822819 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1232,6 +1232,7 @@ pub fn set_network_config( // Light client server config. config.enable_light_client_server = cli_args.is_present("light-client-server"); + // The self limiter is disabled by default. // This flag can be used both with or without a value. Try to parse it first with a value, if // no value is defined but the flag is present, use the default params. config.outbound_rate_limiter_config = clap_utils::parse_optional(cli_args, "self-limiter")?; @@ -1252,7 +1253,22 @@ pub fn set_network_config( config.proposer_only = true; warn!(log, "Proposer-only mode enabled"; "info"=> "Do not connect a validator client to this node unless via the --proposer-nodes flag"); } - + // The inbound rate limiter is enabled by default unless `disabled` is passed to the + // `inbound-rate-limiter` flag. Any other value should be parsed as a configuration string. + config.inbound_rate_limiter_config = match cli_args.value_of("inbound-rate-limiter") { + None => { + // Enabled by default, with default values + Some(Default::default()) + } + Some("disabled") => { + // Explicitly disabled + None + } + Some(config_str) => { + // Enabled with a custom configuration + Some(config_str.parse()?) + } + }; Ok(()) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 75bcccc9d..73520dd6b 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1451,6 +1451,26 @@ fn empty_self_limiter_flag() { ) }); } + +#[test] +fn empty_inbound_rate_limiter_flag() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.network.inbound_rate_limiter_config, + Some(lighthouse_network::rpc::config::InboundRateLimiterConfig::default()) + ) + }); +} +#[test] +fn disable_inbound_rate_limiter_flag() { + CommandLineTest::new() + .flag("inbound-rate-limiter", Some("disabled")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.network.inbound_rate_limiter_config, None)); +} + #[test] fn http_allow_origin_flag() { CommandLineTest::new()