diff --git a/Cargo.lock b/Cargo.lock index cfefa6c11..8fb8c5492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1661,7 +1661,6 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "execution_layer", - "fallback", "futures", "hex", "lazy_static", @@ -2117,13 +2116,6 @@ dependencies = [ "futures", ] -[[package]] -name = "fallback" -version = "0.1.0" -dependencies = [ - "itertools", -] - [[package]] name = "fallible-iterator" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 27120e217..02cf4d943 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ members = [ "common/unused_port", "common/validator_dir", "common/warp_utils", - "common/fallback", "common/monitoring_api", "database_manager", diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 916ebd235..051b84f81 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -897,7 +897,7 @@ where .ok_or("dummy_eth1_backend requires a log")?; let backend = - CachingEth1Backend::new(Eth1Config::default(), log.clone(), self.spec.clone()); + CachingEth1Backend::new(Eth1Config::default(), log.clone(), self.spec.clone())?; self.eth1_chain = Some(Eth1Chain::new_dummy(backend)); diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 8dd101b72..3d24becc8 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -431,12 +431,13 @@ impl CachingEth1Backend { /// Instantiates `self` with empty caches. /// /// Does not connect to the eth1 node or start any tasks to keep the cache updated. - pub fn new(config: Eth1Config, log: Logger, spec: ChainSpec) -> Self { - Self { - core: HttpService::new(config, log.clone(), spec), + pub fn new(config: Eth1Config, log: Logger, spec: ChainSpec) -> Result { + Ok(Self { + core: HttpService::new(config, log.clone(), spec) + .map_err(|e| format!("Failed to create eth1 http service: {:?}", e))?, log, _phantom: PhantomData, - } + }) } /// Starts the routine which connects to the external eth1 node and updates the caches. @@ -730,11 +731,9 @@ mod test { }; let log = null_logger().unwrap(); - Eth1Chain::new(CachingEth1Backend::new( - eth1_config, - log, - MainnetEthSpec::default_spec(), - )) + Eth1Chain::new( + CachingEth1Backend::new(eth1_config, log, MainnetEthSpec::default_spec()).unwrap(), + ) } fn get_deposit_log(i: u64, spec: &ChainSpec) -> DepositLog { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 752ba3b7b..a46d91ad1 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -370,7 +370,7 @@ where info!( context.log(), "Waiting for eth2 genesis from eth1"; - "eth1_endpoints" => format!("{:?}", &config.eth1.endpoints), + "eth1_endpoints" => format!("{:?}", &config.eth1.endpoint), "contract_deploy_block" => config.eth1.deposit_contract_deploy_block, "deposit_contract" => &config.eth1.deposit_contract_address ); @@ -379,7 +379,7 @@ where config.eth1, context.log().clone(), context.eth2_config().spec.clone(), - ); + )?; // If the HTTP API server is enabled, start an instance of it where it only // contains a reference to the eth1 service (all non-eth1 endpoints will fail @@ -875,7 +875,7 @@ where CachingEth1Backend::from_service(eth1_service_from_genesis) } else if config.purge_cache { - CachingEth1Backend::new(config, context.log().clone(), spec) + CachingEth1Backend::new(config, context.log().clone(), spec)? } else { beacon_chain_builder .get_persisted_eth1_backend()? @@ -889,11 +889,7 @@ where .map(|chain| chain.into_backend()) }) .unwrap_or_else(|| { - Ok(CachingEth1Backend::new( - config, - context.log().clone(), - spec.clone(), - )) + CachingEth1Backend::new(config, context.log().clone(), spec.clone()) })? }; diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index 403869cc9..930301256 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -31,5 +31,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics"} lazy_static = "1.4.0" task_executor = { path = "../../common/task_executor" } eth2 = { path = "../../common/eth2" } -fallback = { path = "../../common/fallback" } sensitive_url = { path = "../../common/sensitive_url" } diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index 9a57f450e..b0a951bef 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -1,14 +1,14 @@ +use crate::service::endpoint_from_config; use crate::Config; use crate::{ block_cache::{BlockCache, Eth1Block}, deposit_cache::{DepositCache, SszDepositCache}, - service::EndpointsCache, }; +use execution_layer::HttpJsonRpc; use parking_lot::RwLock; use ssz::four_byte_option_impl; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use std::sync::Arc; use types::ChainSpec; // Define "legacy" implementations of `Option` which use four bytes for encoding the union @@ -31,11 +31,10 @@ impl DepositUpdater { } } -#[derive(Default)] pub struct Inner { pub block_cache: RwLock, pub deposit_cache: RwLock, - pub endpoints_cache: RwLock>>, + pub endpoint: HttpJsonRpc, pub config: RwLock, pub remote_head_block: RwLock>, pub spec: ChainSpec, @@ -96,7 +95,8 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), - endpoints_cache: RwLock::new(None), + endpoint: endpoint_from_config(&config) + .map_err(|e| format!("Failed to create endpoint: {:?}", e))?, // Set the remote head_block zero when creating a new instance. We only care about // present and future eth1 nodes. remote_head_block: RwLock::new(None), diff --git a/beacon_node/eth1/src/metrics.rs b/beacon_node/eth1/src/metrics.rs index f3d9483b2..5441b40d7 100644 --- a/beacon_node/eth1/src/metrics.rs +++ b/beacon_node/eth1/src/metrics.rs @@ -17,16 +17,6 @@ lazy_static! { pub static ref HIGHEST_PROCESSED_DEPOSIT_BLOCK: Result = try_create_int_gauge("eth1_highest_processed_deposit_block", "Number of the last block checked for deposits"); - /* - * Eth1 endpoint errors - */ - pub static ref ENDPOINT_ERRORS: Result = try_create_int_counter_vec( - "eth1_endpoint_errors", "The number of eth1 request errors for each endpoint", &["endpoint"] - ); - pub static ref ENDPOINT_REQUESTS: Result = try_create_int_counter_vec( - "eth1_endpoint_requests", "The number of eth1 requests for each endpoint", &["endpoint"] - ); - /* * Eth1 rpc connection */ @@ -35,14 +25,4 @@ lazy_static! { "sync_eth1_connected", "Set to 1 if connected to an eth1 node, otherwise set to 0" ); - pub static ref ETH1_FALLBACK_CONFIGURED: Result = try_create_int_gauge( - "sync_eth1_fallback_configured", "Number of configured eth1 fallbacks" - ); - - // Note: This metric only checks if an eth1 fallback is configured, not if it is connected and synced. - // Checking for liveness of the fallback would require moving away from lazy checking of fallbacks. - pub static ref ETH1_FALLBACK_CONNECTED: Result = try_create_int_gauge( - "eth1_sync_fallback_connected", "Set to 1 if an eth1 fallback is connected, otherwise set to 0" - ); - } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index a4d4e5e25..fae6eef9c 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -9,19 +9,16 @@ use execution_layer::http::{ deposit_methods::{BlockQuery, Eth1Id}, HttpJsonRpc, }; -use fallback::{Fallback, FallbackError}; use futures::future::TryFutureExt; use parking_lot::{RwLock, RwLockReadGuard}; use sensitive_url::SensitiveUrl; use serde::{Deserialize, Serialize}; use slog::{debug, error, info, trace, warn, Logger}; use std::fmt::Debug; -use std::future::Future; use std::ops::{Range, RangeInclusive}; use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::RwLock as TRwLock; use tokio::time::{interval_at, Duration, Instant}; use types::{ChainSpec, EthSpec, Unsigned}; @@ -53,127 +50,12 @@ const CACHE_FACTOR: u64 = 2; #[derive(Debug, PartialEq, Clone)] pub enum EndpointError { RequestFailed(String), - WrongNetworkId, WrongChainId, FarBehind, } type EndpointState = Result<(), EndpointError>; -pub struct EndpointWithState { - client: HttpJsonRpc, - state: TRwLock>, -} - -impl EndpointWithState { - pub fn new(client: HttpJsonRpc) -> Self { - Self { - client, - state: TRwLock::new(None), - } - } -} - -async fn reset_endpoint_state(endpoint: &EndpointWithState) { - *endpoint.state.write().await = None; -} - -async fn get_state(endpoint: &EndpointWithState) -> Option { - endpoint.state.read().await.clone() -} - -/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is -/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint -/// is not usable. -pub struct EndpointsCache { - pub fallback: Fallback, - pub config_chain_id: Eth1Id, - pub log: Logger, -} - -impl EndpointsCache { - /// Checks the usability of an endpoint. Results get cached and therefore only the first call - /// for each endpoint does the real check. - async fn state(&self, endpoint: &EndpointWithState) -> EndpointState { - if let Some(result) = endpoint.state.read().await.clone() { - return result; - } - let mut value = endpoint.state.write().await; - if let Some(result) = value.clone() { - return result; - } - crate::metrics::inc_counter_vec( - &crate::metrics::ENDPOINT_REQUESTS, - &[&endpoint.client.to_string()], - ); - let state = endpoint_state(&endpoint.client, &self.config_chain_id, &self.log).await; - *value = Some(state.clone()); - if state.is_err() { - crate::metrics::inc_counter_vec( - &crate::metrics::ENDPOINT_ERRORS, - &[&endpoint.client.to_string()], - ); - crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0); - } else { - crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 1); - } - state - } - - /// Return the first successful result along with number of previous errors encountered - /// or all the errors encountered if every none of the fallback endpoints return required output. - pub async fn first_success<'a, F, O, R>( - &'a self, - func: F, - ) -> Result<(O, usize), FallbackError> - where - F: Fn(&'a HttpJsonRpc) -> R, - R: Future>, - { - let func = &func; - self.fallback - .first_success(|endpoint| async move { - match self.state(endpoint).await { - Ok(()) => { - let endpoint_str = &endpoint.client.to_string(); - crate::metrics::inc_counter_vec( - &crate::metrics::ENDPOINT_REQUESTS, - &[endpoint_str], - ); - match func(&endpoint.client).await { - Ok(t) => Ok(t), - Err(t) => { - crate::metrics::inc_counter_vec( - &crate::metrics::ENDPOINT_ERRORS, - &[endpoint_str], - ); - if let SingleEndpointError::EndpointError(e) = &t { - *endpoint.state.write().await = Some(Err(e.clone())); - } else { - // A non-`EndpointError` error occurred, so reset the state. - reset_endpoint_state(endpoint).await; - } - Err(t) - } - } - } - Err(e) => Err(SingleEndpointError::EndpointError(e)), - } - }) - .await - } - - pub async fn reset_errorred_endpoints(&self) { - for endpoint in &self.fallback.servers { - if let Some(state) = get_state(endpoint).await { - if state.is_err() { - reset_endpoint_state(endpoint).await; - } - } - } - } -} - /// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and /// chain id. Otherwise it returns `Err`. async fn endpoint_state( @@ -186,7 +68,6 @@ async fn endpoint_state( log, "Error connecting to eth1 node endpoint"; "endpoint" => %endpoint, - "action" => "trying fallbacks" ); EndpointError::RequestFailed(e) }; @@ -202,7 +83,6 @@ async fn endpoint_state( log, "Remote execution node is not synced"; "endpoint" => %endpoint, - "action" => "trying fallbacks" ); return Err(EndpointError::FarBehind); } @@ -211,7 +91,6 @@ async fn endpoint_state( log, "Invalid execution chain ID. Please switch to correct chain ID on endpoint"; "endpoint" => %endpoint, - "action" => "trying fallbacks", "expected" => ?config_chain_id, "received" => ?chain_id, ); @@ -240,7 +119,7 @@ async fn get_remote_head_and_new_block_ranges( Option>, Option>, ), - SingleEndpointError, + Error, > { let remote_head_block = download_eth1_block(endpoint, service.inner.clone(), None).await?; let now = SystemTime::now() @@ -253,18 +132,16 @@ async fn get_remote_head_and_new_block_ranges( "Execution endpoint is not synced"; "endpoint" => %endpoint, "last_seen_block_unix_timestamp" => remote_head_block.timestamp, - "action" => "trying fallback" ); - return Err(SingleEndpointError::EndpointError(EndpointError::FarBehind)); + return Err(Error::EndpointError(EndpointError::FarBehind)); } let handle_remote_not_synced = |e| { - if let SingleEndpointError::RemoteNotSynced { .. } = e { + if let Error::RemoteNotSynced { .. } = e { warn!( service.log, "Execution endpoint is not synced"; "endpoint" => %endpoint, - "action" => "trying fallbacks" ); } e @@ -296,16 +173,25 @@ async fn relevant_new_block_numbers_from_endpoint( endpoint: &HttpJsonRpc, service: &Service, head_type: HeadType, -) -> Result>, SingleEndpointError> { +) -> Result>, Error> { let remote_highest_block = endpoint .get_block_number(Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS)) - .map_err(SingleEndpointError::GetBlockNumberFailed) + .map_err(Error::GetBlockNumberFailed) .await?; service.relevant_new_block_numbers(remote_highest_block, None, head_type) } #[derive(Debug, PartialEq)] -pub enum SingleEndpointError { +pub enum Error { + /// There was an inconsistency when adding a block to the cache. + FailedToInsertEth1Block(BlockCacheError), + /// There was an inconsistency when adding a deposit to the cache. + FailedToInsertDeposit(DepositCacheError), + /// A log downloaded from the eth1 contract was not well formed. + FailedToParseDepositLog { + block_range: Range, + error: String, + }, /// Endpoint is currently not functional. EndpointError(EndpointError), /// The remote node is less synced that we expect, it is not useful until has done more @@ -325,21 +211,6 @@ pub enum SingleEndpointError { GetDepositCountFailed(String), /// Failed to read the deposit contract root from the eth1 node. GetDepositLogsFailed(String), -} - -#[derive(Debug, PartialEq)] -pub enum Error { - /// There was an inconsistency when adding a block to the cache. - FailedToInsertEth1Block(BlockCacheError), - /// There was an inconsistency when adding a deposit to the cache. - FailedToInsertDeposit(DepositCacheError), - /// A log downloaded from the eth1 contract was not well formed. - FailedToParseDepositLog { - block_range: Range, - error: String, - }, - /// All possible endpoints returned a `SingleEndpointError`. - FallbackError(FallbackError), /// There was an unexpected internal error. Internal(String), } @@ -367,21 +238,14 @@ pub enum Eth1Endpoint { jwt_id: Option, jwt_version: Option, }, - NoAuth(Vec), + NoAuth(SensitiveUrl), } impl Eth1Endpoint { - fn len(&self) -> usize { + pub fn get_endpoint(&self) -> SensitiveUrl { match &self { - Self::Auth { .. } => 1, - Self::NoAuth(urls) => urls.len(), - } - } - - pub fn get_endpoints(&self) -> Vec { - match &self { - Self::Auth { endpoint, .. } => vec![endpoint.clone()], - Self::NoAuth(endpoints) => endpoints.clone(), + Self::Auth { endpoint, .. } => endpoint.clone(), + Self::NoAuth(endpoint) => endpoint.clone(), } } } @@ -389,7 +253,7 @@ impl Eth1Endpoint { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { /// An Eth1 node (e.g., Geth) running a HTTP JSON-RPC endpoint. - pub endpoints: Eth1Endpoint, + pub endpoint: Eth1Endpoint, /// The address the `BlockCache` and `DepositCache` should assume is the canonical deposit contract. pub deposit_contract_address: String, /// The eth1 chain id where the deposit contract is deployed (Goerli/Mainnet). @@ -466,8 +330,10 @@ impl Config { impl Default for Config { fn default() -> Self { Self { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(DEFAULT_ETH1_ENDPOINT) - .expect("The default Eth1 endpoint must always be a valid URL.")]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(DEFAULT_ETH1_ENDPOINT) + .expect("The default Eth1 endpoint must always be a valid URL."), + ), deposit_contract_address: "0x0000000000000000000000000000000000000000".into(), chain_id: DEFAULT_CHAIN_ID, deposit_contract_deploy_block: 1, @@ -485,6 +351,24 @@ impl Default for Config { } } +pub fn endpoint_from_config(config: &Config) -> Result { + match config.endpoint.clone() { + Eth1Endpoint::Auth { + endpoint, + jwt_path, + jwt_id, + jwt_version, + } => { + let auth = Auth::new_with_path(jwt_path, jwt_id, jwt_version) + .map_err(|e| format!("Failed to initialize jwt auth: {:?}", e))?; + HttpJsonRpc::new_with_auth(endpoint, auth) + .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) + } + Eth1Endpoint::NoAuth(endpoint) => HttpJsonRpc::new(endpoint) + .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)), + } +} + /// Provides a set of Eth1 caches and async functions to update them. /// /// Stores the following caches: @@ -499,20 +383,24 @@ pub struct Service { impl Service { /// Creates a new service. Does not attempt to connect to the eth1 node. - pub fn new(config: Config, log: Logger, spec: ChainSpec) -> Self { - Self { + pub fn new(config: Config, log: Logger, spec: ChainSpec) -> Result { + Ok(Self { inner: Arc::new(Inner { block_cache: <_>::default(), deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), - endpoints_cache: RwLock::new(None), + endpoint: endpoint_from_config(&config)?, remote_head_block: RwLock::new(None), config: RwLock::new(config), spec, }), log, - } + }) + } + + pub fn client(&self) -> &HttpJsonRpc { + &self.inner.endpoint } /// Returns the follow distance that has been shortened to accommodate for differences in the @@ -676,52 +564,6 @@ impl Service { self.inner.config.write().lowest_cached_block_number = block_number; } - /// Builds a new `EndpointsCache` with empty states. - pub fn init_endpoints(&self) -> Result, String> { - let endpoints = self.config().endpoints.clone(); - let config_chain_id = self.config().chain_id.clone(); - - let servers = match endpoints { - Eth1Endpoint::Auth { - jwt_path, - endpoint, - jwt_id, - jwt_version, - } => { - let auth = Auth::new_with_path(jwt_path, jwt_id, jwt_version) - .map_err(|e| format!("Failed to initialize jwt auth: {:?}", e))?; - vec![HttpJsonRpc::new_with_auth(endpoint, auth) - .map_err(|e| format!("Failed to build auth enabled json rpc {:?}", e))?] - } - Eth1Endpoint::NoAuth(urls) => urls - .into_iter() - .map(|url| { - HttpJsonRpc::new(url).map_err(|e| format!("Failed to build json rpc {:?}", e)) - }) - .collect::>()?, - }; - let new_cache = Arc::new(EndpointsCache { - fallback: Fallback::new(servers.into_iter().map(EndpointWithState::new).collect()), - config_chain_id, - log: self.log.clone(), - }); - - let mut endpoints_cache = self.inner.endpoints_cache.write(); - *endpoints_cache = Some(new_cache.clone()); - Ok(new_cache) - } - - /// Returns the cached `EndpointsCache` if it exists or builds a new one. - pub fn get_endpoints(&self) -> Result, String> { - let endpoints_cache = self.inner.endpoints_cache.read(); - if let Some(cache) = endpoints_cache.clone() { - Ok(cache) - } else { - drop(endpoints_cache); - self.init_endpoints() - } - } - /// Update the deposit and block cache, returning an error if either fail. /// /// ## Returns @@ -733,56 +575,28 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { - let endpoints = self.get_endpoints()?; - - // Reset the state of any endpoints which have errored so their state can be redetermined. - endpoints.reset_errorred_endpoints().await; - + let client = self.client(); + let log = self.log.clone(); + let chain_id = self.config().chain_id.clone(); let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds; - let process_single_err = |e: &FallbackError| { - match e { - FallbackError::AllErrored(errors) => { - if errors - .iter() - .all(|error| matches!(error, SingleEndpointError::EndpointError(_))) - { - error!( - self.log, - "No synced execution endpoint"; - "advice" => "ensure you have an execution node configured via \ - --execution-endpoint or if pre-merge, --eth1-endpoints" - ); - } - } + match endpoint_state(client, &chain_id, &log).await { + Ok(()) => crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 1), + Err(e) => { + crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0); + return Err(format!("Invalid endpoint state: {:?}", e)); } - endpoints.fallback.map_format_error(|s| &s.client, e) - }; - - let process_err = |e: Error| match &e { - Error::FallbackError(f) => process_single_err(f), - e => format!("{:?}", e), - }; - - let ( - (remote_head_block, new_block_numbers_deposit, new_block_numbers_block_cache), - num_errors, - ) = endpoints - .first_success(|e| async move { - get_remote_head_and_new_block_ranges(e, self, node_far_behind_seconds).await - }) - .await - .map_err(|e| format!("{:?}", process_single_err(&e)))?; - - if num_errors > 0 { - info!(self.log, "Fetched data from fallback"; "fallback_number" => num_errors); } + let (remote_head_block, new_block_numbers_deposit, new_block_numbers_block_cache) = + get_remote_head_and_new_block_ranges(client, self, node_far_behind_seconds) + .await + .map_err(|e| format!("Failed to get remote head and new block ranges: {:?}", e))?; *self.inner.remote_head_block.write() = Some(remote_head_block); let update_deposit_cache = async { let outcome_result = self - .update_deposit_cache(Some(new_block_numbers_deposit), &endpoints) + .update_deposit_cache(Some(new_block_numbers_deposit)) .await; // Reset the `last_procesed block` to the last valid deposit's block number. @@ -804,8 +618,8 @@ impl Service { deposit_cache.last_processed_block = deposit_cache.cache.latest_block_number(); } - let outcome = outcome_result - .map_err(|e| format!("Failed to update deposit cache: {:?}", process_err(e)))?; + let outcome = + outcome_result.map_err(|e| format!("Failed to update deposit cache: {:?}", e))?; trace!( self.log, @@ -819,14 +633,9 @@ impl Service { let update_block_cache = async { let outcome = self - .update_block_cache(Some(new_block_numbers_block_cache), &endpoints) + .update_block_cache(Some(new_block_numbers_block_cache)) .await - .map_err(|e| { - format!( - "Failed to update deposit contract block cache: {:?}", - process_err(e) - ) - })?; + .map_err(|e| format!("Failed to update deposit contract block cache: {:?}", e))?; trace!( self.log, @@ -858,7 +667,6 @@ impl Service { let mut interval = interval_at(Instant::now(), update_interval); - let num_fallbacks = self.config().endpoints.len() - 1; let update_future = async move { loop { interval.tick().await; @@ -866,15 +674,6 @@ impl Service { } }; - // Set the number of configured eth1 servers - metrics::set_gauge(&metrics::ETH1_FALLBACK_CONFIGURED, num_fallbacks as i64); - // Since we lazily update eth1 fallbacks, it's not possible to know connection status of fallback. - // Hence, we set it to 1 if we have atleast one configured fallback. - if num_fallbacks > 0 { - metrics::set_gauge(&metrics::ETH1_FALLBACK_CONNECTED, 1); - } else { - metrics::set_gauge(&metrics::ETH1_FALLBACK_CONNECTED, 0); - } handle.spawn(update_future, "eth1"); } @@ -904,7 +703,7 @@ impl Service { remote_highest_block_number: u64, remote_highest_block_timestamp: Option, head_type: HeadType, - ) -> Result>, SingleEndpointError> { + ) -> Result>, Error> { let follow_distance = self.cache_follow_distance(); let latest_cached_block = self.latest_cached_block(); let next_required_block = match head_type { @@ -948,8 +747,8 @@ impl Service { pub async fn update_deposit_cache( &self, new_block_numbers: Option>>, - endpoints: &EndpointsCache, ) -> Result { + let client = self.client(); let deposit_contract_address = self.config().deposit_contract_address.clone(); let blocks_per_log_query = self.config().blocks_per_log_query; @@ -961,13 +760,10 @@ impl Service { let range = { match new_block_numbers { Some(range) => range, - None => endpoints - .first_success(|e| async move { - relevant_new_block_numbers_from_endpoint(e, self, HeadType::Deposit).await - }) - .await - .map(|(res, _)| res) - .map_err(Error::FallbackError)?, + None => { + relevant_new_block_numbers_from_endpoint(client, self, HeadType::Deposit) + .await? + } } }; @@ -1001,20 +797,14 @@ impl Service { * Step 1. Download logs. */ let block_range_ref = &block_range; - let logs = endpoints - .first_success(|endpoint| async move { - endpoint - .get_deposit_logs_in_range( - deposit_contract_address_ref, - block_range_ref.clone(), - Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), - ) - .await - .map_err(SingleEndpointError::GetDepositLogsFailed) - }) + let logs = client + .get_deposit_logs_in_range( + deposit_contract_address_ref, + block_range_ref.clone(), + Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), + ) .await - .map(|(res, _)| res) - .map_err(Error::FallbackError)?; + .map_err(Error::GetDepositLogsFailed)?; /* * Step 2. Import logs to cache. @@ -1050,7 +840,7 @@ impl Service { logs_imported += 1; } - Ok(()) + Ok::<_, Error>(()) })?; debug!( @@ -1105,8 +895,8 @@ impl Service { pub async fn update_block_cache( &self, new_block_numbers: Option>>, - endpoints: &EndpointsCache, ) -> Result { + let client = self.client(); let block_cache_truncation = self.config().block_cache_truncation; let max_blocks_per_update = self .config() @@ -1116,14 +906,10 @@ impl Service { let range = { match new_block_numbers { Some(range) => range, - None => endpoints - .first_success(|e| async move { - relevant_new_block_numbers_from_endpoint(e, self, HeadType::BlockCache) - .await - }) - .await - .map(|(res, _)| res) - .map_err(Error::FallbackError)?, + None => { + relevant_new_block_numbers_from_endpoint(client, self, HeadType::BlockCache) + .await? + } } }; @@ -1183,13 +969,8 @@ impl Service { let mut blocks_imported = 0; for block_number in required_block_numbers { - let eth1_block = endpoints - .first_success(|e| async move { - download_eth1_block(e, self.inner.clone(), Some(block_number)).await - }) - .await - .map(|(res, _)| res) - .map_err(Error::FallbackError)?; + let eth1_block = + download_eth1_block(client, self.inner.clone(), Some(block_number)).await?; self.inner .block_cache @@ -1269,7 +1050,7 @@ fn relevant_block_range( cache_follow_distance: u64, latest_cached_block: Option<&Eth1Block>, spec: &ChainSpec, -) -> Result>, SingleEndpointError> { +) -> Result>, Error> { // If the latest cached block is lagging the head block by more than `cache_follow_distance` // times the expected block time then the eth1 block time is likely quite different from what we // assumed. @@ -1304,7 +1085,7 @@ fn relevant_block_range( // // We assume that the `cache_follow_distance` should be sufficient to ensure this never // happens, otherwise it is an error. - Err(SingleEndpointError::RemoteNotSynced { + Err(Error::RemoteNotSynced { next_required_block, remote_highest_block: remote_highest_block_number, cache_follow_distance, @@ -1325,7 +1106,7 @@ async fn download_eth1_block( endpoint: &HttpJsonRpc, cache: Arc, block_number_opt: Option, -) -> Result { +) -> Result { let deposit_root = block_number_opt.and_then(|block_number| { cache .deposit_cache @@ -1350,7 +1131,7 @@ async fn download_eth1_block( .unwrap_or_else(|| BlockQuery::Latest), Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS), ) - .map_err(SingleEndpointError::BlockDownloadFailed) + .map_err(Error::BlockDownloadFailed) .await?; Ok(Eth1Block { diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index f7f3b6e70..9f81f91e1 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -117,10 +117,9 @@ mod eth1_cache { let initial_block_number = get_block_number(&web3).await; let config = Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: initial_block_number, follow_distance, @@ -128,7 +127,8 @@ mod eth1_cache { }; let cache_follow_distance = config.cache_follow_distance(); - let service = Service::new(config, log.clone(), MainnetEthSpec::default_spec()); + let service = + Service::new(config, log.clone(), MainnetEthSpec::default_spec()).unwrap(); // Create some blocks and then consume them, performing the test `rounds` times. for round in 0..2 { @@ -149,19 +149,17 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block"); } - let endpoints = service.init_endpoints().unwrap(); - service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should update deposit cache"); service - .update_block_cache(None, &endpoints) + .update_block_cache(None) .await .expect("should update block cache"); service - .update_block_cache(None, &endpoints) + .update_block_cache(None) .await .expect("should update cache when nothing has changed"); @@ -201,10 +199,9 @@ mod eth1_cache { let service = Service::new( Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, @@ -213,7 +210,8 @@ mod eth1_cache { }, log, MainnetEthSpec::default_spec(), - ); + ) + .unwrap(); let blocks = cache_len * 2; @@ -221,14 +219,12 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } - let endpoints = service.init_endpoints().unwrap(); - service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should update deposit cache"); service - .update_block_cache(None, &endpoints) + .update_block_cache(None) .await .expect("should update block cache"); @@ -258,10 +254,9 @@ mod eth1_cache { let service = Service::new( Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, @@ -270,19 +265,19 @@ mod eth1_cache { }, log, MainnetEthSpec::default_spec(), - ); + ) + .unwrap(); for _ in 0..4u8 { for _ in 0..cache_len / 2 { eth1.ganache.evm_mine().await.expect("should mine block") } - let endpoints = service.init_endpoints().unwrap(); service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should update deposit cache"); service - .update_block_cache(None, &endpoints) + .update_block_cache(None) .await .expect("should update block cache"); } @@ -311,10 +306,9 @@ mod eth1_cache { let service = Service::new( Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, @@ -322,21 +316,21 @@ mod eth1_cache { }, log, MainnetEthSpec::default_spec(), - ); + ) + .unwrap(); for _ in 0..n { eth1.ganache.evm_mine().await.expect("should mine block") } - let endpoints = service.init_endpoints().unwrap(); futures::try_join!( - service.update_deposit_cache(None, &endpoints), - service.update_deposit_cache(None, &endpoints) + service.update_deposit_cache(None), + service.update_deposit_cache(None) ) .expect("should perform two simultaneous updates of deposit cache"); futures::try_join!( - service.update_block_cache(None, &endpoints), - service.update_block_cache(None, &endpoints) + service.update_block_cache(None), + service.update_block_cache(None) ) .expect("should perform two simultaneous updates of block cache"); @@ -366,10 +360,9 @@ mod deposit_tree { let service = Service::new( Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: start_block, follow_distance: 0, @@ -377,7 +370,8 @@ mod deposit_tree { }, log, MainnetEthSpec::default_spec(), - ); + ) + .unwrap(); for round in 0..3 { let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); @@ -389,15 +383,13 @@ mod deposit_tree { .expect("should perform a deposit"); } - let endpoints = service.init_endpoints().unwrap(); - service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should perform update"); service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should perform update when nothing has changed"); @@ -449,10 +441,9 @@ mod deposit_tree { let service = Service::new( Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: start_block, lowest_cached_block_number: start_block, @@ -461,7 +452,8 @@ mod deposit_tree { }, log, MainnetEthSpec::default_spec(), - ); + ) + .unwrap(); let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); @@ -472,10 +464,9 @@ mod deposit_tree { .expect("should perform a deposit"); } - let endpoints = service.init_endpoints().unwrap(); futures::try_join!( - service.update_deposit_cache(None, &endpoints), - service.update_deposit_cache(None, &endpoints) + service.update_deposit_cache(None), + service.update_deposit_cache(None) ) .expect("should perform two updates concurrently"); @@ -706,10 +697,9 @@ mod fast { let now = get_block_number(&web3).await; let service = Service::new( Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: now, lowest_cached_block_number: now, @@ -719,7 +709,8 @@ mod fast { }, log, MainnetEthSpec::default_spec(), - ); + ) + .unwrap(); let client = HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap()).unwrap(); let n = 10; let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); @@ -732,9 +723,8 @@ mod fast { eth1.ganache.evm_mine().await.expect("should mine block"); } - let endpoints = service.init_endpoints().unwrap(); service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should perform update"); @@ -787,10 +777,9 @@ mod persist { let now = get_block_number(&web3).await; let config = Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: now, lowest_cached_block_number: now, @@ -798,7 +787,8 @@ mod persist { block_cache_truncation: None, ..Config::default() }; - let service = Service::new(config.clone(), log.clone(), MainnetEthSpec::default_spec()); + let service = + Service::new(config.clone(), log.clone(), MainnetEthSpec::default_spec()).unwrap(); let n = 10; let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); for deposit in &deposits { @@ -808,9 +798,8 @@ mod persist { .expect("should perform a deposit"); } - let endpoints = service.init_endpoints().unwrap(); service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .expect("should perform update"); @@ -822,7 +811,7 @@ mod persist { let deposit_count = service.deposit_cache_len(); service - .update_block_cache(None, &endpoints) + .update_block_cache(None) .await .expect("should perform update"); @@ -855,228 +844,3 @@ mod persist { .await; } } - -/// Tests for eth1 fallback -mod fallbacks { - use super::*; - use tokio::time::sleep; - - #[tokio::test] - async fn test_fallback_when_offline() { - async { - let log = null_logger(); - let endpoint2 = new_ganache_instance() - .await - .expect("should start eth1 environment"); - let deposit_contract = &endpoint2.deposit_contract; - - let initial_block_number = get_block_number(&endpoint2.web3()).await; - - // Create some blocks and then consume them, performing the test `rounds` times. - let new_blocks = 4; - - for _ in 0..new_blocks { - endpoint2 - .ganache - .evm_mine() - .await - .expect("should mine block"); - } - - let endpoint1 = endpoint2 - .ganache - .fork() - .expect("should start eth1 environment"); - - //mine additional blocks on top of the original endpoint - for _ in 0..new_blocks { - endpoint2 - .ganache - .evm_mine() - .await - .expect("should mine block"); - } - - let service = Service::new( - Config { - endpoints: Eth1Endpoint::NoAuth(vec![ - SensitiveUrl::parse(endpoint1.endpoint().as_str()).unwrap(), - SensitiveUrl::parse(endpoint2.endpoint().as_str()).unwrap(), - ]), - deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: initial_block_number, - follow_distance: 0, - ..Config::default() - }, - log.clone(), - MainnetEthSpec::default_spec(), - ); - - let endpoint1_block_number = get_block_number(&endpoint1.web3).await; - //the first call will only query endpoint1 - service.update().await.expect("should update deposit cache"); - assert_eq!( - service.deposits().read().last_processed_block.unwrap(), - endpoint1_block_number - ); - - drop(endpoint1); - - let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; - assert!(endpoint1_block_number < endpoint2_block_number); - //endpoint1 is offline => query will import blocks from endpoint2 - service.update().await.expect("should update deposit cache"); - assert_eq!( - service.deposits().read().last_processed_block.unwrap(), - endpoint2_block_number - ); - } - .await; - } - - #[tokio::test] - async fn test_fallback_when_wrong_chain_id() { - async { - let log = null_logger(); - let correct_chain_id: u64 = DEFAULT_CHAIN_ID.into(); - let wrong_chain_id = correct_chain_id + 1; - let endpoint1 = GanacheEth1Instance::new(wrong_chain_id) - .await - .expect("should start eth1 environment"); - let endpoint2 = new_ganache_instance() - .await - .expect("should start eth1 environment"); - let deposit_contract = &endpoint2.deposit_contract; - - let initial_block_number = get_block_number(&endpoint2.web3()).await; - - // Create some blocks and then consume them, performing the test `rounds` times. - let new_blocks = 4; - - for _ in 0..new_blocks { - endpoint1 - .ganache - .evm_mine() - .await - .expect("should mine block"); - endpoint2 - .ganache - .evm_mine() - .await - .expect("should mine block"); - } - - //additional blocks for endpoint1 to be able to distinguish - for _ in 0..new_blocks { - endpoint1 - .ganache - .evm_mine() - .await - .expect("should mine block"); - } - - let service = Service::new( - Config { - endpoints: Eth1Endpoint::NoAuth(vec![ - SensitiveUrl::parse(endpoint2.endpoint().as_str()).unwrap(), - SensitiveUrl::parse(endpoint1.endpoint().as_str()).unwrap(), - ]), - deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: initial_block_number, - follow_distance: 0, - ..Config::default() - }, - log.clone(), - MainnetEthSpec::default_spec(), - ); - - let endpoint1_block_number = get_block_number(&endpoint1.web3()).await; - let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; - assert!(endpoint2_block_number < endpoint1_block_number); - //the call will fallback to endpoint2 - service.update().await.expect("should update deposit cache"); - assert_eq!( - service.deposits().read().last_processed_block.unwrap(), - endpoint2_block_number - ); - } - .await; - } - - #[tokio::test] - async fn test_fallback_when_node_far_behind() { - async { - let log = null_logger(); - let endpoint2 = new_ganache_instance() - .await - .expect("should start eth1 environment"); - let deposit_contract = &endpoint2.deposit_contract; - - let initial_block_number = get_block_number(&endpoint2.web3()).await; - - // Create some blocks and then consume them, performing the test `rounds` times. - let new_blocks = 4; - - for _ in 0..new_blocks { - endpoint2 - .ganache - .evm_mine() - .await - .expect("should mine block"); - } - - let endpoint1 = endpoint2 - .ganache - .fork() - .expect("should start eth1 environment"); - - let service = Service::new( - Config { - endpoints: Eth1Endpoint::NoAuth(vec![ - SensitiveUrl::parse(endpoint1.endpoint().as_str()).unwrap(), - SensitiveUrl::parse(endpoint2.endpoint().as_str()).unwrap(), - ]), - deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: initial_block_number, - follow_distance: 0, - node_far_behind_seconds: 5, - ..Config::default() - }, - log.clone(), - MainnetEthSpec::default_spec(), - ); - - let endpoint1_block_number = get_block_number(&endpoint1.web3).await; - //the first call will only query endpoint1 - service.update().await.expect("should update deposit cache"); - assert_eq!( - service.deposits().read().last_processed_block.unwrap(), - endpoint1_block_number - ); - - sleep(Duration::from_secs(7)).await; - - //both endpoints don't have recent blocks => should return error - assert!(service.update().await.is_err()); - - //produce some new blocks on endpoint2 - for _ in 0..new_blocks { - endpoint2 - .ganache - .evm_mine() - .await - .expect("should mine block"); - } - - let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; - - //endpoint1 is far behind + endpoint2 not => update will import blocks from endpoint2 - service.update().await.expect("should update deposit cache"); - assert_eq!( - service.deposits().read().last_processed_block.unwrap(), - endpoint2_block_number - ); - } - .await; - } -} diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index 089f79aa1..5614e237f 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -43,7 +43,7 @@ impl Eth1GenesisService { /// Creates a new service. Does not attempt to connect to the Eth1 node. /// /// Modifies the given `config` to make it more suitable to the task of listening to genesis. - pub fn new(config: Eth1Config, log: Logger, spec: ChainSpec) -> Self { + pub fn new(config: Eth1Config, log: Logger, spec: ChainSpec) -> Result { let config = Eth1Config { // Truncating the block cache makes searching for genesis more // complicated. @@ -64,15 +64,16 @@ impl Eth1GenesisService { ..config }; - Self { - eth1_service: Eth1Service::new(config, log, spec), + Ok(Self { + eth1_service: Eth1Service::new(config, log, spec) + .map_err(|e| format!("Failed to create eth1 service: {:?}", e))?, stats: Arc::new(Statistics { highest_processed_block: AtomicU64::new(0), active_validator_count: AtomicUsize::new(0), total_deposit_count: AtomicUsize::new(0), latest_timestamp: AtomicU64::new(0), }), - } + }) } /// Returns the first eth1 block that has enough deposits that it's a (potentially invalid) @@ -112,11 +113,9 @@ impl Eth1GenesisService { "Importing eth1 deposit logs"; ); - let endpoints = eth1_service.init_endpoints()?; - loop { let update_result = eth1_service - .update_deposit_cache(None, &endpoints) + .update_deposit_cache(None) .await .map_err(|e| format!("{:?}", e)); @@ -158,7 +157,7 @@ impl Eth1GenesisService { } // Download new eth1 blocks into the cache. - let blocks_imported = match eth1_service.update_block_cache(None, &endpoints).await { + let blocks_imported = match eth1_service.update_block_cache(None).await { Ok(outcome) => { debug!( log, diff --git a/beacon_node/genesis/tests/tests.rs b/beacon_node/genesis/tests/tests.rs index 74a054fcc..58f28702b 100644 --- a/beacon_node/genesis/tests/tests.rs +++ b/beacon_node/genesis/tests/tests.rs @@ -44,10 +44,9 @@ fn basic() { let service = Eth1GenesisService::new( Eth1Config { - endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse( - eth1.endpoint().as_str(), - ) - .unwrap()]), + endpoint: Eth1Endpoint::NoAuth( + SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(), + ), deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: now, lowest_cached_block_number: now, @@ -57,7 +56,8 @@ fn basic() { }, log, spec.clone(), - ); + ) + .unwrap(); // NOTE: this test is sensitive to the response speed of the external web3 server. If // you're experiencing failures, try increasing the update_interval. diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs index a0dbf40b2..eaf91ce9d 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/tests/common.rs @@ -131,7 +131,8 @@ pub async fn create_api_server_on_port( pm.inject_connection_established(&peer_id, &con_id, &connected_point, None, 0); *network_globals.sync_state.write() = SyncState::Synced; - let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); + let eth1_service = + eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap(); let context = Arc::new(Context { config: Config { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9b5f65622..51e8762f1 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -372,9 +372,9 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("eth1-endpoints") .value_name("HTTP-ENDPOINTS") .conflicts_with("eth1-endpoint") - .help("One or more comma-delimited server endpoints for web3 connection. \ - If multiple endpoints are given the endpoints are used as fallback in the \ - given order. Also enables the --eth1 flag. \ + .help("One http endpoint for a web3 connection to an execution node. \ + Note: This flag is now only useful for testing, use `--execution-endpoint` \ + flag to connect to an execution node on mainnet and testnets. Defaults to http://127.0.0.1:8545.") .takes_value(true) ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 982cb82ed..f1d0fb35a 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -230,17 +230,14 @@ pub fn get_config( ); client_config.sync_eth1_chain = true; - let endpoints = vec![SensitiveUrl::parse(endpoint) - .map_err(|e| format!("eth1-endpoint was an invalid URL: {:?}", e))?]; - client_config.eth1.endpoints = Eth1Endpoint::NoAuth(endpoints); - } else if let Some(endpoints) = cli_args.value_of("eth1-endpoints") { + let endpoint = SensitiveUrl::parse(endpoint) + .map_err(|e| format!("eth1-endpoint was an invalid URL: {:?}", e))?; + client_config.eth1.endpoint = Eth1Endpoint::NoAuth(endpoint); + } else if let Some(endpoint) = cli_args.value_of("eth1-endpoints") { client_config.sync_eth1_chain = true; - let endpoints = endpoints - .split(',') - .map(SensitiveUrl::parse) - .collect::>() + let endpoint = SensitiveUrl::parse(endpoint) .map_err(|e| format!("eth1-endpoints contains an invalid URL {:?}", e))?; - client_config.eth1.endpoints = Eth1Endpoint::NoAuth(endpoints); + client_config.eth1.endpoint = Eth1Endpoint::NoAuth(endpoint); } if let Some(val) = cli_args.value_of("eth1-blocks-per-log-query") { @@ -326,7 +323,7 @@ pub fn get_config( --eth1-endpoints has been deprecated for post-merge configurations" ); } - client_config.eth1.endpoints = Eth1Endpoint::Auth { + client_config.eth1.endpoint = Eth1Endpoint::Auth { endpoint: execution_endpoint, jwt_path: secret_file, jwt_id: el_config.jwt_id.clone(), diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 690271022..9fd688220 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -117,7 +117,7 @@ impl ProductionBeaconNode { info!( log, "Block production enabled"; - "endpoints" => format!("{:?}", &client_config.eth1.endpoints), + "endpoint" => format!("{:?}", &client_config.eth1.endpoint), "method" => "json rpc via http" ); builder diff --git a/common/fallback/Cargo.toml b/common/fallback/Cargo.toml deleted file mode 100644 index 0d71bbbd2..000000000 --- a/common/fallback/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "fallback" -version = "0.1.0" -authors = ["blacktemplar "] -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -itertools = "0.10.0" diff --git a/common/fallback/src/lib.rs b/common/fallback/src/lib.rs deleted file mode 100644 index 70f327d20..000000000 --- a/common/fallback/src/lib.rs +++ /dev/null @@ -1,63 +0,0 @@ -use itertools::{join, zip}; -use std::fmt::{Debug, Display}; -use std::future::Future; - -#[derive(Clone)] -pub struct Fallback { - pub servers: Vec, -} - -#[derive(Debug, PartialEq)] -pub enum FallbackError { - AllErrored(Vec), -} - -impl Fallback { - pub fn new(servers: Vec) -> Self { - Self { servers } - } - - /// Return the first successful result along with number of previous errors encountered - /// or all the errors encountered if every server fails. - pub async fn first_success<'a, F, O, E, R>( - &'a self, - func: F, - ) -> Result<(O, usize), FallbackError> - where - F: Fn(&'a T) -> R, - R: Future>, - { - let mut errors = vec![]; - for server in &self.servers { - match func(server).await { - Ok(val) => return Ok((val, errors.len())), - Err(e) => errors.push(e), - } - } - Err(FallbackError::AllErrored(errors)) - } - - pub fn map_format_error<'a, E, F, S>(&'a self, f: F, error: &FallbackError) -> String - where - F: FnMut(&'a T) -> &'a S, - S: Display + 'a, - E: Debug, - { - match error { - FallbackError::AllErrored(v) => format!( - "All fallbacks errored: {}", - join( - zip(self.servers.iter().map(f), v.iter()) - .map(|(server, error)| format!("{} => {:?}", server, error)), - ", " - ) - ), - } - } -} - -impl Fallback { - pub fn format_error(&self, error: &FallbackError) -> String { - self.map_format_error(|s| s, error) - } -} diff --git a/common/monitoring_api/src/gather.rs b/common/monitoring_api/src/gather.rs index 8699a8cf2..b59a6dfb8 100644 --- a/common/monitoring_api/src/gather.rs +++ b/common/monitoring_api/src/gather.rs @@ -43,6 +43,16 @@ impl JsonMetric { } } } + + /// Return a default json value given given the metric type. + fn get_typed_value_default(&self) -> serde_json::Value { + match self.ty { + JsonType::Integer => json!(0), + JsonType::Boolean => { + json!(false) + } + } + } } /// The required metrics for the beacon and validator processes. @@ -155,6 +165,16 @@ pub fn gather_metrics(metrics_map: &HashMap) -> Option( .value_of("eth1-endpoint") .map(|e| { warn!("The --eth1-endpoint flag is deprecated. Please use --eth1-endpoints instead"); - vec![String::from(e)] + String::from(e) }) - .or_else(|| { - matches - .value_of("eth1-endpoints") - .map(|s| s.split(',').map(String::from).collect()) - }); + .or_else(|| matches.value_of("eth1-endpoints").map(String::from)); let mut eth2_network_config = Eth2NetworkConfig::load(testnet_dir.clone())?; @@ -35,12 +31,9 @@ pub fn run( let mut config = Eth1Config::default(); if let Some(v) = endpoints.clone() { - let endpoints = v - .iter() - .map(|s| SensitiveUrl::parse(s)) - .collect::>() + let endpoint = SensitiveUrl::parse(&v) .map_err(|e| format!("Unable to parse eth1 endpoint URL: {:?}", e))?; - config.endpoints = Eth1Endpoint::NoAuth(endpoints); + config.endpoint = Eth1Endpoint::NoAuth(endpoint); } config.deposit_contract_address = format!("{:?}", spec.deposit_contract_address); config.deposit_contract_deploy_block = eth2_network_config.deposit_contract_deploy_block; @@ -49,7 +42,7 @@ pub fn run( config.node_far_behind_seconds = max(5, config.follow_distance) * spec.seconds_per_eth1_block; let genesis_service = - Eth1GenesisService::new(config, env.core_context().log().clone(), spec.clone()); + Eth1GenesisService::new(config, env.core_context().log().clone(), spec.clone())?; env.runtime().block_on(async { let _ = genesis_service diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 661bbcdb0..288d18c1f 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -68,7 +68,7 @@ fn staking_flag() { assert!(config.http_api.enabled); assert!(config.sync_eth1_chain); assert_eq!( - config.eth1.endpoints.get_endpoints()[0].to_string(), + config.eth1.endpoint.get_endpoint().to_string(), DEFAULT_ETH1_ENDPOINT ); }); @@ -293,28 +293,17 @@ fn eth1_flag() { #[test] fn eth1_endpoints_flag() { CommandLineTest::new() - .flag( - "eth1-endpoints", - Some("http://localhost:9545,https://infura.io/secret"), - ) + .flag("eth1-endpoints", Some("http://localhost:9545")) .run_with_zero_port() .with_config(|config| { assert_eq!( - config.eth1.endpoints.get_endpoints()[0].full.to_string(), + config.eth1.endpoint.get_endpoint().full.to_string(), "http://localhost:9545/" ); assert_eq!( - config.eth1.endpoints.get_endpoints()[0].to_string(), + config.eth1.endpoint.get_endpoint().to_string(), "http://localhost:9545/" ); - assert_eq!( - config.eth1.endpoints.get_endpoints()[1].full.to_string(), - "https://infura.io/secret" - ); - assert_eq!( - config.eth1.endpoints.get_endpoints()[1].to_string(), - "https://infura.io/" - ); assert!(config.sync_eth1_chain); }); } @@ -429,7 +418,7 @@ fn run_execution_endpoints_overrides_eth1_endpoints_test(eth1_flag: &str, execut // The eth1 endpoint should have been set to the --execution-endpoint value in defiance // of --eth1-endpoints. assert_eq!( - config.eth1.endpoints, + config.eth1.endpoint, Eth1Endpoint::Auth { endpoint: SensitiveUrl::parse(execution_endpoint).unwrap(), jwt_path: jwt_path.clone(), @@ -624,7 +613,7 @@ fn run_jwt_optional_flags_test(jwt_flag: &str, jwt_id_flag: &str, jwt_version_fl assert_eq!(el_config.jwt_id, Some(id.to_string())); assert_eq!(el_config.jwt_version, Some(version.to_string())); assert_eq!( - config.eth1.endpoints, + config.eth1.endpoint, Eth1Endpoint::Auth { endpoint: SensitiveUrl::parse(execution_endpoint).unwrap(), jwt_path: dir.path().join(jwt_file), diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 5e346d546..182a66b49 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -1,4 +1,4 @@ -use crate::local_network::{EXECUTION_PORT, INVALID_ADDRESS, TERMINAL_BLOCK, TERMINAL_DIFFICULTY}; +use crate::local_network::{EXECUTION_PORT, TERMINAL_BLOCK, TERMINAL_DIFFICULTY}; use crate::{checks, LocalNetwork, E}; use clap::ArgMatches; use eth1::{Eth1Endpoint, DEFAULT_CHAIN_ID}; @@ -138,7 +138,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let mut beacon_config = testing_client_config(); beacon_config.genesis = ClientGenesis::DepositContract; - beacon_config.eth1.endpoints = Eth1Endpoint::NoAuth(vec![eth1_endpoint]); + beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint); beacon_config.eth1.deposit_contract_address = deposit_contract_address; beacon_config.eth1.deposit_contract_deploy_block = 0; beacon_config.eth1.lowest_cached_block_number = 0; @@ -173,18 +173,8 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { /* * One by one, add beacon nodes to the network. */ - for i in 0..node_count - 1 { - let mut config = beacon_config.clone(); - if i % 2 == 0 { - if let Eth1Endpoint::NoAuth(endpoints) = &mut config.eth1.endpoints { - endpoints.insert( - 0, - SensitiveUrl::parse(INVALID_ADDRESS) - .expect("Unable to parse invalid address"), - ) - } - } - network.add_beacon_node(config).await?; + for _ in 0..node_count - 1 { + network.add_beacon_node(beacon_config.clone()).await?; } /*