Deposits wait (#1036)

* Address clippy arith lints

* Make account manager wait for eth1 to sync

* Fix bug with testnet parsing

* Tidy logs
This commit is contained in:
Paul Hauner 2020-04-22 15:20:55 +10:00 committed by GitHub
parent 018a666731
commit 2b6b2354e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 164 additions and 39 deletions

1
Cargo.lock generated
View File

@ -21,6 +21,7 @@ dependencies = [
"slog-async 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-async 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slog-term 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"types 0.2.0", "types 0.2.0",
"validator_client 0.2.0", "validator_client 0.2.0",
"web3 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "web3 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -27,3 +27,4 @@ eth2_testnet_config = { path = "../eth2/utils/eth2_testnet_config" }
web3 = "0.10.0" web3 = "0.10.0"
futures = "0.1.25" futures = "0.1.25"
clap_utils = { path = "../eth2/utils/clap_utils" } clap_utils = { path = "../eth2/utils/clap_utils" }
tokio = "0.1.22"

View File

@ -1,20 +1,34 @@
use clap::{App, Arg, ArgMatches}; use clap::{App, Arg, ArgMatches};
use clap_utils; use clap_utils;
use environment::Environment; use environment::Environment;
use futures::{
future::{self, loop_fn, Loop},
Future,
};
use slog::{info, Logger};
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
use types::EthSpec; use types::EthSpec;
use validator_client::validator_directory::ValidatorDirectoryBuilder; use validator_client::validator_directory::ValidatorDirectoryBuilder;
use web3::{transports::Ipc, types::Address, Web3}; use web3::{
transports::Ipc,
types::{Address, SyncInfo, SyncState},
Transport, Web3,
};
const SYNCING_STATE_RETRY_DELAY: Duration = Duration::from_secs(2);
pub fn cli_app<'a, 'b>() -> App<'a, 'b> { pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
App::new("deposited") App::new("deposited")
.about("Creates new Lighthouse validator keys and directories. Each newly-created validator .about("Creates new Lighthouse validator keys and directories. Each newly-created validator
will have a deposit transaction formed and submitted to the deposit contract via will have a deposit transaction formed and submitted to the deposit contract via
--eth1-ipc. Will only write each validator keys to disk if the deposit transaction returns --eth1-ipc. This application will only write each validator keys to disk if the deposit
successfully from the eth1 node. The process exits immediately if any Eth1 tx fails. Does transaction returns successfully from the eth1 node. The process exits immediately if any
not wait for Eth1 confirmation blocks, so there is no guarantee that a deposit will be Eth1 tx fails. Does not wait for Eth1 confirmation blocks, so there is no guarantee that a
accepted in the Eth1 chain.") deposit will be accepted in the Eth1 chain. Before key generation starts, this application
will wait until the eth1 indicates that it is not syncing via the eth_syncing endpoint")
.arg( .arg(
Arg::with_name("validator-dir") Arg::with_name("validator-dir")
.long("validator-dir") .long("validator-dir")
@ -56,8 +70,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true), .takes_value(true),
) )
.arg( .arg(
Arg::with_name("at-least") Arg::with_name("at-most")
.long("at-least") .long("at-most")
.value_name("VALIDATOR_COUNT") .value_name("VALIDATOR_COUNT")
.help("Observe the number of validators in --validator-dir, only creating enough to .help("Observe the number of validators in --validator-dir, only creating enough to
ensure reach the given count. Never deletes an existing validator.") ensure reach the given count. Never deletes an existing validator.")
@ -68,6 +82,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Result<(), String> { pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Result<(), String> {
let spec = env.core_context().eth2_config.spec; let spec = env.core_context().eth2_config.spec;
let log = env.core_context().log;
let validator_dir = clap_utils::parse_path_with_default_in_home_dir( let validator_dir = clap_utils::parse_path_with_default_in_home_dir(
matches, matches,
@ -79,17 +94,28 @@ pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Res
let deposit_gwei = clap_utils::parse_optional(matches, "deposit-gwei")? let deposit_gwei = clap_utils::parse_optional(matches, "deposit-gwei")?
.unwrap_or_else(|| spec.max_effective_balance); .unwrap_or_else(|| spec.max_effective_balance);
let count: Option<usize> = clap_utils::parse_optional(matches, "count")?; let count: Option<usize> = clap_utils::parse_optional(matches, "count")?;
let at_least: Option<usize> = clap_utils::parse_optional(matches, "at-least")?; let at_most: Option<usize> = clap_utils::parse_optional(matches, "at-most")?;
let n = match (count, at_least) { let starting_validator_count = existing_validator_count(&validator_dir)?;
(Some(_), Some(_)) => Err("Cannot supply --count and --at-least".to_string()),
(None, None) => Err("Must supply either --count or --at-least".to_string()), let n = match (count, at_most) {
(Some(_), Some(_)) => Err("Cannot supply --count and --at-most".to_string()),
(None, None) => Err("Must supply either --count or --at-most".to_string()),
(Some(count), None) => Ok(count), (Some(count), None) => Ok(count),
(None, Some(at_least)) => fs::read_dir(&validator_dir) (None, Some(at_most)) => Ok(at_most.saturating_sub(starting_validator_count)),
.map(|iter| at_least.saturating_sub(iter.count()))
.map_err(|e| format!("Unable to read {:?}: {}", validator_dir, e)),
}?; }?;
if n == 0 {
info!(
log,
"No need to produce and validators, exiting";
"--count" => count,
"--at-most" => at_most,
"existing_validators" => starting_validator_count,
);
return Ok(());
}
let deposit_contract = env let deposit_contract = env
.testnet .testnet
.as_ref() .as_ref()
@ -105,25 +131,125 @@ pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Res
Ipc::new(eth1_ipc_path).map_err(|e| format!("Unable to connect to eth1 IPC: {:?}", e))?; Ipc::new(eth1_ipc_path).map_err(|e| format!("Unable to connect to eth1 IPC: {:?}", e))?;
let web3 = Web3::new(transport); let web3 = Web3::new(transport);
for _ in 0..n { env.runtime()
let validator = env .block_on(poll_until_synced(web3.clone(), log.clone()))?;
.runtime()
for i in 0..n {
let tx_hash_log = log.clone();
env.runtime()
.block_on( .block_on(
ValidatorDirectoryBuilder::default() ValidatorDirectoryBuilder::default()
.spec(spec.clone()) .spec(spec.clone())
.custom_deposit_amount(deposit_gwei) .custom_deposit_amount(deposit_gwei)
.thread_random_keypairs() .thread_random_keypairs()
.submit_eth1_deposit(web3.clone(), from_address, deposit_contract), .submit_eth1_deposit(web3.clone(), from_address, deposit_contract)
.map(move |(builder, tx_hash)| {
info!(
tx_hash_log,
"Validator deposited";
"eth1_tx_hash" => format!("{:?}", tx_hash),
"index" => format!("{}/{}", i + 1, n),
);
builder
}),
)? )?
.create_directory(validator_dir.clone())? .create_directory(validator_dir.clone())?
.write_keypair_files()? .write_keypair_files()?
.write_eth1_data_file()? .write_eth1_data_file()?
.build()?; .build()?;
}
if let Some(voting_keypair) = validator.voting_keypair { let ending_validator_count = existing_validator_count(&validator_dir)?;
println!("{:?}", voting_keypair.pk) let delta = ending_validator_count.saturating_sub(starting_validator_count);
}
} info!(
log,
"Success";
"validators_created_and_deposited" => delta,
);
Ok(()) Ok(())
} }
/// Returns the number of validators that exist in the given `validator_dir`.
///
/// This function just assumes any file is a validator directory, making it likely to return a
/// higher number than accurate but never a lower one.
fn existing_validator_count(validator_dir: &PathBuf) -> Result<usize, String> {
fs::read_dir(&validator_dir)
.map(|iter| iter.count())
.map_err(|e| format!("Unable to read {:?}: {}", validator_dir, e))
}
/// Run a poll on the `eth_syncing` endpoint, blocking until the node is synced.
fn poll_until_synced<T>(web3: Web3<T>, log: Logger) -> impl Future<Item = (), Error = String> + Send
where
T: Transport + Send + 'static,
<T as Transport>::Out: Send,
{
loop_fn((web3.clone(), log.clone()), move |(web3, log)| {
web3.clone()
.eth()
.syncing()
.map_err(|e| format!("Unable to read syncing state from eth1 node: {:?}", e))
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(move |sync_state| {
match sync_state {
SyncState::Syncing(SyncInfo {
current_block,
highest_block,
..
}) => {
info!(
log,
"Waiting for eth1 node to sync";
"est_highest_block" => format!("{}", highest_block),
"current_block" => format!("{}", current_block),
);
Box::new(
Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY)
.map_err(|e| format!("Failed to trigger delay: {:?}", e))
.and_then(|_| future::ok(Loop::Continue((web3, log)))),
)
}
SyncState::NotSyncing => Box::new(
web3.clone()
.eth()
.block_number()
.map_err(|e| {
format!("Unable to read block number from eth1 node: {:?}", e)
})
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(
|block_number| {
if block_number > 0.into() {
info!(
log,
"Eth1 node is synced";
"head_block" => format!("{}", block_number),
);
Box::new(future::ok(Loop::Break((web3, log))))
} else {
Box::new(
Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY)
.map_err(|e| {
format!("Failed to trigger delay: {:?}", e)
})
.and_then(|_| {
info!(
log,
"Waiting for eth1 node to sync";
"current_block" => 0,
);
future::ok(Loop::Continue((web3, log)))
}),
)
}
},
),
),
}
})
})
.map(|_| ())
}

View File

@ -16,13 +16,10 @@ pub fn parse_testnet_dir_with_hardcoded_default<E: EthSpec>(
matches: &ArgMatches, matches: &ArgMatches,
name: &'static str, name: &'static str,
) -> Result<Eth2TestnetConfig<E>, String> { ) -> Result<Eth2TestnetConfig<E>, String> {
parse_required::<PathBuf>(matches, name) if let Some(path) = parse_optional::<PathBuf>(matches, name)? {
.and_then(|path| {
Eth2TestnetConfig::load(path.clone()) Eth2TestnetConfig::load(path.clone())
.map_err(|e| format!("Unable to open testnet dir at {:?}: {}", path, e)) .map_err(|e| format!("Unable to open testnet dir at {:?}: {}", path, e))
}) } else {
.map(Result::Ok)
.unwrap_or_else(|_| {
Eth2TestnetConfig::hard_coded().map_err(|e| { Eth2TestnetConfig::hard_coded().map_err(|e| {
format!( format!(
"The hard-coded testnet directory was invalid. \ "The hard-coded testnet directory was invalid. \
@ -31,7 +28,7 @@ pub fn parse_testnet_dir_with_hardcoded_default<E: EthSpec>(
e e
) )
}) })
}) }
} }
/// If `name` is in `matches`, parses the value as a path. Otherwise, attempts to find the user's /// If `name` is in `matches`, parses the value as a path. Otherwise, attempts to find the user's

View File

@ -308,7 +308,7 @@ impl ValidatorDirectoryBuilder {
web3: Web3<T>, web3: Web3<T>,
from: Address, from: Address,
deposit_contract: Address, deposit_contract: Address,
) -> impl Future<Item = Self, Error = String> { ) -> impl Future<Item = (Self, Hash256), Error = String> {
self.get_deposit_data() self.get_deposit_data()
.into_future() .into_future()
.and_then(move |(deposit_data, deposit_amount)| { .and_then(move |(deposit_data, deposit_amount)| {
@ -325,7 +325,7 @@ impl ValidatorDirectoryBuilder {
}) })
.map_err(|e| format!("Failed to send transaction: {:?}", e)) .map_err(|e| format!("Failed to send transaction: {:?}", e))
}) })
.map(|_tx| self) .map(|tx| (self, tx))
} }
pub fn build(self) -> Result<ValidatorDirectory, String> { pub fn build(self) -> Result<ValidatorDirectory, String> {