From a413b43fedbc6ed7e17a37437e9312889fb96363 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 27 Jul 2020 00:08:12 +0000 Subject: [PATCH] Add eth1 deposit confirmations (#1370) ## Issue Addressed NA ## Proposed Changes Allow `lighthouse account validator deposit` to await for confirmations after deposit submissions. ## Additional Info NA --- account_manager/src/validator/deposit.rs | 123 +++++++++++++++++------ 1 file changed, 92 insertions(+), 31 deletions(-) diff --git a/account_manager/src/validator/deposit.rs b/account_manager/src/validator/deposit.rs index 23f970ed2..f83e11891 100644 --- a/account_manager/src/validator/deposit.rs +++ b/account_manager/src/validator/deposit.rs @@ -2,7 +2,10 @@ use crate::VALIDATOR_DIR_FLAG; use clap::{App, Arg, ArgMatches}; use deposit_contract::DEPOSIT_GAS; use environment::Environment; -use futures::compat::Future01CompatExt; +use futures::{ + compat::Future01CompatExt, + stream::{FuturesUnordered, StreamExt}, +}; use slog::{info, Logger}; use std::path::PathBuf; use tokio::time::{delay_until, Duration, Instant}; @@ -20,11 +23,15 @@ pub const VALIDATOR_FLAG: &str = "validator"; pub const ETH1_IPC_FLAG: &str = "eth1-ipc"; pub const ETH1_HTTP_FLAG: &str = "eth1-http"; pub const FROM_ADDRESS_FLAG: &str = "from-address"; +pub const CONFIRMATION_COUNT_FLAG: &str = "confirmation-count"; +pub const CONFIRMATION_BATCH_SIZE_FLAG: &str = "confirmation-batch-size"; const GWEI: u64 = 1_000_000_000; const SYNCING_STATE_RETRY_DELAY: Duration = Duration::from_secs(2); +const CONFIRMATIONS_POLL_TIME: Duration = Duration::from_secs(2); + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new("deposit") .about( @@ -33,10 +40,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { have been created and exist on the file-system. The process will exit immediately \ with an error if any error occurs. After each deposit is submitted to the Eth1 \ node, a file will be saved in the validator directory with the transaction hash. \ - The application does not wait for confirmations so there is not guarantee that \ - the transaction is included in the Eth1 chain; use a block explorer and the \ - transaction hash to check for confirmations. The deposit contract address will \ - be determined by the --testnet-dir flag on the primary Lighthouse binary.", + If confirmations are set to non-zero then the application will wait for confirmations \ + before saving the transaction hash and moving onto the next batch of deposits. \ + The deposit contract address will be determined by the --testnet-dir flag on the \ + primary Lighthouse binary.", ) .arg( Arg::with_name(VALIDATOR_DIR_FLAG) @@ -86,15 +93,41 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .required(true), ) + .arg( + Arg::with_name(CONFIRMATION_COUNT_FLAG) + .long(CONFIRMATION_COUNT_FLAG) + .value_name("CONFIRMATION_COUNT") + .help( + "The number of Eth1 block confirmations required \ + before a transaction is considered complete. Set to \ + 0 for no confirmations.", + ) + .takes_value(true) + .default_value("1"), + ) + .arg( + Arg::with_name(CONFIRMATION_BATCH_SIZE_FLAG) + .long(CONFIRMATION_BATCH_SIZE_FLAG) + .value_name("BATCH_SIZE") + .help( + "Perform BATCH_SIZE deposits and wait for confirmations \ + in parallel. Useful for achieving faster bulk deposits.", + ) + .takes_value(true) + .default_value("10"), + ) } +#[allow(clippy::too_many_arguments)] fn send_deposit_transactions( mut env: Environment, log: Logger, - eth1_deposit_datas: Vec<(ValidatorDir, Eth1DepositData)>, + mut eth1_deposit_datas: Vec<(ValidatorDir, Eth1DepositData)>, from_address: Address, deposit_contract: Address, transport: T2, + confirmation_count: usize, + confirmation_batch_size: usize, ) -> Result<(), String> where T1: EthSpec, @@ -106,32 +139,53 @@ where let deposits_fut = async { poll_until_synced(web3.clone(), log.clone()).await?; - for (mut validator_dir, eth1_deposit_data) in eth1_deposit_datas { - let tx_hash = web3 - .eth() - .send_transaction(TransactionRequest { - from: from_address, - to: Some(deposit_contract), - gas: Some(DEPOSIT_GAS.into()), - gas_price: None, - value: Some(from_gwei(eth1_deposit_data.deposit_data.amount)), - data: Some(eth1_deposit_data.rlp.into()), - nonce: None, - condition: None, - }) - .compat() + for chunk in eth1_deposit_datas.chunks_mut(confirmation_batch_size) { + let futures = FuturesUnordered::default(); + + for (ref mut validator_dir, eth1_deposit_data) in chunk.iter_mut() { + let web3 = web3.clone(); + let log = log.clone(); + futures.push(async move { + let tx_hash = web3 + .send_transaction_with_confirmation( + TransactionRequest { + from: from_address, + to: Some(deposit_contract), + gas: Some(DEPOSIT_GAS.into()), + gas_price: None, + value: Some(from_gwei(eth1_deposit_data.deposit_data.amount)), + data: Some(eth1_deposit_data.rlp.clone().into()), + nonce: None, + condition: None, + }, + CONFIRMATIONS_POLL_TIME, + confirmation_count, + ) + .compat() + .await + .map_err(|e| format!("Failed to send transaction: {:?}", e))?; + + info!( + log, + "Submitted deposit"; + "tx_hash" => format!("{:?}", tx_hash), + ); + + validator_dir + .save_eth1_deposit_tx_hash(&format!("{:?}", tx_hash)) + .map_err(|e| { + format!("Failed to save tx hash {:?} to disk: {:?}", tx_hash, e) + })?; + + Ok::<(), String>(()) + }); + } + + futures + .collect::>() .await - .map_err(|e| format!("Failed to send transaction: {:?}", e))?; - - info!( - log, - "Submitted deposit"; - "tx_hash" => format!("{:?}", tx_hash), - ); - - validator_dir - .save_eth1_deposit_tx_hash(&format!("{:?}", tx_hash)) - .map_err(|e| format!("Failed to save tx hash {:?} to disk: {:?}", tx_hash, e))?; + .into_iter() + .collect::>()?; } Ok::<(), String>(()) @@ -157,6 +211,9 @@ pub fn cli_run( let eth1_ipc_path: Option = clap_utils::parse_optional(matches, ETH1_IPC_FLAG)?; let eth1_http_url: Option = clap_utils::parse_optional(matches, ETH1_HTTP_FLAG)?; let from_address: Address = clap_utils::parse_required(matches, FROM_ADDRESS_FLAG)?; + let confirmation_count: usize = clap_utils::parse_required(matches, CONFIRMATION_COUNT_FLAG)?; + let confirmation_batch_size: usize = + clap_utils::parse_required(matches, CONFIRMATION_BATCH_SIZE_FLAG)?; let manager = ValidatorManager::open(&data_dir) .map_err(|e| format!("Unable to read --{}: {:?}", VALIDATOR_DIR_FLAG, e))?; @@ -250,6 +307,8 @@ pub fn cli_run( from_address, deposit_contract, ipc_transport, + confirmation_count, + confirmation_batch_size, ) } (None, Some(http_url)) => { @@ -262,6 +321,8 @@ pub fn cli_run( from_address, deposit_contract, http_transport, + confirmation_count, + confirmation_batch_size, ) } }