From a8d040c821918fa0b6f94c800a84b0f8050549f2 Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Tue, 19 Jan 2021 02:54:18 +0000 Subject: [PATCH] Fix timing issue in obtaining the Fork (#2158) ## Issue Addressed Related PR: https://github.com/sigp/lighthouse/pull/2137#issuecomment-754712492 The Fork is required for VC to perform signing. Currently, it is not guaranteed that the Fork has been obtained at the point of the signing as the Fork is obtained at after ForkService starts. We will see the [error](https://github.com/sigp/lighthouse/blob/851a4dca3c5a514455006589b1a756a8a994258a/validator_client/src/validator_store.rs#L127) if VC could not perform the signing due to the timing issue. > Unable to get Fork for signing ## Proposed Changes Obtain the Fork on `init_from_beacon_node` to fix the timing issue. --- validator_client/src/fork_service.rs | 15 +++++++---- validator_client/src/lib.rs | 36 ++++++++++++++++++++++--- validator_client/src/validator_store.rs | 22 ++++++--------- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 7c3b456d8..65e48a08a 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -34,6 +34,11 @@ impl ForkServiceBuilder { } } + pub fn fork(mut self, fork: Fork) -> Self { + self.fork = Some(fork); + self + } + pub fn slot_clock(mut self, slot_clock: T) -> Self { self.slot_clock = Some(slot_clock); self @@ -52,7 +57,7 @@ impl ForkServiceBuilder { pub fn build(self) -> Result, String> { Ok(ForkService { inner: Arc::new(Inner { - fork: RwLock::new(self.fork), + fork: RwLock::new(self.fork.ok_or("Cannot build ForkService without fork")?), slot_clock: self .slot_clock .ok_or("Cannot build ForkService without slot_clock")?, @@ -100,7 +105,7 @@ impl ForkServiceBuilder { /// Helper to minimise `Arc` usage. pub struct Inner { - fork: RwLock>, + fork: RwLock, beacon_nodes: Arc>, log: Logger, slot_clock: T, @@ -129,7 +134,7 @@ impl Deref for ForkService { impl ForkService { /// Returns the last fork downloaded from the beacon node, if any. - pub fn fork(&self) -> Option { + pub fn fork(&self) -> Fork { *self.fork.read() } @@ -201,8 +206,8 @@ impl ForkService { .await .map_err(|_| ())?; - if self.fork.read().as_ref() != Some(&fork) { - *(self.fork.write()) = Some(fork); + if *(self.fork.read()) != fork { + *(self.fork.write()) = fork; } debug!(self.log, "Fork update success"); diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 65256b5f5..e0c9c3120 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -27,6 +27,7 @@ use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; +use eth2::types::StateId; use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Url}; use fork_service::{ForkService, ForkServiceBuilder}; use futures::channel::mpsc; @@ -43,7 +44,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{sleep, Duration}; -use types::{EthSpec, Hash256}; +use types::{EthSpec, Fork, Hash256}; use validator_store::ValidatorStore; /// The interval between attempts to contact the beacon node during startup. @@ -234,7 +235,7 @@ impl ProductionValidatorClient { BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone()); // Perform some potentially long-running initialization tasks. - let (genesis_time, genesis_validators_root) = tokio::select! { + let (genesis_time, genesis_validators_root, fork) = tokio::select! { tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?, () = context.executor.exit() => return Err("Shutting down".to_string()) }; @@ -255,6 +256,7 @@ impl ProductionValidatorClient { start_fallback_updater_service(context.clone(), beacon_nodes.clone())?; let fork_service = ForkServiceBuilder::new() + .fork(fork) .slot_clock(slot_clock.clone()) .beacon_nodes(beacon_nodes.clone()) .log(log.clone()) @@ -394,7 +396,7 @@ impl ProductionValidatorClient { async fn init_from_beacon_node( beacon_nodes: &BeaconNodeFallback, context: &RuntimeContext, -) -> Result<(u64, Hash256), String> { +) -> Result<(u64, Hash256, Fork), String> { loop { beacon_nodes.update_unready_candidates().await; let num_available = beacon_nodes.num_available().await; @@ -453,7 +455,33 @@ async fn init_from_beacon_node( sleep(RETRY_DELAY).await; }; - Ok((genesis.genesis_time, genesis.genesis_validators_root)) + let fork = loop { + match beacon_nodes + .first_success(RequireSynced::No, |node| async move { + node.get_beacon_states_fork(StateId::Head).await + }) + .await + { + Ok(Some(fork)) => break fork.data, + Ok(None) => { + info!( + context.log(), + "Failed to get fork, state not found"; + ); + } + Err(errors) => { + error!( + context.log(), + "Failed to get fork"; + "error" => %errors + ); + } + } + + sleep(RETRY_DELAY).await; + }; + + Ok((genesis.genesis_time, genesis.genesis_validators_root, fork)) } async fn wait_for_genesis( diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index cbbc72cc9..63b752e10 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -120,13 +120,7 @@ impl ValidatorStore { self.validators.read().num_enabled() } - fn fork(&self) -> Option { - if self.fork_service.fork().is_none() { - error!( - self.log, - "Unable to get Fork for signing"; - ); - } + fn fork(&self) -> Fork { self.fork_service.fork() } @@ -134,16 +128,16 @@ impl ValidatorStore { self.validators .read() .voting_keypair(validator_pubkey) - .and_then(|voting_keypair| { + .map(|voting_keypair| { let domain = self.spec.get_domain( epoch, Domain::Randao, - &self.fork()?, + &self.fork(), self.genesis_validators_root, ); let message = epoch.signing_root(domain); - Some(voting_keypair.sk.sign(message)) + voting_keypair.sk.sign(message) }) } @@ -165,7 +159,7 @@ impl ValidatorStore { } // Check for slashing conditions. - let fork = self.fork()?; + let fork = self.fork(); let domain = self.spec.get_domain( block.epoch(), Domain::BeaconProposer, @@ -237,7 +231,7 @@ impl ValidatorStore { } // Checking for slashing conditions. - let fork = self.fork()?; + let fork = self.fork(); let domain = self.spec.get_domain( attestation.data.target.epoch, @@ -339,7 +333,7 @@ impl ValidatorStore { aggregate, Some(selection_proof), &voting_keypair.sk, - &self.fork()?, + &self.fork(), self.genesis_validators_root, &self.spec, )) @@ -360,7 +354,7 @@ impl ValidatorStore { Some(SelectionProof::new::( slot, &voting_keypair.sk, - &self.fork()?, + &self.fork(), self.genesis_validators_root, &self.spec, ))