diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 7c3b456d8..65e48a08a 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -34,6 +34,11 @@ impl ForkServiceBuilder { } } + pub fn fork(mut self, fork: Fork) -> Self { + self.fork = Some(fork); + self + } + pub fn slot_clock(mut self, slot_clock: T) -> Self { self.slot_clock = Some(slot_clock); self @@ -52,7 +57,7 @@ impl ForkServiceBuilder { pub fn build(self) -> Result, String> { Ok(ForkService { inner: Arc::new(Inner { - fork: RwLock::new(self.fork), + fork: RwLock::new(self.fork.ok_or("Cannot build ForkService without fork")?), slot_clock: self .slot_clock .ok_or("Cannot build ForkService without slot_clock")?, @@ -100,7 +105,7 @@ impl ForkServiceBuilder { /// Helper to minimise `Arc` usage. pub struct Inner { - fork: RwLock>, + fork: RwLock, beacon_nodes: Arc>, log: Logger, slot_clock: T, @@ -129,7 +134,7 @@ impl Deref for ForkService { impl ForkService { /// Returns the last fork downloaded from the beacon node, if any. - pub fn fork(&self) -> Option { + pub fn fork(&self) -> Fork { *self.fork.read() } @@ -201,8 +206,8 @@ impl ForkService { .await .map_err(|_| ())?; - if self.fork.read().as_ref() != Some(&fork) { - *(self.fork.write()) = Some(fork); + if *(self.fork.read()) != fork { + *(self.fork.write()) = fork; } debug!(self.log, "Fork update success"); diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 65256b5f5..e0c9c3120 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -27,6 +27,7 @@ use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; +use eth2::types::StateId; use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Url}; use fork_service::{ForkService, ForkServiceBuilder}; use futures::channel::mpsc; @@ -43,7 +44,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{sleep, Duration}; -use types::{EthSpec, Hash256}; +use types::{EthSpec, Fork, Hash256}; use validator_store::ValidatorStore; /// The interval between attempts to contact the beacon node during startup. @@ -234,7 +235,7 @@ impl ProductionValidatorClient { BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone()); // Perform some potentially long-running initialization tasks. - let (genesis_time, genesis_validators_root) = tokio::select! { + let (genesis_time, genesis_validators_root, fork) = tokio::select! { tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?, () = context.executor.exit() => return Err("Shutting down".to_string()) }; @@ -255,6 +256,7 @@ impl ProductionValidatorClient { start_fallback_updater_service(context.clone(), beacon_nodes.clone())?; let fork_service = ForkServiceBuilder::new() + .fork(fork) .slot_clock(slot_clock.clone()) .beacon_nodes(beacon_nodes.clone()) .log(log.clone()) @@ -394,7 +396,7 @@ impl ProductionValidatorClient { async fn init_from_beacon_node( beacon_nodes: &BeaconNodeFallback, context: &RuntimeContext, -) -> Result<(u64, Hash256), String> { +) -> Result<(u64, Hash256, Fork), String> { loop { beacon_nodes.update_unready_candidates().await; let num_available = beacon_nodes.num_available().await; @@ -453,7 +455,33 @@ async fn init_from_beacon_node( sleep(RETRY_DELAY).await; }; - Ok((genesis.genesis_time, genesis.genesis_validators_root)) + let fork = loop { + match beacon_nodes + .first_success(RequireSynced::No, |node| async move { + node.get_beacon_states_fork(StateId::Head).await + }) + .await + { + Ok(Some(fork)) => break fork.data, + Ok(None) => { + info!( + context.log(), + "Failed to get fork, state not found"; + ); + } + Err(errors) => { + error!( + context.log(), + "Failed to get fork"; + "error" => %errors + ); + } + } + + sleep(RETRY_DELAY).await; + }; + + Ok((genesis.genesis_time, genesis.genesis_validators_root, fork)) } async fn wait_for_genesis( diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index cbbc72cc9..63b752e10 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -120,13 +120,7 @@ impl ValidatorStore { self.validators.read().num_enabled() } - fn fork(&self) -> Option { - if self.fork_service.fork().is_none() { - error!( - self.log, - "Unable to get Fork for signing"; - ); - } + fn fork(&self) -> Fork { self.fork_service.fork() } @@ -134,16 +128,16 @@ impl ValidatorStore { self.validators .read() .voting_keypair(validator_pubkey) - .and_then(|voting_keypair| { + .map(|voting_keypair| { let domain = self.spec.get_domain( epoch, Domain::Randao, - &self.fork()?, + &self.fork(), self.genesis_validators_root, ); let message = epoch.signing_root(domain); - Some(voting_keypair.sk.sign(message)) + voting_keypair.sk.sign(message) }) } @@ -165,7 +159,7 @@ impl ValidatorStore { } // Check for slashing conditions. - let fork = self.fork()?; + let fork = self.fork(); let domain = self.spec.get_domain( block.epoch(), Domain::BeaconProposer, @@ -237,7 +231,7 @@ impl ValidatorStore { } // Checking for slashing conditions. - let fork = self.fork()?; + let fork = self.fork(); let domain = self.spec.get_domain( attestation.data.target.epoch, @@ -339,7 +333,7 @@ impl ValidatorStore { aggregate, Some(selection_proof), &voting_keypair.sk, - &self.fork()?, + &self.fork(), self.genesis_validators_root, &self.spec, )) @@ -360,7 +354,7 @@ impl ValidatorStore { Some(SelectionProof::new::( slot, &voting_keypair.sk, - &self.fork()?, + &self.fork(), self.genesis_validators_root, &self.spec, ))