diff --git a/beacon_node/eth1/src/deposit_cache.rs b/beacon_node/eth1/src/deposit_cache.rs index 1a237a1d9..eb31bccc5 100644 --- a/beacon_node/eth1/src/deposit_cache.rs +++ b/beacon_node/eth1/src/deposit_cache.rs @@ -1,9 +1,9 @@ use crate::DepositLog; use eth2_hashing::hash; use tree_hash::TreeHash; -use types::{Deposit, Hash256}; +use types::{Deposit, Hash256, DEPOSIT_TREE_DEPTH}; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq)] pub enum Error { /// A deposit log was added when a prior deposit was not already in the cache. /// @@ -23,6 +23,8 @@ pub enum Error { /// /// E.g., you cannot request deposit 10 when the deposit count is 9. DepositCountInvalid { deposit_count: u64, range_end: u64 }, + /// Error with the merkle tree for deposits. + DepositTreeError(merkle_proof::MerkleTreeError), /// An unexpected condition was encountered. InternalError(String), } @@ -66,18 +68,56 @@ impl DepositDataTree { proof.push(Hash256::from_slice(&self.length_bytes())); (root, proof) } + + /// Add a deposit to the merkle tree. + pub fn push_leaf(&mut self, leaf: Hash256) -> Result<(), Error> { + self.tree + .push_leaf(leaf, self.depth) + .map_err(Error::DepositTreeError)?; + self.mix_in_length += 1; + Ok(()) + } } /// Mirrors the merkle tree of deposits in the eth1 deposit contract. /// /// Provides `Deposit` objects with merkle proofs included. -#[derive(Default)] pub struct DepositCache { logs: Vec, - roots: Vec, + leaves: Vec, + deposit_contract_deploy_block: u64, + /// An incremental merkle tree which represents the current state of the + /// deposit contract tree. + deposit_tree: DepositDataTree, + /// Vector of deposit roots. `deposit_roots[i]` denotes `deposit_root` at + /// `deposit_index` `i`. + deposit_roots: Vec, +} + +impl Default for DepositCache { + fn default() -> Self { + let deposit_tree = DepositDataTree::create(&[], 0, DEPOSIT_TREE_DEPTH); + let deposit_roots = vec![deposit_tree.root()]; + DepositCache { + logs: Vec::new(), + leaves: Vec::new(), + deposit_contract_deploy_block: 1, + deposit_tree, + deposit_roots, + } + } } impl DepositCache { + /// Create new `DepositCache` given block number at which deposit + /// contract was deployed. + pub fn new(deposit_contract_deploy_block: u64) -> Self { + DepositCache { + deposit_contract_deploy_block, + ..Self::default() + } + } + /// Returns the number of deposits available in the cache. pub fn len(&self) -> usize { self.logs.len() @@ -114,10 +154,11 @@ impl DepositCache { /// - If a log with `log.index` is already known, but the given `log` is distinct to it. pub fn insert_log(&mut self, log: DepositLog) -> Result<(), Error> { if log.index == self.logs.len() as u64 { - self.roots - .push(Hash256::from_slice(&log.deposit_data.tree_hash_root())); + let deposit = Hash256::from_slice(&log.deposit_data.tree_hash_root()); + self.leaves.push(deposit); self.logs.push(log); - + self.deposit_tree.push_leaf(deposit)?; + self.deposit_roots.push(self.deposit_tree.root()); Ok(()) } else if log.index < self.logs.len() as u64 { if self.logs[log.index as usize] == log { @@ -163,7 +204,7 @@ impl DepositCache { requested: end, known_deposits: self.logs.len(), }) - } else if deposit_count > self.roots.len() as u64 { + } else if deposit_count > self.leaves.len() as u64 { // There are not `deposit_count` known deposit roots, so we can't build the merkle tree // to prove into. Err(Error::InsufficientDeposits { @@ -171,10 +212,10 @@ impl DepositCache { known_deposits: self.logs.len(), }) } else { - let roots = self - .roots + let leaves = self + .leaves .get(0..deposit_count as usize) - .ok_or_else(|| Error::InternalError("Unable to get known root".into()))?; + .ok_or_else(|| Error::InternalError("Unable to get known leaves".into()))?; // Note: there is likely a more optimal solution than recreating the `DepositDataTree` // each time this function is called. @@ -183,7 +224,7 @@ impl DepositCache { // last finalized eth1 deposit count. Then, that tree could be cloned and extended for // each of these calls. - let tree = DepositDataTree::create(roots, deposit_count as usize, tree_depth); + let tree = DepositDataTree::create(leaves, deposit_count as usize, tree_depth); let deposits = self .logs @@ -203,6 +244,50 @@ impl DepositCache { Ok((tree.root(), deposits)) } } + + /// Gets the deposit count at block height = block_number. + /// + /// Fetches the `DepositLog` that was emitted at or just before `block_number` + /// and returns the deposit count as `index + 1`. + /// + /// Returns `None` if block number queried is 0 or less than deposit_contract_deployed block. + pub fn get_deposit_count_from_cache(&self, block_number: u64) -> Option { + // Contract cannot be deployed in 0'th block + if block_number == 0 { + return None; + } + if block_number < self.deposit_contract_deploy_block { + return None; + } + // Return 0 if block_num queried is before first deposit + if let Some(first_deposit) = self.logs.first() { + if first_deposit.block_number > block_number { + return Some(0); + } + } + let index = self + .logs + .binary_search_by(|deposit| deposit.block_number.cmp(&block_number)); + match index { + Ok(index) => return self.logs.get(index).map(|x| x.index + 1), + Err(next) => { + return Some( + self.logs + .get(next.saturating_sub(1)) + .map_or(0, |x| x.index + 1), + ) + } + } + } + + /// Gets the deposit root at block height = block_number. + /// + /// Fetches the `deposit_count` on or just before the queried `block_number` + /// and queries the `deposit_roots` map to get the corresponding `deposit_root`. + pub fn get_deposit_root_from_cache(&self, block_number: u64) -> Option { + let index = self.get_deposit_count_from_cache(block_number)?; + Some(self.deposit_roots.get(index as usize)?.clone()) + } } /// Returns `int` as little-endian bytes with a length of 32. diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index 88e698147..76b19cb0a 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -8,6 +8,16 @@ pub struct DepositUpdater { pub last_processed_block: Option, } +impl DepositUpdater { + pub fn new(deposit_contract_deploy_block: u64) -> Self { + let cache = DepositCache::new(deposit_contract_deploy_block); + DepositUpdater { + cache, + last_processed_block: None, + } + } +} + #[derive(Default)] pub struct Inner { pub block_cache: RwLock, diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 2710b0398..7cfd060a3 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -2,9 +2,7 @@ use crate::metrics; use crate::{ block_cache::{BlockCache, Error as BlockCacheError, Eth1Block}, deposit_cache::Error as DepositCacheError, - http::{ - get_block, get_block_number, get_deposit_count, get_deposit_logs_in_range, get_deposit_root, - }, + http::{get_block, get_block_number, get_deposit_logs_in_range}, inner::{DepositUpdater, Inner}, DepositLog, }; @@ -27,14 +25,10 @@ const STANDARD_TIMEOUT_MILLIS: u64 = 15_000; const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; /// Timeout when doing an eth_getBlockByNumber call. const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; -/// Timeout when doing an eth_call to read the deposit contract root. -const GET_DEPOSIT_ROOT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; -/// Timeout when doing an eth_call to read the deposit contract deposit count. -const GET_DEPOSIT_COUNT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; /// Timeout when doing an eth_getLogs to read the deposit contract logs. const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq)] pub enum Error { /// The remote node is less synced that we expect, it is not useful until has done more /// syncing. @@ -118,8 +112,8 @@ impl Default for Config { Self { endpoint: "http://localhost:8545".into(), deposit_contract_address: "0x0000000000000000000000000000000000000000".into(), - deposit_contract_deploy_block: 0, - lowest_cached_block_number: 0, + deposit_contract_deploy_block: 1, + lowest_cached_block_number: 1, follow_distance: 128, block_cache_truncation: Some(4_096), auto_update_interval_millis: 7_000, @@ -147,6 +141,9 @@ impl Service { pub fn new(config: Config, log: Logger) -> Self { Self { inner: Arc::new(Inner { + deposit_cache: RwLock::new(DepositUpdater::new( + config.deposit_contract_deploy_block, + )), config: RwLock::new(config), ..Inner::default() }), @@ -254,6 +251,7 @@ impl Service { "Updated eth1 deposit cache"; "cached_deposits" => inner_1.deposit_cache.read().cache.len(), "logs_imported" => logs_imported, + "last_processed_eth1_block" => inner_1.deposit_cache.read().last_processed_block, ), Err(e) => error!( log_a, @@ -491,6 +489,7 @@ impl Service { let cache_3 = self.inner.clone(); let cache_4 = self.inner.clone(); let cache_5 = self.inner.clone(); + let cache_6 = self.inner.clone(); let block_cache_truncation = self.config().block_cache_truncation; let max_blocks_per_update = self @@ -527,7 +526,6 @@ impl Service { let max_size = block_cache_truncation .map(|n| n as u64) .unwrap_or_else(u64::max_value); - if range_size > max_size { // If the range of required blocks is larger than `max_size`, drop all // existing blocks and download `max_size` count of blocks. @@ -543,14 +541,22 @@ impl Service { }) // Download the range of blocks and sequentially import them into the cache. .and_then(move |required_block_numbers| { + // Last processed block in deposit cache + let latest_in_cache = cache_6 + .deposit_cache + .read() + .last_processed_block + .unwrap_or(0); + let required_block_numbers = required_block_numbers .into_iter() - .take(max_blocks_per_update); - + .filter(|x| *x <= latest_in_cache) + .take(max_blocks_per_update) + .collect::>(); // Produce a stream from the list of required block numbers and return a future that // consumes the it. stream::unfold( - required_block_numbers, + required_block_numbers.into_iter(), move |mut block_numbers| match block_numbers.next() { Some(block_number) => Some( download_eth1_block(cache_2.clone(), block_number) @@ -639,6 +645,16 @@ fn download_eth1_block<'a>( cache: Arc, block_number: u64, ) -> impl Future + 'a { + let deposit_root = cache + .deposit_cache + .read() + .cache + .get_deposit_root_from_cache(block_number); + let deposit_count = cache + .deposit_cache + .read() + .cache + .get_deposit_count_from_cache(block_number); // Performs a `get_blockByNumber` call to an eth1 node. get_block( &cache.config.read().endpoint, @@ -646,24 +662,7 @@ fn download_eth1_block<'a>( Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS), ) .map_err(Error::BlockDownloadFailed) - .join3( - // Perform 2x `eth_call` via an eth1 node to read the deposit contract root and count. - get_deposit_root( - &cache.config.read().endpoint, - &cache.config.read().deposit_contract_address, - block_number, - Duration::from_millis(GET_DEPOSIT_ROOT_TIMEOUT_MILLIS), - ) - .map_err(Error::GetDepositRootFailed), - get_deposit_count( - &cache.config.read().endpoint, - &cache.config.read().deposit_contract_address, - block_number, - Duration::from_millis(GET_DEPOSIT_COUNT_TIMEOUT_MILLIS), - ) - .map_err(Error::GetDepositCountFailed), - ) - .map(|(http_block, deposit_root, deposit_count)| Eth1Block { + .map(move |http_block| Eth1Block { hash: http_block.hash, number: http_block.number, timestamp: http_block.timestamp, diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index 603cfab48..f6312f07e 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -133,7 +133,7 @@ mod auto_update { // NOTE: this test is sensitive to the response speed of the external web3 server. If // you're experiencing failures, try increasing the update_interval. - let update_interval = Duration::from_millis(2_000); + let update_interval = Duration::from_millis(3000); assert_eq!( service.block_cache_len(), @@ -235,9 +235,12 @@ mod eth1_cache { .expect("should mine block"); } + runtime + .block_on(service.update_deposit_cache()) + .expect("should update deposit cache"); runtime .block_on(service.update_block_cache()) - .expect("should update cache"); + .expect("should update block cache"); runtime .block_on(service.update_block_cache()) @@ -294,9 +297,12 @@ mod eth1_cache { .expect("should mine block") } + runtime + .block_on(service.update_deposit_cache()) + .expect("should update deposit cache"); runtime .block_on(service.update_block_cache()) - .expect("should update cache"); + .expect("should update block cache"); assert_eq!( service.block_cache_len(), @@ -339,9 +345,12 @@ mod eth1_cache { .block_on(eth1.ganache.evm_mine()) .expect("should mine block") } + runtime + .block_on(service.update_deposit_cache()) + .expect("should update deposit cache"); runtime .block_on(service.update_block_cache()) - .expect("should update cache"); + .expect("should update block cache"); } assert_eq!( @@ -381,14 +390,20 @@ mod eth1_cache { .block_on(eth1.ganache.evm_mine()) .expect("should mine block") } - + runtime + .block_on( + service + .update_deposit_cache() + .join(service.update_deposit_cache()), + ) + .expect("should perform two simultaneous updates of deposit cache"); runtime .block_on( service .update_block_cache() .join(service.update_block_cache()), ) - .expect("should perform two simultaneous updates"); + .expect("should perform two simultaneous updates of block cache"); assert!(service.block_cache_len() >= n, "should grow the cache"); } @@ -711,3 +726,80 @@ mod http { } } } + +mod fast { + use super::*; + + // Adds deposits into deposit cache and matches deposit_count and deposit_root + // with the deposit count and root computed from the deposit cache. + #[test] + fn deposit_cache_query() { + let mut env = new_env(); + let log = env.core_context().log; + let runtime = env.runtime(); + + let eth1 = runtime + .block_on(GanacheEth1Instance::new()) + .expect("should start eth1 environment"); + let deposit_contract = ð1.deposit_contract; + let web3 = eth1.web3(); + + let now = get_block_number(runtime, &web3); + let service = Service::new( + Config { + endpoint: eth1.endpoint(), + deposit_contract_address: deposit_contract.address(), + deposit_contract_deploy_block: now, + lowest_cached_block_number: now, + follow_distance: 0, + block_cache_truncation: None, + ..Config::default() + }, + log, + ); + let n = 10; + let deposits: Vec<_> = (0..n).into_iter().map(|_| random_deposit_data()).collect(); + for deposit in &deposits { + deposit_contract + .deposit(runtime, deposit.clone()) + .expect("should perform a deposit"); + // Mine an extra block between deposits to test for corner cases + runtime + .block_on(eth1.ganache.evm_mine()) + .expect("should mine block"); + } + + runtime + .block_on(service.update_deposit_cache()) + .expect("should perform update"); + + assert!( + service.deposit_cache_len() >= n, + "should have imported n deposits" + ); + + for block_num in 0..=get_block_number(runtime, &web3) { + let expected_deposit_count = blocking_deposit_count(runtime, ð1, block_num); + let expected_deposit_root = blocking_deposit_root(runtime, ð1, block_num); + + let deposit_count = service + .deposits() + .read() + .cache + .get_deposit_count_from_cache(block_num); + let deposit_root = service + .deposits() + .read() + .cache + .get_deposit_root_from_cache(block_num); + assert_eq!( + expected_deposit_count, deposit_count, + "deposit count from cache should match queried" + ); + assert_eq!( + expected_deposit_root, deposit_root, + "deposit root from cache should match queried" + ); + } + } +}