Reduce outbound requests to eth1 endpoints (#2340)
## Issue Addressed #2282 ## Proposed Changes Reduce the outbound requests made to eth1 endpoints by caching the results from `eth_chainId` and `net_version`. Further reduce the overall request count by increasing `auto_update_interval_millis` from `7_000` (7 seconds) to `60_000` (1 minute). This will result in a reduction from ~2000 requests per hour to 360 requests per hour (during normal operation). A reduction of 82%. ## Additional Info If an endpoint fails, its state is dropped from the cache and the `eth_chainId` and `net_version` calls will be made for that endpoint again during the regular update cycle (once per minute) until it is back online. Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
parent
ec5cceba50
commit
0847986936
@ -2,10 +2,12 @@ use crate::Config;
|
|||||||
use crate::{
|
use crate::{
|
||||||
block_cache::{BlockCache, Eth1Block},
|
block_cache::{BlockCache, Eth1Block},
|
||||||
deposit_cache::{DepositCache, SszDepositCache},
|
deposit_cache::{DepositCache, SszDepositCache},
|
||||||
|
service::EndpointsCache,
|
||||||
};
|
};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
|
use std::sync::Arc;
|
||||||
use types::ChainSpec;
|
use types::ChainSpec;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@ -28,6 +30,7 @@ impl DepositUpdater {
|
|||||||
pub struct Inner {
|
pub struct Inner {
|
||||||
pub block_cache: RwLock<BlockCache>,
|
pub block_cache: RwLock<BlockCache>,
|
||||||
pub deposit_cache: RwLock<DepositUpdater>,
|
pub deposit_cache: RwLock<DepositUpdater>,
|
||||||
|
pub endpoints_cache: RwLock<Option<Arc<EndpointsCache>>>,
|
||||||
pub config: RwLock<Config>,
|
pub config: RwLock<Config>,
|
||||||
pub remote_head_block: RwLock<Option<Eth1Block>>,
|
pub remote_head_block: RwLock<Option<Eth1Block>>,
|
||||||
pub spec: ChainSpec,
|
pub spec: ChainSpec,
|
||||||
@ -87,6 +90,7 @@ impl SszEth1Cache {
|
|||||||
cache: self.deposit_cache.to_deposit_cache()?,
|
cache: self.deposit_cache.to_deposit_cache()?,
|
||||||
last_processed_block: self.last_processed_block,
|
last_processed_block: self.last_processed_block,
|
||||||
}),
|
}),
|
||||||
|
endpoints_cache: RwLock::new(None),
|
||||||
// Set the remote head_block zero when creating a new instance. We only care about
|
// Set the remote head_block zero when creating a new instance. We only care about
|
||||||
// present and future eth1 nodes.
|
// present and future eth1 nodes.
|
||||||
remote_head_block: RwLock::new(None),
|
remote_head_block: RwLock::new(None),
|
||||||
|
@ -54,7 +54,27 @@ pub enum EndpointError {
|
|||||||
|
|
||||||
type EndpointState = Result<(), EndpointError>;
|
type EndpointState = Result<(), EndpointError>;
|
||||||
|
|
||||||
type EndpointWithState = (SensitiveUrl, TRwLock<Option<EndpointState>>);
|
pub struct EndpointWithState {
|
||||||
|
endpoint: SensitiveUrl,
|
||||||
|
state: TRwLock<Option<EndpointState>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EndpointWithState {
|
||||||
|
pub fn new(endpoint: SensitiveUrl) -> Self {
|
||||||
|
Self {
|
||||||
|
endpoint,
|
||||||
|
state: TRwLock::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn reset_endpoint_state(endpoint: &EndpointWithState) {
|
||||||
|
*endpoint.state.write().await = None;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_state(endpoint: &EndpointWithState) -> Option<EndpointState> {
|
||||||
|
*endpoint.state.read().await
|
||||||
|
}
|
||||||
|
|
||||||
/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
|
/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
|
||||||
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
|
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
|
||||||
@ -70,19 +90,19 @@ impl EndpointsCache {
|
|||||||
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
|
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
|
||||||
/// for each endpoint does the real check.
|
/// for each endpoint does the real check.
|
||||||
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
|
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
|
||||||
if let Some(result) = *endpoint.1.read().await {
|
if let Some(result) = *endpoint.state.read().await {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
let mut value = endpoint.1.write().await;
|
let mut value = endpoint.state.write().await;
|
||||||
if let Some(result) = *value {
|
if let Some(result) = *value {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
crate::metrics::inc_counter_vec(
|
crate::metrics::inc_counter_vec(
|
||||||
&crate::metrics::ENDPOINT_REQUESTS,
|
&crate::metrics::ENDPOINT_REQUESTS,
|
||||||
&[&endpoint.0.to_string()],
|
&[&endpoint.endpoint.to_string()],
|
||||||
);
|
);
|
||||||
let state = endpoint_state(
|
let state = endpoint_state(
|
||||||
&endpoint.0,
|
&endpoint.endpoint,
|
||||||
&self.config_network_id,
|
&self.config_network_id,
|
||||||
&self.config_chain_id,
|
&self.config_chain_id,
|
||||||
&self.log,
|
&self.log,
|
||||||
@ -92,7 +112,7 @@ impl EndpointsCache {
|
|||||||
if state.is_err() {
|
if state.is_err() {
|
||||||
crate::metrics::inc_counter_vec(
|
crate::metrics::inc_counter_vec(
|
||||||
&crate::metrics::ENDPOINT_ERRORS,
|
&crate::metrics::ENDPOINT_ERRORS,
|
||||||
&[&endpoint.0.to_string()],
|
&[&endpoint.endpoint.to_string()],
|
||||||
);
|
);
|
||||||
crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0);
|
crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0);
|
||||||
} else {
|
} else {
|
||||||
@ -114,12 +134,12 @@ impl EndpointsCache {
|
|||||||
.first_success(|endpoint| async move {
|
.first_success(|endpoint| async move {
|
||||||
match self.state(endpoint).await {
|
match self.state(endpoint).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
let endpoint_str = &endpoint.0.to_string();
|
let endpoint_str = &endpoint.endpoint.to_string();
|
||||||
crate::metrics::inc_counter_vec(
|
crate::metrics::inc_counter_vec(
|
||||||
&crate::metrics::ENDPOINT_REQUESTS,
|
&crate::metrics::ENDPOINT_REQUESTS,
|
||||||
&[endpoint_str],
|
&[endpoint_str],
|
||||||
);
|
);
|
||||||
match func(&endpoint.0).await {
|
match func(&endpoint.endpoint).await {
|
||||||
Ok(t) => Ok(t),
|
Ok(t) => Ok(t),
|
||||||
Err(t) => {
|
Err(t) => {
|
||||||
crate::metrics::inc_counter_vec(
|
crate::metrics::inc_counter_vec(
|
||||||
@ -127,7 +147,10 @@ impl EndpointsCache {
|
|||||||
&[endpoint_str],
|
&[endpoint_str],
|
||||||
);
|
);
|
||||||
if let SingleEndpointError::EndpointError(e) = &t {
|
if let SingleEndpointError::EndpointError(e) = &t {
|
||||||
*endpoint.1.write().await = Some(Err(*e));
|
*endpoint.state.write().await = Some(Err(*e));
|
||||||
|
} else {
|
||||||
|
// A non-`EndpointError` error occurred, so reset the state.
|
||||||
|
reset_endpoint_state(endpoint).await;
|
||||||
}
|
}
|
||||||
Err(t)
|
Err(t)
|
||||||
}
|
}
|
||||||
@ -138,6 +161,16 @@ impl EndpointsCache {
|
|||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn reset_errorred_endpoints(&self) {
|
||||||
|
for endpoint in &self.fallback.servers {
|
||||||
|
if let Some(state) = get_state(endpoint).await {
|
||||||
|
if state.is_err() {
|
||||||
|
reset_endpoint_state(endpoint).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
|
/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
|
||||||
@ -405,9 +438,9 @@ impl Default for Config {
|
|||||||
follow_distance: 128,
|
follow_distance: 128,
|
||||||
node_far_behind_seconds: 128 * 14,
|
node_far_behind_seconds: 128 * 14,
|
||||||
block_cache_truncation: Some(4_096),
|
block_cache_truncation: Some(4_096),
|
||||||
auto_update_interval_millis: 7_000,
|
auto_update_interval_millis: 60_000,
|
||||||
blocks_per_log_query: 1_000,
|
blocks_per_log_query: 1_000,
|
||||||
max_log_requests_per_update: Some(100),
|
max_log_requests_per_update: Some(5_000),
|
||||||
max_blocks_per_update: Some(8_192),
|
max_blocks_per_update: Some(8_192),
|
||||||
purge_cache: false,
|
purge_cache: false,
|
||||||
}
|
}
|
||||||
@ -435,6 +468,7 @@ impl Service {
|
|||||||
deposit_cache: RwLock::new(DepositUpdater::new(
|
deposit_cache: RwLock::new(DepositUpdater::new(
|
||||||
config.deposit_contract_deploy_block,
|
config.deposit_contract_deploy_block,
|
||||||
)),
|
)),
|
||||||
|
endpoints_cache: RwLock::new(None),
|
||||||
remote_head_block: RwLock::new(None),
|
remote_head_block: RwLock::new(None),
|
||||||
config: RwLock::new(config),
|
config: RwLock::new(config),
|
||||||
spec,
|
spec,
|
||||||
@ -605,20 +639,31 @@ impl Service {
|
|||||||
self.inner.config.write().lowest_cached_block_number = block_number;
|
self.inner.config.write().lowest_cached_block_number = block_number;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init_endpoints(&self) -> EndpointsCache {
|
/// Builds a new `EndpointsCache` with empty states.
|
||||||
|
pub fn init_endpoints(&self) -> Arc<EndpointsCache> {
|
||||||
let endpoints = self.config().endpoints.clone();
|
let endpoints = self.config().endpoints.clone();
|
||||||
let config_network_id = self.config().network_id.clone();
|
let config_network_id = self.config().network_id.clone();
|
||||||
let config_chain_id = self.config().chain_id.clone();
|
let config_chain_id = self.config().chain_id.clone();
|
||||||
EndpointsCache {
|
let new_cache = Arc::new(EndpointsCache {
|
||||||
fallback: Fallback::new(
|
fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()),
|
||||||
endpoints
|
|
||||||
.into_iter()
|
|
||||||
.map(|s| (s, TRwLock::new(None)))
|
|
||||||
.collect(),
|
|
||||||
),
|
|
||||||
config_network_id,
|
config_network_id,
|
||||||
config_chain_id,
|
config_chain_id,
|
||||||
log: self.log.clone(),
|
log: self.log.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut endpoints_cache = self.inner.endpoints_cache.write();
|
||||||
|
*endpoints_cache = Some(new_cache.clone());
|
||||||
|
new_cache
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the cached `EndpointsCache` if it exists or builds a new one.
|
||||||
|
pub fn get_endpoints(&self) -> Arc<EndpointsCache> {
|
||||||
|
let endpoints_cache = self.inner.endpoints_cache.read();
|
||||||
|
if let Some(cache) = endpoints_cache.clone() {
|
||||||
|
cache
|
||||||
|
} else {
|
||||||
|
drop(endpoints_cache);
|
||||||
|
self.init_endpoints()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -633,7 +678,11 @@ impl Service {
|
|||||||
pub async fn update(
|
pub async fn update(
|
||||||
&self,
|
&self,
|
||||||
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
|
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
|
||||||
let endpoints = self.init_endpoints();
|
let endpoints = self.get_endpoints();
|
||||||
|
|
||||||
|
// Reset the state of any endpoints which have errored so their state can be redetermined.
|
||||||
|
endpoints.reset_errorred_endpoints().await;
|
||||||
|
|
||||||
let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;
|
let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;
|
||||||
|
|
||||||
let process_single_err = |e: &FallbackError<SingleEndpointError>| {
|
let process_single_err = |e: &FallbackError<SingleEndpointError>| {
|
||||||
@ -656,7 +705,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
endpoints.fallback.map_format_error(|s| &s.0, &e)
|
endpoints.fallback.map_format_error(|s| &s.endpoint, &e)
|
||||||
};
|
};
|
||||||
|
|
||||||
let process_err = |e: Error| match &e {
|
let process_err = |e: Error| match &e {
|
||||||
|
@ -2,8 +2,9 @@ use itertools::{join, zip};
|
|||||||
use std::fmt::{Debug, Display};
|
use std::fmt::{Debug, Display};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Fallback<T> {
|
pub struct Fallback<T> {
|
||||||
servers: Vec<T>,
|
pub servers: Vec<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
|
Loading…
Reference in New Issue
Block a user