CI fix: add retries to eth1 sim tests (#4501)
## Issue Addressed This PR attempts to workaround the recent frequent eth1 simulator failures caused by missing eth logs from Anvil. > FailedToInsertDeposit(NonConsecutive { log_index: 1, expected: 0 }) This usually occurs at the beginning of the tests, and it guarantees a timeout after a few hours if this log shows up, and this is currently causing our CIs to fail quite frequently. Example failure here: https://github.com/sigp/lighthouse/actions/runs/5525760195/jobs/10079736914 ## Proposed Changes The quick fix applied here adds a timeout to node startup and restarts the node again. - Add a 60 seconds timeout to beacon node startup in eth1 simulator tests. It takes ~10 seconds on my machine, but could take longer on CI runners. - Wrap the startup code in a retry function, that allows for 3 retries before returning an error. ## Additional Info We should probably raise an issue under the Anvil GitHub repo there so this can be further investigated.
This commit is contained in:
parent
68d5a6cf99
commit
d4a61756ca
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -5531,6 +5531,7 @@ dependencies = [
|
||||
"execution_layer",
|
||||
"sensitive_url",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"types",
|
||||
"validator_client",
|
||||
"validator_dir",
|
||||
|
@ -14,3 +14,4 @@ validator_client = { path = "../../validator_client" }
|
||||
validator_dir = { path = "../../common/validator_dir", features = ["insecure_keys"] }
|
||||
sensitive_url = { path = "../../common/sensitive_url" }
|
||||
execution_layer = { path = "../../beacon_node/execution_layer" }
|
||||
tokio = { version = "1.14.0", features = ["time"] }
|
||||
|
@ -10,6 +10,7 @@ use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tempfile::{Builder as TempBuilder, TempDir};
|
||||
use tokio::time::timeout;
|
||||
use types::EthSpec;
|
||||
use validator_client::ProductionValidatorClient;
|
||||
use validator_dir::insecure_keys::build_deterministic_validator_dirs;
|
||||
@ -24,6 +25,8 @@ pub use validator_client::Config as ValidatorConfig;
|
||||
|
||||
/// The global timeout for HTTP requests to the beacon node.
|
||||
const HTTP_TIMEOUT: Duration = Duration::from_secs(4);
|
||||
/// The timeout for a beacon node to start up.
|
||||
const STARTUP_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Provides a beacon node that is running in the current process on a given tokio executor (it
|
||||
/// is _local_ to this process).
|
||||
@ -51,12 +54,16 @@ impl<E: EthSpec> LocalBeaconNode<E> {
|
||||
client_config.set_data_dir(datadir.path().into());
|
||||
client_config.network.network_dir = PathBuf::from(datadir.path()).join("network");
|
||||
|
||||
ProductionBeaconNode::new(context, client_config)
|
||||
.await
|
||||
.map(move |client| Self {
|
||||
client: client.into_inner(),
|
||||
datadir,
|
||||
})
|
||||
timeout(
|
||||
STARTUP_TIMEOUT,
|
||||
ProductionBeaconNode::new(context, client_config),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| format!("Beacon node startup timed out after {:?}", STARTUP_TIMEOUT))?
|
||||
.map(move |client| Self {
|
||||
client: client.into_inner(),
|
||||
datadir,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,14 +1,16 @@
|
||||
use crate::local_network::{EXECUTION_PORT, TERMINAL_BLOCK, TERMINAL_DIFFICULTY};
|
||||
use crate::{checks, LocalNetwork, E};
|
||||
use crate::{checks, LocalNetwork};
|
||||
use clap::ArgMatches;
|
||||
use eth1::{Eth1Endpoint, DEFAULT_CHAIN_ID};
|
||||
use eth1_test_rig::AnvilEth1Instance;
|
||||
|
||||
use crate::retry::with_retry;
|
||||
use execution_layer::http::deposit_methods::Eth1Id;
|
||||
use futures::prelude::*;
|
||||
use node_test_rig::environment::RuntimeContext;
|
||||
use node_test_rig::{
|
||||
environment::{EnvironmentBuilder, LoggerConfig},
|
||||
testing_client_config, testing_validator_config, ClientGenesis, ValidatorFiles,
|
||||
testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use sensitive_url::SensitiveUrl;
|
||||
@ -107,71 +109,24 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
|
||||
let context = env.core_context();
|
||||
|
||||
let main_future = async {
|
||||
/*
|
||||
* Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit
|
||||
* validators.
|
||||
*/
|
||||
let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?;
|
||||
let deposit_contract = anvil_eth1_instance.deposit_contract;
|
||||
let chain_id = anvil_eth1_instance.anvil.chain_id();
|
||||
let anvil = anvil_eth1_instance.anvil;
|
||||
let eth1_endpoint = SensitiveUrl::parse(anvil.endpoint().as_str())
|
||||
.expect("Unable to parse anvil endpoint.");
|
||||
let deposit_contract_address = deposit_contract.address();
|
||||
|
||||
// Start a timer that produces eth1 blocks on an interval.
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(eth1_block_time);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = anvil.evm_mine().await;
|
||||
}
|
||||
});
|
||||
|
||||
// Submit deposits to the deposit contract.
|
||||
tokio::spawn(async move {
|
||||
for i in 0..total_validator_count {
|
||||
println!("Submitting deposit for validator {}...", i);
|
||||
let _ = deposit_contract
|
||||
.deposit_deterministic_async::<E>(i, deposit_amount)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
let mut beacon_config = testing_client_config();
|
||||
|
||||
beacon_config.genesis = ClientGenesis::DepositContract;
|
||||
beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint);
|
||||
beacon_config.eth1.deposit_contract_address = deposit_contract_address;
|
||||
beacon_config.eth1.deposit_contract_deploy_block = 0;
|
||||
beacon_config.eth1.lowest_cached_block_number = 0;
|
||||
beacon_config.eth1.follow_distance = 1;
|
||||
beacon_config.eth1.node_far_behind_seconds = 20;
|
||||
beacon_config.dummy_eth1_backend = false;
|
||||
beacon_config.sync_eth1_chain = true;
|
||||
beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64;
|
||||
beacon_config.eth1.chain_id = Eth1Id::from(chain_id);
|
||||
beacon_config.network.target_peers = node_count + proposer_nodes - 1;
|
||||
|
||||
beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None);
|
||||
|
||||
if post_merge_sim {
|
||||
let el_config = execution_layer::Config {
|
||||
execution_endpoints: vec![SensitiveUrl::parse(&format!(
|
||||
"http://localhost:{}",
|
||||
EXECUTION_PORT
|
||||
))
|
||||
.unwrap()],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
beacon_config.execution_layer = Some(el_config);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new `LocalNetwork` with one beacon node.
|
||||
*/
|
||||
let network = LocalNetwork::new(context.clone(), beacon_config.clone()).await?;
|
||||
let max_retries = 3;
|
||||
let (network, beacon_config) = with_retry(max_retries, || {
|
||||
Box::pin(create_local_network(
|
||||
LocalNetworkParams {
|
||||
eth1_block_time,
|
||||
total_validator_count,
|
||||
deposit_amount,
|
||||
node_count,
|
||||
proposer_nodes,
|
||||
post_merge_sim,
|
||||
},
|
||||
context.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
|
||||
/*
|
||||
* One by one, add beacon nodes to the network.
|
||||
@ -341,3 +296,88 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct LocalNetworkParams {
|
||||
eth1_block_time: Duration,
|
||||
total_validator_count: usize,
|
||||
deposit_amount: u64,
|
||||
node_count: usize,
|
||||
proposer_nodes: usize,
|
||||
post_merge_sim: bool,
|
||||
}
|
||||
|
||||
async fn create_local_network<E: EthSpec>(
|
||||
LocalNetworkParams {
|
||||
eth1_block_time,
|
||||
total_validator_count,
|
||||
deposit_amount,
|
||||
node_count,
|
||||
proposer_nodes,
|
||||
post_merge_sim,
|
||||
}: LocalNetworkParams,
|
||||
context: RuntimeContext<E>,
|
||||
) -> Result<(LocalNetwork<E>, ClientConfig), String> {
|
||||
/*
|
||||
* Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit
|
||||
* validators.
|
||||
*/
|
||||
let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?;
|
||||
let deposit_contract = anvil_eth1_instance.deposit_contract;
|
||||
let chain_id = anvil_eth1_instance.anvil.chain_id();
|
||||
let anvil = anvil_eth1_instance.anvil;
|
||||
let eth1_endpoint =
|
||||
SensitiveUrl::parse(anvil.endpoint().as_str()).expect("Unable to parse anvil endpoint.");
|
||||
let deposit_contract_address = deposit_contract.address();
|
||||
|
||||
// Start a timer that produces eth1 blocks on an interval.
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(eth1_block_time);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = anvil.evm_mine().await;
|
||||
}
|
||||
});
|
||||
|
||||
// Submit deposits to the deposit contract.
|
||||
tokio::spawn(async move {
|
||||
for i in 0..total_validator_count {
|
||||
println!("Submitting deposit for validator {}...", i);
|
||||
let _ = deposit_contract
|
||||
.deposit_deterministic_async::<E>(i, deposit_amount)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
let mut beacon_config = testing_client_config();
|
||||
|
||||
beacon_config.genesis = ClientGenesis::DepositContract;
|
||||
beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint);
|
||||
beacon_config.eth1.deposit_contract_address = deposit_contract_address;
|
||||
beacon_config.eth1.deposit_contract_deploy_block = 0;
|
||||
beacon_config.eth1.lowest_cached_block_number = 0;
|
||||
beacon_config.eth1.follow_distance = 1;
|
||||
beacon_config.eth1.node_far_behind_seconds = 20;
|
||||
beacon_config.dummy_eth1_backend = false;
|
||||
beacon_config.sync_eth1_chain = true;
|
||||
beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64;
|
||||
beacon_config.eth1.chain_id = Eth1Id::from(chain_id);
|
||||
beacon_config.network.target_peers = node_count + proposer_nodes - 1;
|
||||
|
||||
beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None);
|
||||
|
||||
if post_merge_sim {
|
||||
let el_config = execution_layer::Config {
|
||||
execution_endpoints: vec![SensitiveUrl::parse(&format!(
|
||||
"http://localhost:{}",
|
||||
EXECUTION_PORT
|
||||
))
|
||||
.unwrap()],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
beacon_config.execution_layer = Some(el_config);
|
||||
}
|
||||
|
||||
let network = LocalNetwork::new(context, beacon_config.clone()).await?;
|
||||
Ok((network, beacon_config))
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ mod cli;
|
||||
mod eth1_sim;
|
||||
mod local_network;
|
||||
mod no_eth1_sim;
|
||||
mod retry;
|
||||
mod sync_sim;
|
||||
|
||||
use cli::cli_app;
|
||||
|
63
testing/simulator/src/retry.rs
Normal file
63
testing/simulator/src/retry.rs
Normal file
@ -0,0 +1,63 @@
|
||||
use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Executes the function with a specified number of retries if the function returns an error.
|
||||
/// Once it exceeds `max_retries` and still fails, the error is returned.
|
||||
pub async fn with_retry<T, E, F>(max_retries: usize, mut func: F) -> Result<T, E>
|
||||
where
|
||||
F: FnMut() -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
|
||||
E: Debug,
|
||||
{
|
||||
let mut retry_count = 0;
|
||||
loop {
|
||||
let result = Box::pin(func()).await;
|
||||
if result.is_ok() || retry_count >= max_retries {
|
||||
break result;
|
||||
}
|
||||
retry_count += 1;
|
||||
|
||||
if let Err(e) = result {
|
||||
eprintln!(
|
||||
"Operation failed with error {:?}, retrying {} of {}",
|
||||
e, retry_count, max_retries
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
async fn my_async_func(is_ok: bool) -> Result<(), ()> {
|
||||
if is_ok {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_retry_ok() {
|
||||
let res = with_retry(3, || Box::pin(my_async_func(true))).await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_retry_2nd_ok() {
|
||||
let mut mock_results = VecDeque::from([false, true]);
|
||||
let res = with_retry(3, || {
|
||||
Box::pin(my_async_func(mock_results.pop_front().unwrap()))
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_retry_fail() {
|
||||
let res = with_retry(3, || Box::pin(my_async_func(false))).await;
|
||||
assert!(res.is_err());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user