From 77f3539654dd9cf0eedc7ca5f31e41d94c506286 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 30 Nov 2020 20:29:17 +0000 Subject: [PATCH] Improve eth1 block sync (#2008) ## Issue Addressed NA ## Proposed Changes - Log about eth1 whilst waiting for genesis. - For the block and deposit caches, update them after each download instead of when *all* downloads are complete. - This prevents the case where a single timeout error can cause us to drop *all* previously download blocks/deposits. - Set `max_log_requests_per_update` to avoid timeouts due to very large log counts in a response. - Set `max_blocks_per_update` to prevent a single update of the block cache to download an unreasonable number of blocks. - This shouldn't have any affect in normal use, it's just a safe-guard against bugs. - Increase the timeout for eth1 calls from 15s to 60s, as per @pawanjay176's experience with Infura. ## Additional Info NA --- beacon_node/beacon_chain/src/eth1_chain.rs | 47 ++++++-- beacon_node/client/src/notifier.rs | 88 +++++++++------ beacon_node/eth1/src/service.rs | 121 +++++++++++---------- beacon_node/http_api/src/lib.rs | 6 +- book/src/faq.md | 19 ++++ 5 files changed, 176 insertions(+), 105 deletions(-) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index e67f95fba..1287ef16b 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -71,23 +71,52 @@ fn get_sync_status( latest_cached_block: Option<&Eth1Block>, head_block: Option<&Eth1Block>, genesis_time: u64, - current_slot: Slot, + current_slot: Option, spec: &ChainSpec, ) -> Option { - let period = T::SlotsPerEth1VotingPeriod::to_u64(); - // Since `period` is a "constant", we assume it is set sensibly. - let voting_period_start_slot = (current_slot / period) * period; - let voting_target_timestamp = { + let eth1_follow_distance_seconds = spec + .seconds_per_eth1_block + .saturating_mul(spec.eth1_follow_distance); + + // The voting target timestamp needs to be special-cased when we're before + // genesis (as defined by `current_slot == None`). + // + // For the sake of this status, when prior to genesis we want to invent some voting periods + // that are *before* genesis, so that we can indicate to users that we're actually adequately + // cached for where they are in time. + let voting_target_timestamp = if let Some(current_slot) = current_slot { + let period = T::SlotsPerEth1VotingPeriod::to_u64(); + let voting_period_start_slot = (current_slot / period) * period; + let period_start = slot_start_seconds::( genesis_time, spec.milliseconds_per_slot, voting_period_start_slot, ); - let eth1_follow_distance_seconds = spec - .seconds_per_eth1_block - .saturating_mul(spec.eth1_follow_distance); period_start.saturating_sub(eth1_follow_distance_seconds) + } else { + // The number of seconds in an eth1 voting period. + let voting_period_duration = + T::slots_per_eth1_voting_period() as u64 * (spec.milliseconds_per_slot / 1_000); + + let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs(); + + // The number of seconds between now and genesis. + let seconds_till_genesis = genesis_time.saturating_sub(now); + + // Determine how many voting periods are contained in distance between + // now and genesis, rounding up. + let voting_periods_past = + (seconds_till_genesis + voting_period_duration - 1) / voting_period_duration; + + // Return the start time of the current voting period*. + // + // *: This voting period doesn't *actually* exist, we're just using it to + // give useful logs prior to genesis. + genesis_time + .saturating_sub(voting_periods_past * voting_period_duration) + .saturating_sub(eth1_follow_distance_seconds) }; let latest_cached_block_number = latest_cached_block.map(|b| b.number); @@ -232,7 +261,7 @@ where pub fn sync_status( &self, genesis_time: u64, - current_slot: Slot, + current_slot: Option, spec: &ChainSpec, ) -> Option { get_sync_status::( diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index aa300310c..6f814557a 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -3,7 +3,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::NetworkGlobals; use futures::prelude::*; use parking_lot::Mutex; -use slog::{debug, error, info, warn}; +use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -56,6 +56,7 @@ pub fn spawn_notifier( "peers" => peer_count_pretty(network.connected_peers()), "wait_time" => estimated_time_pretty(Some(next_slot.as_secs() as f64)), ); + eth1_logging(&beacon_chain, &log); sleep(slot_duration).await; } _ => break, @@ -172,37 +173,7 @@ pub fn spawn_notifier( ); } - // Perform some logging about the eth1 chain - if let Some(eth1_chain) = beacon_chain.eth1_chain.as_ref() { - if let Some(status) = - eth1_chain.sync_status(head_info.genesis_time, current_slot, &beacon_chain.spec) - { - debug!( - log, - "Eth1 cache sync status"; - "eth1_head_block" => status.head_block_number, - "latest_cached_block_number" => status.latest_cached_block_number, - "latest_cached_timestamp" => status.latest_cached_block_timestamp, - "voting_target_timestamp" => status.voting_target_timestamp, - "ready" => status.lighthouse_is_cached_and_ready - ); - - if !status.lighthouse_is_cached_and_ready { - warn!( - log, - "Syncing eth1 block cache"; - "target_timestamp" => status.voting_target_timestamp, - "latest_timestamp" => status.latest_cached_block_timestamp, - "msg" => "block production temporarily impaired" - ); - } - } else { - error!( - log, - "Unable to determine eth1 sync status"; - ); - } - } + eth1_logging(&beacon_chain, &log); } Ok::<(), ()>(()) }; @@ -213,6 +184,59 @@ pub fn spawn_notifier( Ok(()) } +fn eth1_logging(beacon_chain: &BeaconChain, log: &Logger) { + let current_slot_opt = beacon_chain.slot().ok(); + + if let Ok(head_info) = beacon_chain.head_info() { + // Perform some logging about the eth1 chain + if let Some(eth1_chain) = beacon_chain.eth1_chain.as_ref() { + if let Some(status) = + eth1_chain.sync_status(head_info.genesis_time, current_slot_opt, &beacon_chain.spec) + { + debug!( + log, + "Eth1 cache sync status"; + "eth1_head_block" => status.head_block_number, + "latest_cached_block_number" => status.latest_cached_block_number, + "latest_cached_timestamp" => status.latest_cached_block_timestamp, + "voting_target_timestamp" => status.voting_target_timestamp, + "ready" => status.lighthouse_is_cached_and_ready + ); + + if !status.lighthouse_is_cached_and_ready { + let voting_target_timestamp = status.voting_target_timestamp; + + let distance = status + .latest_cached_block_timestamp + .map(|latest| { + voting_target_timestamp.saturating_sub(latest) + / beacon_chain.spec.seconds_per_eth1_block + }) + .map(|distance| distance.to_string()) + .unwrap_or_else(|| "initializing deposits".to_string()); + + warn!( + log, + "Syncing eth1 block cache"; + "msg" => "sync can take longer when using remote eth1 nodes", + "est_blocks_remaining" => distance, + ); + } + } else { + error!( + log, + "Unable to determine eth1 sync status"; + ); + } + } + } else { + error!( + log, + "Unable to get head info"; + ); + } +} + /// Returns the peer count, returning something helpful if it's `usize::max_value` (effectively a /// `None` value). fn peer_count_pretty(peer_count: usize) -> String { diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 235db9b4b..70e5501ae 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -4,12 +4,12 @@ use crate::{ deposit_cache::Error as DepositCacheError, http::{ get_block, get_block_number, get_chain_id, get_deposit_logs_in_range, get_network_id, - BlockQuery, Eth1Id, Log, + BlockQuery, Eth1Id, }, inner::{DepositUpdater, Inner}, }; use fallback::{Fallback, FallbackError}; -use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt}; +use futures::{future::TryFutureExt, StreamExt}; use parking_lot::{RwLock, RwLockReadGuard}; use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, trace, warn, Logger}; @@ -34,7 +34,7 @@ const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; /// Timeout when doing an eth_getBlockByNumber call. const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; /// Timeout when doing an eth_getLogs to read the deposit contract logs. -const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS; +const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = 60_000; const WARNING_MSG: &str = "BLOCK PROPOSALS WILL FAIL WITHOUT VALID, SYNCED ETH1 CONNECTION"; @@ -384,8 +384,8 @@ impl Default for Config { block_cache_truncation: Some(4_096), auto_update_interval_millis: 7_000, blocks_per_log_query: 1_000, - max_log_requests_per_update: None, - max_blocks_per_update: None, + max_log_requests_per_update: Some(100), + max_blocks_per_update: Some(8_192), } } } @@ -817,38 +817,40 @@ impl Service { Vec::new() }; + let mut logs_imported: usize = 0; let deposit_contract_address_ref: &str = &deposit_contract_address; - let logs: Vec<(Range, Vec)> = - stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async { - match chunks.next() { - Some(chunk) => { - let chunk_ref = &chunk; - endpoints - .first_success(|e| async move { - get_deposit_logs_in_range( - e, - deposit_contract_address_ref, - chunk_ref.clone(), - Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), - ) - .await - .map_err(SingleEndpointError::GetDepositLogsFailed) - }) - .await - .map(|logs| Some(((chunk, logs), chunks))) - } - None => Ok(None), - } - }) - .try_collect() - .await - .map_err(Error::FallbackError)?; + for block_range in block_number_chunks.into_iter() { + if block_range.is_empty() { + debug!( + self.log, + "No new blocks to scan for logs"; + ); + continue; + } - let mut logs_imported = 0; - for (block_range, log_chunk) in logs.iter() { + /* + * Step 1. Download logs. + */ + let block_range_ref = &block_range; + let logs = endpoints + .first_success(|e| async move { + get_deposit_logs_in_range( + e, + &deposit_contract_address_ref, + block_range_ref.clone(), + Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), + ) + .await + .map_err(SingleEndpointError::GetDepositLogsFailed) + }) + .await + .map_err(Error::FallbackError)?; + + /* + * Step 2. Import logs to cache. + */ let mut cache = self.deposits().write(); - log_chunk - .iter() + logs.iter() .map(|raw_log| { raw_log.to_deposit_log(self.inner.spec()).map_err(|error| { Error::FailedToParseDepositLog { @@ -881,6 +883,12 @@ impl Service { // node to choose an invalid genesis state or propose an invalid block. .collect::>()?; + debug!( + self.log, + "Imported deposit logs chunk"; + "logs" => logs.len(), + ); + cache.last_processed_block = Some(block_range.end.saturating_sub(1)); metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64); @@ -976,8 +984,9 @@ impl Service { } else { Vec::new() }; - // Download the range of blocks and sequentially import them into the cache. - // Last processed block in deposit cache + + // This value is used to prevent the block cache from importing a block that is not yet in + // the deposit cache. let latest_in_cache = self .inner .deposit_cache @@ -990,34 +999,26 @@ impl Service { .filter(|x| *x <= latest_in_cache) .take(max_blocks_per_update) .collect::>(); + + debug!( + self.log, + "Downloading eth1 blocks"; + "first" => ?required_block_numbers.first(), + "last" => ?required_block_numbers.last(), + ); + // Produce a stream from the list of required block numbers and return a future that // consumes the it. - let eth1_blocks: Vec = stream::try_unfold( - required_block_numbers.into_iter(), - |mut block_numbers| async { - match block_numbers.next() { - Some(block_number) => { - match endpoints - .first_success(|e| async move { - download_eth1_block(e, self.inner.clone(), Some(block_number)).await - }) - .await - { - Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))), - Err(e) => Err(e), - } - } - None => Ok(None), - } - }, - ) - .try_collect() - .await - .map_err(Error::FallbackError)?; - let mut blocks_imported = 0; - for eth1_block in eth1_blocks { + for block_number in required_block_numbers { + let eth1_block = endpoints + .first_success(|e| async move { + download_eth1_block(e, self.inner.clone(), Some(block_number)).await + }) + .await + .map_err(Error::FallbackError)?; + self.inner .block_cache .write() diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d37cbfb76..8c3902fcd 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2075,9 +2075,7 @@ pub fn serve( let head_info = chain .head_info() .map_err(warp_utils::reject::beacon_chain_error)?; - let current_slot = chain - .slot() - .map_err(warp_utils::reject::beacon_chain_error)?; + let current_slot_opt = chain.slot().ok(); chain .eth1_chain @@ -2088,7 +2086,7 @@ pub fn serve( ) }) .and_then(|eth1| { - eth1.sync_status(head_info.genesis_time, current_slot, &chain.spec) + eth1.sync_status(head_info.genesis_time, current_slot_opt, &chain.spec) .ok_or_else(|| { warp_utils::reject::custom_server_error( "Unable to determine Eth1 sync status".to_string(), diff --git a/book/src/faq.md b/book/src/faq.md index 860fd3f1b..12764b230 100644 --- a/book/src/faq.md +++ b/book/src/faq.md @@ -6,6 +6,7 @@ - [What should I do if I lose my slashing protection database?](#what-should-i-do-if-i-lose-my-slashing-protection-database) - [How do I update lighthouse?](#how-do-i-update-lighthouse) - [I can't compile lighthouse](#i-cant-compile-lighthouse) +- [What is "Syncing eth1 block cache"](#what-is-syncing-eth1-block-cache) ### Why does it take so long for a validator to be activated? @@ -155,3 +156,21 @@ You will just also need to make sure the code you have checked out is up to date ### I can't compile lighthouse See [here.](./installation-source.md#troubleshooting) + +### What is "Syncing eth1 block cache" + +``` +Nov 30 21:04:28.268 WARN Syncing eth1 block cache est_blocks_remaining: initializing deposits, msg: sync can take longer when using remote eth1 nodes, service: slot_notifier +``` + +This log indicates that your beacon node is downloading blocks and deposits +from your eth1 node. When the `est_blocks_remaining` is +`initializing_deposits`, your node is downloading deposit logs. It may stay in +this stage for several minutes. Once the deposits logs are finished +downloading, the `est_blocks_remaining` value will start decreasing. + +It is perfectly normal to see this log when starting a node for the first time +or after being off for more than several minutes. + +If this log continues appearing sporadically during operation, there may be an +issue with your eth1 endpoint.