Fallback nodes for eth1 access (#1918)
## Issue Addressed part of #1883 ## Proposed Changes Adds a new cli argument `--eth1-endpoints` that can be used instead of `--eth1-endpoint` to specify a comma-separated list of endpoints. If the first endpoint returns an error for some request the other endpoints are tried in the given order. ## Additional Info Currently if the first endpoint fails the fallbacks are used silently (except for `try_fallback_test_endpoint` that is used in `do_update` which logs a `WARN` for each endpoint that is not reachable). A question is if we should add more logs so that the user gets warned if his main endpoint is for example just slow and sometimes hits timeouts.
This commit is contained in:
parent
1312844f29
commit
38b15deccb
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -1835,6 +1835,7 @@ dependencies = [
|
||||
"eth2_hashing",
|
||||
"eth2_ssz",
|
||||
"eth2_ssz_derive",
|
||||
"fallback",
|
||||
"futures 0.3.8",
|
||||
"hex",
|
||||
"lazy_static",
|
||||
@ -2152,6 +2153,13 @@ version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
|
||||
|
||||
[[package]]
|
||||
name = "fallback"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"itertools 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fallible-iterator"
|
||||
version = "0.2.0"
|
||||
|
@ -39,6 +39,7 @@ members = [
|
||||
"common/test_random_derive",
|
||||
"common/validator_dir",
|
||||
"common/warp_utils",
|
||||
"common/fallback",
|
||||
|
||||
"consensus/cached_tree_hash",
|
||||
"consensus/int_to_bytes",
|
||||
|
@ -206,7 +206,7 @@ where
|
||||
info!(
|
||||
context.log(),
|
||||
"Waiting for eth2 genesis from eth1";
|
||||
"eth1_endpoint" => &config.eth1.endpoint,
|
||||
"eth1_endpoints" => format!("{:?}", &config.eth1.endpoints),
|
||||
"contract_deploy_block" => config.eth1.deposit_contract_deploy_block,
|
||||
"deposit_contract" => &config.eth1.deposit_contract_address
|
||||
);
|
||||
|
@ -32,3 +32,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics"}
|
||||
lazy_static = "1.4.0"
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
eth2 = { path = "../../common/eth2" }
|
||||
fallback = { path = "../../common/fallback" }
|
||||
|
@ -14,5 +14,6 @@ pub use deposit_cache::DepositCache;
|
||||
pub use deposit_log::DepositLog;
|
||||
pub use inner::SszEth1Cache;
|
||||
pub use service::{
|
||||
BlockCacheUpdateOutcome, Config, DepositCacheUpdateOutcome, Error, Service, DEFAULT_NETWORK_ID,
|
||||
BlockCacheUpdateOutcome, Config, DepositCacheUpdateOutcome, Error, Service, DEFAULT_CHAIN_ID,
|
||||
DEFAULT_NETWORK_ID,
|
||||
};
|
||||
|
@ -16,4 +16,14 @@ lazy_static! {
|
||||
try_create_int_gauge("eth1_deposit_cache_len", "Number of deposits in the eth1 cache");
|
||||
pub static ref HIGHEST_PROCESSED_DEPOSIT_BLOCK: Result<IntGauge> =
|
||||
try_create_int_gauge("eth1_highest_processed_deposit_block", "Number of the last block checked for deposits");
|
||||
|
||||
/*
|
||||
* Eth1 endpoint errors
|
||||
*/
|
||||
pub static ref ENDPOINT_ERRORS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"eth1_endpoint_errors", "The number of eth1 request errors for each endpoint", &["endpoint"]
|
||||
);
|
||||
pub static ref ENDPOINT_REQUESTS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"eth1_endpoint_requests", "The number of eth1 requests for each endpoint", &["endpoint"]
|
||||
);
|
||||
}
|
||||
|
@ -8,13 +8,17 @@ use crate::{
|
||||
},
|
||||
inner::{DepositUpdater, Inner},
|
||||
};
|
||||
use fallback::{Fallback, FallbackError};
|
||||
use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt};
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{crit, debug, error, info, trace, Logger};
|
||||
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||
use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::RwLock as TRwLock;
|
||||
use tokio::time::{interval_at, Duration, Instant};
|
||||
use types::{ChainSpec, EthSpec, Unsigned};
|
||||
|
||||
@ -37,8 +41,224 @@ const WARNING_MSG: &str = "BLOCK PROPOSALS WILL FAIL WITHOUT VALID, SYNCED ETH1
|
||||
/// A factor used to reduce the eth1 follow distance to account for discrepancies in the block time.
|
||||
const ETH1_BLOCK_TIME_TOLERANCE_FACTOR: u64 = 4;
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
pub enum EndpointError {
|
||||
NotReachable,
|
||||
WrongNetworkId,
|
||||
WrongChainId,
|
||||
FarBehind,
|
||||
}
|
||||
|
||||
type EndpointState = Result<(), EndpointError>;
|
||||
|
||||
type EndpointWithState = (String, TRwLock<Option<EndpointState>>);
|
||||
|
||||
/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
|
||||
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
|
||||
/// is not usable.
|
||||
pub struct EndpointsCache {
|
||||
pub fallback: Fallback<EndpointWithState>,
|
||||
pub config_network_id: Eth1Id,
|
||||
pub config_chain_id: Eth1Id,
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
impl EndpointsCache {
|
||||
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
|
||||
/// for each endpoint does the real check.
|
||||
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
|
||||
if let Some(result) = *endpoint.1.read().await {
|
||||
return result;
|
||||
}
|
||||
let mut value = endpoint.1.write().await;
|
||||
if let Some(result) = *value {
|
||||
return result;
|
||||
}
|
||||
crate::metrics::inc_counter_vec(&crate::metrics::ENDPOINT_REQUESTS, &[&endpoint.0]);
|
||||
let state = endpoint_state(
|
||||
&endpoint.0,
|
||||
&self.config_network_id,
|
||||
&self.config_chain_id,
|
||||
&self.log,
|
||||
)
|
||||
.await;
|
||||
*value = Some(state);
|
||||
if state.is_err() {
|
||||
crate::metrics::inc_counter_vec(&crate::metrics::ENDPOINT_ERRORS, &[&endpoint.0]);
|
||||
}
|
||||
state
|
||||
}
|
||||
|
||||
pub async fn first_success<'a, F, O, R>(
|
||||
&'a self,
|
||||
func: F,
|
||||
) -> Result<O, FallbackError<SingleEndpointError>>
|
||||
where
|
||||
F: Fn(&'a str) -> R,
|
||||
R: Future<Output = Result<O, SingleEndpointError>>,
|
||||
{
|
||||
let func = &func;
|
||||
self.fallback
|
||||
.first_success(|endpoint| async move {
|
||||
match self.state(endpoint).await {
|
||||
Ok(()) => {
|
||||
let endpoint_str = &endpoint.0;
|
||||
crate::metrics::inc_counter_vec(
|
||||
&crate::metrics::ENDPOINT_REQUESTS,
|
||||
&[endpoint_str],
|
||||
);
|
||||
match func(&endpoint.0).await {
|
||||
Ok(t) => Ok(t),
|
||||
Err(t) => {
|
||||
crate::metrics::inc_counter_vec(
|
||||
&crate::metrics::ENDPOINT_ERRORS,
|
||||
&[endpoint_str],
|
||||
);
|
||||
if let SingleEndpointError::EndpointError(e) = &t {
|
||||
*endpoint.1.write().await = Some(Err(*e));
|
||||
}
|
||||
Err(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => Err(SingleEndpointError::EndpointError(e)),
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
|
||||
/// chain id. Otherwise it returns `Err`.
|
||||
async fn endpoint_state(
|
||||
endpoint: &str,
|
||||
config_network_id: &Eth1Id,
|
||||
config_chain_id: &Eth1Id,
|
||||
log: &Logger,
|
||||
) -> EndpointState {
|
||||
let error_connecting = |_| {
|
||||
warn!(
|
||||
log,
|
||||
"Error connecting to eth1 node. Trying fallback ...";
|
||||
"endpoint" => endpoint,
|
||||
);
|
||||
EndpointError::NotReachable
|
||||
};
|
||||
let network_id = get_network_id(endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS))
|
||||
.await
|
||||
.map_err(error_connecting)?;
|
||||
if &network_id != config_network_id {
|
||||
warn!(
|
||||
log,
|
||||
"Invalid eth1 network id. Please switch to correct network id. Trying \
|
||||
fallback ...";
|
||||
"endpoint" => endpoint,
|
||||
"expected" => format!("{:?}",config_network_id),
|
||||
"received" => format!("{:?}",network_id),
|
||||
);
|
||||
return Err(EndpointError::WrongNetworkId);
|
||||
}
|
||||
let chain_id = get_chain_id(endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS))
|
||||
.await
|
||||
.map_err(error_connecting)?;
|
||||
// Eth1 nodes return chain_id = 0 if the node is not synced
|
||||
// Handle the special case
|
||||
if chain_id == Eth1Id::Custom(0) {
|
||||
warn!(
|
||||
log,
|
||||
"Remote eth1 node is not synced";
|
||||
"endpoint" => endpoint,
|
||||
);
|
||||
return Err(EndpointError::FarBehind);
|
||||
}
|
||||
if &chain_id != config_chain_id {
|
||||
warn!(
|
||||
log,
|
||||
"Invalid eth1 chain id. Please switch to correct chain id. Trying \
|
||||
fallback ...";
|
||||
"endpoint" => endpoint,
|
||||
"expected" => format!("{:?}",config_chain_id),
|
||||
"received" => format!("{:?}", chain_id),
|
||||
);
|
||||
Err(EndpointError::WrongChainId)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Enum for the two internal (maybe different) cached heads for cached deposits and for the block
|
||||
/// cache.
|
||||
pub enum HeadType {
|
||||
Deposit,
|
||||
BlockCache,
|
||||
}
|
||||
|
||||
/// Returns the head block and the new block ranges relevant for deposits and the block cache
|
||||
/// from the given endpoint.
|
||||
async fn get_remote_head_and_new_block_ranges(
|
||||
endpoint: &str,
|
||||
service: &Service,
|
||||
node_far_behind_seconds: u64,
|
||||
) -> Result<
|
||||
(
|
||||
Eth1Block,
|
||||
Option<RangeInclusive<u64>>,
|
||||
Option<RangeInclusive<u64>>,
|
||||
),
|
||||
SingleEndpointError,
|
||||
> {
|
||||
let remote_head_block = download_eth1_block(endpoint, service.inner.clone(), None).await?;
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(u64::MAX);
|
||||
if remote_head_block.timestamp + node_far_behind_seconds < now {
|
||||
warn!(
|
||||
service.log,
|
||||
"Eth1 endpoint is far behind. Trying fallback ...";
|
||||
"endpoint" => endpoint,
|
||||
"last_seen_block_unix_timestamp" => remote_head_block.timestamp
|
||||
);
|
||||
return Err(SingleEndpointError::EndpointError(EndpointError::FarBehind));
|
||||
}
|
||||
|
||||
let handle_remote_not_synced = |e| {
|
||||
if let SingleEndpointError::RemoteNotSynced { .. } = e {
|
||||
warn!(service.log, "Eth1 node not synced. Trying fallback..."; "endpoint" => endpoint);
|
||||
}
|
||||
e
|
||||
};
|
||||
let new_deposit_block_numbers = service
|
||||
.relevant_new_block_numbers(remote_head_block.number, HeadType::Deposit)
|
||||
.map_err(handle_remote_not_synced)?;
|
||||
let new_block_cache_numbers = service
|
||||
.relevant_new_block_numbers(remote_head_block.number, HeadType::BlockCache)
|
||||
.map_err(handle_remote_not_synced)?;
|
||||
Ok((
|
||||
remote_head_block,
|
||||
new_deposit_block_numbers,
|
||||
new_block_cache_numbers,
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the range of new block numbers to be considered for the given head type from the given
|
||||
/// endpoint.
|
||||
async fn relevant_new_block_numbers_from_endpoint(
|
||||
endpoint: &str,
|
||||
service: &Service,
|
||||
head_type: HeadType,
|
||||
) -> Result<Option<RangeInclusive<u64>>, SingleEndpointError> {
|
||||
let remote_highest_block =
|
||||
get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS))
|
||||
.map_err(SingleEndpointError::GetBlockNumberFailed)
|
||||
.await?;
|
||||
service.relevant_new_block_numbers(remote_highest_block, head_type)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
pub enum SingleEndpointError {
|
||||
/// Endpoint is currently not functional.
|
||||
EndpointError(EndpointError),
|
||||
/// The remote node is less synced that we expect, it is not useful until has done more
|
||||
/// syncing.
|
||||
RemoteNotSynced {
|
||||
@ -56,6 +276,10 @@ pub enum Error {
|
||||
GetDepositCountFailed(String),
|
||||
/// Failed to read the deposit contract root from the eth1 node.
|
||||
GetDepositLogsFailed(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
/// There was an inconsistency when adding a block to the cache.
|
||||
FailedToInsertEth1Block(BlockCacheError),
|
||||
/// There was an inconsistency when adding a deposit to the cache.
|
||||
@ -65,6 +289,8 @@ pub enum Error {
|
||||
block_range: Range<u64>,
|
||||
error: String,
|
||||
},
|
||||
/// All possible endpoints returned a `SingleEndpointError`.
|
||||
FallbackError(FallbackError<SingleEndpointError>),
|
||||
/// There was an unexpected internal error.
|
||||
Internal(String),
|
||||
}
|
||||
@ -85,7 +311,7 @@ pub struct DepositCacheUpdateOutcome {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
/// An Eth1 node (e.g., Geth) running a HTTP JSON-RPC endpoint.
|
||||
pub endpoint: String,
|
||||
pub endpoints: Vec<String>,
|
||||
/// The address the `BlockCache` and `DepositCache` should assume is the canonical deposit contract.
|
||||
pub deposit_contract_address: String,
|
||||
/// The eth1 network id where the deposit contract is deployed (Goerli/Mainnet).
|
||||
@ -103,6 +329,9 @@ pub struct Config {
|
||||
///
|
||||
/// Note: this should be less than or equal to the specification's `ETH1_FOLLOW_DISTANCE`.
|
||||
pub follow_distance: u64,
|
||||
/// Specifies the seconds when we consider the head of a node far behind.
|
||||
/// This should be less than `ETH1_FOLLOW_DISTANCE * SECONDS_PER_ETH1_BLOCK`.
|
||||
pub node_far_behind_seconds: u64,
|
||||
/// Defines the number of blocks that should be retained each time the `BlockCache` calls truncate on
|
||||
/// itself.
|
||||
pub block_cache_truncation: Option<usize>,
|
||||
@ -144,13 +373,14 @@ impl Config {
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
endpoint: "http://localhost:8545".into(),
|
||||
endpoints: vec!["http://localhost:8545".into()],
|
||||
deposit_contract_address: "0x0000000000000000000000000000000000000000".into(),
|
||||
network_id: DEFAULT_NETWORK_ID,
|
||||
chain_id: DEFAULT_CHAIN_ID,
|
||||
deposit_contract_deploy_block: 1,
|
||||
lowest_cached_block_number: 1,
|
||||
follow_distance: 128,
|
||||
node_far_behind_seconds: 128 * 14,
|
||||
block_cache_truncation: Some(4_096),
|
||||
auto_update_interval_millis: 7_000,
|
||||
blocks_per_log_query: 1_000,
|
||||
@ -351,6 +581,23 @@ impl Service {
|
||||
self.inner.config.write().lowest_cached_block_number = block_number;
|
||||
}
|
||||
|
||||
pub fn init_endpoints(&self) -> EndpointsCache {
|
||||
let endpoints = self.config().endpoints.clone();
|
||||
let config_network_id = self.config().network_id.clone();
|
||||
let config_chain_id = self.config().chain_id.clone();
|
||||
EndpointsCache {
|
||||
fallback: Fallback::new(
|
||||
endpoints
|
||||
.into_iter()
|
||||
.map(|s| (s, TRwLock::new(None)))
|
||||
.collect(),
|
||||
),
|
||||
config_network_id,
|
||||
config_chain_id,
|
||||
log: self.log.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the deposit and block cache, returning an error if either fail.
|
||||
///
|
||||
/// ## Returns
|
||||
@ -362,18 +609,57 @@ impl Service {
|
||||
pub async fn update(
|
||||
&self,
|
||||
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
|
||||
let remote_head_block = download_eth1_block(self.inner.clone(), None)
|
||||
.map_err(|e| format!("Failed to update Eth1 service: {:?}", e))
|
||||
.await?;
|
||||
let remote_head_block_number = Some(remote_head_block.number);
|
||||
let endpoints = self.init_endpoints();
|
||||
let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;
|
||||
|
||||
let process_single_err = |e: &FallbackError<SingleEndpointError>| {
|
||||
match e {
|
||||
FallbackError::AllErrored(errors) => {
|
||||
if errors
|
||||
.iter()
|
||||
.all(|error| matches!(error, SingleEndpointError::EndpointError(_)))
|
||||
{
|
||||
crit!(
|
||||
self.log,
|
||||
"Couldn't connect to any eth1 node. Please ensure that you have an \
|
||||
eth1 http server running locally on http://localhost:8545 or specify \
|
||||
one or more (remote) endpoints using \
|
||||
`--eth1-endpoints <COMMA-SEPARATED-SERVER-ADDRESSES>`. \
|
||||
Also ensure that `eth` and `net` apis are enabled on the eth1 http \
|
||||
server";
|
||||
"warning" => WARNING_MSG
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
endpoints.fallback.map_format_error(|s| &s.0, &e)
|
||||
};
|
||||
|
||||
let process_err = |e: Error| match &e {
|
||||
Error::FallbackError(f) => process_single_err(f),
|
||||
e => format!("{:?}", e),
|
||||
};
|
||||
|
||||
let (remote_head_block, new_block_numbers_deposit, new_block_numbers_block_cache) =
|
||||
endpoints
|
||||
.first_success(|e| async move {
|
||||
get_remote_head_and_new_block_ranges(e, &self, node_far_behind_seconds).await
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"Failed to update Eth1 service: {:?}",
|
||||
process_single_err(&e)
|
||||
)
|
||||
})?;
|
||||
|
||||
*self.inner.remote_head_block.write() = Some(remote_head_block);
|
||||
|
||||
let update_deposit_cache = async {
|
||||
let outcome = self
|
||||
.update_deposit_cache(remote_head_block_number)
|
||||
.update_deposit_cache(Some(new_block_numbers_deposit), &endpoints)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
|
||||
.map_err(|e| format!("Failed to update eth1 cache: {:?}", process_err(e)))?;
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
@ -387,9 +673,9 @@ impl Service {
|
||||
|
||||
let update_block_cache = async {
|
||||
let outcome = self
|
||||
.update_block_cache(remote_head_block_number)
|
||||
.update_block_cache(Some(new_block_numbers_block_cache), &endpoints)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
|
||||
.map_err(|e| format!("Failed to update eth1 cache: {:?}", process_err(e)))?;
|
||||
|
||||
trace!(
|
||||
self.log,
|
||||
@ -431,57 +717,6 @@ impl Service {
|
||||
}
|
||||
|
||||
async fn do_update(&self, update_interval: Duration) -> Result<(), ()> {
|
||||
let endpoint = self.config().endpoint.clone();
|
||||
let config_network_id = self.config().network_id.clone();
|
||||
let config_chain_id = self.config().chain_id.clone();
|
||||
let network_id_result =
|
||||
get_network_id(&endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS)).await;
|
||||
let chain_id_result =
|
||||
get_chain_id(&endpoint, Duration::from_millis(STANDARD_TIMEOUT_MILLIS)).await;
|
||||
match (network_id_result, chain_id_result) {
|
||||
(Ok(network_id), Ok(chain_id)) => {
|
||||
if network_id != config_network_id {
|
||||
crit!(
|
||||
self.log,
|
||||
"Invalid eth1 network id. Please switch to correct network id";
|
||||
"expected" => format!("{:?}",config_network_id),
|
||||
"received" => format!("{:?}",network_id),
|
||||
"warning" => WARNING_MSG,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
// Eth1 nodes return chain_id = 0 if the node is not synced
|
||||
// Handle the special case
|
||||
if chain_id == Eth1Id::Custom(0) {
|
||||
crit!(
|
||||
self.log,
|
||||
"Remote eth1 node is not synced";
|
||||
"warning" => WARNING_MSG,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
if chain_id != config_chain_id {
|
||||
crit!(
|
||||
self.log,
|
||||
"Invalid eth1 chain id. Please switch to correct chain id";
|
||||
"expected" => format!("{:?}",config_chain_id),
|
||||
"received" => format!("{:?}", chain_id),
|
||||
"warning" => WARNING_MSG,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
crit!(
|
||||
self.log,
|
||||
"Error connecting to eth1 node. Please ensure that you have an eth1 http server running locally on http://localhost:8545 or \
|
||||
pass an external endpoint using `--eth1-endpoint <SERVER-ADDRESS>`. Also ensure that `eth` and `net` apis are enabled on the eth1 http server";
|
||||
"warning" => WARNING_MSG,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let update_result = self.update().await;
|
||||
match update_result {
|
||||
Err(e) => error!(
|
||||
@ -501,6 +736,32 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the range of new block numbers to be considered for the given head type.
|
||||
fn relevant_new_block_numbers(
|
||||
&self,
|
||||
remote_highest_block: u64,
|
||||
head_type: HeadType,
|
||||
) -> Result<Option<RangeInclusive<u64>>, SingleEndpointError> {
|
||||
let follow_distance = self.reduced_follow_distance();
|
||||
let next_required_block = match head_type {
|
||||
HeadType::Deposit => self
|
||||
.deposits()
|
||||
.read()
|
||||
.last_processed_block
|
||||
.map(|n| n + 1)
|
||||
.unwrap_or_else(|| self.config().deposit_contract_deploy_block),
|
||||
HeadType::BlockCache => self
|
||||
.inner
|
||||
.block_cache
|
||||
.read()
|
||||
.highest_block_number()
|
||||
.map(|n| n + 1)
|
||||
.unwrap_or_else(|| self.config().lowest_cached_block_number),
|
||||
};
|
||||
|
||||
relevant_block_range(remote_highest_block, next_required_block, follow_distance)
|
||||
}
|
||||
|
||||
/// Contacts the remote eth1 node and attempts to import deposit logs up to the configured
|
||||
/// follow-distance block.
|
||||
///
|
||||
@ -518,10 +779,9 @@ impl Service {
|
||||
/// Emits logs for debugging and errors.
|
||||
pub async fn update_deposit_cache(
|
||||
&self,
|
||||
remote_highest_block_opt: Option<u64>,
|
||||
new_block_numbers: Option<Option<RangeInclusive<u64>>>,
|
||||
endpoints: &EndpointsCache,
|
||||
) -> Result<DepositCacheUpdateOutcome, Error> {
|
||||
let endpoint = self.config().endpoint.clone();
|
||||
let reduced_follow_distance = self.reduced_follow_distance();
|
||||
let deposit_contract_address = self.config().deposit_contract_address.clone();
|
||||
|
||||
let blocks_per_log_query = self.config().blocks_per_log_query;
|
||||
@ -530,20 +790,17 @@ impl Service {
|
||||
.max_log_requests_per_update
|
||||
.unwrap_or_else(usize::max_value);
|
||||
|
||||
let next_required_block = self
|
||||
.deposits()
|
||||
.read()
|
||||
.last_processed_block
|
||||
.map(|n| n + 1)
|
||||
.unwrap_or_else(|| self.config().deposit_contract_deploy_block);
|
||||
|
||||
let range = get_new_block_numbers(
|
||||
&endpoint,
|
||||
remote_highest_block_opt,
|
||||
next_required_block,
|
||||
reduced_follow_distance,
|
||||
)
|
||||
.await?;
|
||||
let range = {
|
||||
match new_block_numbers {
|
||||
Some(range) => range,
|
||||
None => endpoints
|
||||
.first_success(|e| async move {
|
||||
relevant_new_block_numbers_from_endpoint(e, &self, HeadType::Deposit).await
|
||||
})
|
||||
.await
|
||||
.map_err(Error::FallbackError)?,
|
||||
}
|
||||
};
|
||||
|
||||
let block_number_chunks = if let Some(range) = range {
|
||||
range
|
||||
@ -560,28 +817,32 @@ impl Service {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let deposit_contract_address_ref: &str = &deposit_contract_address;
|
||||
let logs: Vec<(Range<u64>, Vec<Log>)> =
|
||||
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async {
|
||||
match chunks.next() {
|
||||
Some(chunk) => {
|
||||
let chunk_1 = chunk.clone();
|
||||
match get_deposit_logs_in_range(
|
||||
&endpoint,
|
||||
&deposit_contract_address,
|
||||
chunk,
|
||||
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(logs) => Ok(Some(((chunk_1, logs), chunks))),
|
||||
Err(e) => Err(Error::GetDepositLogsFailed(e)),
|
||||
}
|
||||
let chunk_ref = &chunk;
|
||||
endpoints
|
||||
.first_success(|e| async move {
|
||||
get_deposit_logs_in_range(
|
||||
e,
|
||||
deposit_contract_address_ref,
|
||||
chunk_ref.clone(),
|
||||
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
|
||||
)
|
||||
.await
|
||||
.map_err(SingleEndpointError::GetDepositLogsFailed)
|
||||
})
|
||||
.await
|
||||
.map(|logs| Some(((chunk, logs), chunks)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
})
|
||||
.try_collect()
|
||||
.await?;
|
||||
.await
|
||||
.map_err(Error::FallbackError)?;
|
||||
|
||||
let mut logs_imported = 0;
|
||||
for (block_range, log_chunk) in logs.iter() {
|
||||
@ -665,7 +926,8 @@ impl Service {
|
||||
/// Emits logs for debugging and errors.
|
||||
pub async fn update_block_cache(
|
||||
&self,
|
||||
remote_highest_block_opt: Option<u64>,
|
||||
new_block_numbers: Option<Option<RangeInclusive<u64>>>,
|
||||
endpoints: &EndpointsCache,
|
||||
) -> Result<BlockCacheUpdateOutcome, Error> {
|
||||
let block_cache_truncation = self.config().block_cache_truncation;
|
||||
let max_blocks_per_update = self
|
||||
@ -673,24 +935,19 @@ impl Service {
|
||||
.max_blocks_per_update
|
||||
.unwrap_or_else(usize::max_value);
|
||||
|
||||
let next_required_block = self
|
||||
.inner
|
||||
.block_cache
|
||||
.read()
|
||||
.highest_block_number()
|
||||
.map(|n| n + 1)
|
||||
.unwrap_or_else(|| self.config().lowest_cached_block_number);
|
||||
let range = {
|
||||
match new_block_numbers {
|
||||
Some(range) => range,
|
||||
None => endpoints
|
||||
.first_success(|e| async move {
|
||||
relevant_new_block_numbers_from_endpoint(e, &self, HeadType::BlockCache)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(Error::FallbackError)?,
|
||||
}
|
||||
};
|
||||
|
||||
let endpoint = self.config().endpoint.clone();
|
||||
let reduced_follow_distance = self.reduced_follow_distance();
|
||||
|
||||
let range = get_new_block_numbers(
|
||||
&endpoint,
|
||||
remote_highest_block_opt,
|
||||
next_required_block,
|
||||
reduced_follow_distance,
|
||||
)
|
||||
.await?;
|
||||
// Map the range of required blocks into a Vec.
|
||||
//
|
||||
// If the required range is larger than the size of the cache, drop the exiting cache
|
||||
@ -741,7 +998,12 @@ impl Service {
|
||||
|mut block_numbers| async {
|
||||
match block_numbers.next() {
|
||||
Some(block_number) => {
|
||||
match download_eth1_block(self.inner.clone(), Some(block_number)).await {
|
||||
match endpoints
|
||||
.first_success(|e| async move {
|
||||
download_eth1_block(e, self.inner.clone(), Some(block_number)).await
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
@ -751,7 +1013,8 @@ impl Service {
|
||||
},
|
||||
)
|
||||
.try_collect()
|
||||
.await?;
|
||||
.await
|
||||
.map_err(Error::FallbackError)?;
|
||||
|
||||
let mut blocks_imported = 0;
|
||||
for eth1_block in eth1_blocks {
|
||||
@ -822,21 +1085,15 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine the range of blocks that need to be downloaded, given the remotes best block and
|
||||
/// the locally stored best block.
|
||||
async fn get_new_block_numbers<'a>(
|
||||
endpoint: &str,
|
||||
remote_highest_block_opt: Option<u64>,
|
||||
/// Returns the range of blocks starting from `next_required_block` that are at least
|
||||
/// `follow_distance` many blocks before `remote_highest_block`.
|
||||
/// Returns an error if `next_required_block > remote_highest_block + 1` which means the remote went
|
||||
/// backwards.
|
||||
fn relevant_block_range(
|
||||
remote_highest_block: u64,
|
||||
next_required_block: u64,
|
||||
reduced_follow_distance: u64,
|
||||
) -> Result<Option<RangeInclusive<u64>>, Error> {
|
||||
let remote_highest_block = if let Some(block_number) = remote_highest_block_opt {
|
||||
block_number
|
||||
} else {
|
||||
get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS))
|
||||
.map_err(Error::GetBlockNumberFailed)
|
||||
.await?
|
||||
};
|
||||
) -> Result<Option<RangeInclusive<u64>>, SingleEndpointError> {
|
||||
let remote_follow_block = remote_highest_block.saturating_sub(reduced_follow_distance);
|
||||
|
||||
if next_required_block <= remote_follow_block {
|
||||
@ -847,7 +1104,7 @@ async fn get_new_block_numbers<'a>(
|
||||
//
|
||||
// We assume that the `reduced_follow_distance` should be sufficient to ensure this never
|
||||
// happens, otherwise it is an error.
|
||||
Err(Error::RemoteNotSynced {
|
||||
Err(SingleEndpointError::RemoteNotSynced {
|
||||
next_required_block,
|
||||
remote_highest_block,
|
||||
reduced_follow_distance,
|
||||
@ -865,11 +1122,10 @@ async fn get_new_block_numbers<'a>(
|
||||
///
|
||||
/// Performs three async calls to an Eth1 HTTP JSON RPC endpoint.
|
||||
async fn download_eth1_block(
|
||||
endpoint: &str,
|
||||
cache: Arc<Inner>,
|
||||
block_number_opt: Option<u64>,
|
||||
) -> Result<Eth1Block, Error> {
|
||||
let endpoint = cache.config.read().endpoint.clone();
|
||||
|
||||
) -> Result<Eth1Block, SingleEndpointError> {
|
||||
let deposit_root = block_number_opt.and_then(|block_number| {
|
||||
cache
|
||||
.deposit_cache
|
||||
@ -888,13 +1144,13 @@ async fn download_eth1_block(
|
||||
|
||||
// Performs a `get_blockByNumber` call to an eth1 node.
|
||||
let http_block = get_block(
|
||||
&endpoint,
|
||||
endpoint,
|
||||
block_number_opt
|
||||
.map(BlockQuery::Number)
|
||||
.unwrap_or_else(|| BlockQuery::Latest),
|
||||
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
|
||||
)
|
||||
.map_err(Error::BlockDownloadFailed)
|
||||
.map_err(SingleEndpointError::BlockDownloadFailed)
|
||||
.await?;
|
||||
|
||||
Ok(Eth1Block {
|
||||
|
@ -1,8 +1,8 @@
|
||||
#![cfg(test)]
|
||||
use environment::{Environment, EnvironmentBuilder};
|
||||
use eth1::http::{get_deposit_count, get_deposit_logs_in_range, get_deposit_root, Block, Log};
|
||||
use eth1::DepositCache;
|
||||
use eth1::{Config, Service};
|
||||
use eth1::{DepositCache, DEFAULT_CHAIN_ID, DEFAULT_NETWORK_ID};
|
||||
use eth1_test_rig::GanacheEth1Instance;
|
||||
use futures::compat::Future01CompatExt;
|
||||
use merkle_proof::verify_merkle_proof;
|
||||
@ -97,6 +97,10 @@ async fn get_block_number(web3: &Web3<Http>) -> u64 {
|
||||
.expect("should get block number")
|
||||
}
|
||||
|
||||
async fn new_ganache_instance() -> Result<GanacheEth1Instance, String> {
|
||||
GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), DEFAULT_CHAIN_ID.into()).await
|
||||
}
|
||||
|
||||
mod eth1_cache {
|
||||
use super::*;
|
||||
use types::{EthSpec, MainnetEthSpec};
|
||||
@ -106,7 +110,7 @@ mod eth1_cache {
|
||||
let log = null_logger();
|
||||
|
||||
for follow_distance in 0..2 {
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -116,7 +120,7 @@ mod eth1_cache {
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: initial_block_number,
|
||||
follow_distance,
|
||||
@ -145,17 +149,19 @@ mod eth1_cache {
|
||||
eth1.ganache.evm_mine().await.expect("should mine block");
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update deposit cache");
|
||||
service
|
||||
.update_block_cache(None)
|
||||
.update_block_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update block cache");
|
||||
|
||||
service
|
||||
.update_block_cache(None)
|
||||
.update_block_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update cache when nothing has changed");
|
||||
|
||||
@ -181,7 +187,7 @@ mod eth1_cache {
|
||||
async fn big_skip() {
|
||||
let log = null_logger();
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -191,7 +197,7 @@ mod eth1_cache {
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: get_block_number(&web3).await,
|
||||
follow_distance: 0,
|
||||
@ -208,12 +214,14 @@ mod eth1_cache {
|
||||
eth1.ganache.evm_mine().await.expect("should mine block")
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update deposit cache");
|
||||
service
|
||||
.update_block_cache(None)
|
||||
.update_block_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update block cache");
|
||||
|
||||
@ -230,7 +238,7 @@ mod eth1_cache {
|
||||
async fn pruning() {
|
||||
let log = null_logger();
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -240,7 +248,7 @@ mod eth1_cache {
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: get_block_number(&web3).await,
|
||||
follow_distance: 0,
|
||||
@ -255,12 +263,13 @@ mod eth1_cache {
|
||||
for _ in 0..cache_len / 2 {
|
||||
eth1.ganache.evm_mine().await.expect("should mine block")
|
||||
}
|
||||
let endpoints = service.init_endpoints();
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update deposit cache");
|
||||
service
|
||||
.update_block_cache(None)
|
||||
.update_block_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should update block cache");
|
||||
}
|
||||
@ -278,7 +287,7 @@ mod eth1_cache {
|
||||
|
||||
let n = 16;
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -286,7 +295,7 @@ mod eth1_cache {
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: get_block_number(&web3).await,
|
||||
follow_distance: 0,
|
||||
@ -299,14 +308,16 @@ mod eth1_cache {
|
||||
for _ in 0..n {
|
||||
eth1.ganache.evm_mine().await.expect("should mine block")
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
futures::try_join!(
|
||||
service.update_deposit_cache(None),
|
||||
service.update_deposit_cache(None)
|
||||
service.update_deposit_cache(None, &endpoints),
|
||||
service.update_deposit_cache(None, &endpoints)
|
||||
)
|
||||
.expect("should perform two simultaneous updates of deposit cache");
|
||||
futures::try_join!(
|
||||
service.update_block_cache(None),
|
||||
service.update_block_cache(None)
|
||||
service.update_block_cache(None, &endpoints),
|
||||
service.update_block_cache(None, &endpoints)
|
||||
)
|
||||
.expect("should perform two simultaneous updates of block cache");
|
||||
|
||||
@ -323,7 +334,7 @@ mod deposit_tree {
|
||||
|
||||
let n = 4;
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -333,7 +344,7 @@ mod deposit_tree {
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
deposit_contract_deploy_block: start_block,
|
||||
follow_distance: 0,
|
||||
@ -353,13 +364,15 @@ mod deposit_tree {
|
||||
.expect("should perform a deposit");
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should perform update");
|
||||
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should perform update when nothing has changed");
|
||||
|
||||
@ -398,7 +411,7 @@ mod deposit_tree {
|
||||
|
||||
let n = 8;
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -408,7 +421,7 @@ mod deposit_tree {
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
deposit_contract_deploy_block: start_block,
|
||||
lowest_cached_block_number: start_block,
|
||||
@ -428,9 +441,10 @@ mod deposit_tree {
|
||||
.expect("should perform a deposit");
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
futures::try_join!(
|
||||
service.update_deposit_cache(None),
|
||||
service.update_deposit_cache(None)
|
||||
service.update_deposit_cache(None, &endpoints),
|
||||
service.update_deposit_cache(None, &endpoints)
|
||||
)
|
||||
.expect("should perform two updates concurrently");
|
||||
|
||||
@ -445,7 +459,7 @@ mod deposit_tree {
|
||||
|
||||
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -552,7 +566,7 @@ mod http {
|
||||
|
||||
#[tokio::test]
|
||||
async fn incrementing_deposits() {
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -644,7 +658,7 @@ mod fast {
|
||||
async fn deposit_cache_query() {
|
||||
let log = null_logger();
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -653,7 +667,7 @@ mod fast {
|
||||
let now = get_block_number(&web3).await;
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
deposit_contract_deploy_block: now,
|
||||
lowest_cached_block_number: now,
|
||||
@ -675,8 +689,9 @@ mod fast {
|
||||
eth1.ganache.evm_mine().await.expect("should mine block");
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should perform update");
|
||||
|
||||
@ -717,7 +732,7 @@ mod persist {
|
||||
async fn test_persist_caches() {
|
||||
let log = null_logger();
|
||||
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -725,7 +740,7 @@ mod persist {
|
||||
|
||||
let now = get_block_number(&web3).await;
|
||||
let config = Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
deposit_contract_deploy_block: now,
|
||||
lowest_cached_block_number: now,
|
||||
@ -743,8 +758,9 @@ mod persist {
|
||||
.expect("should perform a deposit");
|
||||
}
|
||||
|
||||
let endpoints = service.init_endpoints();
|
||||
service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should perform update");
|
||||
|
||||
@ -756,7 +772,7 @@ mod persist {
|
||||
let deposit_count = service.deposit_cache_len();
|
||||
|
||||
service
|
||||
.update_block_cache(None)
|
||||
.update_block_cache(None, &endpoints)
|
||||
.await
|
||||
.expect("should perform update");
|
||||
|
||||
@ -786,3 +802,273 @@ mod persist {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests for eth1 fallback
|
||||
mod fallbacks {
|
||||
use super::*;
|
||||
use tokio::time::delay_for;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fallback_when_offline() {
|
||||
let log = null_logger();
|
||||
let endpoint2 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = &endpoint2.deposit_contract;
|
||||
|
||||
let initial_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
|
||||
// Create some blocks and then consume them, performing the test `rounds` times.
|
||||
let new_blocks = 4;
|
||||
|
||||
for _ in 0..new_blocks {
|
||||
endpoint2
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
let endpoint1 = endpoint2
|
||||
.ganache
|
||||
.fork()
|
||||
.expect("should start eth1 environment");
|
||||
|
||||
//mine additional blocks on top of the original endpoint
|
||||
for _ in 0..new_blocks {
|
||||
endpoint2
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoints: vec![endpoint1.endpoint(), endpoint2.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: initial_block_number,
|
||||
follow_distance: 0,
|
||||
..Config::default()
|
||||
},
|
||||
log.clone(),
|
||||
MainnetEthSpec::default_spec(),
|
||||
);
|
||||
|
||||
let endpoint1_block_number = get_block_number(&endpoint1.web3).await;
|
||||
//the first call will only query endpoint1
|
||||
service.update().await.expect("should update deposit cache");
|
||||
assert_eq!(
|
||||
service.deposits().read().last_processed_block.unwrap(),
|
||||
endpoint1_block_number
|
||||
);
|
||||
|
||||
drop(endpoint1);
|
||||
|
||||
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
assert!(endpoint1_block_number < endpoint2_block_number);
|
||||
//endpoint1 is offline => query will import blocks from endpoint2
|
||||
service.update().await.expect("should update deposit cache");
|
||||
assert_eq!(
|
||||
service.deposits().read().last_processed_block.unwrap(),
|
||||
endpoint2_block_number
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fallback_when_wrong_network_id() {
|
||||
let log = null_logger();
|
||||
let correct_network_id: u64 = DEFAULT_NETWORK_ID.into();
|
||||
let wrong_network_id = correct_network_id + 1;
|
||||
let endpoint1 = GanacheEth1Instance::new(wrong_network_id, DEFAULT_CHAIN_ID.into())
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let endpoint2 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = &endpoint2.deposit_contract;
|
||||
|
||||
let initial_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
|
||||
// Create some blocks and then consume them, performing the test `rounds` times.
|
||||
let new_blocks = 4;
|
||||
|
||||
for _ in 0..new_blocks {
|
||||
endpoint1
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
endpoint2
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
//additional blocks for endpoint1 to be able to distinguish
|
||||
for _ in 0..new_blocks {
|
||||
endpoint1
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoints: vec![endpoint2.endpoint(), endpoint1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: initial_block_number,
|
||||
follow_distance: 0,
|
||||
..Config::default()
|
||||
},
|
||||
log.clone(),
|
||||
MainnetEthSpec::default_spec(),
|
||||
);
|
||||
|
||||
let endpoint1_block_number = get_block_number(&endpoint1.web3()).await;
|
||||
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
assert!(endpoint2_block_number < endpoint1_block_number);
|
||||
//the call will fallback to endpoint2
|
||||
service.update().await.expect("should update deposit cache");
|
||||
assert_eq!(
|
||||
service.deposits().read().last_processed_block.unwrap(),
|
||||
endpoint2_block_number
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fallback_when_wrong_chain_id() {
|
||||
let log = null_logger();
|
||||
let correct_chain_id: u64 = DEFAULT_CHAIN_ID.into();
|
||||
let wrong_chain_id = correct_chain_id + 1;
|
||||
let endpoint1 = GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), wrong_chain_id)
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let endpoint2 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = &endpoint2.deposit_contract;
|
||||
|
||||
let initial_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
|
||||
// Create some blocks and then consume them, performing the test `rounds` times.
|
||||
let new_blocks = 4;
|
||||
|
||||
for _ in 0..new_blocks {
|
||||
endpoint1
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
endpoint2
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
//additional blocks for endpoint1 to be able to distinguish
|
||||
for _ in 0..new_blocks {
|
||||
endpoint1
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoints: vec![endpoint2.endpoint(), endpoint1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: initial_block_number,
|
||||
follow_distance: 0,
|
||||
..Config::default()
|
||||
},
|
||||
log.clone(),
|
||||
MainnetEthSpec::default_spec(),
|
||||
);
|
||||
|
||||
let endpoint1_block_number = get_block_number(&endpoint1.web3()).await;
|
||||
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
assert!(endpoint2_block_number < endpoint1_block_number);
|
||||
//the call will fallback to endpoint2
|
||||
service.update().await.expect("should update deposit cache");
|
||||
assert_eq!(
|
||||
service.deposits().read().last_processed_block.unwrap(),
|
||||
endpoint2_block_number
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fallback_when_node_far_behind() {
|
||||
let log = null_logger();
|
||||
let endpoint2 = new_ganache_instance()
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = &endpoint2.deposit_contract;
|
||||
|
||||
let initial_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
|
||||
// Create some blocks and then consume them, performing the test `rounds` times.
|
||||
let new_blocks = 4;
|
||||
|
||||
for _ in 0..new_blocks {
|
||||
endpoint2
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
let endpoint1 = endpoint2
|
||||
.ganache
|
||||
.fork()
|
||||
.expect("should start eth1 environment");
|
||||
|
||||
let service = Service::new(
|
||||
Config {
|
||||
endpoints: vec![endpoint1.endpoint(), endpoint2.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
lowest_cached_block_number: initial_block_number,
|
||||
follow_distance: 0,
|
||||
node_far_behind_seconds: 5,
|
||||
..Config::default()
|
||||
},
|
||||
log.clone(),
|
||||
MainnetEthSpec::default_spec(),
|
||||
);
|
||||
|
||||
let endpoint1_block_number = get_block_number(&endpoint1.web3).await;
|
||||
//the first call will only query endpoint1
|
||||
service.update().await.expect("should update deposit cache");
|
||||
assert_eq!(
|
||||
service.deposits().read().last_processed_block.unwrap(),
|
||||
endpoint1_block_number
|
||||
);
|
||||
|
||||
delay_for(Duration::from_secs(7)).await;
|
||||
|
||||
//both endpoints don't have recent blocks => should return error
|
||||
assert!(service.update().await.is_err());
|
||||
|
||||
//produce some new blocks on endpoint2
|
||||
for _ in 0..new_blocks {
|
||||
endpoint2
|
||||
.ganache
|
||||
.evm_mine()
|
||||
.await
|
||||
.expect("should mine block");
|
||||
}
|
||||
|
||||
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
|
||||
|
||||
//endpoint1 is far behind + endpoint2 not => update will import blocks from endpoint2
|
||||
service.update().await.expect("should update deposit cache");
|
||||
assert_eq!(
|
||||
service.deposits().read().last_processed_block.unwrap(),
|
||||
endpoint2_block_number
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -112,9 +112,11 @@ impl Eth1GenesisService {
|
||||
"Importing eth1 deposit logs";
|
||||
);
|
||||
|
||||
let endpoints = eth1_service.init_endpoints();
|
||||
|
||||
loop {
|
||||
let update_result = eth1_service
|
||||
.update_deposit_cache(None)
|
||||
.update_deposit_cache(None, &endpoints)
|
||||
.await
|
||||
.map_err(|e| format!("{:?}", e));
|
||||
|
||||
@ -156,7 +158,7 @@ impl Eth1GenesisService {
|
||||
}
|
||||
|
||||
// Download new eth1 blocks into the cache.
|
||||
let blocks_imported = match eth1_service.update_block_cache(None).await {
|
||||
let blocks_imported = match eth1_service.update_block_cache(None, &endpoints).await {
|
||||
Ok(outcome) => {
|
||||
debug!(
|
||||
log,
|
||||
|
@ -4,6 +4,7 @@
|
||||
//! dir in the root of the `lighthouse` repo.
|
||||
#![cfg(test)]
|
||||
use environment::{Environment, EnvironmentBuilder};
|
||||
use eth1::{DEFAULT_CHAIN_ID, DEFAULT_NETWORK_ID};
|
||||
use eth1_test_rig::{DelayThenDeposit, GanacheEth1Instance};
|
||||
use futures::compat::Future01CompatExt;
|
||||
use genesis::{Eth1Config, Eth1GenesisService};
|
||||
@ -28,7 +29,7 @@ fn basic() {
|
||||
let mut spec = env.eth2_config().spec.clone();
|
||||
|
||||
env.runtime().block_on(async {
|
||||
let eth1 = GanacheEth1Instance::new()
|
||||
let eth1 = GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), DEFAULT_CHAIN_ID.into())
|
||||
.await
|
||||
.expect("should start eth1 environment");
|
||||
let deposit_contract = ð1.deposit_contract;
|
||||
@ -44,7 +45,7 @@ fn basic() {
|
||||
|
||||
let service = Eth1GenesisService::new(
|
||||
Eth1Config {
|
||||
endpoint: eth1.endpoint(),
|
||||
endpoints: vec![eth1.endpoint()],
|
||||
deposit_contract_address: deposit_contract.address(),
|
||||
deposit_contract_deploy_block: now,
|
||||
lowest_cached_block_number: now,
|
||||
|
@ -284,7 +284,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
Arg::with_name("eth1-endpoint")
|
||||
.long("eth1-endpoint")
|
||||
.value_name("HTTP-ENDPOINT")
|
||||
.help("Specifies the server for a web3 connection to the Eth1 chain. Also enables the --eth1 flag. Defaults to http://127.0.0.1:8545.")
|
||||
.help("Deprecated. Use --eth1-endpoints.")
|
||||
.takes_value(true)
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("eth1-endpoints")
|
||||
.long("eth1-endpoints")
|
||||
.value_name("HTTP-ENDPOINTS")
|
||||
.conflicts_with("eth1-endpoint")
|
||||
.help("One or more comma-delimited server endpoints for web3 connection. \
|
||||
If multiple endpoints are given the endpoints are used as fallback in the \
|
||||
given order. Also enables the --eth1 flag. \
|
||||
Defaults to http://127.0.0.1:8545.")
|
||||
.takes_value(true)
|
||||
)
|
||||
.arg(
|
||||
|
@ -7,6 +7,7 @@ use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr, NetworkConfig, PeerIdSeri
|
||||
use eth2_testnet_config::Eth2TestnetConfig;
|
||||
use slog::{info, warn, Logger};
|
||||
use std::cmp;
|
||||
use std::cmp::max;
|
||||
use std::fs;
|
||||
use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
|
||||
use std::net::{TcpListener, UdpSocket};
|
||||
@ -194,7 +195,10 @@ pub fn get_config<E: EthSpec>(
|
||||
// Defines the URL to reach the eth1 node.
|
||||
if let Some(val) = cli_args.value_of("eth1-endpoint") {
|
||||
client_config.sync_eth1_chain = true;
|
||||
client_config.eth1.endpoint = val.to_string();
|
||||
client_config.eth1.endpoints = vec![val.to_string()];
|
||||
} else if let Some(val) = cli_args.value_of("eth1-endpoints") {
|
||||
client_config.sync_eth1_chain = true;
|
||||
client_config.eth1.endpoints = val.split(',').map(String::from).collect();
|
||||
}
|
||||
|
||||
if let Some(val) = cli_args.value_of("eth1-blocks-per-log-query") {
|
||||
@ -264,6 +268,8 @@ pub fn get_config<E: EthSpec>(
|
||||
client_config.eth1.lowest_cached_block_number =
|
||||
client_config.eth1.deposit_contract_deploy_block;
|
||||
client_config.eth1.follow_distance = spec.eth1_follow_distance;
|
||||
client_config.eth1.node_far_behind_seconds =
|
||||
max(5, spec.eth1_follow_distance / 2) * spec.seconds_per_eth1_block;
|
||||
client_config.eth1.network_id = spec.deposit_network_id.into();
|
||||
client_config.eth1.chain_id = spec.deposit_chain_id.into();
|
||||
client_config.eth1.set_block_cache_truncation::<E>(spec);
|
||||
|
@ -101,7 +101,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
|
||||
info!(
|
||||
log,
|
||||
"Block production enabled";
|
||||
"endpoint" => &client_config.eth1.endpoint,
|
||||
"endpoints" => format!("{:?}", &client_config.eth1.endpoints),
|
||||
"method" => "json rpc via http"
|
||||
);
|
||||
builder
|
||||
|
10
common/fallback/Cargo.toml
Normal file
10
common/fallback/Cargo.toml
Normal file
@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "fallback"
|
||||
version = "0.1.0"
|
||||
authors = ["blacktemplar <blacktemplar@a1.net>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.9.0"
|
58
common/fallback/src/lib.rs
Normal file
58
common/fallback/src/lib.rs
Normal file
@ -0,0 +1,58 @@
|
||||
use itertools::{join, zip};
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::future::Future;
|
||||
|
||||
pub struct Fallback<T> {
|
||||
servers: Vec<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum FallbackError<E> {
|
||||
AllErrored(Vec<E>),
|
||||
}
|
||||
|
||||
impl<T> Fallback<T> {
|
||||
pub fn new(servers: Vec<T>) -> Self {
|
||||
Self { servers }
|
||||
}
|
||||
|
||||
/// Return the first successful result or all errors encountered.
|
||||
pub async fn first_success<'a, F, O, E, R>(&'a self, func: F) -> Result<O, FallbackError<E>>
|
||||
where
|
||||
F: Fn(&'a T) -> R,
|
||||
R: Future<Output = Result<O, E>>,
|
||||
{
|
||||
let mut errors = vec![];
|
||||
for server in &self.servers {
|
||||
match func(server).await {
|
||||
Ok(val) => return Ok(val),
|
||||
Err(e) => errors.push(e),
|
||||
}
|
||||
}
|
||||
Err(FallbackError::AllErrored(errors))
|
||||
}
|
||||
|
||||
pub fn map_format_error<'a, E, F, S>(&'a self, f: F, error: &FallbackError<E>) -> String
|
||||
where
|
||||
F: FnMut(&'a T) -> &'a S,
|
||||
S: Display + 'a,
|
||||
E: Debug,
|
||||
{
|
||||
match error {
|
||||
FallbackError::AllErrored(v) => format!(
|
||||
"All fallback errored: {}",
|
||||
join(
|
||||
zip(self.servers.iter().map(f), v.iter())
|
||||
.map(|(server, error)| format!("{} => {:?}", server, error)),
|
||||
", "
|
||||
)
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Display> Fallback<T> {
|
||||
pub fn format_error<E: Debug>(&self, error: &FallbackError<E>) -> String {
|
||||
self.map_format_error(|s| s, error)
|
||||
}
|
||||
}
|
@ -3,6 +3,7 @@ use environment::Environment;
|
||||
use eth2_testnet_config::Eth2TestnetConfig;
|
||||
use genesis::{Eth1Config, Eth1GenesisService};
|
||||
use ssz::Encode;
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use types::EthSpec;
|
||||
@ -11,9 +12,14 @@ use types::EthSpec;
|
||||
pub const ETH1_GENESIS_UPDATE_INTERVAL: Duration = Duration::from_millis(7_000);
|
||||
|
||||
pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches<'_>) -> Result<(), String> {
|
||||
let endpoint = matches
|
||||
let endpoints = matches
|
||||
.value_of("eth1-endpoint")
|
||||
.ok_or_else(|| "eth1-endpoint not specified")?;
|
||||
.map(|e| vec![String::from(e)])
|
||||
.or_else(|| {
|
||||
matches
|
||||
.value_of("eth1-endpoints")
|
||||
.map(|s| s.split(',').map(String::from).collect())
|
||||
});
|
||||
|
||||
let testnet_dir = matches
|
||||
.value_of("testnet-dir")
|
||||
@ -40,11 +46,14 @@ pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches<'_>) -> Res
|
||||
})?;
|
||||
|
||||
let mut config = Eth1Config::default();
|
||||
config.endpoint = endpoint.to_string();
|
||||
if let Some(v) = endpoints.clone() {
|
||||
config.endpoints = v;
|
||||
}
|
||||
config.deposit_contract_address = format!("{:?}", spec.deposit_contract_address);
|
||||
config.deposit_contract_deploy_block = eth2_testnet_config.deposit_contract_deploy_block;
|
||||
config.lowest_cached_block_number = eth2_testnet_config.deposit_contract_deploy_block;
|
||||
config.follow_distance = spec.eth1_follow_distance / 2;
|
||||
config.node_far_behind_seconds = max(5, config.follow_distance) * spec.seconds_per_eth1_block;
|
||||
|
||||
let genesis_service =
|
||||
Eth1GenesisService::new(config, env.core_context().log().clone(), spec.clone());
|
||||
@ -60,7 +69,7 @@ pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches<'_>) -> Res
|
||||
.map_err(|e| format!("Failed to find genesis: {}", e))?;
|
||||
|
||||
info!("Starting service to produce genesis BeaconState from eth1");
|
||||
info!("Connecting to eth1 http endpoint: {}", endpoint);
|
||||
info!("Connecting to eth1 http endpoints: {:?}", endpoints);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
|
@ -233,8 +233,17 @@ fn main() {
|
||||
.long("eth1-endpoint")
|
||||
.value_name("HTTP_SERVER")
|
||||
.takes_value(true)
|
||||
.default_value("http://localhost:8545")
|
||||
.help("The URL to the eth1 JSON-RPC http API."),
|
||||
.help("Deprecated. Use --eth1-endpoints."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("eth1-endpoints")
|
||||
.long("eth1-endpoints")
|
||||
.value_name("HTTP_SERVER_LIST")
|
||||
.takes_value(true)
|
||||
.conflicts_with("eth1-endpoint")
|
||||
.help("One or more comma-delimited URLs to eth1 JSON-RPC http APIs. \
|
||||
If multiple endpoints are given the endpoints are used as \
|
||||
fallback in the given order."),
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
|
@ -14,9 +14,6 @@ use web3::{
|
||||
/// How long we will wait for ganache to indicate that it is ready.
|
||||
const GANACHE_STARTUP_TIMEOUT_MILLIS: u64 = 10_000;
|
||||
|
||||
const NETWORK_ID: u64 = 42;
|
||||
const CHAIN_ID: u64 = 42;
|
||||
|
||||
/// Provides a dedicated `ganachi-cli` instance with a connected `Web3` instance.
|
||||
///
|
||||
/// Requires that `ganachi-cli` is installed and available on `PATH`.
|
||||
@ -25,39 +22,17 @@ pub struct GanacheInstance {
|
||||
child: Child,
|
||||
_event_loop: Arc<EventLoopHandle>,
|
||||
pub web3: Web3<Http>,
|
||||
network_id: u64,
|
||||
chain_id: u64,
|
||||
}
|
||||
|
||||
impl GanacheInstance {
|
||||
/// Start a new `ganache-cli` process, waiting until it indicates that it is ready to accept
|
||||
/// RPC connections.
|
||||
pub fn new() -> Result<Self, String> {
|
||||
let port = unused_port()?;
|
||||
|
||||
let mut child = Command::new("ganache-cli")
|
||||
.stdout(Stdio::piped())
|
||||
.arg("--defaultBalanceEther")
|
||||
.arg("1000000000")
|
||||
.arg("--gasLimit")
|
||||
.arg("1000000000")
|
||||
.arg("--accounts")
|
||||
.arg("10")
|
||||
.arg("--port")
|
||||
.arg(format!("{}", port))
|
||||
.arg("--mnemonic")
|
||||
.arg("\"vast thought differ pull jewel broom cook wrist tribe word before omit\"")
|
||||
.arg("--networkId")
|
||||
.arg(format!("{}", NETWORK_ID))
|
||||
.arg("--chainId")
|
||||
.arg(format!("{}", CHAIN_ID))
|
||||
.spawn()
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"Failed to start ganache-cli. \
|
||||
Is it ganache-cli installed and available on $PATH? Error: {:?}",
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
fn new_from_child(
|
||||
mut child: Child,
|
||||
port: u16,
|
||||
network_id: u64,
|
||||
chain_id: u64,
|
||||
) -> Result<Self, String> {
|
||||
let stdout = child
|
||||
.stdout
|
||||
.ok_or_else(|| "Unable to get stdout for ganache child process")?;
|
||||
@ -96,9 +71,67 @@ impl GanacheInstance {
|
||||
port,
|
||||
_event_loop: Arc::new(event_loop),
|
||||
web3,
|
||||
network_id,
|
||||
chain_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Start a new `ganache-cli` process, waiting until it indicates that it is ready to accept
|
||||
/// RPC connections.
|
||||
pub fn new(network_id: u64, chain_id: u64) -> Result<Self, String> {
|
||||
let port = unused_port()?;
|
||||
|
||||
let child = Command::new("ganache-cli")
|
||||
.stdout(Stdio::piped())
|
||||
.arg("--defaultBalanceEther")
|
||||
.arg("1000000000")
|
||||
.arg("--gasLimit")
|
||||
.arg("1000000000")
|
||||
.arg("--accounts")
|
||||
.arg("10")
|
||||
.arg("--port")
|
||||
.arg(format!("{}", port))
|
||||
.arg("--mnemonic")
|
||||
.arg("\"vast thought differ pull jewel broom cook wrist tribe word before omit\"")
|
||||
.arg("--networkId")
|
||||
.arg(format!("{}", network_id))
|
||||
.arg("--chainId")
|
||||
.arg(format!("{}", chain_id))
|
||||
.spawn()
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"Failed to start ganache-cli. \
|
||||
Is it ganache-cli installed and available on $PATH? Error: {:?}",
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
Self::new_from_child(child, port, network_id, chain_id)
|
||||
}
|
||||
|
||||
pub fn fork(&self) -> Result<Self, String> {
|
||||
let port = unused_port()?;
|
||||
|
||||
let child = Command::new("ganache-cli")
|
||||
.stdout(Stdio::piped())
|
||||
.arg("--fork")
|
||||
.arg(self.endpoint())
|
||||
.arg("--port")
|
||||
.arg(format!("{}", port))
|
||||
.arg("--chainId")
|
||||
.arg(format!("{}", self.chain_id))
|
||||
.spawn()
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"Failed to start ganache-cli. \
|
||||
Is it ganache-cli installed and available on $PATH? Error: {:?}",
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
Self::new_from_child(child, port, self.network_id, self.chain_id)
|
||||
}
|
||||
|
||||
/// Returns the endpoint that this instance is listening on.
|
||||
pub fn endpoint(&self) -> String {
|
||||
endpoint(self.port)
|
||||
@ -106,12 +139,12 @@ impl GanacheInstance {
|
||||
|
||||
/// Returns the network id of the ganache instance
|
||||
pub fn network_id(&self) -> u64 {
|
||||
NETWORK_ID
|
||||
self.network_id
|
||||
}
|
||||
|
||||
/// Returns the chain id of the ganache instance
|
||||
pub fn chain_id(&self) -> u64 {
|
||||
CHAIN_ID
|
||||
self.chain_id
|
||||
}
|
||||
|
||||
/// Increase the timestamp on future blocks by `increase_by` seconds.
|
||||
|
@ -31,8 +31,8 @@ pub struct GanacheEth1Instance {
|
||||
}
|
||||
|
||||
impl GanacheEth1Instance {
|
||||
pub async fn new() -> Result<Self, String> {
|
||||
let ganache = GanacheInstance::new()?;
|
||||
pub async fn new(network_id: u64, chain_id: u64) -> Result<Self, String> {
|
||||
let ganache = GanacheInstance::new(network_id, chain_id)?;
|
||||
DepositContract::deploy(ganache.web3.clone(), 0, None)
|
||||
.await
|
||||
.map(|deposit_contract| Self {
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::{checks, LocalNetwork, E};
|
||||
use clap::ArgMatches;
|
||||
use eth1::http::Eth1Id;
|
||||
use eth1::{DEFAULT_CHAIN_ID, DEFAULT_NETWORK_ID};
|
||||
use eth1_test_rig::GanacheEth1Instance;
|
||||
use futures::prelude::*;
|
||||
use node_test_rig::{
|
||||
@ -73,7 +74,8 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
|
||||
* Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit
|
||||
* validators.
|
||||
*/
|
||||
let ganache_eth1_instance = GanacheEth1Instance::new().await?;
|
||||
let ganache_eth1_instance =
|
||||
GanacheEth1Instance::new(DEFAULT_NETWORK_ID.into(), DEFAULT_CHAIN_ID.into()).await?;
|
||||
let deposit_contract = ganache_eth1_instance.deposit_contract;
|
||||
let network_id = ganache_eth1_instance.ganache.network_id();
|
||||
let chain_id = ganache_eth1_instance.ganache.chain_id();
|
||||
@ -102,15 +104,16 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
|
||||
let mut beacon_config = testing_client_config();
|
||||
|
||||
beacon_config.genesis = ClientGenesis::DepositContract;
|
||||
beacon_config.eth1.endpoint = eth1_endpoint;
|
||||
beacon_config.eth1.endpoints = vec![eth1_endpoint];
|
||||
beacon_config.eth1.deposit_contract_address = deposit_contract_address;
|
||||
beacon_config.eth1.deposit_contract_deploy_block = 0;
|
||||
beacon_config.eth1.lowest_cached_block_number = 0;
|
||||
beacon_config.eth1.follow_distance = 1;
|
||||
beacon_config.eth1.node_far_behind_seconds = 20;
|
||||
beacon_config.dummy_eth1_backend = false;
|
||||
beacon_config.sync_eth1_chain = true;
|
||||
beacon_config.eth1.network_id = Eth1Id::Custom(network_id);
|
||||
beacon_config.eth1.chain_id = Eth1Id::Custom(chain_id);
|
||||
beacon_config.eth1.network_id = Eth1Id::from(network_id);
|
||||
beacon_config.eth1.chain_id = Eth1Id::from(chain_id);
|
||||
|
||||
beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user