diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 460f53e73..a35d57403 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -41,8 +41,16 @@ const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = 60_000; const WARNING_MSG: &str = "BLOCK PROPOSALS WILL FAIL WITHOUT VALID, SYNCED ETH1 CONNECTION"; -/// A factor used to reduce the eth1 follow distance to account for discrepancies in the block time. -const ETH1_BLOCK_TIME_TOLERANCE_FACTOR: u64 = 4; +/// Number of blocks to download if the node detects it is lagging behind due to an inaccurate +/// relationship between block-number-based follow distance and time-based follow distance. +const CATCHUP_BATCH_SIZE: u64 = 128; + +/// The absolute minimum follow distance to enforce when downloading catchup batches. +const CATCHUP_MIN_FOLLOW_DISTANCE: u64 = 64; + +/// To account for fast PoW blocks requiring more blocks in the cache than the block-based follow +/// distance would imply, we store `CACHE_FACTOR` more blocks in our cache. +const CACHE_FACTOR: u64 = 2; #[derive(Debug, PartialEq, Clone)] pub enum EndpointError { @@ -284,10 +292,18 @@ async fn get_remote_head_and_new_block_ranges( e }; let new_deposit_block_numbers = service - .relevant_new_block_numbers(remote_head_block.number, HeadType::Deposit) + .relevant_new_block_numbers( + remote_head_block.number, + Some(remote_head_block.timestamp), + HeadType::Deposit, + ) .map_err(handle_remote_not_synced)?; let new_block_cache_numbers = service - .relevant_new_block_numbers(remote_head_block.number, HeadType::BlockCache) + .relevant_new_block_numbers( + remote_head_block.number, + Some(remote_head_block.timestamp), + HeadType::BlockCache, + ) .map_err(handle_remote_not_synced)?; Ok(( remote_head_block, @@ -307,7 +323,7 @@ async fn relevant_new_block_numbers_from_endpoint( get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS)) .map_err(SingleEndpointError::GetBlockNumberFailed) .await?; - service.relevant_new_block_numbers(remote_highest_block, head_type) + service.relevant_new_block_numbers(remote_highest_block, None, head_type) } #[derive(Debug, PartialEq)] @@ -319,7 +335,7 @@ pub enum SingleEndpointError { RemoteNotSynced { next_required_block: u64, remote_highest_block: u64, - reduced_follow_distance: u64, + cache_follow_distance: u64, }, /// Failed to download a block from the eth1 node. BlockDownloadFailed(String), @@ -384,6 +400,11 @@ pub struct Config { /// /// Note: this should be less than or equal to the specification's `ETH1_FOLLOW_DISTANCE`. pub follow_distance: u64, + /// The follow distance to use for blocks in our cache. + /// + /// This can be set lower than the true follow distance in order to correct for poor timing + /// of eth1 blocks. + pub cache_follow_distance: Option, /// Specifies the seconds when we consider the head of a node far behind. /// This should be less than `ETH1_FOLLOW_DISTANCE * SECONDS_PER_ETH1_BLOCK`. pub node_far_behind_seconds: u64, @@ -410,20 +431,30 @@ impl Config { E::SlotsPerEth1VotingPeriod::to_u64() * spec.seconds_per_slot; let eth1_blocks_per_voting_period = seconds_per_voting_period / spec.seconds_per_eth1_block; - // Compute the number of extra blocks we store prior to the voting period start blocks. - let follow_distance_tolerance_blocks = - spec.eth1_follow_distance / ETH1_BLOCK_TIME_TOLERANCE_FACTOR; - // Ensure we can store two full windows of voting blocks. let voting_windows = eth1_blocks_per_voting_period * 2; - // Extend the cache to account for varying eth1 block times and the follow distance - // tolerance blocks. - let length = voting_windows - + (voting_windows / ETH1_BLOCK_TIME_TOLERANCE_FACTOR) - + follow_distance_tolerance_blocks; + // Extend the cache to account for the cache follow distance. + let extra_follow_distance_blocks = self + .follow_distance + .saturating_sub(self.cache_follow_distance()); - self.block_cache_truncation = Some(length as usize); + let length = voting_windows + extra_follow_distance_blocks; + + // Allow for more blocks to account for blocks being generated faster than expected. + // The cache expiry should really be timestamp based, but that would require a more + // extensive refactor. + let cache_size = CACHE_FACTOR * length; + + self.block_cache_truncation = Some(cache_size as usize); + } + + /// The distance at which the cache should follow the head. + /// + /// Defaults to 3/4 of `follow_distance` unless set manually. + pub fn cache_follow_distance(&self) -> u64 { + self.cache_follow_distance + .unwrap_or(3 * self.follow_distance / 4) } } @@ -438,6 +469,7 @@ impl Default for Config { deposit_contract_deploy_block: 1, lowest_cached_block_number: 1, follow_distance: 128, + cache_follow_distance: None, node_far_behind_seconds: 128 * 14, block_cache_truncation: Some(4_096), auto_update_interval_millis: 60_000, @@ -486,9 +518,8 @@ impl Service { /// /// This is useful since the spec declares `SECONDS_PER_ETH1_BLOCK` to be `14`, whilst it is /// actually `15` on Goerli. - pub fn reduced_follow_distance(&self) -> u64 { - let full = self.config().follow_distance; - full.saturating_sub(full / ETH1_BLOCK_TIME_TOLERANCE_FACTOR) + pub fn cache_follow_distance(&self) -> u64 { + self.config().cache_follow_distance() } /// Return byte representation of deposit and block caches. @@ -834,9 +865,10 @@ impl Service { fn relevant_new_block_numbers( &self, remote_highest_block: u64, + remote_highest_block_timestamp: Option, head_type: HeadType, ) -> Result>, SingleEndpointError> { - let follow_distance = self.reduced_follow_distance(); + let follow_distance = self.cache_follow_distance(); let next_required_block = match head_type { HeadType::Deposit => self .deposits() @@ -852,8 +884,16 @@ impl Service { .map(|n| n + 1) .unwrap_or_else(|| self.config().lowest_cached_block_number), }; + let latest_cached_block = self.latest_cached_block(); - relevant_block_range(remote_highest_block, next_required_block, follow_distance) + relevant_block_range( + remote_highest_block, + remote_highest_block_timestamp, + next_required_block, + follow_distance, + latest_cached_block.as_ref(), + &self.inner.spec, + ) } /// Contacts the remote eth1 node and attempts to import deposit logs up to the configured @@ -1189,24 +1229,48 @@ impl Service { /// Returns an error if `next_required_block > remote_highest_block + 1` which means the remote went /// backwards. fn relevant_block_range( - remote_highest_block: u64, + remote_highest_block_number: u64, + remote_highest_block_timestamp: Option, next_required_block: u64, - reduced_follow_distance: u64, + cache_follow_distance: u64, + latest_cached_block: Option<&Eth1Block>, + spec: &ChainSpec, ) -> Result>, SingleEndpointError> { - let remote_follow_block = remote_highest_block.saturating_sub(reduced_follow_distance); + // If the latest cached block is lagging the head block by more than `cache_follow_distance` + // times the expected block time then the eth1 block time is likely quite different from what we + // assumed. + // + // In order to catch up, load batches of `CATCHUP_BATCH_SIZE` until the situation rights itself. + // Note that we need to check this condition before the regular follow distance condition + // or we will keep downloading small numbers of blocks. + if let (Some(remote_highest_block_timestamp), Some(latest_cached_block)) = + (remote_highest_block_timestamp, latest_cached_block) + { + let lagging = latest_cached_block.timestamp + + cache_follow_distance * spec.seconds_per_eth1_block + < remote_highest_block_timestamp; + let end_block = std::cmp::min( + remote_highest_block_number.saturating_sub(CATCHUP_MIN_FOLLOW_DISTANCE), + next_required_block + CATCHUP_BATCH_SIZE, + ); + if lagging && next_required_block <= end_block { + return Ok(Some(next_required_block..=end_block)); + } + } + let remote_follow_block = remote_highest_block_number.saturating_sub(cache_follow_distance); if next_required_block <= remote_follow_block { Ok(Some(next_required_block..=remote_follow_block)) - } else if next_required_block > remote_highest_block + 1 { + } else if next_required_block > remote_highest_block_number + 1 { // If this is the case, the node must have gone "backwards" in terms of it's sync // (i.e., it's head block is lower than it was before). // - // We assume that the `reduced_follow_distance` should be sufficient to ensure this never + // We assume that the `cache_follow_distance` should be sufficient to ensure this never // happens, otherwise it is an error. Err(SingleEndpointError::RemoteNotSynced { next_required_block, - remote_highest_block, - reduced_follow_distance, + remote_highest_block: remote_highest_block_number, + cache_follow_distance, }) } else { // Return an empty range. @@ -1292,10 +1356,9 @@ mod tests { let seconds_per_voting_period = ::SlotsPerEth1VotingPeriod::to_u64() * spec.seconds_per_slot; let eth1_blocks_per_voting_period = seconds_per_voting_period / spec.seconds_per_eth1_block; - let reduce_follow_distance_blocks = - config.follow_distance / ETH1_BLOCK_TIME_TOLERANCE_FACTOR; + let cache_follow_distance_blocks = config.follow_distance - config.cache_follow_distance(); - let minimum_len = eth1_blocks_per_voting_period * 2 + reduce_follow_distance_blocks; + let minimum_len = eth1_blocks_per_voting_period * 2 + cache_follow_distance_blocks; assert!(len > minimum_len as usize); } diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index bb00ebaab..3fe3b3ca5 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -107,7 +107,7 @@ mod eth1_cache { async { let log = null_logger(); - for follow_distance in 0..2 { + for follow_distance in 0..3 { let eth1 = new_ganache_instance() .await .expect("should start eth1 environment"); @@ -116,17 +116,16 @@ mod eth1_cache { let initial_block_number = get_block_number(&web3).await; - let service = Service::new( - Config { - endpoints: vec![SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap()], - deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: initial_block_number, - follow_distance, - ..Config::default() - }, - log.clone(), - MainnetEthSpec::default_spec(), - ); + let config = Config { + endpoints: vec![SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap()], + deposit_contract_address: deposit_contract.address(), + lowest_cached_block_number: initial_block_number, + follow_distance, + ..Config::default() + }; + let cache_follow_distance = config.cache_follow_distance(); + + let service = Service::new(config, log.clone(), MainnetEthSpec::default_spec()); // Create some blocks and then consume them, performing the test `rounds` times. for round in 0..2 { @@ -139,7 +138,7 @@ mod eth1_cache { .blocks() .read() .highest_block_number() - .map(|n| n + follow_distance) + .map(|n| n + cache_follow_distance) .expect("should have a latest block after the first round") }; @@ -168,12 +167,13 @@ mod eth1_cache { .blocks() .read() .highest_block_number() - .map(|n| n + follow_distance), + .map(|n| n + cache_follow_distance), Some(initial + blocks), - "should update {} blocks in round {} (follow {})", + "should update {} blocks in round {} (follow {} i.e. {})", blocks, round, follow_distance, + cache_follow_distance ); } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index a1347c9b0..3102018e3 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -377,6 +377,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("1000") .takes_value(true) ) + .arg( + Arg::with_name("eth1-cache-follow-distance") + .long("eth1-cache-follow-distance") + .value_name("BLOCKS") + .help("Specifies the distance between the Eth1 chain head and the last block which \ + should be imported into the cache. Setting this value lower can help \ + compensate for irregular Proof-of-Work block times, but setting it too low \ + can make the node vulnerable to re-orgs.") + .takes_value(true) + ) .arg( Arg::with_name("slots-per-restore-point") .long("slots-per-restore-point") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b1560c795..db765100c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -236,6 +236,12 @@ pub fn get_config( client_config.eth1.purge_cache = true; } + if let Some(follow_distance) = + clap_utils::parse_optional(cli_args, "eth1-cache-follow-distance")? + { + client_config.eth1.cache_follow_distance = Some(follow_distance); + } + if cli_args.is_present("merge") || cli_args.is_present("execution-endpoints") { let mut el_config = execution_layer::Config::default(); diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 5748bbd34..effccbbd6 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -225,6 +225,25 @@ fn eth1_purge_cache_flag() { .run_with_zero_port() .with_config(|config| assert!(config.eth1.purge_cache)); } +#[test] +fn eth1_cache_follow_distance_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.eth1.cache_follow_distance, None); + assert_eq!(config.eth1.cache_follow_distance(), 3 * 2048 / 4); + }); +} +#[test] +fn eth1_cache_follow_distance_manual() { + CommandLineTest::new() + .flag("eth1-cache-follow-distance", Some("128")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.eth1.cache_follow_distance, Some(128)); + assert_eq!(config.eth1.cache_follow_distance(), 128); + }); +} // Tests for Bellatrix flags. #[test]