diff --git a/Cargo.lock b/Cargo.lock index f3beeca8b..95a6cd428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,6 +21,7 @@ dependencies = [ "slog-async 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "types 0.2.0", "validator_client 0.2.0", "web3 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index eac3ceb06..9588767fa 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -27,3 +27,4 @@ eth2_testnet_config = { path = "../eth2/utils/eth2_testnet_config" } web3 = "0.10.0" futures = "0.1.25" clap_utils = { path = "../eth2/utils/clap_utils" } +tokio = "0.1.22" diff --git a/account_manager/src/deposits.rs b/account_manager/src/deposits.rs index c29bb5a21..5f6cc82fc 100644 --- a/account_manager/src/deposits.rs +++ b/account_manager/src/deposits.rs @@ -1,20 +1,34 @@ use clap::{App, Arg, ArgMatches}; use clap_utils; use environment::Environment; +use futures::{ + future::{self, loop_fn, Loop}, + Future, +}; +use slog::{info, Logger}; use std::fs; use std::path::PathBuf; +use std::time::{Duration, Instant}; +use tokio::timer::Delay; use types::EthSpec; use validator_client::validator_directory::ValidatorDirectoryBuilder; -use web3::{transports::Ipc, types::Address, Web3}; +use web3::{ + transports::Ipc, + types::{Address, SyncInfo, SyncState}, + Transport, Web3, +}; + +const SYNCING_STATE_RETRY_DELAY: Duration = Duration::from_secs(2); pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new("deposited") .about("Creates new Lighthouse validator keys and directories. Each newly-created validator will have a deposit transaction formed and submitted to the deposit contract via - --eth1-ipc. Will only write each validator keys to disk if the deposit transaction returns - successfully from the eth1 node. The process exits immediately if any Eth1 tx fails. Does - not wait for Eth1 confirmation blocks, so there is no guarantee that a deposit will be - accepted in the Eth1 chain.") + --eth1-ipc. This application will only write each validator keys to disk if the deposit + transaction returns successfully from the eth1 node. The process exits immediately if any + Eth1 tx fails. Does not wait for Eth1 confirmation blocks, so there is no guarantee that a + deposit will be accepted in the Eth1 chain. Before key generation starts, this application + will wait until the eth1 indicates that it is not syncing via the eth_syncing endpoint") .arg( Arg::with_name("validator-dir") .long("validator-dir") @@ -56,8 +70,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( - Arg::with_name("at-least") - .long("at-least") + Arg::with_name("at-most") + .long("at-most") .value_name("VALIDATOR_COUNT") .help("Observe the number of validators in --validator-dir, only creating enough to ensure reach the given count. Never deletes an existing validator.") @@ -68,6 +82,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { pub fn cli_run(matches: &ArgMatches, mut env: Environment) -> Result<(), String> { let spec = env.core_context().eth2_config.spec; + let log = env.core_context().log; let validator_dir = clap_utils::parse_path_with_default_in_home_dir( matches, @@ -79,17 +94,28 @@ pub fn cli_run(matches: &ArgMatches, mut env: Environment) -> Res let deposit_gwei = clap_utils::parse_optional(matches, "deposit-gwei")? .unwrap_or_else(|| spec.max_effective_balance); let count: Option = clap_utils::parse_optional(matches, "count")?; - let at_least: Option = clap_utils::parse_optional(matches, "at-least")?; + let at_most: Option = clap_utils::parse_optional(matches, "at-most")?; - let n = match (count, at_least) { - (Some(_), Some(_)) => Err("Cannot supply --count and --at-least".to_string()), - (None, None) => Err("Must supply either --count or --at-least".to_string()), + let starting_validator_count = existing_validator_count(&validator_dir)?; + + let n = match (count, at_most) { + (Some(_), Some(_)) => Err("Cannot supply --count and --at-most".to_string()), + (None, None) => Err("Must supply either --count or --at-most".to_string()), (Some(count), None) => Ok(count), - (None, Some(at_least)) => fs::read_dir(&validator_dir) - .map(|iter| at_least.saturating_sub(iter.count())) - .map_err(|e| format!("Unable to read {:?}: {}", validator_dir, e)), + (None, Some(at_most)) => Ok(at_most.saturating_sub(starting_validator_count)), }?; + if n == 0 { + info!( + log, + "No need to produce and validators, exiting"; + "--count" => count, + "--at-most" => at_most, + "existing_validators" => starting_validator_count, + ); + return Ok(()); + } + let deposit_contract = env .testnet .as_ref() @@ -105,25 +131,125 @@ pub fn cli_run(matches: &ArgMatches, mut env: Environment) -> Res Ipc::new(eth1_ipc_path).map_err(|e| format!("Unable to connect to eth1 IPC: {:?}", e))?; let web3 = Web3::new(transport); - for _ in 0..n { - let validator = env - .runtime() + env.runtime() + .block_on(poll_until_synced(web3.clone(), log.clone()))?; + + for i in 0..n { + let tx_hash_log = log.clone(); + + env.runtime() .block_on( ValidatorDirectoryBuilder::default() .spec(spec.clone()) .custom_deposit_amount(deposit_gwei) .thread_random_keypairs() - .submit_eth1_deposit(web3.clone(), from_address, deposit_contract), + .submit_eth1_deposit(web3.clone(), from_address, deposit_contract) + .map(move |(builder, tx_hash)| { + info!( + tx_hash_log, + "Validator deposited"; + "eth1_tx_hash" => format!("{:?}", tx_hash), + "index" => format!("{}/{}", i + 1, n), + ); + builder + }), )? .create_directory(validator_dir.clone())? .write_keypair_files()? .write_eth1_data_file()? .build()?; - - if let Some(voting_keypair) = validator.voting_keypair { - println!("{:?}", voting_keypair.pk) - } } + let ending_validator_count = existing_validator_count(&validator_dir)?; + let delta = ending_validator_count.saturating_sub(starting_validator_count); + + info!( + log, + "Success"; + "validators_created_and_deposited" => delta, + ); + Ok(()) } + +/// Returns the number of validators that exist in the given `validator_dir`. +/// +/// This function just assumes any file is a validator directory, making it likely to return a +/// higher number than accurate but never a lower one. +fn existing_validator_count(validator_dir: &PathBuf) -> Result { + fs::read_dir(&validator_dir) + .map(|iter| iter.count()) + .map_err(|e| format!("Unable to read {:?}: {}", validator_dir, e)) +} + +/// Run a poll on the `eth_syncing` endpoint, blocking until the node is synced. +fn poll_until_synced(web3: Web3, log: Logger) -> impl Future + Send +where + T: Transport + Send + 'static, + ::Out: Send, +{ + loop_fn((web3.clone(), log.clone()), move |(web3, log)| { + web3.clone() + .eth() + .syncing() + .map_err(|e| format!("Unable to read syncing state from eth1 node: {:?}", e)) + .and_then::<_, Box + Send>>(move |sync_state| { + match sync_state { + SyncState::Syncing(SyncInfo { + current_block, + highest_block, + .. + }) => { + info!( + log, + "Waiting for eth1 node to sync"; + "est_highest_block" => format!("{}", highest_block), + "current_block" => format!("{}", current_block), + ); + + Box::new( + Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY) + .map_err(|e| format!("Failed to trigger delay: {:?}", e)) + .and_then(|_| future::ok(Loop::Continue((web3, log)))), + ) + } + SyncState::NotSyncing => Box::new( + web3.clone() + .eth() + .block_number() + .map_err(|e| { + format!("Unable to read block number from eth1 node: {:?}", e) + }) + .and_then::<_, Box + Send>>( + |block_number| { + if block_number > 0.into() { + info!( + log, + "Eth1 node is synced"; + "head_block" => format!("{}", block_number), + ); + Box::new(future::ok(Loop::Break((web3, log)))) + } else { + Box::new( + Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY) + .map_err(|e| { + format!("Failed to trigger delay: {:?}", e) + }) + .and_then(|_| { + info!( + log, + "Waiting for eth1 node to sync"; + "current_block" => 0, + ); + future::ok(Loop::Continue((web3, log))) + }), + ) + } + }, + ), + ), + } + }) + }) + .map(|_| ()) +} diff --git a/eth2/utils/clap_utils/src/lib.rs b/eth2/utils/clap_utils/src/lib.rs index ecd1b698b..d8002d76f 100644 --- a/eth2/utils/clap_utils/src/lib.rs +++ b/eth2/utils/clap_utils/src/lib.rs @@ -16,22 +16,19 @@ pub fn parse_testnet_dir_with_hardcoded_default( matches: &ArgMatches, name: &'static str, ) -> Result, String> { - parse_required::(matches, name) - .and_then(|path| { - Eth2TestnetConfig::load(path.clone()) - .map_err(|e| format!("Unable to open testnet dir at {:?}: {}", path, e)) - }) - .map(Result::Ok) - .unwrap_or_else(|_| { - Eth2TestnetConfig::hard_coded().map_err(|e| { - format!( - "The hard-coded testnet directory was invalid. \ - This happens when Lighthouse is migrating between spec versions. \ - Error : {}", - e - ) - }) + if let Some(path) = parse_optional::(matches, name)? { + Eth2TestnetConfig::load(path.clone()) + .map_err(|e| format!("Unable to open testnet dir at {:?}: {}", path, e)) + } else { + Eth2TestnetConfig::hard_coded().map_err(|e| { + format!( + "The hard-coded testnet directory was invalid. \ + This happens when Lighthouse is migrating between spec versions. \ + Error : {}", + e + ) }) + } } /// If `name` is in `matches`, parses the value as a path. Otherwise, attempts to find the user's diff --git a/validator_client/src/validator_directory.rs b/validator_client/src/validator_directory.rs index 82f02cf34..197e1cb44 100644 --- a/validator_client/src/validator_directory.rs +++ b/validator_client/src/validator_directory.rs @@ -308,7 +308,7 @@ impl ValidatorDirectoryBuilder { web3: Web3, from: Address, deposit_contract: Address, - ) -> impl Future { + ) -> impl Future { self.get_deposit_data() .into_future() .and_then(move |(deposit_data, deposit_amount)| { @@ -325,7 +325,7 @@ impl ValidatorDirectoryBuilder { }) .map_err(|e| format!("Failed to send transaction: {:?}", e)) }) - .map(|_tx| self) + .map(|tx| (self, tx)) } pub fn build(self) -> Result {