diff --git a/Cargo.lock b/Cargo.lock index 0e02e4091..b6f4e42c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1835,6 +1835,7 @@ dependencies = [ "eth2_hashing", "eth2_ssz", "eth2_ssz_derive", + "fallback", "futures 0.3.8", "hex", "lazy_static", @@ -2152,6 +2153,13 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "fallback" +version = "0.1.0" +dependencies = [ + "itertools 0.9.0", +] + [[package]] name = "fallible-iterator" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index f08276104..3d2973261 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ members = [ "common/test_random_derive", "common/validator_dir", "common/warp_utils", + "common/fallback", "consensus/cached_tree_hash", "consensus/int_to_bytes", diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 723b099f6..bfb481547 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -206,7 +206,7 @@ where info!( context.log(), "Waiting for eth2 genesis from eth1"; - "eth1_endpoint" => &config.eth1.endpoint, + "eth1_endpoints" => format!("{:?}", &config.eth1.endpoints), "contract_deploy_block" => config.eth1.deposit_contract_deploy_block, "deposit_contract" => &config.eth1.deposit_contract_address ); diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index bb7d01670..26eb42f25 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -32,3 +32,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics"} lazy_static = "1.4.0" task_executor = { path = "../../common/task_executor" } eth2 = { path = "../../common/eth2" } +fallback = { path = "../../common/fallback" } diff --git a/beacon_node/eth1/src/lib.rs b/beacon_node/eth1/src/lib.rs index a7aba85a2..cf724201a 100644 --- a/beacon_node/eth1/src/lib.rs +++ b/beacon_node/eth1/src/lib.rs @@ -14,5 +14,6 @@ pub use deposit_cache::DepositCache; pub use deposit_log::DepositLog; pub use inner::SszEth1Cache; pub use service::{ - BlockCacheUpdateOutcome, Config, DepositCacheUpdateOutcome, Error, Service, DEFAULT_NETWORK_ID, + BlockCacheUpdateOutcome, Config, DepositCacheUpdateOutcome, Error, Service, DEFAULT_CHAIN_ID, + DEFAULT_NETWORK_ID, }; diff --git a/beacon_node/eth1/src/metrics.rs b/beacon_node/eth1/src/metrics.rs index 1d3381f91..bbf2f6d83 100644 --- a/beacon_node/eth1/src/metrics.rs +++ b/beacon_node/eth1/src/metrics.rs @@ -16,4 +16,14 @@ lazy_static! { try_create_int_gauge("eth1_deposit_cache_len", "Number of deposits in the eth1 cache"); pub static ref HIGHEST_PROCESSED_DEPOSIT_BLOCK: Result = try_create_int_gauge("eth1_highest_processed_deposit_block", "Number of the last block checked for deposits"); + + /* + * Eth1 endpoint errors + */ + pub static ref ENDPOINT_ERRORS: Result = try_create_int_counter_vec( + "eth1_endpoint_errors", "The number of eth1 request errors for each endpoint", &["endpoint"] + ); + pub static ref ENDPOINT_REQUESTS: Result = try_create_int_counter_vec( + "eth1_endpoint_requests", "The number of eth1 requests for each endpoint", &["endpoint"] + ); } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 0bc5e9432..235db9b4b 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -8,13 +8,17 @@ use crate::{ }, inner::{DepositUpdater, Inner}, }; +use fallback::{Fallback, FallbackError}; use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt}; use parking_lot::{RwLock, RwLockReadGuard}; use serde::{Deserialize, Serialize}; -use slog::{crit, debug, error, info, trace, Logger}; +use slog::{crit, debug, error, info, trace, warn, Logger}; +use std::fmt::Debug; +use std::future::Future; use std::ops::{Range, RangeInclusive}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock as TRwLock; use tokio::time::{interval_at, Duration, Instant}; use types::{ChainSpec, EthSpec, Unsigned}; @@ -37,8 +41,224 @@ const WARNING_MSG: &str = "BLOCK PROPOSALS WILL FAIL WITHOUT VALID, SYNCED ETH1 /// A factor used to reduce the eth1 follow distance to account for discrepancies in the block time. const ETH1_BLOCK_TIME_TOLERANCE_FACTOR: u64 = 4; +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum EndpointError { + NotReachable, + WrongNetworkId, + WrongChainId, + FarBehind, +} + +type EndpointState = Result<(), EndpointError>; + +type EndpointWithState = (String, TRwLock>); + +/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is +/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint +/// is not usable. +pub struct EndpointsCache { + pub fallback: Fallback, + pub config_network_id: Eth1Id, + pub config_chain_id: Eth1Id, + pub log: Logger, +} + +impl EndpointsCache { + /// Checks the usability of an endpoint. Results get cached and therefore only the first call + /// for each endpoint does the real check. + async fn state(&self, endpoint: &EndpointWithState) -> EndpointState { + if let Some(result) = *endpoint.1.read().await { + return result; + } + let mut value = endpoint.1.write().await; + if let Some(result) = *value { + return result; + } + crate::metrics::inc_counter_vec(&crate::metrics::ENDPOINT_REQUESTS, &[&endpoint.0]); + let state = endpoint_state( + &endpoint.0, + &self.config_network_id, + &self.config_chain_id, + &self.log, + ) + .await; + *value = Some(state); + if state.is_err() { + crate::metrics::inc_counter_vec(&crate::metrics::ENDPOINT_ERRORS, &[&endpoint.0]); + } + state + } + + pub async fn first_success<'a, F, O, R>( + &'a self, + func: F, + ) -> Result> + where + F: Fn(&'a str) -> R, + R: Future>, + { + let func = &func; + self.fallback + .first_success(|endpoint| async move { + match self.state(endpoint).await { + Ok(()) => { + let endpoint_str = &endpoint.0; + crate::metrics::inc_counter_vec( + &crate::metrics::ENDPOINT_REQUESTS, + &[endpoint_str], + ); + match func(&endpoint.0).await { + Ok(t) => Ok(t), + Err(t) => { + crate::metrics::inc_counter_vec( + &crate::metrics::ENDPOINT_ERRORS, + &[endpoint_str], + ); + if let SingleEndpointError::EndpointError(e) = &t { + *endpoint.1.write().await = Some(Err(*e)); + } + Err(t) + } + } + } + Err(e) => Err(SingleEndpointError::EndpointError(e)), + } + }) + .await + } +} + +/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and +/// chain id. Otherwise it returns `Err`. +async fn endpoint_state( + endpoint: &str, + config_network_id: &Eth1Id, + config_chain_id: &Eth1Id, + log: &Logger, +) -> EndpointState { + let error_connecting = |_| { + warn!( + log, + "Error connecting to eth1 node. Trying fallback ..."; + "endpoint" => endpoint, + ); + EndpointError::NotReachable + }; + let network_id = get_network_id(endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS)) + .await + .map_err(error_connecting)?; + if &network_id != config_network_id { + warn!( + log, + "Invalid eth1 network id. Please switch to correct network id. Trying \ + fallback ..."; + "endpoint" => endpoint, + "expected" => format!("{:?}",config_network_id), + "received" => format!("{:?}",network_id), + ); + return Err(EndpointError::WrongNetworkId); + } + let chain_id = get_chain_id(endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS)) + .await + .map_err(error_connecting)?; + // Eth1 nodes return chain_id = 0 if the node is not synced + // Handle the special case + if chain_id == Eth1Id::Custom(0) { + warn!( + log, + "Remote eth1 node is not synced"; + "endpoint" => endpoint, + ); + return Err(EndpointError::FarBehind); + } + if &chain_id != config_chain_id { + warn!( + log, + "Invalid eth1 chain id. Please switch to correct chain id. Trying \ + fallback ..."; + "endpoint" => endpoint, + "expected" => format!("{:?}",config_chain_id), + "received" => format!("{:?}", chain_id), + ); + Err(EndpointError::WrongChainId) + } else { + Ok(()) + } +} + +/// Enum for the two internal (maybe different) cached heads for cached deposits and for the block +/// cache. +pub enum HeadType { + Deposit, + BlockCache, +} + +/// Returns the head block and the new block ranges relevant for deposits and the block cache +/// from the given endpoint. +async fn get_remote_head_and_new_block_ranges( + endpoint: &str, + service: &Service, + node_far_behind_seconds: u64, +) -> Result< + ( + Eth1Block, + Option>, + Option>, + ), + SingleEndpointError, +> { + let remote_head_block = download_eth1_block(endpoint, service.inner.clone(), None).await?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(u64::MAX); + if remote_head_block.timestamp + node_far_behind_seconds < now { + warn!( + service.log, + "Eth1 endpoint is far behind. Trying fallback ..."; + "endpoint" => endpoint, + "last_seen_block_unix_timestamp" => remote_head_block.timestamp + ); + return Err(SingleEndpointError::EndpointError(EndpointError::FarBehind)); + } + + let handle_remote_not_synced = |e| { + if let SingleEndpointError::RemoteNotSynced { .. } = e { + warn!(service.log, "Eth1 node not synced. Trying fallback..."; "endpoint" => endpoint); + } + e + }; + let new_deposit_block_numbers = service + .relevant_new_block_numbers(remote_head_block.number, HeadType::Deposit) + .map_err(handle_remote_not_synced)?; + let new_block_cache_numbers = service + .relevant_new_block_numbers(remote_head_block.number, HeadType::BlockCache) + .map_err(handle_remote_not_synced)?; + Ok(( + remote_head_block, + new_deposit_block_numbers, + new_block_cache_numbers, + )) +} + +/// Returns the range of new block numbers to be considered for the given head type from the given +/// endpoint. +async fn relevant_new_block_numbers_from_endpoint( + endpoint: &str, + service: &Service, + head_type: HeadType, +) -> Result>, SingleEndpointError> { + let remote_highest_block = + get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS)) + .map_err(SingleEndpointError::GetBlockNumberFailed) + .await?; + service.relevant_new_block_numbers(remote_highest_block, head_type) +} + #[derive(Debug, PartialEq)] -pub enum Error { +pub enum SingleEndpointError { + /// Endpoint is currently not functional. + EndpointError(EndpointError), /// The remote node is less synced that we expect, it is not useful until has done more /// syncing. RemoteNotSynced { @@ -56,6 +276,10 @@ pub enum Error { GetDepositCountFailed(String), /// Failed to read the deposit contract root from the eth1 node. GetDepositLogsFailed(String), +} + +#[derive(Debug, PartialEq)] +pub enum Error { /// There was an inconsistency when adding a block to the cache. FailedToInsertEth1Block(BlockCacheError), /// There was an inconsistency when adding a deposit to the cache. @@ -65,6 +289,8 @@ pub enum Error { block_range: Range, error: String, }, + /// All possible endpoints returned a `SingleEndpointError`. + FallbackError(FallbackError), /// There was an unexpected internal error. Internal(String), } @@ -85,7 +311,7 @@ pub struct DepositCacheUpdateOutcome { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { /// An Eth1 node (e.g., Geth) running a HTTP JSON-RPC endpoint. - pub endpoint: String, + pub endpoints: Vec, /// The address the `BlockCache` and `DepositCache` should assume is the canonical deposit contract. pub deposit_contract_address: String, /// The eth1 network id where the deposit contract is deployed (Goerli/Mainnet). @@ -103,6 +329,9 @@ pub struct Config { /// /// Note: this should be less than or equal to the specification's `ETH1_FOLLOW_DISTANCE`. pub follow_distance: u64, + /// Specifies the seconds when we consider the head of a node far behind. + /// This should be less than `ETH1_FOLLOW_DISTANCE * SECONDS_PER_ETH1_BLOCK`. + pub node_far_behind_seconds: u64, /// Defines the number of blocks that should be retained each time the `BlockCache` calls truncate on /// itself. pub block_cache_truncation: Option, @@ -144,13 +373,14 @@ impl Config { impl Default for Config { fn default() -> Self { Self { - endpoint: "http://localhost:8545".into(), + endpoints: vec!["http://localhost:8545".into()], deposit_contract_address: "0x0000000000000000000000000000000000000000".into(), network_id: DEFAULT_NETWORK_ID, chain_id: DEFAULT_CHAIN_ID, deposit_contract_deploy_block: 1, lowest_cached_block_number: 1, follow_distance: 128, + node_far_behind_seconds: 128 * 14, block_cache_truncation: Some(4_096), auto_update_interval_millis: 7_000, blocks_per_log_query: 1_000, @@ -351,6 +581,23 @@ impl Service { self.inner.config.write().lowest_cached_block_number = block_number; } + pub fn init_endpoints(&self) -> EndpointsCache { + let endpoints = self.config().endpoints.clone(); + let config_network_id = self.config().network_id.clone(); + let config_chain_id = self.config().chain_id.clone(); + EndpointsCache { + fallback: Fallback::new( + endpoints + .into_iter() + .map(|s| (s, TRwLock::new(None))) + .collect(), + ), + config_network_id, + config_chain_id, + log: self.log.clone(), + } + } + /// Update the deposit and block cache, returning an error if either fail. /// /// ## Returns @@ -362,18 +609,57 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { - let remote_head_block = download_eth1_block(self.inner.clone(), None) - .map_err(|e| format!("Failed to update Eth1 service: {:?}", e)) - .await?; - let remote_head_block_number = Some(remote_head_block.number); + let endpoints = self.init_endpoints(); + let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds; + + let process_single_err = |e: &FallbackError| { + match e { + FallbackError::AllErrored(errors) => { + if errors + .iter() + .all(|error| matches!(error, SingleEndpointError::EndpointError(_))) + { + crit!( + self.log, + "Couldn't connect to any eth1 node. Please ensure that you have an \ + eth1 http server running locally on http://localhost:8545 or specify \ + one or more (remote) endpoints using \ + `--eth1-endpoints `. \ + Also ensure that `eth` and `net` apis are enabled on the eth1 http \ + server"; + "warning" => WARNING_MSG + ); + } + } + } + endpoints.fallback.map_format_error(|s| &s.0, &e) + }; + + let process_err = |e: Error| match &e { + Error::FallbackError(f) => process_single_err(f), + e => format!("{:?}", e), + }; + + let (remote_head_block, new_block_numbers_deposit, new_block_numbers_block_cache) = + endpoints + .first_success(|e| async move { + get_remote_head_and_new_block_ranges(e, &self, node_far_behind_seconds).await + }) + .await + .map_err(|e| { + format!( + "Failed to update Eth1 service: {:?}", + process_single_err(&e) + ) + })?; *self.inner.remote_head_block.write() = Some(remote_head_block); let update_deposit_cache = async { let outcome = self - .update_deposit_cache(remote_head_block_number) + .update_deposit_cache(Some(new_block_numbers_deposit), &endpoints) .await - .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; + .map_err(|e| format!("Failed to update eth1 cache: {:?}", process_err(e)))?; trace!( self.log, @@ -387,9 +673,9 @@ impl Service { let update_block_cache = async { let outcome = self - .update_block_cache(remote_head_block_number) + .update_block_cache(Some(new_block_numbers_block_cache), &endpoints) .await - .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; + .map_err(|e| format!("Failed to update eth1 cache: {:?}", process_err(e)))?; trace!( self.log, @@ -431,57 +717,6 @@ impl Service { } async fn do_update(&self, update_interval: Duration) -> Result<(), ()> { - let endpoint = self.config().endpoint.clone(); - let config_network_id = self.config().network_id.clone(); - let config_chain_id = self.config().chain_id.clone(); - let network_id_result = - get_network_id(&endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS)).await; - let chain_id_result = - get_chain_id(&endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS)).await; - match (network_id_result, chain_id_result) { - (Ok(network_id), Ok(chain_id)) => { - if network_id != config_network_id { - crit!( - self.log, - "Invalid eth1 network id. Please switch to correct network id"; - "expected" => format!("{:?}",config_network_id), - "received" => format!("{:?}",network_id), - "warning" => WARNING_MSG, - ); - return Ok(()); - } - // Eth1 nodes return chain_id = 0 if the node is not synced - // Handle the special case - if chain_id == Eth1Id::Custom(0) { - crit!( - self.log, - "Remote eth1 node is not synced"; - "warning" => WARNING_MSG, - ); - return Ok(()); - } - if chain_id != config_chain_id { - crit!( - self.log, - "Invalid eth1 chain id. Please switch to correct chain id"; - "expected" => format!("{:?}",config_chain_id), - "received" => format!("{:?}", chain_id), - "warning" => WARNING_MSG, - ); - return Ok(()); - } - } - _ => { - crit!( - self.log, - "Error connecting to eth1 node. Please ensure that you have an eth1 http server running locally on http://localhost:8545 or \ - pass an external endpoint using `--eth1-endpoint `. Also ensure that `eth` and `net` apis are enabled on the eth1 http server"; - "warning" => WARNING_MSG, - ); - return Ok(()); - } - } - let update_result = self.update().await; match update_result { Err(e) => error!( @@ -501,6 +736,32 @@ impl Service { Ok(()) } + /// Returns the range of new block numbers to be considered for the given head type. + fn relevant_new_block_numbers( + &self, + remote_highest_block: u64, + head_type: HeadType, + ) -> Result>, SingleEndpointError> { + let follow_distance = self.reduced_follow_distance(); + let next_required_block = match head_type { + HeadType::Deposit => self + .deposits() + .read() + .last_processed_block + .map(|n| n + 1) + .unwrap_or_else(|| self.config().deposit_contract_deploy_block), + HeadType::BlockCache => self + .inner + .block_cache + .read() + .highest_block_number() + .map(|n| n + 1) + .unwrap_or_else(|| self.config().lowest_cached_block_number), + }; + + relevant_block_range(remote_highest_block, next_required_block, follow_distance) + } + /// Contacts the remote eth1 node and attempts to import deposit logs up to the configured /// follow-distance block. /// @@ -518,10 +779,9 @@ impl Service { /// Emits logs for debugging and errors. pub async fn update_deposit_cache( &self, - remote_highest_block_opt: Option, + new_block_numbers: Option>>, + endpoints: &EndpointsCache, ) -> Result { - let endpoint = self.config().endpoint.clone(); - let reduced_follow_distance = self.reduced_follow_distance(); let deposit_contract_address = self.config().deposit_contract_address.clone(); let blocks_per_log_query = self.config().blocks_per_log_query; @@ -530,20 +790,17 @@ impl Service { .max_log_requests_per_update .unwrap_or_else(usize::max_value); - let next_required_block = self - .deposits() - .read() - .last_processed_block - .map(|n| n + 1) - .unwrap_or_else(|| self.config().deposit_contract_deploy_block); - - let range = get_new_block_numbers( - &endpoint, - remote_highest_block_opt, - next_required_block, - reduced_follow_distance, - ) - .await?; + let range = { + match new_block_numbers { + Some(range) => range, + None => endpoints + .first_success(|e| async move { + relevant_new_block_numbers_from_endpoint(e, &self, HeadType::Deposit).await + }) + .await + .map_err(Error::FallbackError)?, + } + }; let block_number_chunks = if let Some(range) = range { range @@ -560,28 +817,32 @@ impl Service { Vec::new() }; + let deposit_contract_address_ref: &str = &deposit_contract_address; let logs: Vec<(Range, Vec)> = stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async { match chunks.next() { Some(chunk) => { - let chunk_1 = chunk.clone(); - match get_deposit_logs_in_range( - &endpoint, - &deposit_contract_address, - chunk, - Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), - ) - .await - { - Ok(logs) => Ok(Some(((chunk_1, logs), chunks))), - Err(e) => Err(Error::GetDepositLogsFailed(e)), - } + let chunk_ref = &chunk; + endpoints + .first_success(|e| async move { + get_deposit_logs_in_range( + e, + deposit_contract_address_ref, + chunk_ref.clone(), + Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), + ) + .await + .map_err(SingleEndpointError::GetDepositLogsFailed) + }) + .await + .map(|logs| Some(((chunk, logs), chunks))) } None => Ok(None), } }) .try_collect() - .await?; + .await + .map_err(Error::FallbackError)?; let mut logs_imported = 0; for (block_range, log_chunk) in logs.iter() { @@ -665,7 +926,8 @@ impl Service { /// Emits logs for debugging and errors. pub async fn update_block_cache( &self, - remote_highest_block_opt: Option, + new_block_numbers: Option>>, + endpoints: &EndpointsCache, ) -> Result { let block_cache_truncation = self.config().block_cache_truncation; let max_blocks_per_update = self @@ -673,24 +935,19 @@ impl Service { .max_blocks_per_update .unwrap_or_else(usize::max_value); - let next_required_block = self - .inner - .block_cache - .read() - .highest_block_number() - .map(|n| n + 1) - .unwrap_or_else(|| self.config().lowest_cached_block_number); + let range = { + match new_block_numbers { + Some(range) => range, + None => endpoints + .first_success(|e| async move { + relevant_new_block_numbers_from_endpoint(e, &self, HeadType::BlockCache) + .await + }) + .await + .map_err(Error::FallbackError)?, + } + }; - let endpoint = self.config().endpoint.clone(); - let reduced_follow_distance = self.reduced_follow_distance(); - - let range = get_new_block_numbers( - &endpoint, - remote_highest_block_opt, - next_required_block, - reduced_follow_distance, - ) - .await?; // Map the range of required blocks into a Vec. // // If the required range is larger than the size of the cache, drop the exiting cache @@ -741,7 +998,12 @@ impl Service { |mut block_numbers| async { match block_numbers.next() { Some(block_number) => { - match download_eth1_block(self.inner.clone(), Some(block_number)).await { + match endpoints + .first_success(|e| async move { + download_eth1_block(e, self.inner.clone(), Some(block_number)).await + }) + .await + { Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))), Err(e) => Err(e), } @@ -751,7 +1013,8 @@ impl Service { }, ) .try_collect() - .await?; + .await + .map_err(Error::FallbackError)?; let mut blocks_imported = 0; for eth1_block in eth1_blocks { @@ -822,21 +1085,15 @@ impl Service { } } -/// Determine the range of blocks that need to be downloaded, given the remotes best block and -/// the locally stored best block. -async fn get_new_block_numbers<'a>( - endpoint: &str, - remote_highest_block_opt: Option, +/// Returns the range of blocks starting from `next_required_block` that are at least +/// `follow_distance` many blocks before `remote_highest_block`. +/// Returns an error if `next_required_block > remote_highest_block + 1` which means the remote went +/// backwards. +fn relevant_block_range( + remote_highest_block: u64, next_required_block: u64, reduced_follow_distance: u64, -) -> Result>, Error> { - let remote_highest_block = if let Some(block_number) = remote_highest_block_opt { - block_number - } else { - get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS)) - .map_err(Error::GetBlockNumberFailed) - .await? - }; +) -> Result>, SingleEndpointError> { let remote_follow_block = remote_highest_block.saturating_sub(reduced_follow_distance); if next_required_block <= remote_follow_block { @@ -847,7 +1104,7 @@ async fn get_new_block_numbers<'a>( // // We assume that the `reduced_follow_distance` should be sufficient to ensure this never // happens, otherwise it is an error. - Err(Error::RemoteNotSynced { + Err(SingleEndpointError::RemoteNotSynced { next_required_block, remote_highest_block, reduced_follow_distance, @@ -865,11 +1122,10 @@ async fn get_new_block_numbers<'a>( /// /// Performs three async calls to an Eth1 HTTP JSON RPC endpoint. async fn download_eth1_block( + endpoint: &str, cache: Arc, block_number_opt: Option, -) -> Result { - let endpoint = cache.config.read().endpoint.clone(); - +) -> Result { let deposit_root = block_number_opt.and_then(|block_number| { cache .deposit_cache @@ -888,13 +1144,13 @@ async fn download_eth1_block( // Performs a `get_blockByNumber` call to an eth1 node. let http_block = get_block( - &endpoint, + endpoint, block_number_opt .map(BlockQuery::Number) .unwrap_or_else(|| BlockQuery::Latest), Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS), ) - .map_err(Error::BlockDownloadFailed) + .map_err(SingleEndpointError::BlockDownloadFailed) .await?; Ok(Eth1Block { diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index 399890213..5a90a1c43 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -1,8 +1,8 @@ #![cfg(test)] use environment::{Environment, EnvironmentBuilder}; use eth1::http::{get_deposit_count, get_deposit_logs_in_range, get_deposit_root, Block, Log}; -use eth1::DepositCache; use eth1::{Config, Service}; +use eth1::{DepositCache, DEFAULT_CHAIN_ID, DEFAULT_NETWORK_ID}; use eth1_test_rig::GanacheEth1Instance; use futures::compat::Future01CompatExt; use merkle_proof::verify_merkle_proof; @@ -97,6 +97,10 @@ async fn get_block_number(web3: &Web3) -> u64 { .expect("should get block number") } +async fn new_ganache_instance() -> Result { + GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), DEFAULT_CHAIN_ID.into()).await +} + mod eth1_cache { use super::*; use types::{EthSpec, MainnetEthSpec}; @@ -106,7 +110,7 @@ mod eth1_cache { let log = null_logger(); for follow_distance in 0..2 { - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -116,7 +120,7 @@ mod eth1_cache { let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: initial_block_number, follow_distance, @@ -145,17 +149,19 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block"); } + let endpoints = service.init_endpoints(); + service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should update deposit cache"); service - .update_block_cache(None) + .update_block_cache(None, &endpoints) .await .expect("should update block cache"); service - .update_block_cache(None) + .update_block_cache(None, &endpoints) .await .expect("should update cache when nothing has changed"); @@ -181,7 +187,7 @@ mod eth1_cache { async fn big_skip() { let log = null_logger(); - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -191,7 +197,7 @@ mod eth1_cache { let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, @@ -208,12 +214,14 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } + let endpoints = service.init_endpoints(); + service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should update deposit cache"); service - .update_block_cache(None) + .update_block_cache(None, &endpoints) .await .expect("should update block cache"); @@ -230,7 +238,7 @@ mod eth1_cache { async fn pruning() { let log = null_logger(); - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -240,7 +248,7 @@ mod eth1_cache { let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, @@ -255,12 +263,13 @@ mod eth1_cache { for _ in 0..cache_len / 2 { eth1.ganache.evm_mine().await.expect("should mine block") } + let endpoints = service.init_endpoints(); service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should update deposit cache"); service - .update_block_cache(None) + .update_block_cache(None, &endpoints) .await .expect("should update block cache"); } @@ -278,7 +287,7 @@ mod eth1_cache { let n = 16; - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -286,7 +295,7 @@ mod eth1_cache { let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, @@ -299,14 +308,16 @@ mod eth1_cache { for _ in 0..n { eth1.ganache.evm_mine().await.expect("should mine block") } + + let endpoints = service.init_endpoints(); futures::try_join!( - service.update_deposit_cache(None), - service.update_deposit_cache(None) + service.update_deposit_cache(None, &endpoints), + service.update_deposit_cache(None, &endpoints) ) .expect("should perform two simultaneous updates of deposit cache"); futures::try_join!( - service.update_block_cache(None), - service.update_block_cache(None) + service.update_block_cache(None, &endpoints), + service.update_block_cache(None, &endpoints) ) .expect("should perform two simultaneous updates of block cache"); @@ -323,7 +334,7 @@ mod deposit_tree { let n = 4; - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -333,7 +344,7 @@ mod deposit_tree { let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: start_block, follow_distance: 0, @@ -353,13 +364,15 @@ mod deposit_tree { .expect("should perform a deposit"); } + let endpoints = service.init_endpoints(); + service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should perform update"); service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should perform update when nothing has changed"); @@ -398,7 +411,7 @@ mod deposit_tree { let n = 8; - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -408,7 +421,7 @@ mod deposit_tree { let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: start_block, lowest_cached_block_number: start_block, @@ -428,9 +441,10 @@ mod deposit_tree { .expect("should perform a deposit"); } + let endpoints = service.init_endpoints(); futures::try_join!( - service.update_deposit_cache(None), - service.update_deposit_cache(None) + service.update_deposit_cache(None, &endpoints), + service.update_deposit_cache(None, &endpoints) ) .expect("should perform two updates concurrently"); @@ -445,7 +459,7 @@ mod deposit_tree { let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -552,7 +566,7 @@ mod http { #[tokio::test] async fn incrementing_deposits() { - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -644,7 +658,7 @@ mod fast { async fn deposit_cache_query() { let log = null_logger(); - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -653,7 +667,7 @@ mod fast { let now = get_block_number(&web3).await; let service = Service::new( Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: now, lowest_cached_block_number: now, @@ -675,8 +689,9 @@ mod fast { eth1.ganache.evm_mine().await.expect("should mine block"); } + let endpoints = service.init_endpoints(); service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should perform update"); @@ -717,7 +732,7 @@ mod persist { async fn test_persist_caches() { let log = null_logger(); - let eth1 = GanacheEth1Instance::new() + let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -725,7 +740,7 @@ mod persist { let now = get_block_number(&web3).await; let config = Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: now, lowest_cached_block_number: now, @@ -743,8 +758,9 @@ mod persist { .expect("should perform a deposit"); } + let endpoints = service.init_endpoints(); service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .expect("should perform update"); @@ -756,7 +772,7 @@ mod persist { let deposit_count = service.deposit_cache_len(); service - .update_block_cache(None) + .update_block_cache(None, &endpoints) .await .expect("should perform update"); @@ -786,3 +802,273 @@ mod persist { ); } } + +/// Tests for eth1 fallback +mod fallbacks { + use super::*; + use tokio::time::delay_for; + + #[tokio::test] + async fn test_fallback_when_offline() { + let log = null_logger(); + let endpoint2 = new_ganache_instance() + .await + .expect("should start eth1 environment"); + let deposit_contract = &endpoint2.deposit_contract; + + let initial_block_number = get_block_number(&endpoint2.web3()).await; + + // Create some blocks and then consume them, performing the test `rounds` times. + let new_blocks = 4; + + for _ in 0..new_blocks { + endpoint2 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + let endpoint1 = endpoint2 + .ganache + .fork() + .expect("should start eth1 environment"); + + //mine additional blocks on top of the original endpoint + for _ in 0..new_blocks { + endpoint2 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + let service = Service::new( + Config { + endpoints: vec![endpoint1.endpoint(), endpoint2.endpoint()], + deposit_contract_address: deposit_contract.address(), + lowest_cached_block_number: initial_block_number, + follow_distance: 0, + ..Config::default() + }, + log.clone(), + MainnetEthSpec::default_spec(), + ); + + let endpoint1_block_number = get_block_number(&endpoint1.web3).await; + //the first call will only query endpoint1 + service.update().await.expect("should update deposit cache"); + assert_eq!( + service.deposits().read().last_processed_block.unwrap(), + endpoint1_block_number + ); + + drop(endpoint1); + + let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; + assert!(endpoint1_block_number < endpoint2_block_number); + //endpoint1 is offline => query will import blocks from endpoint2 + service.update().await.expect("should update deposit cache"); + assert_eq!( + service.deposits().read().last_processed_block.unwrap(), + endpoint2_block_number + ); + } + + #[tokio::test] + async fn test_fallback_when_wrong_network_id() { + let log = null_logger(); + let correct_network_id: u64 = DEFAULT_NETWORK_ID.into(); + let wrong_network_id = correct_network_id + 1; + let endpoint1 = GanacheEth1Instance::new(wrong_network_id, DEFAULT_CHAIN_ID.into()) + .await + .expect("should start eth1 environment"); + let endpoint2 = new_ganache_instance() + .await + .expect("should start eth1 environment"); + let deposit_contract = &endpoint2.deposit_contract; + + let initial_block_number = get_block_number(&endpoint2.web3()).await; + + // Create some blocks and then consume them, performing the test `rounds` times. + let new_blocks = 4; + + for _ in 0..new_blocks { + endpoint1 + .ganache + .evm_mine() + .await + .expect("should mine block"); + endpoint2 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + //additional blocks for endpoint1 to be able to distinguish + for _ in 0..new_blocks { + endpoint1 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + let service = Service::new( + Config { + endpoints: vec![endpoint2.endpoint(), endpoint1.endpoint()], + deposit_contract_address: deposit_contract.address(), + lowest_cached_block_number: initial_block_number, + follow_distance: 0, + ..Config::default() + }, + log.clone(), + MainnetEthSpec::default_spec(), + ); + + let endpoint1_block_number = get_block_number(&endpoint1.web3()).await; + let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; + assert!(endpoint2_block_number < endpoint1_block_number); + //the call will fallback to endpoint2 + service.update().await.expect("should update deposit cache"); + assert_eq!( + service.deposits().read().last_processed_block.unwrap(), + endpoint2_block_number + ); + } + + #[tokio::test] + async fn test_fallback_when_wrong_chain_id() { + let log = null_logger(); + let correct_chain_id: u64 = DEFAULT_CHAIN_ID.into(); + let wrong_chain_id = correct_chain_id + 1; + let endpoint1 = GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), wrong_chain_id) + .await + .expect("should start eth1 environment"); + let endpoint2 = new_ganache_instance() + .await + .expect("should start eth1 environment"); + let deposit_contract = &endpoint2.deposit_contract; + + let initial_block_number = get_block_number(&endpoint2.web3()).await; + + // Create some blocks and then consume them, performing the test `rounds` times. + let new_blocks = 4; + + for _ in 0..new_blocks { + endpoint1 + .ganache + .evm_mine() + .await + .expect("should mine block"); + endpoint2 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + //additional blocks for endpoint1 to be able to distinguish + for _ in 0..new_blocks { + endpoint1 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + let service = Service::new( + Config { + endpoints: vec![endpoint2.endpoint(), endpoint1.endpoint()], + deposit_contract_address: deposit_contract.address(), + lowest_cached_block_number: initial_block_number, + follow_distance: 0, + ..Config::default() + }, + log.clone(), + MainnetEthSpec::default_spec(), + ); + + let endpoint1_block_number = get_block_number(&endpoint1.web3()).await; + let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; + assert!(endpoint2_block_number < endpoint1_block_number); + //the call will fallback to endpoint2 + service.update().await.expect("should update deposit cache"); + assert_eq!( + service.deposits().read().last_processed_block.unwrap(), + endpoint2_block_number + ); + } + + #[tokio::test] + async fn test_fallback_when_node_far_behind() { + let log = null_logger(); + let endpoint2 = new_ganache_instance() + .await + .expect("should start eth1 environment"); + let deposit_contract = &endpoint2.deposit_contract; + + let initial_block_number = get_block_number(&endpoint2.web3()).await; + + // Create some blocks and then consume them, performing the test `rounds` times. + let new_blocks = 4; + + for _ in 0..new_blocks { + endpoint2 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + let endpoint1 = endpoint2 + .ganache + .fork() + .expect("should start eth1 environment"); + + let service = Service::new( + Config { + endpoints: vec![endpoint1.endpoint(), endpoint2.endpoint()], + deposit_contract_address: deposit_contract.address(), + lowest_cached_block_number: initial_block_number, + follow_distance: 0, + node_far_behind_seconds: 5, + ..Config::default() + }, + log.clone(), + MainnetEthSpec::default_spec(), + ); + + let endpoint1_block_number = get_block_number(&endpoint1.web3).await; + //the first call will only query endpoint1 + service.update().await.expect("should update deposit cache"); + assert_eq!( + service.deposits().read().last_processed_block.unwrap(), + endpoint1_block_number + ); + + delay_for(Duration::from_secs(7)).await; + + //both endpoints don't have recent blocks => should return error + assert!(service.update().await.is_err()); + + //produce some new blocks on endpoint2 + for _ in 0..new_blocks { + endpoint2 + .ganache + .evm_mine() + .await + .expect("should mine block"); + } + + let endpoint2_block_number = get_block_number(&endpoint2.web3()).await; + + //endpoint1 is far behind + endpoint2 not => update will import blocks from endpoint2 + service.update().await.expect("should update deposit cache"); + assert_eq!( + service.deposits().read().last_processed_block.unwrap(), + endpoint2_block_number + ); + } +} diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index e16e577af..34bcd7dc8 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -112,9 +112,11 @@ impl Eth1GenesisService { "Importing eth1 deposit logs"; ); + let endpoints = eth1_service.init_endpoints(); + loop { let update_result = eth1_service - .update_deposit_cache(None) + .update_deposit_cache(None, &endpoints) .await .map_err(|e| format!("{:?}", e)); @@ -156,7 +158,7 @@ impl Eth1GenesisService { } // Download new eth1 blocks into the cache. - let blocks_imported = match eth1_service.update_block_cache(None).await { + let blocks_imported = match eth1_service.update_block_cache(None, &endpoints).await { Ok(outcome) => { debug!( log, diff --git a/beacon_node/genesis/tests/tests.rs b/beacon_node/genesis/tests/tests.rs index feab02e41..2b7e81412 100644 --- a/beacon_node/genesis/tests/tests.rs +++ b/beacon_node/genesis/tests/tests.rs @@ -4,6 +4,7 @@ //! dir in the root of the `lighthouse` repo. #![cfg(test)] use environment::{Environment, EnvironmentBuilder}; +use eth1::{DEFAULT_CHAIN_ID, DEFAULT_NETWORK_ID}; use eth1_test_rig::{DelayThenDeposit, GanacheEth1Instance}; use futures::compat::Future01CompatExt; use genesis::{Eth1Config, Eth1GenesisService}; @@ -28,7 +29,7 @@ fn basic() { let mut spec = env.eth2_config().spec.clone(); env.runtime().block_on(async { - let eth1 = GanacheEth1Instance::new() + let eth1 = GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), DEFAULT_CHAIN_ID.into()) .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; @@ -44,7 +45,7 @@ fn basic() { let service = Eth1GenesisService::new( Eth1Config { - endpoint: eth1.endpoint(), + endpoints: vec![eth1.endpoint()], deposit_contract_address: deposit_contract.address(), deposit_contract_deploy_block: now, lowest_cached_block_number: now, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 3a1361184..233135d0b 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -284,7 +284,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("eth1-endpoint") .long("eth1-endpoint") .value_name("HTTP-ENDPOINT") - .help("Specifies the server for a web3 connection to the Eth1 chain. Also enables the --eth1 flag. Defaults to http://127.0.0.1:8545.") + .help("Deprecated. Use --eth1-endpoints.") + .takes_value(true) + ) + .arg( + Arg::with_name("eth1-endpoints") + .long("eth1-endpoints") + .value_name("HTTP-ENDPOINTS") + .conflicts_with("eth1-endpoint") + .help("One or more comma-delimited server endpoints for web3 connection. \ + If multiple endpoints are given the endpoints are used as fallback in the \ + given order. Also enables the --eth1 flag. \ + Defaults to http://127.0.0.1:8545.") .takes_value(true) ) .arg( diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 830975799..367d43e10 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -7,6 +7,7 @@ use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr, NetworkConfig, PeerIdSeri use eth2_testnet_config::Eth2TestnetConfig; use slog::{info, warn, Logger}; use std::cmp; +use std::cmp::max; use std::fs; use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::net::{TcpListener, UdpSocket}; @@ -194,7 +195,10 @@ pub fn get_config( // Defines the URL to reach the eth1 node. if let Some(val) = cli_args.value_of("eth1-endpoint") { client_config.sync_eth1_chain = true; - client_config.eth1.endpoint = val.to_string(); + client_config.eth1.endpoints = vec![val.to_string()]; + } else if let Some(val) = cli_args.value_of("eth1-endpoints") { + client_config.sync_eth1_chain = true; + client_config.eth1.endpoints = val.split(',').map(String::from).collect(); } if let Some(val) = cli_args.value_of("eth1-blocks-per-log-query") { @@ -264,6 +268,8 @@ pub fn get_config( client_config.eth1.lowest_cached_block_number = client_config.eth1.deposit_contract_deploy_block; client_config.eth1.follow_distance = spec.eth1_follow_distance; + client_config.eth1.node_far_behind_seconds = + max(5, spec.eth1_follow_distance / 2) * spec.seconds_per_eth1_block; client_config.eth1.network_id = spec.deposit_network_id.into(); client_config.eth1.chain_id = spec.deposit_chain_id.into(); client_config.eth1.set_block_cache_truncation::(spec); diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index a468871a5..e9585dfd9 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -101,7 +101,7 @@ impl ProductionBeaconNode { info!( log, "Block production enabled"; - "endpoint" => &client_config.eth1.endpoint, + "endpoints" => format!("{:?}", &client_config.eth1.endpoints), "method" => "json rpc via http" ); builder diff --git a/common/fallback/Cargo.toml b/common/fallback/Cargo.toml new file mode 100644 index 000000000..dd0f98cd4 --- /dev/null +++ b/common/fallback/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "fallback" +version = "0.1.0" +authors = ["blacktemplar "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +itertools = "0.9.0" \ No newline at end of file diff --git a/common/fallback/src/lib.rs b/common/fallback/src/lib.rs new file mode 100644 index 000000000..1e3cb9cf0 --- /dev/null +++ b/common/fallback/src/lib.rs @@ -0,0 +1,58 @@ +use itertools::{join, zip}; +use std::fmt::{Debug, Display}; +use std::future::Future; + +pub struct Fallback { + servers: Vec, +} + +#[derive(Debug, PartialEq)] +pub enum FallbackError { + AllErrored(Vec), +} + +impl Fallback { + pub fn new(servers: Vec) -> Self { + Self { servers } + } + + /// Return the first successful result or all errors encountered. + pub async fn first_success<'a, F, O, E, R>(&'a self, func: F) -> Result> + where + F: Fn(&'a T) -> R, + R: Future>, + { + let mut errors = vec![]; + for server in &self.servers { + match func(server).await { + Ok(val) => return Ok(val), + Err(e) => errors.push(e), + } + } + Err(FallbackError::AllErrored(errors)) + } + + pub fn map_format_error<'a, E, F, S>(&'a self, f: F, error: &FallbackError) -> String + where + F: FnMut(&'a T) -> &'a S, + S: Display + 'a, + E: Debug, + { + match error { + FallbackError::AllErrored(v) => format!( + "All fallback errored: {}", + join( + zip(self.servers.iter().map(f), v.iter()) + .map(|(server, error)| format!("{} => {:?}", server, error)), + ", " + ) + ), + } + } +} + +impl Fallback { + pub fn format_error(&self, error: &FallbackError) -> String { + self.map_format_error(|s| s, error) + } +} diff --git a/lcli/src/eth1_genesis.rs b/lcli/src/eth1_genesis.rs index bd03cef5e..c16435f9d 100644 --- a/lcli/src/eth1_genesis.rs +++ b/lcli/src/eth1_genesis.rs @@ -3,6 +3,7 @@ use environment::Environment; use eth2_testnet_config::Eth2TestnetConfig; use genesis::{Eth1Config, Eth1GenesisService}; use ssz::Encode; +use std::cmp::max; use std::path::PathBuf; use std::time::Duration; use types::EthSpec; @@ -11,9 +12,14 @@ use types::EthSpec; pub const ETH1_GENESIS_UPDATE_INTERVAL: Duration = Duration::from_millis(7_000); pub fn run(mut env: Environment, matches: &ArgMatches<'_>) -> Result<(), String> { - let endpoint = matches + let endpoints = matches .value_of("eth1-endpoint") - .ok_or_else(|| "eth1-endpoint not specified")?; + .map(|e| vec![String::from(e)]) + .or_else(|| { + matches + .value_of("eth1-endpoints") + .map(|s| s.split(',').map(String::from).collect()) + }); let testnet_dir = matches .value_of("testnet-dir") @@ -40,11 +46,14 @@ pub fn run(mut env: Environment, matches: &ArgMatches<'_>) -> Res })?; let mut config = Eth1Config::default(); - config.endpoint = endpoint.to_string(); + if let Some(v) = endpoints.clone() { + config.endpoints = v; + } config.deposit_contract_address = format!("{:?}", spec.deposit_contract_address); config.deposit_contract_deploy_block = eth2_testnet_config.deposit_contract_deploy_block; config.lowest_cached_block_number = eth2_testnet_config.deposit_contract_deploy_block; config.follow_distance = spec.eth1_follow_distance / 2; + config.node_far_behind_seconds = max(5, config.follow_distance) * spec.seconds_per_eth1_block; let genesis_service = Eth1GenesisService::new(config, env.core_context().log().clone(), spec.clone()); @@ -60,7 +69,7 @@ pub fn run(mut env: Environment, matches: &ArgMatches<'_>) -> Res .map_err(|e| format!("Failed to find genesis: {}", e))?; info!("Starting service to produce genesis BeaconState from eth1"); - info!("Connecting to eth1 http endpoint: {}", endpoint); + info!("Connecting to eth1 http endpoints: {:?}", endpoints); Ok(()) }) diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 596ee3e89..8381d8be5 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -233,8 +233,17 @@ fn main() { .long("eth1-endpoint") .value_name("HTTP_SERVER") .takes_value(true) - .default_value("http://localhost:8545") - .help("The URL to the eth1 JSON-RPC http API."), + .help("Deprecated. Use --eth1-endpoints."), + ) + .arg( + Arg::with_name("eth1-endpoints") + .long("eth1-endpoints") + .value_name("HTTP_SERVER_LIST") + .takes_value(true) + .conflicts_with("eth1-endpoint") + .help("One or more comma-delimited URLs to eth1 JSON-RPC http APIs. \ + If multiple endpoints are given the endpoints are used as \ + fallback in the given order."), ) ) .subcommand( diff --git a/testing/eth1_test_rig/src/ganache.rs b/testing/eth1_test_rig/src/ganache.rs index d9f15f4b4..d044e5131 100644 --- a/testing/eth1_test_rig/src/ganache.rs +++ b/testing/eth1_test_rig/src/ganache.rs @@ -14,9 +14,6 @@ use web3::{ /// How long we will wait for ganache to indicate that it is ready. const GANACHE_STARTUP_TIMEOUT_MILLIS: u64 = 10_000; -const NETWORK_ID: u64 = 42; -const CHAIN_ID: u64 = 42; - /// Provides a dedicated `ganachi-cli` instance with a connected `Web3` instance. /// /// Requires that `ganachi-cli` is installed and available on `PATH`. @@ -25,39 +22,17 @@ pub struct GanacheInstance { child: Child, _event_loop: Arc, pub web3: Web3, + network_id: u64, + chain_id: u64, } impl GanacheInstance { - /// Start a new `ganache-cli` process, waiting until it indicates that it is ready to accept - /// RPC connections. - pub fn new() -> Result { - let port = unused_port()?; - - let mut child = Command::new("ganache-cli") - .stdout(Stdio::piped()) - .arg("--defaultBalanceEther") - .arg("1000000000") - .arg("--gasLimit") - .arg("1000000000") - .arg("--accounts") - .arg("10") - .arg("--port") - .arg(format!("{}", port)) - .arg("--mnemonic") - .arg("\"vast thought differ pull jewel broom cook wrist tribe word before omit\"") - .arg("--networkId") - .arg(format!("{}", NETWORK_ID)) - .arg("--chainId") - .arg(format!("{}", CHAIN_ID)) - .spawn() - .map_err(|e| { - format!( - "Failed to start ganache-cli. \ - Is it ganache-cli installed and available on $PATH? Error: {:?}", - e - ) - })?; - + fn new_from_child( + mut child: Child, + port: u16, + network_id: u64, + chain_id: u64, + ) -> Result { let stdout = child .stdout .ok_or_else(|| "Unable to get stdout for ganache child process")?; @@ -96,9 +71,67 @@ impl GanacheInstance { port, _event_loop: Arc::new(event_loop), web3, + network_id, + chain_id, }) } + /// Start a new `ganache-cli` process, waiting until it indicates that it is ready to accept + /// RPC connections. + pub fn new(network_id: u64, chain_id: u64) -> Result { + let port = unused_port()?; + + let child = Command::new("ganache-cli") + .stdout(Stdio::piped()) + .arg("--defaultBalanceEther") + .arg("1000000000") + .arg("--gasLimit") + .arg("1000000000") + .arg("--accounts") + .arg("10") + .arg("--port") + .arg(format!("{}", port)) + .arg("--mnemonic") + .arg("\"vast thought differ pull jewel broom cook wrist tribe word before omit\"") + .arg("--networkId") + .arg(format!("{}", network_id)) + .arg("--chainId") + .arg(format!("{}", chain_id)) + .spawn() + .map_err(|e| { + format!( + "Failed to start ganache-cli. \ + Is it ganache-cli installed and available on $PATH? Error: {:?}", + e + ) + })?; + + Self::new_from_child(child, port, network_id, chain_id) + } + + pub fn fork(&self) -> Result { + let port = unused_port()?; + + let child = Command::new("ganache-cli") + .stdout(Stdio::piped()) + .arg("--fork") + .arg(self.endpoint()) + .arg("--port") + .arg(format!("{}", port)) + .arg("--chainId") + .arg(format!("{}", self.chain_id)) + .spawn() + .map_err(|e| { + format!( + "Failed to start ganache-cli. \ + Is it ganache-cli installed and available on $PATH? Error: {:?}", + e + ) + })?; + + Self::new_from_child(child, port, self.network_id, self.chain_id) + } + /// Returns the endpoint that this instance is listening on. pub fn endpoint(&self) -> String { endpoint(self.port) @@ -106,12 +139,12 @@ impl GanacheInstance { /// Returns the network id of the ganache instance pub fn network_id(&self) -> u64 { - NETWORK_ID + self.network_id } /// Returns the chain id of the ganache instance pub fn chain_id(&self) -> u64 { - CHAIN_ID + self.chain_id } /// Increase the timestamp on future blocks by `increase_by` seconds. diff --git a/testing/eth1_test_rig/src/lib.rs b/testing/eth1_test_rig/src/lib.rs index 53715133b..45902f4df 100644 --- a/testing/eth1_test_rig/src/lib.rs +++ b/testing/eth1_test_rig/src/lib.rs @@ -31,8 +31,8 @@ pub struct GanacheEth1Instance { } impl GanacheEth1Instance { - pub async fn new() -> Result { - let ganache = GanacheInstance::new()?; + pub async fn new(network_id: u64, chain_id: u64) -> Result { + let ganache = GanacheInstance::new(network_id, chain_id)?; DepositContract::deploy(ganache.web3.clone(), 0, None) .await .map(|deposit_contract| Self { diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 33fb3a2b6..e9250c96b 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -1,6 +1,7 @@ use crate::{checks, LocalNetwork, E}; use clap::ArgMatches; use eth1::http::Eth1Id; +use eth1::{DEFAULT_CHAIN_ID, DEFAULT_NETWORK_ID}; use eth1_test_rig::GanacheEth1Instance; use futures::prelude::*; use node_test_rig::{ @@ -73,7 +74,8 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit * validators. */ - let ganache_eth1_instance = GanacheEth1Instance::new().await?; + let ganache_eth1_instance = + GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), DEFAULT_CHAIN_ID.into()).await?; let deposit_contract = ganache_eth1_instance.deposit_contract; let network_id = ganache_eth1_instance.ganache.network_id(); let chain_id = ganache_eth1_instance.ganache.chain_id(); @@ -102,15 +104,16 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let mut beacon_config = testing_client_config(); beacon_config.genesis = ClientGenesis::DepositContract; - beacon_config.eth1.endpoint = eth1_endpoint; + beacon_config.eth1.endpoints = vec![eth1_endpoint]; beacon_config.eth1.deposit_contract_address = deposit_contract_address; beacon_config.eth1.deposit_contract_deploy_block = 0; beacon_config.eth1.lowest_cached_block_number = 0; beacon_config.eth1.follow_distance = 1; + beacon_config.eth1.node_far_behind_seconds = 20; beacon_config.dummy_eth1_backend = false; beacon_config.sync_eth1_chain = true; - beacon_config.eth1.network_id = Eth1Id::Custom(network_id); - beacon_config.eth1.chain_id = Eth1Id::Custom(chain_id); + beacon_config.eth1.network_id = Eth1Id::from(network_id); + beacon_config.eth1.chain_id = Eth1Id::from(chain_id); beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));