diff --git a/Cargo.lock b/Cargo.lock index a1f5c7e2a..4bb3ab99e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5531,6 +5531,7 @@ dependencies = [ "execution_layer", "sensitive_url", "tempfile", + "tokio", "types", "validator_client", "validator_dir", diff --git a/testing/node_test_rig/Cargo.toml b/testing/node_test_rig/Cargo.toml index ea5d005c1..ac77349c5 100644 --- a/testing/node_test_rig/Cargo.toml +++ b/testing/node_test_rig/Cargo.toml @@ -14,3 +14,4 @@ validator_client = { path = "../../validator_client" } validator_dir = { path = "../../common/validator_dir", features = ["insecure_keys"] } sensitive_url = { path = "../../common/sensitive_url" } execution_layer = { path = "../../beacon_node/execution_layer" } +tokio = { version = "1.14.0", features = ["time"] } diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index d4fd115be..62db67b8c 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -10,6 +10,7 @@ use std::path::PathBuf; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use tempfile::{Builder as TempBuilder, TempDir}; +use tokio::time::timeout; use types::EthSpec; use validator_client::ProductionValidatorClient; use validator_dir::insecure_keys::build_deterministic_validator_dirs; @@ -24,6 +25,8 @@ pub use validator_client::Config as ValidatorConfig; /// The global timeout for HTTP requests to the beacon node. const HTTP_TIMEOUT: Duration = Duration::from_secs(4); +/// The timeout for a beacon node to start up. +const STARTUP_TIMEOUT: Duration = Duration::from_secs(60); /// Provides a beacon node that is running in the current process on a given tokio executor (it /// is _local_ to this process). @@ -51,12 +54,16 @@ impl LocalBeaconNode { client_config.set_data_dir(datadir.path().into()); client_config.network.network_dir = PathBuf::from(datadir.path()).join("network"); - ProductionBeaconNode::new(context, client_config) - .await - .map(move |client| Self { - client: client.into_inner(), - datadir, - }) + timeout( + STARTUP_TIMEOUT, + ProductionBeaconNode::new(context, client_config), + ) + .await + .map_err(|_| format!("Beacon node startup timed out after {:?}", STARTUP_TIMEOUT))? + .map(move |client| Self { + client: client.into_inner(), + datadir, + }) } } diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 3e764d27d..57c944cf1 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -1,14 +1,16 @@ use crate::local_network::{EXECUTION_PORT, TERMINAL_BLOCK, TERMINAL_DIFFICULTY}; -use crate::{checks, LocalNetwork, E}; +use crate::{checks, LocalNetwork}; use clap::ArgMatches; use eth1::{Eth1Endpoint, DEFAULT_CHAIN_ID}; use eth1_test_rig::AnvilEth1Instance; +use crate::retry::with_retry; use execution_layer::http::deposit_methods::Eth1Id; use futures::prelude::*; +use node_test_rig::environment::RuntimeContext; use node_test_rig::{ environment::{EnvironmentBuilder, LoggerConfig}, - testing_client_config, testing_validator_config, ClientGenesis, ValidatorFiles, + testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles, }; use rayon::prelude::*; use sensitive_url::SensitiveUrl; @@ -107,71 +109,24 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let context = env.core_context(); let main_future = async { - /* - * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit - * validators. - */ - let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?; - let deposit_contract = anvil_eth1_instance.deposit_contract; - let chain_id = anvil_eth1_instance.anvil.chain_id(); - let anvil = anvil_eth1_instance.anvil; - let eth1_endpoint = SensitiveUrl::parse(anvil.endpoint().as_str()) - .expect("Unable to parse anvil endpoint."); - let deposit_contract_address = deposit_contract.address(); - - // Start a timer that produces eth1 blocks on an interval. - tokio::spawn(async move { - let mut interval = tokio::time::interval(eth1_block_time); - loop { - interval.tick().await; - let _ = anvil.evm_mine().await; - } - }); - - // Submit deposits to the deposit contract. - tokio::spawn(async move { - for i in 0..total_validator_count { - println!("Submitting deposit for validator {}...", i); - let _ = deposit_contract - .deposit_deterministic_async::(i, deposit_amount) - .await; - } - }); - - let mut beacon_config = testing_client_config(); - - beacon_config.genesis = ClientGenesis::DepositContract; - beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint); - beacon_config.eth1.deposit_contract_address = deposit_contract_address; - beacon_config.eth1.deposit_contract_deploy_block = 0; - beacon_config.eth1.lowest_cached_block_number = 0; - beacon_config.eth1.follow_distance = 1; - beacon_config.eth1.node_far_behind_seconds = 20; - beacon_config.dummy_eth1_backend = false; - beacon_config.sync_eth1_chain = true; - beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64; - beacon_config.eth1.chain_id = Eth1Id::from(chain_id); - beacon_config.network.target_peers = node_count + proposer_nodes - 1; - - beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); - - if post_merge_sim { - let el_config = execution_layer::Config { - execution_endpoints: vec![SensitiveUrl::parse(&format!( - "http://localhost:{}", - EXECUTION_PORT - )) - .unwrap()], - ..Default::default() - }; - - beacon_config.execution_layer = Some(el_config); - } - /* * Create a new `LocalNetwork` with one beacon node. */ - let network = LocalNetwork::new(context.clone(), beacon_config.clone()).await?; + let max_retries = 3; + let (network, beacon_config) = with_retry(max_retries, || { + Box::pin(create_local_network( + LocalNetworkParams { + eth1_block_time, + total_validator_count, + deposit_amount, + node_count, + proposer_nodes, + post_merge_sim, + }, + context.clone(), + )) + }) + .await?; /* * One by one, add beacon nodes to the network. @@ -341,3 +296,88 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { Ok(()) } + +struct LocalNetworkParams { + eth1_block_time: Duration, + total_validator_count: usize, + deposit_amount: u64, + node_count: usize, + proposer_nodes: usize, + post_merge_sim: bool, +} + +async fn create_local_network( + LocalNetworkParams { + eth1_block_time, + total_validator_count, + deposit_amount, + node_count, + proposer_nodes, + post_merge_sim, + }: LocalNetworkParams, + context: RuntimeContext, +) -> Result<(LocalNetwork, ClientConfig), String> { + /* + * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit + * validators. + */ + let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?; + let deposit_contract = anvil_eth1_instance.deposit_contract; + let chain_id = anvil_eth1_instance.anvil.chain_id(); + let anvil = anvil_eth1_instance.anvil; + let eth1_endpoint = + SensitiveUrl::parse(anvil.endpoint().as_str()).expect("Unable to parse anvil endpoint."); + let deposit_contract_address = deposit_contract.address(); + + // Start a timer that produces eth1 blocks on an interval. + tokio::spawn(async move { + let mut interval = tokio::time::interval(eth1_block_time); + loop { + interval.tick().await; + let _ = anvil.evm_mine().await; + } + }); + + // Submit deposits to the deposit contract. + tokio::spawn(async move { + for i in 0..total_validator_count { + println!("Submitting deposit for validator {}...", i); + let _ = deposit_contract + .deposit_deterministic_async::(i, deposit_amount) + .await; + } + }); + + let mut beacon_config = testing_client_config(); + + beacon_config.genesis = ClientGenesis::DepositContract; + beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint); + beacon_config.eth1.deposit_contract_address = deposit_contract_address; + beacon_config.eth1.deposit_contract_deploy_block = 0; + beacon_config.eth1.lowest_cached_block_number = 0; + beacon_config.eth1.follow_distance = 1; + beacon_config.eth1.node_far_behind_seconds = 20; + beacon_config.dummy_eth1_backend = false; + beacon_config.sync_eth1_chain = true; + beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64; + beacon_config.eth1.chain_id = Eth1Id::from(chain_id); + beacon_config.network.target_peers = node_count + proposer_nodes - 1; + + beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); + + if post_merge_sim { + let el_config = execution_layer::Config { + execution_endpoints: vec![SensitiveUrl::parse(&format!( + "http://localhost:{}", + EXECUTION_PORT + )) + .unwrap()], + ..Default::default() + }; + + beacon_config.execution_layer = Some(el_config); + } + + let network = LocalNetwork::new(context, beacon_config.clone()).await?; + Ok((network, beacon_config)) +} diff --git a/testing/simulator/src/main.rs b/testing/simulator/src/main.rs index a19777c5a..e8af9c180 100644 --- a/testing/simulator/src/main.rs +++ b/testing/simulator/src/main.rs @@ -21,6 +21,7 @@ mod cli; mod eth1_sim; mod local_network; mod no_eth1_sim; +mod retry; mod sync_sim; use cli::cli_app; diff --git a/testing/simulator/src/retry.rs b/testing/simulator/src/retry.rs new file mode 100644 index 000000000..a4eb52cea --- /dev/null +++ b/testing/simulator/src/retry.rs @@ -0,0 +1,63 @@ +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; + +/// Executes the function with a specified number of retries if the function returns an error. +/// Once it exceeds `max_retries` and still fails, the error is returned. +pub async fn with_retry(max_retries: usize, mut func: F) -> Result +where + F: FnMut() -> Pin>>>, + E: Debug, +{ + let mut retry_count = 0; + loop { + let result = Box::pin(func()).await; + if result.is_ok() || retry_count >= max_retries { + break result; + } + retry_count += 1; + + if let Err(e) = result { + eprintln!( + "Operation failed with error {:?}, retrying {} of {}", + e, retry_count, max_retries + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::VecDeque; + + async fn my_async_func(is_ok: bool) -> Result<(), ()> { + if is_ok { + Ok(()) + } else { + Err(()) + } + } + + #[tokio::test] + async fn test_with_retry_ok() { + let res = with_retry(3, || Box::pin(my_async_func(true))).await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_with_retry_2nd_ok() { + let mut mock_results = VecDeque::from([false, true]); + let res = with_retry(3, || { + Box::pin(my_async_func(mock_results.pop_front().unwrap())) + }) + .await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_with_retry_fail() { + let res = with_retry(3, || Box::pin(my_async_func(false))).await; + assert!(res.is_err()); + } +}