Fix timing issue in obtaining the Fork (#2158)

## Issue Addressed

Related PR: https://github.com/sigp/lighthouse/pull/2137#issuecomment-754712492

The Fork is required for VC to perform signing. Currently, it is not guaranteed that the Fork has been obtained at the point of the signing as the Fork is obtained at after ForkService starts. We will see the [error](851a4dca3c/validator_client/src/validator_store.rs (L127)) if VC could not perform the signing due to the timing issue.

> Unable to get Fork for signing

## Proposed Changes

Obtain the Fork on `init_from_beacon_node` to fix the timing issue.
This commit is contained in:
Akihito Nakano 2021-01-19 02:54:18 +00:00
parent 908c8eadf3
commit a8d040c821
3 changed files with 50 additions and 23 deletions

View File

@ -34,6 +34,11 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
} }
} }
pub fn fork(mut self, fork: Fork) -> Self {
self.fork = Some(fork);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self { pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock); self.slot_clock = Some(slot_clock);
self self
@ -52,7 +57,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
pub fn build(self) -> Result<ForkService<T, E>, String> { pub fn build(self) -> Result<ForkService<T, E>, String> {
Ok(ForkService { Ok(ForkService {
inner: Arc::new(Inner { inner: Arc::new(Inner {
fork: RwLock::new(self.fork), fork: RwLock::new(self.fork.ok_or("Cannot build ForkService without fork")?),
slot_clock: self slot_clock: self
.slot_clock .slot_clock
.ok_or("Cannot build ForkService without slot_clock")?, .ok_or("Cannot build ForkService without slot_clock")?,
@ -100,7 +105,7 @@ impl<E: EthSpec> ForkServiceBuilder<slot_clock::TestingSlotClock, E> {
/// Helper to minimise `Arc` usage. /// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> { pub struct Inner<T, E: EthSpec> {
fork: RwLock<Option<Fork>>, fork: RwLock<Fork>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>, beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
log: Logger, log: Logger,
slot_clock: T, slot_clock: T,
@ -129,7 +134,7 @@ impl<T, E: EthSpec> Deref for ForkService<T, E> {
impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> { impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
/// Returns the last fork downloaded from the beacon node, if any. /// Returns the last fork downloaded from the beacon node, if any.
pub fn fork(&self) -> Option<Fork> { pub fn fork(&self) -> Fork {
*self.fork.read() *self.fork.read()
} }
@ -201,8 +206,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
.await .await
.map_err(|_| ())?; .map_err(|_| ())?;
if self.fork.read().as_ref() != Some(&fork) { if *(self.fork.read()) != fork {
*(self.fork.write()) = Some(fork); *(self.fork.write()) = fork;
} }
debug!(self.log, "Fork update success"); debug!(self.log, "Fork update success");

View File

@ -27,6 +27,7 @@ use block_service::{BlockService, BlockServiceBuilder};
use clap::ArgMatches; use clap::ArgMatches;
use duties_service::{DutiesService, DutiesServiceBuilder}; use duties_service::{DutiesService, DutiesServiceBuilder};
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::types::StateId;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Url}; use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Url};
use fork_service::{ForkService, ForkServiceBuilder}; use fork_service::{ForkService, ForkServiceBuilder};
use futures::channel::mpsc; use futures::channel::mpsc;
@ -43,7 +44,7 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use types::{EthSpec, Hash256}; use types::{EthSpec, Fork, Hash256};
use validator_store::ValidatorStore; use validator_store::ValidatorStore;
/// The interval between attempts to contact the beacon node during startup. /// The interval between attempts to contact the beacon node during startup.
@ -234,7 +235,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone()); BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone());
// Perform some potentially long-running initialization tasks. // Perform some potentially long-running initialization tasks.
let (genesis_time, genesis_validators_root) = tokio::select! { let (genesis_time, genesis_validators_root, fork) = tokio::select! {
tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?, tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string()) () = context.executor.exit() => return Err("Shutting down".to_string())
}; };
@ -255,6 +256,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
start_fallback_updater_service(context.clone(), beacon_nodes.clone())?; start_fallback_updater_service(context.clone(), beacon_nodes.clone())?;
let fork_service = ForkServiceBuilder::new() let fork_service = ForkServiceBuilder::new()
.fork(fork)
.slot_clock(slot_clock.clone()) .slot_clock(slot_clock.clone())
.beacon_nodes(beacon_nodes.clone()) .beacon_nodes(beacon_nodes.clone())
.log(log.clone()) .log(log.clone())
@ -394,7 +396,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
async fn init_from_beacon_node<E: EthSpec>( async fn init_from_beacon_node<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>, beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
context: &RuntimeContext<E>, context: &RuntimeContext<E>,
) -> Result<(u64, Hash256), String> { ) -> Result<(u64, Hash256, Fork), String> {
loop { loop {
beacon_nodes.update_unready_candidates().await; beacon_nodes.update_unready_candidates().await;
let num_available = beacon_nodes.num_available().await; let num_available = beacon_nodes.num_available().await;
@ -453,7 +455,33 @@ async fn init_from_beacon_node<E: EthSpec>(
sleep(RETRY_DELAY).await; sleep(RETRY_DELAY).await;
}; };
Ok((genesis.genesis_time, genesis.genesis_validators_root)) let fork = loop {
match beacon_nodes
.first_success(RequireSynced::No, |node| async move {
node.get_beacon_states_fork(StateId::Head).await
})
.await
{
Ok(Some(fork)) => break fork.data,
Ok(None) => {
info!(
context.log(),
"Failed to get fork, state not found";
);
}
Err(errors) => {
error!(
context.log(),
"Failed to get fork";
"error" => %errors
);
}
}
sleep(RETRY_DELAY).await;
};
Ok((genesis.genesis_time, genesis.genesis_validators_root, fork))
} }
async fn wait_for_genesis<E: EthSpec>( async fn wait_for_genesis<E: EthSpec>(

View File

@ -120,13 +120,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.validators.read().num_enabled() self.validators.read().num_enabled()
} }
fn fork(&self) -> Option<Fork> { fn fork(&self) -> Fork {
if self.fork_service.fork().is_none() {
error!(
self.log,
"Unable to get Fork for signing";
);
}
self.fork_service.fork() self.fork_service.fork()
} }
@ -134,16 +128,16 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.validators self.validators
.read() .read()
.voting_keypair(validator_pubkey) .voting_keypair(validator_pubkey)
.and_then(|voting_keypair| { .map(|voting_keypair| {
let domain = self.spec.get_domain( let domain = self.spec.get_domain(
epoch, epoch,
Domain::Randao, Domain::Randao,
&self.fork()?, &self.fork(),
self.genesis_validators_root, self.genesis_validators_root,
); );
let message = epoch.signing_root(domain); let message = epoch.signing_root(domain);
Some(voting_keypair.sk.sign(message)) voting_keypair.sk.sign(message)
}) })
} }
@ -165,7 +159,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
} }
// Check for slashing conditions. // Check for slashing conditions.
let fork = self.fork()?; let fork = self.fork();
let domain = self.spec.get_domain( let domain = self.spec.get_domain(
block.epoch(), block.epoch(),
Domain::BeaconProposer, Domain::BeaconProposer,
@ -237,7 +231,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
} }
// Checking for slashing conditions. // Checking for slashing conditions.
let fork = self.fork()?; let fork = self.fork();
let domain = self.spec.get_domain( let domain = self.spec.get_domain(
attestation.data.target.epoch, attestation.data.target.epoch,
@ -339,7 +333,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
aggregate, aggregate,
Some(selection_proof), Some(selection_proof),
&voting_keypair.sk, &voting_keypair.sk,
&self.fork()?, &self.fork(),
self.genesis_validators_root, self.genesis_validators_root,
&self.spec, &self.spec,
)) ))
@ -360,7 +354,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
Some(SelectionProof::new::<E>( Some(SelectionProof::new::<E>(
slot, slot,
&voting_keypair.sk, &voting_keypair.sk,
&self.fork()?, &self.fork(),
self.genesis_validators_root, self.genesis_validators_root,
&self.spec, &self.spec,
)) ))