BN Fallback v2 (#2080)

## Issue Addressed

- Resolves #1883

## Proposed Changes

This follows on from @blacktemplar's work in #2018.

- Allows the VC to connect to multiple BN for redundancy.
  - Update the simulator so some nodes always need to rely on their fallback.
- Adds some extra deprecation warnings for `--eth1-endpoint`
- Pass `SignatureBytes` as a reference instead of by value.

## Additional Info

NA

Co-authored-by: blacktemplar <blacktemplar@a1.net>
This commit is contained in:
Paul Hauner 2020-12-18 09:17:03 +00:00
parent f998eff7ce
commit a62dc65ca4
23 changed files with 882 additions and 281 deletions

1
Cargo.lock generated
View File

@ -7141,6 +7141,7 @@ dependencies = [
"eth2_ssz",
"eth2_ssz_derive",
"exit-future",
"fallback",
"futures 0.3.8",
"hex",
"hyper 0.13.9",

View File

@ -1643,7 +1643,7 @@ impl ApiTester {
let block = self
.client
.get_validator_blocks::<E>(slot, randao_reveal, None)
.get_validator_blocks::<E>(slot, &randao_reveal, None)
.await
.unwrap()
.data;

View File

@ -170,6 +170,11 @@ pub fn get_config<E: EthSpec>(
// Defines the URL to reach the eth1 node.
if let Some(val) = cli_args.value_of("eth1-endpoint") {
warn!(
log,
"The --eth1-endpoint flag is deprecated";
"msg" => "please use --eth1-endpoints instead"
);
client_config.sync_eth1_chain = true;
client_config.eth1.endpoints = vec![val.to_string()];
} else if let Some(val) = cli_args.value_of("eth1-endpoints") {

View File

@ -83,6 +83,18 @@ pub struct BeaconNodeHttpClient {
server: Url,
}
impl fmt::Display for BeaconNodeHttpClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.server.fmt(f)
}
}
impl AsRef<str> for BeaconNodeHttpClient {
fn as_ref(&self) -> &str {
self.server.as_str()
}
}
impl BeaconNodeHttpClient {
pub fn new(server: Url) -> Self {
Self {
@ -848,7 +860,7 @@ impl BeaconNodeHttpClient {
pub async fn get_validator_blocks<T: EthSpec>(
&self,
slot: Slot,
randao_reveal: SignatureBytes,
randao_reveal: &SignatureBytes,
graffiti: Option<&Graffiti>,
) -> Result<GenericResponse<BeaconBlock<T>>, Error> {
let mut path = self.eth_path()?;

View File

@ -15,7 +15,10 @@ pub const ETH1_GENESIS_UPDATE_INTERVAL: Duration = Duration::from_millis(7_000);
pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches<'_>) -> Result<(), String> {
let endpoints = matches
.value_of("eth1-endpoint")
.map(|e| vec![String::from(e)])
.map(|e| {
warn!("The --eth1-endpoint flag is deprecated. Please use --eth1-endpoints instead");
vec![String::from(e)]
})
.or_else(|| {
matches
.value_of("eth1-endpoints")

View File

@ -1,3 +1,4 @@
use crate::local_network::INVALID_ADDRESS;
use crate::{checks, LocalNetwork, E};
use clap::ArgMatches;
use eth1::http::Eth1Id;
@ -128,8 +129,12 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
/*
* One by one, add beacon nodes to the network.
*/
for _ in 0..node_count - 1 {
network.add_beacon_node(beacon_config.clone()).await?;
for i in 0..node_count - 1 {
let mut config = beacon_config.clone();
if i % 2 == 0 {
config.eth1.endpoints.insert(0, INVALID_ADDRESS.to_string());
}
network.add_beacon_node(config).await?;
}
/*
@ -137,7 +142,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
*/
for (i, files) in validator_files.into_iter().enumerate() {
network
.add_validator_client(testing_validator_config(), i, files)
.add_validator_client(testing_validator_config(), i, files, i % 2 == 0)
.await?;
}

View File

@ -9,6 +9,7 @@ use std::sync::Arc;
use types::{Epoch, EthSpec};
const BOOTNODE_PORT: u16 = 42424;
pub const INVALID_ADDRESS: &str = "http://127.0.0.1:42423";
/// Helper struct to reduce `Arc` usage.
pub struct Inner<E: EthSpec> {
@ -118,6 +119,7 @@ impl<E: EthSpec> LocalNetwork<E> {
mut validator_config: ValidatorConfig,
beacon_node: usize,
validator_files: ValidatorFiles,
invalid_first_beacon_node: bool, //to test beacon node fallbacks
) -> Result<(), String> {
let index = self.validator_clients.read().len();
let context = self.context.service_context(format!("validator_{}", index));
@ -133,8 +135,12 @@ impl<E: EthSpec> LocalNetwork<E> {
.expect("Must have http started")
};
validator_config.beacon_node =
format!("http://{}:{}", socket_addr.ip(), socket_addr.port());
let beacon_node = format!("http://{}:{}", socket_addr.ip(), socket_addr.port());
validator_config.beacon_nodes = if invalid_first_beacon_node {
vec![INVALID_ADDRESS.to_string(), beacon_node]
} else {
vec![beacon_node]
};
let validator_client = LocalValidatorClient::production_with_insecure_keypairs(
context,
validator_config,

View File

@ -103,7 +103,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
let add_validators_fut = async {
for (i, files) in validator_files.into_iter().enumerate() {
network
.add_validator_client(testing_validator_config(), i, files)
.add_validator_client(testing_validator_config(), i, files, i % 2 == 0)
.await?;
}

View File

@ -94,7 +94,7 @@ fn syncing_sim(
* Add a validator client which handles all validators from the genesis state.
*/
network
.add_validator_client(testing_validator_config(), 0, validator_files)
.add_validator_client(testing_validator_config(), 0, validator_files, true)
.await?;
// Check all syncing strategies one after other.

View File

@ -63,3 +63,4 @@ rand = "0.7.3"
scrypt = { version = "0.5.0", default-features = false }
lighthouse_metrics = { path = "../common/lighthouse_metrics" }
lazy_static = "1.4.0"
fallback = { path = "../common/fallback" }

View File

@ -1,10 +1,10 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
validator_store::ValidatorStore,
};
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future::FutureExt;
use futures::StreamExt;
use slog::{crit, error, info, trace};
@ -24,7 +24,7 @@ pub struct AttestationServiceBuilder<T, E: EthSpec> {
duties_service: Option<DutiesService<T, E>>,
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<T>,
beacon_node: Option<BeaconNodeHttpClient>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
}
@ -34,7 +34,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
duties_service: None,
validator_store: None,
slot_clock: None,
beacon_node: None,
beacon_nodes: None,
context: None,
}
}
@ -54,8 +54,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
@ -76,9 +76,9 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
slot_clock: self
.slot_clock
.ok_or("Cannot build AttestationService without slot_clock")?,
beacon_node: self
.beacon_node
.ok_or("Cannot build AttestationService without beacon_node")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build AttestationService without beacon_nodes")?,
context: self
.context
.ok_or("Cannot build AttestationService without runtime_context")?,
@ -92,7 +92,7 @@ pub struct Inner<T, E: EthSpec> {
duties_service: DutiesService<T, E>,
validator_store: ValidatorStore<T, E>,
slot_clock: T,
beacon_node: BeaconNodeHttpClient,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
}
@ -337,11 +337,16 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.epoch(E::slots_per_epoch());
let attestation_data = self
.beacon_node
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))?
.data;
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
let mut attestations = Vec::with_capacity(validator_duties.len());
@ -408,9 +413,14 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
}
}
let attestations_slice = attestations.as_slice();
match self
.beacon_node
.post_beacon_pool_attestations(attestations.as_slice())
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_beacon_pool_attestations(attestations_slice)
.await
})
.await
{
Ok(()) => info!(
@ -425,7 +435,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
Err(e) => error!(
log,
"Unable to publish attestations";
"error" => ?e,
"error" => %e,
"committee_index" => attestation_data.index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
@ -455,16 +465,22 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
) -> Result<(), String> {
let log = self.context.log();
let attestation_data_ref = &attestation_data;
let aggregated_attestation = self
.beacon_node
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
attestation_data_ref.slot,
attestation_data_ref.tree_hash_root(),
)
.await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))?
.data;
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data_ref))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
let mut signed_aggregate_and_proofs = Vec::new();
@ -507,9 +523,14 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
}
if !signed_aggregate_and_proofs.is_empty() {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();
match self
.beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs.as_slice())
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
})
.await
{
Ok(()) => {
@ -533,7 +554,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
crit!(
log,
"Failed to publish attestation";
"error" => e.to_string(),
"error" => %e,
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",

View File

@ -0,0 +1,463 @@
//! Allows for a list of `BeaconNodeHttpClient` to appear as a single entity which will exhibits
//! "fallback" behaviour; it will try a request on all of the nodes until one or none of them
//! succeed.
use crate::check_synced::check_synced;
use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_REQUESTS};
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use std::fmt;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tokio::{sync::RwLock, time::sleep};
use types::{ChainSpec, EthSpec};
/// The number of seconds *prior* to slot start that we will try and update the state of fallback
/// nodes.
///
/// Ideally this should be somewhere between 2/3rds through the slot and the end of it. If we set it
/// too early, we risk switching nodes between the time of publishing an attestation and publishing
/// an aggregate; this may result in a missed aggregation. If we set this time too late, we risk not
/// having the correct nodes up and running prior to the start of the slot.
const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1);
/// Starts a service that will routinely try and update the status of the provided `beacon_nodes`.
///
/// See `SLOT_LOOKAHEAD` for information about when this should run.
pub fn start_fallback_updater_service<T: SlotClock + 'static, E: EthSpec>(
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
) -> Result<(), &'static str> {
let executor = context.executor;
if beacon_nodes.slot_clock.is_none() {
return Err("Cannot start fallback updater without slot clock");
}
let future = async move {
loop {
beacon_nodes.update_unready_candidates().await;
let sleep_time = beacon_nodes
.slot_clock
.as_ref()
.and_then(|slot_clock| {
let slot = slot_clock.now()?;
let till_next_slot = slot_clock.duration_to_slot(slot + 1)?;
till_next_slot.checked_sub(SLOT_LOOKAHEAD)
})
.unwrap_or_else(|| Duration::from_secs(1));
sleep(sleep_time).await
}
};
executor.spawn(future, "fallback");
Ok(())
}
/// Indicates if a beacon node must be synced before some action is performed on it.
#[derive(PartialEq, Clone, Copy)]
pub enum RequireSynced {
Yes,
No,
}
impl PartialEq<bool> for RequireSynced {
fn eq(&self, other: &bool) -> bool {
if *other {
*self == RequireSynced::Yes
} else {
*self == RequireSynced::No
}
}
}
#[derive(Debug)]
pub enum Error<E> {
/// The node was unavailable and we didn't attempt to contact it.
Unavailable(CandidateError),
/// We attempted to contact the node but it failed.
RequestFailed(E),
}
impl<E> Error<E> {
pub fn request_failure(&self) -> Option<&E> {
match self {
Error::RequestFailed(e) => Some(e),
_ => None,
}
}
}
/// The list of errors encountered whilst attempting to perform a query.
pub struct AllErrored<E>(pub Vec<(String, Error<E>)>);
impl<E: Debug> fmt::Display for AllErrored<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "All endpoints failed")?;
for (i, (id, error)) in self.0.iter().enumerate() {
let comma = if i + 1 < self.0.len() { "," } else { "" };
write!(f, " {} => {:?}{}", id, error, comma)?;
}
Ok(())
}
}
/// Reasons why a candidate might not be ready.
#[derive(Debug, Clone, Copy)]
pub enum CandidateError {
Uninitialized,
Offline,
Incompatible,
NotSynced,
}
/// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used
/// for a query.
pub struct CandidateBeaconNode<E> {
beacon_node: BeaconNodeHttpClient,
status: RwLock<Result<(), CandidateError>>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> CandidateBeaconNode<E> {
/// Instantiate a new node.
pub fn new(beacon_node: BeaconNodeHttpClient) -> Self {
Self {
beacon_node,
status: RwLock::new(Err(CandidateError::Uninitialized)),
_phantom: PhantomData,
}
}
/// Returns the status of `self`.
///
/// If `RequiredSynced::No`, any `NotSynced` node will be ignored and mapped to `Ok(())`.
pub async fn status(&self, synced: RequireSynced) -> Result<(), CandidateError> {
match *self.status.read().await {
Err(CandidateError::NotSynced) if synced == false => Ok(()),
other => other,
}
}
/// Indicate that `self` is offline.
pub async fn set_offline(&self) {
*self.status.write().await = Err(CandidateError::Offline)
}
/// Perform some queries against the node to determine if it is a good candidate, updating
/// `self.status` and returning that result.
pub async fn refresh_status<T: SlotClock>(
&self,
slot_clock: Option<&T>,
spec: &ChainSpec,
log: &Logger,
) -> Result<(), CandidateError> {
let mut status = self.status.write().await;
if let Err(e) = self.is_online(log).await {
*status = Err(e);
} else if let Err(e) = self.is_compatible(spec, log).await {
*status = Err(e);
} else if let Err(e) = self.is_synced(slot_clock, log).await {
*status = Err(e);
} else {
*status = Ok(())
}
*status
}
/// Checks if the node is reachable.
async fn is_online(&self, log: &Logger) -> Result<(), CandidateError> {
let result = self
.beacon_node
.get_node_version()
.await
.map(|body| body.data.version);
match result {
Ok(version) => {
info!(
log,
"Connected to beacon node";
"version" => version,
"endpoint" => %self.beacon_node,
);
Ok(())
}
Err(e) => {
warn!(
log,
"Offline beacon node";
"error" => %e,
"endpoint" => %self.beacon_node,
);
Err(CandidateError::Offline)
}
}
}
/// Checks if the node has the correct specification.
async fn is_compatible(&self, spec: &ChainSpec, log: &Logger) -> Result<(), CandidateError> {
let yaml_config = self
.beacon_node
.get_config_spec()
.await
.map_err(|e| {
error!(
log,
"Unable to read spec from beacon node";
"error" => %e,
"endpoint" => %self.beacon_node,
);
CandidateError::Offline
})?
.data;
let beacon_node_spec = yaml_config
.apply_to_chain_spec::<E>(&E::default_spec())
.ok_or_else(|| {
error!(
log,
"The minimal/mainnet spec type of the beacon node does not match the validator \
client. See the --network command.";
"endpoint" => %self.beacon_node,
);
CandidateError::Incompatible
})?;
if *spec == beacon_node_spec {
Ok(())
} else {
error!(
log,
"The beacon node is using a different Eth2 specification to this validator client. \
See the --network command.";
"endpoint" => %self.beacon_node,
);
Err(CandidateError::Incompatible)
}
}
/// Checks if the beacon node is synced.
async fn is_synced<T: SlotClock>(
&self,
slot_clock: Option<&T>,
log: &Logger,
) -> Result<(), CandidateError> {
if let Some(slot_clock) = slot_clock {
match check_synced(&self.beacon_node, slot_clock, Some(log)).await {
r @ Err(CandidateError::NotSynced) => {
warn!(
log,
"Beacon node is not synced";
"endpoint" => %self.beacon_node,
);
r
}
result => result,
}
} else {
// Skip this check if we don't supply a slot clock.
Ok(())
}
}
}
/// A collection of `CandidateBeaconNode` that can be used to perform requests with "fallback"
/// behaviour, where the failure of one candidate results in the next candidate receiving an
/// identical query.
pub struct BeaconNodeFallback<T, E> {
candidates: Vec<CandidateBeaconNode<E>>,
slot_clock: Option<T>,
spec: ChainSpec,
log: Logger,
}
impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub fn new(candidates: Vec<CandidateBeaconNode<E>>, spec: ChainSpec, log: Logger) -> Self {
Self {
candidates,
slot_clock: None,
spec,
log,
}
}
/// Used to update the slot clock post-instantiation.
///
/// This is the result of a chicken-and-egg issue where `Self` needs a slot clock for some
/// operations, but `Self` is required to obtain the slot clock since we need the genesis time
/// from a beacon node.
pub fn set_slot_clock(&mut self, slot_clock: T) {
self.slot_clock = Some(slot_clock);
}
/// The count of candidates, regardless of their state.
pub async fn num_total(&self) -> usize {
self.candidates.len()
}
/// The count of synced and ready candidates.
pub async fn num_synced(&self) -> usize {
let mut n = 0;
for candidate in &self.candidates {
if candidate.status(RequireSynced::Yes).await.is_ok() {
n += 1
}
}
n
}
/// The count of candidates that are online and compatible, but not necessarily synced.
pub async fn num_available(&self) -> usize {
let mut n = 0;
for candidate in &self.candidates {
if candidate.status(RequireSynced::No).await.is_ok() {
n += 1
}
}
n
}
/// Loop through any `self.candidates` that we don't think are online, compatible or synced and
/// poll them to see if their status has changed.
///
/// We do not poll nodes that are synced to avoid sending additional requests when everything is
/// going smoothly.
pub async fn update_unready_candidates(&self) {
let mut futures = Vec::new();
for candidate in &self.candidates {
// There is a potential race condition between having the read lock and the write
// lock. The worst case of this race is running `try_become_ready` twice, which is
// acceptable.
//
// Note: `RequireSynced` is always set to false here. This forces us to recheck the sync
// status of nodes that were previously not-synced.
if candidate.status(RequireSynced::Yes).await.is_err() {
// There exists a race-condition that could result in `refresh_status` being called
// when the status does not require refreshing anymore. This deemed is an
// acceptable inefficiency.
futures.push(candidate.refresh_status(
self.slot_clock.as_ref(),
&self.spec,
&self.log,
));
}
}
//run all updates concurrently and ignore results
let _ = future::join_all(futures).await;
}
/// Run `func` against each candidate in `self`, returning immediately if a result is found.
/// Otherwise, return all the errors encountered along the way.
///
/// First this function will try all nodes with a suitable status. If no candidates are suitable
/// or all the requests fail, it will try updating the status of all unsuitable nodes and
/// re-running `func` again.
pub async fn first_success<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
func: F,
) -> Result<O, AllErrored<Err>>
where
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
{
let mut errors = vec![];
let mut to_retry = vec![];
let mut retry_unsynced = vec![];
// Run `func` using a `candidate`, returning the value or capturing errors.
//
// We use a macro instead of a closure here since it is not trivial to move `func` into a
// closure.
macro_rules! try_func {
($candidate: ident) => {{
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&$candidate.beacon_node).await {
Ok(val) => return Ok(val),
Err(e) => {
// If we have an error on this function, make the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
$candidate.set_offline().await;
errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
}
}
}};
}
// First pass: try `func` on all synced and ready candidates.
//
// This ensures that we always choose a synced node if it is available.
for candidate in &self.candidates {
match candidate.status(RequireSynced::Yes).await {
Err(e @ CandidateError::NotSynced) if require_synced == false => {
// This client is unsynced we will try it after trying all synced clients
retry_unsynced.push(candidate);
errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e)));
}
Err(e) => {
// This client was not ready on the first pass, we might try it again later.
to_retry.push(candidate);
errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e)));
}
_ => try_func!(candidate),
}
}
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
// unsynced candidates.
//
// Due to async race-conditions, it is possible that we will send a request to a candidate
// that has been set to an offline/unready status. This is acceptable.
if require_synced == false {
for candidate in retry_unsynced {
try_func!(candidate);
}
}
// Third pass: try again, attempting to make non-ready clients become ready.
for candidate in to_retry {
// If the candidate hasn't luckily transferred into the correct state in the meantime,
// force an update of the state.
let new_status = match candidate.status(require_synced).await {
Ok(()) => Ok(()),
Err(_) => {
candidate
.refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log)
.await
}
};
match new_status {
Ok(()) => try_func!(candidate),
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
Err(e) => {
errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e)));
}
}
}
// There were no candidates already ready and we were unable to make any of them ready.
Err(AllErrored(errors))
}
}

View File

@ -1,6 +1,7 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{http_metrics::metrics, validator_store::ValidatorStore};
use environment::RuntimeContext;
use eth2::{types::Graffiti, BeaconNodeHttpClient};
use eth2::types::Graffiti;
use futures::channel::mpsc::Receiver;
use futures::{StreamExt, TryFutureExt};
use slog::{crit, debug, error, info, trace, warn};
@ -13,7 +14,7 @@ use types::{EthSpec, PublicKey, Slot};
pub struct BlockServiceBuilder<T, E: EthSpec> {
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<Arc<T>>,
beacon_node: Option<BeaconNodeHttpClient>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
graffiti: Option<Graffiti>,
}
@ -23,7 +24,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
Self {
validator_store: None,
slot_clock: None,
beacon_node: None,
beacon_nodes: None,
context: None,
graffiti: None,
}
@ -39,8 +40,8 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
@ -63,8 +64,8 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
slot_clock: self
.slot_clock
.ok_or("Cannot build BlockService without slot_clock")?,
beacon_node: self
.beacon_node
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build BlockService without beacon_node")?,
context: self
.context
@ -79,7 +80,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
pub struct Inner<T, E: EthSpec> {
validator_store: ValidatorStore<T, E>,
slot_clock: Arc<T>,
beacon_node: BeaconNodeHttpClient,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
graffiti: Option<Graffiti>,
}
@ -222,24 +223,37 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let randao_reveal = self
.validator_store
.randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch()))
.ok_or("Unable to produce randao reveal")?;
.ok_or("Unable to produce randao reveal")?
.into();
let block = self
.beacon_node
.get_validator_blocks(slot, randao_reveal.into(), self.graffiti.as_ref())
let randao_reveal_ref = &randao_reveal;
let self_ref = &self;
let validator_pubkey_ref = &validator_pubkey;
let signed_block = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let block = beacon_node
.get_validator_blocks(slot, randao_reveal_ref, self_ref.graffiti.as_ref())
.await
.map_err(|e| format!("Error from beacon node when producing block: {:?}", e))?
.data;
let signed_block = self
let signed_block = self_ref
.validator_store
.sign_block(&validator_pubkey, block, current_slot)
.sign_block(validator_pubkey_ref, block, current_slot)
.ok_or("Unable to sign block")?;
self.beacon_node
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| format!("Error from beacon node when publishing block: {:?}", e))?;
.map_err(|e| {
format!("Error from beacon node when publishing block: {:?}", e)
})?;
Ok::<_, String>(signed_block)
})
.await
.map_err(|e| e.to_string())?;
info!(
log,

View File

@ -1,3 +1,4 @@
use crate::beacon_node_fallback::CandidateError;
use eth2::BeaconNodeHttpClient;
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
@ -5,33 +6,33 @@ use slot_clock::SlotClock;
/// A distance in slots.
const SYNC_TOLERANCE: u64 = 4;
/// Returns `true` if the beacon node is synced and ready for action.
/// Returns
///
/// Returns `false` if:
///
/// - The beacon node is unreachable.
/// - The beacon node indicates that it is syncing **AND** it is more than `SYNC_TOLERANCE` behind
/// the highest known slot.
/// `Ok(())` if the beacon node is synced and ready for action,
/// `Err(CandidateError::Offline)` if the beacon node is unreachable,
/// `Err(CandidateError::NotSynced)` if the beacon node indicates that it is syncing **AND**
/// it is more than `SYNC_TOLERANCE` behind the highest
/// known slot.
///
/// The second condition means the even if the beacon node thinks that it's syncing, we'll still
/// try to use it if it's close enough to the head.
pub async fn is_synced<T: SlotClock>(
pub async fn check_synced<T: SlotClock>(
beacon_node: &BeaconNodeHttpClient,
slot_clock: &T,
log_opt: Option<&Logger>,
) -> bool {
) -> Result<(), CandidateError> {
let resp = match beacon_node.get_node_syncing().await {
Ok(resp) => resp,
Err(e) => {
if let Some(log) = log_opt {
error!(
warn!(
log,
"Unable connect to beacon node";
"error" => e.to_string()
"error" => %e
)
}
return false;
return Err(CandidateError::Offline);
}
};
@ -48,9 +49,9 @@ pub async fn is_synced<T: SlotClock>(
warn!(
log,
"Beacon node is syncing";
"msg" => "not receiving new duties",
"sync_distance" => resp.data.sync_distance.as_u64(),
"head_slot" => resp.data.head_slot.as_u64(),
"endpoint" => %beacon_node,
);
}
@ -63,10 +64,15 @@ pub async fn is_synced<T: SlotClock>(
"msg" => "check the system time on this host and the beacon node",
"beacon_node_slot" => remote_slot,
"local_slot" => local_slot,
"endpoint" => %beacon_node,
);
}
}
}
is_synced
if is_synced {
Ok(())
} else {
Err(CandidateError::NotSynced)
}
}

View File

@ -9,22 +9,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
"When connected to a beacon node, performs the duties of a staked \
validator (e.g., proposing blocks and attestations).",
)
// This argument is deprecated, use `--beacon-nodes` instead.
.arg(
Arg::with_name("beacon-node")
.long("beacon-node")
.value_name("NETWORK_ADDRESS")
.help("Address to a beacon node HTTP API")
.help("Deprecated. Use --beacon-nodes.")
.takes_value(true)
.conflicts_with("beacon-nodes"),
)
.arg(
Arg::with_name("beacon-nodes")
.long("beacon-nodes")
.value_name("NETWORK_ADDRESSES")
.help("Comma-separated addresses to one or more beacon node HTTP APIs")
.default_value(&DEFAULT_BEACON_NODE)
.takes_value(true),
)
// This argument is deprecated, use `--beacon-node` instead.
// This argument is deprecated, use `--beacon-nodes` instead.
.arg(
Arg::with_name("server")
.long("server")
.value_name("NETWORK_ADDRESS")
.help("Deprecated. Use --beacon-node.")
.help("Deprecated. Use --beacon-nodes.")
.takes_value(true)
.conflicts_with("beacon-node"),
.conflicts_with_all(&["beacon-node", "beacon-nodes"]),
)
.arg(
Arg::with_name("validators-dir")

View File

@ -22,10 +22,10 @@ pub struct Config {
pub validator_dir: PathBuf,
/// The directory containing the passwords to unlock validator keystores.
pub secrets_dir: PathBuf,
/// The http endpoint of the beacon node API.
/// The http endpoints of the beacon node APIs.
///
/// Should be similar to `http://localhost:8080`
pub beacon_node: String,
/// Should be similar to `["http://localhost:8080"]`
pub beacon_nodes: Vec<String>,
/// If true, the validator client will still poll for duties and produce blocks even if the
/// beacon node is not synced at startup.
pub allow_unsynced_beacon_node: bool,
@ -55,7 +55,7 @@ impl Default for Config {
Self {
validator_dir,
secrets_dir,
beacon_node: DEFAULT_BEACON_NODE.to_string(),
beacon_nodes: vec![DEFAULT_BEACON_NODE.to_string()],
allow_unsynced_beacon_node: false,
disable_auto_discover: false,
init_slashing_protection: false,
@ -106,18 +106,26 @@ impl Config {
.map_err(|e| format!("Failed to create {:?}: {:?}", config.validator_dir, e))?;
}
if let Some(beacon_node) = parse_optional(cli_args, "beacon-node")? {
config.beacon_node = beacon_node;
if let Some(beacon_nodes) = parse_optional::<String>(cli_args, "beacon-nodes")? {
config.beacon_nodes = beacon_nodes.as_str().split(',').map(String::from).collect()
}
// To be deprecated.
if let Some(server) = parse_optional(cli_args, "server")? {
else if let Some(beacon_node) = parse_optional(cli_args, "beacon-node")? {
warn!(
log,
"The --beacon-node flag is deprecated";
"msg" => "please use --beacon-nodes instead"
);
config.beacon_nodes = vec![beacon_node];
}
// To be deprecated.
else if let Some(server) = parse_optional(cli_args, "server")? {
warn!(
log,
"The --server flag is deprecated";
"msg" => "please use --beacon-node instead"
"msg" => "please use --beacon-nodes instead"
);
config.beacon_node = server;
config.beacon_nodes = vec![server];
}
if cli_args.is_present("delete-lockfiles") {

View File

@ -1,9 +1,9 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{
block_service::BlockServiceNotification, http_metrics::metrics, is_synced::is_synced,
validator_duty::ValidatorDuty, validator_store::ValidatorStore,
block_service::BlockServiceNotification, http_metrics::metrics, validator_duty::ValidatorDuty,
validator_store::ValidatorStore,
};
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, StreamExt};
use parking_lot::RwLock;
@ -324,12 +324,12 @@ impl DutiesStore {
}
}
pub struct DutiesServiceBuilder<T, E: EthSpec> {
pub struct DutiesServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<T>,
beacon_node: Option<BeaconNodeHttpClient>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
allow_unsynced_beacon_node: bool,
context: Option<RuntimeContext<E>>,
}
impl<T: SlotClock + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
@ -337,9 +337,9 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
Self {
validator_store: None,
slot_clock: None,
beacon_node: None,
context: None,
beacon_nodes: None,
allow_unsynced_beacon_node: false,
context: None,
}
}
@ -353,8 +353,13 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn allow_unsynced_beacon_node(mut self, allow_unsynced_beacon_node: bool) -> Self {
self.allow_unsynced_beacon_node = allow_unsynced_beacon_node;
self
}
@ -363,12 +368,6 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
self
}
/// Set to `true` to allow polling for duties when the beacon node is not synced.
pub fn allow_unsynced_beacon_node(mut self, allow_unsynced_beacon_node: bool) -> Self {
self.allow_unsynced_beacon_node = allow_unsynced_beacon_node;
self
}
pub fn build(self) -> Result<DutiesService<T, E>, String> {
Ok(DutiesService {
inner: Arc::new(Inner {
@ -379,13 +378,13 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
slot_clock: self
.slot_clock
.ok_or("Cannot build DutiesService without slot_clock")?,
beacon_node: self
.beacon_node
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build DutiesService without beacon_node")?,
allow_unsynced_beacon_node: self.allow_unsynced_beacon_node,
context: self
.context
.ok_or("Cannot build DutiesService without runtime_context")?,
allow_unsynced_beacon_node: self.allow_unsynced_beacon_node,
}),
})
}
@ -396,11 +395,9 @@ pub struct Inner<T, E: EthSpec> {
store: Arc<DutiesStore>,
validator_store: ValidatorStore<T, E>,
pub(crate) slot_clock: T,
pub(crate) beacon_node: BeaconNodeHttpClient,
context: RuntimeContext<E>,
/// If true, the duties service will poll for duties from the beacon node even if it is not
/// synced.
pub(crate) beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
allow_unsynced_beacon_node: bool,
context: RuntimeContext<E>,
}
/// Maintains a store of the duties for all voting validators in the `validator_store`.
@ -513,12 +510,6 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
let _timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::FULL_UPDATE]);
if !is_synced(&self.beacon_node, &self.slot_clock, None).await
&& !self.allow_unsynced_beacon_node
{
return;
}
let slot = if let Some(slot) = self.slot_clock.now() {
slot
} else {
@ -594,6 +585,12 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
) -> Result<(), String> {
let log = self.context.log();
let maybe_require_synced = if self.allow_unsynced_beacon_node {
RequireSynced::No
} else {
RequireSynced::Yes
};
let mut new_validator = 0;
let mut new_epoch = 0;
let mut new_proposal_slots = 0;
@ -614,21 +611,27 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
.collect();
let mut validator_subscriptions = vec![];
let remote_duties: Vec<ValidatorDuty> = match ValidatorDuty::download(
&self.beacon_node,
let pubkeys_ref = &pubkeys;
let remote_duties: Vec<ValidatorDuty> = match self
.beacon_nodes
.first_success(maybe_require_synced, |beacon_node| async move {
ValidatorDuty::download(
&beacon_node,
current_epoch,
request_epoch,
pubkeys,
pubkeys_ref.clone(),
&log,
)
.await
})
.await
{
Ok(duties) => duties,
Err(e) => {
error!(
log,
"Failed to download validator duties";
"error" => e
"error" => %e
);
vec![]
}
@ -720,10 +723,16 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
if count == 0 {
debug!(log, "No new subscriptions required");
} else {
self.beacon_node
.post_validator_beacon_committee_subscriptions(&validator_subscriptions)
let validator_subscriptions_ref = &validator_subscriptions;
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_validator_beacon_committee_subscriptions(validator_subscriptions_ref)
.await
.map_err(|e| format!("Failed to subscribe validators: {:?}", e))?;
})
.await
.map_err(|e| format!("Failed to subscribe validators: {}", e))?;
debug!(
log,
"Successfully subscribed validators";

View File

@ -1,6 +1,7 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::http_metrics::metrics;
use environment::RuntimeContext;
use eth2::{types::StateId, BeaconNodeHttpClient};
use eth2::types::StateId;
use futures::future::FutureExt;
use futures::StreamExt;
use parking_lot::RwLock;
@ -16,19 +17,19 @@ use types::{EthSpec, Fork};
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80);
/// Builds a `ForkService`.
pub struct ForkServiceBuilder<T> {
pub struct ForkServiceBuilder<T, E: EthSpec> {
fork: Option<Fork>,
slot_clock: Option<T>,
beacon_node: Option<BeaconNodeHttpClient>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
log: Option<Logger>,
}
impl<T: SlotClock + 'static> ForkServiceBuilder<T> {
impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
pub fn new() -> Self {
Self {
fork: None,
slot_clock: None,
beacon_node: None,
beacon_nodes: None,
log: None,
}
}
@ -38,8 +39,8 @@ impl<T: SlotClock + 'static> ForkServiceBuilder<T> {
self
}
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
@ -48,15 +49,15 @@ impl<T: SlotClock + 'static> ForkServiceBuilder<T> {
self
}
pub fn build(self) -> Result<ForkService<T>, String> {
pub fn build(self) -> Result<ForkService<T, E>, String> {
Ok(ForkService {
inner: Arc::new(Inner {
fork: RwLock::new(self.fork),
slot_clock: self
.slot_clock
.ok_or("Cannot build ForkService without slot_clock")?,
beacon_node: self
.beacon_node
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build ForkService without beacon_node")?,
log: self
.log
@ -69,8 +70,21 @@ impl<T: SlotClock + 'static> ForkServiceBuilder<T> {
#[cfg(test)]
#[allow(dead_code)]
impl ForkServiceBuilder<slot_clock::TestingSlotClock> {
pub fn testing_only(log: Logger) -> Self {
impl<E: EthSpec> ForkServiceBuilder<slot_clock::TestingSlotClock, E> {
pub fn testing_only(spec: types::ChainSpec, log: Logger) -> Self {
use crate::beacon_node_fallback::CandidateBeaconNode;
let slot_clock = slot_clock::TestingSlotClock::new(
types::Slot::new(0),
std::time::Duration::from_secs(42),
std::time::Duration::from_secs(42),
);
let candidates = vec![CandidateBeaconNode::new(eth2::BeaconNodeHttpClient::new(
eth2::Url::parse("http://127.0.0.1").unwrap(),
))];
let mut beacon_nodes = BeaconNodeFallback::new(candidates, spec, log.clone());
beacon_nodes.set_slot_clock(slot_clock.clone());
Self {
fork: Some(types::Fork::default()),
slot_clock: Some(slot_clock::TestingSlotClock::new(
@ -78,28 +92,26 @@ impl ForkServiceBuilder<slot_clock::TestingSlotClock> {
std::time::Duration::from_secs(42),
std::time::Duration::from_secs(42),
)),
beacon_node: Some(eth2::BeaconNodeHttpClient::new(
eth2::Url::parse("http://127.0.0.1").unwrap(),
)),
beacon_nodes: Some(Arc::new(beacon_nodes)),
log: Some(log),
}
}
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T> {
pub struct Inner<T, E: EthSpec> {
fork: RwLock<Option<Fork>>,
beacon_node: BeaconNodeHttpClient,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
log: Logger,
slot_clock: T,
}
/// Attempts to download the `Fork` struct from the beacon node at the start of each epoch.
pub struct ForkService<T> {
inner: Arc<Inner<T>>,
pub struct ForkService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
}
impl<T> Clone for ForkService<T> {
impl<T, E: EthSpec> Clone for ForkService<T, E> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@ -107,25 +119,22 @@ impl<T> Clone for ForkService<T> {
}
}
impl<T> Deref for ForkService<T> {
type Target = Inner<T>;
impl<T, E: EthSpec> Deref for ForkService<T, E> {
type Target = Inner<T, E>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static> ForkService<T> {
impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
/// Returns the last fork downloaded from the beacon node, if any.
pub fn fork(&self) -> Option<Fork> {
*self.fork.read()
}
/// Starts the service that periodically polls for the `Fork`.
pub fn start_update_service<E: EthSpec>(
self,
context: &RuntimeContext<E>,
) -> Result<(), String> {
pub fn start_update_service(self, context: &RuntimeContext<E>) -> Result<(), String> {
let spec = &context.eth2_config.spec;
let duration_to_next_epoch = self
@ -165,26 +174,32 @@ impl<T: SlotClock + 'static> ForkService<T> {
let _timer =
metrics::start_timer_vec(&metrics::FORK_SERVICE_TIMES, &[metrics::FULL_UPDATE]);
let log = &self.log;
let fork = self
.inner
.beacon_node
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.get_beacon_states_fork(StateId::Head)
.await
.map_err(|e| {
trace!(
self.log,
log,
"Fork update failed";
"error" => format!("Error retrieving fork: {:?}", e)
)
})?
.ok_or_else(|| {
trace!(
self.log,
log,
"Fork update failed";
"error" => "The beacon head fork is unknown"
)
})?
.data;
})
.map(|result| result.data)
})
.await
.map_err(|_| ())?;
if self.fork.read().as_ref() != Some(&fork) {
*(self.fork.write()) = Some(fork);

View File

@ -73,7 +73,9 @@ impl ApiTester {
config.validator_dir = validator_dir.path().into();
config.secrets_dir = secrets_dir.path().into();
let fork_service = ForkServiceBuilder::testing_only(log.clone())
let spec = E::default_spec();
let fork_service = ForkServiceBuilder::testing_only(spec.clone(), log.clone())
.build()
.unwrap();
@ -84,7 +86,7 @@ impl ApiTester {
initialized_validators,
slashing_protection,
Hash256::repeat_byte(42),
E::default_spec(),
spec,
fork_service.clone(),
log.clone(),
);

View File

@ -80,6 +80,19 @@ lazy_static::lazy_static! {
"Number of attesters on this host",
&["task"]
);
/*
* Endpoint metrics
*/
pub static ref ENDPOINT_ERRORS: Result<IntCounterVec> = try_create_int_counter_vec(
"bn_endpoint_errors",
"The number of beacon node request errors for each endpoint",
&["endpoint"]
);
pub static ref ENDPOINT_REQUESTS: Result<IntCounterVec> = try_create_int_counter_vec(
"bn_endpoint_requests",
"The number of beacon node requests for each endpoint",
&["endpoint"]
);
}
pub fn gather_prometheus_metrics<T: EthSpec>(

View File

@ -1,12 +1,13 @@
mod attestation_service;
mod beacon_node_fallback;
mod block_service;
mod check_synced;
mod cli;
mod config;
mod duties_service;
mod fork_service;
mod http_metrics;
mod initialized_validators;
mod is_synced;
mod key_cache;
mod notifier;
mod validator_duty;
@ -17,6 +18,9 @@ pub mod http_api;
pub use cli::cli_app;
pub use config::Config;
use crate::beacon_node_fallback::{
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced,
};
use account_utils::validator_definitions::ValidatorDefinitions;
use attestation_service::{AttestationService, AttestationServiceBuilder};
use block_service::{BlockService, BlockServiceBuilder};
@ -55,7 +59,7 @@ const HTTP_TIMEOUT: Duration = Duration::from_secs(12);
pub struct ProductionValidatorClient<T: EthSpec> {
context: RuntimeContext<T>,
duties_service: DutiesService<SystemTimeSlotClock, T>,
fork_service: ForkService<SystemTimeSlotClock>,
fork_service: ForkService<SystemTimeSlotClock, T>,
block_service: BlockService<SystemTimeSlotClock, T>,
attestation_service: AttestationService<SystemTimeSlotClock, T>,
validator_store: ValidatorStore<SystemTimeSlotClock, T>,
@ -84,7 +88,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
info!(
log,
"Starting validator client";
"beacon_node" => &config.beacon_node,
"beacon_nodes" => format!("{:?}", &config.beacon_nodes),
"validator_dir" => format!("{:?}", config.validator_dir),
);
@ -202,20 +206,36 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
})?;
}
let beacon_node_url: Url = config
.beacon_node
.parse()
let beacon_node_urls: Vec<Url> = config
.beacon_nodes
.iter()
.map(|s| s.parse())
.collect::<Result<_, _>>()
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?;
let beacon_nodes: Vec<BeaconNodeHttpClient> = beacon_node_urls
.into_iter()
.map(|url| {
let beacon_node_http_client = ClientBuilder::new()
.timeout(HTTP_TIMEOUT)
.build()
.map_err(|e| format!("Unable to build HTTP client: {:?}", e))?;
let beacon_node =
BeaconNodeHttpClient::from_components(beacon_node_url, beacon_node_http_client);
Ok(BeaconNodeHttpClient::from_components(
url,
beacon_node_http_client,
))
})
.collect::<Result<Vec<BeaconNodeHttpClient>, String>>()?;
let candidates = beacon_nodes
.into_iter()
.map(CandidateBeaconNode::new)
.collect();
let mut beacon_nodes: BeaconNodeFallback<_, T> =
BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone());
// Perform some potentially long-running initialization tasks.
let (genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_node, &context) => tuple?,
tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};
@ -230,9 +250,13 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot),
);
beacon_nodes.set_slot_clock(slot_clock.clone());
let beacon_nodes = Arc::new(beacon_nodes);
start_fallback_updater_service(context.clone(), beacon_nodes.clone())?;
let fork_service = ForkServiceBuilder::new()
.slot_clock(slot_clock.clone())
.beacon_node(beacon_node.clone())
.beacon_nodes(beacon_nodes.clone())
.log(log.clone())
.build()?;
@ -254,9 +278,9 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let duties_service = DutiesServiceBuilder::new()
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_node(beacon_node.clone())
.runtime_context(context.service_context("duties".into()))
.beacon_nodes(beacon_nodes.clone())
.allow_unsynced_beacon_node(config.allow_unsynced_beacon_node)
.runtime_context(context.service_context("duties".into()))
.build()?;
// Update the metrics server.
@ -268,7 +292,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let block_service = BlockServiceBuilder::new()
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_node(beacon_node.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("block".into()))
.graffiti(config.graffiti)
.build()?;
@ -277,7 +301,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.duties_service(duties_service.clone())
.slot_clock(slot_clock)
.validator_store(validator_store.clone())
.beacon_node(beacon_node.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("attestation".into()))
.build()?;
@ -285,7 +309,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
//
// It seems most sensible to move this into the `start_service` function, but I'm caution
// of making too many changes this close to genesis (<1 week).
wait_for_genesis(&beacon_node, genesis_time, &context).await?;
wait_for_genesis(&beacon_nodes, genesis_time, &context).await?;
Ok(Self {
context,
@ -368,40 +392,50 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}
async fn init_from_beacon_node<E: EthSpec>(
beacon_node: &BeaconNodeHttpClient,
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
context: &RuntimeContext<E>,
) -> Result<(u64, Hash256), String> {
// Wait for the beacon node to come online.
wait_for_connectivity(beacon_node, context.log()).await?;
let yaml_config = beacon_node
.get_config_spec()
.await
.map_err(|e| format!("Unable to read spec from beacon node: {:?}", e))?
.data;
let beacon_node_spec = yaml_config
.apply_to_chain_spec::<E>(&E::default_spec())
.ok_or_else(|| {
"The minimal/mainnet spec type of the beacon node does not match the validator client. \
See the --network command."
.to_string()
})?;
if context.eth2_config.spec != beacon_node_spec {
return Err(
"The beacon node is using a different Eth2 specification to this validator client. \
See the --network command."
.to_string(),
loop {
beacon_nodes.update_unready_candidates().await;
let num_available = beacon_nodes.num_available().await;
let num_total = beacon_nodes.num_total().await;
if num_available > 0 {
info!(
context.log(),
"Initialized beacon node connections";
"total" => num_total,
"available" => num_available,
);
break;
} else {
warn!(
context.log(),
"Unable to connect to a beacon node";
"retry in" => format!("{} seconds", RETRY_DELAY.as_secs()),
"total" => num_total,
"available" => num_available,
);
sleep(RETRY_DELAY).await;
}
}
let genesis = loop {
match beacon_node.get_beacon_genesis().await {
match beacon_nodes
.first_success(RequireSynced::No, |node| async move {
node.get_beacon_genesis().await
})
.await
{
Ok(genesis) => break genesis.data,
Err(e) => {
// A 404 error on the genesis endpoint indicates that genesis has not yet occurred.
if e.status() == Some(StatusCode::NOT_FOUND) {
Err(errors) => {
// Search for a 404 error which indicates that genesis has not yet
// occurred.
if errors
.0
.iter()
.filter_map(|(_, e)| e.request_failure())
.any(|e| e.status() == Some(StatusCode::NOT_FOUND))
{
info!(
context.log(),
"Waiting for genesis";
@ -409,8 +443,8 @@ async fn init_from_beacon_node<E: EthSpec>(
} else {
error!(
context.log(),
"Error polling beacon node";
"error" => format!("{:?}", e)
"Errors polling beacon node";
"error" => %errors
);
}
}
@ -423,7 +457,7 @@ async fn init_from_beacon_node<E: EthSpec>(
}
async fn wait_for_genesis<E: EthSpec>(
beacon_node: &BeaconNodeHttpClient,
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
genesis_time: u64,
context: &RuntimeContext<E>,
) -> Result<(), String> {
@ -447,7 +481,7 @@ async fn wait_for_genesis<E: EthSpec>(
// Start polling the node for pre-genesis information, cancelling the polling as soon as the
// timer runs out.
tokio::select! {
result = poll_whilst_waiting_for_genesis(beacon_node, genesis_time, context.log()) => result?,
result = poll_whilst_waiting_for_genesis(beacon_nodes, genesis_time, context.log()) => result?,
() = sleep(genesis_time - now) => ()
};
@ -469,50 +503,18 @@ async fn wait_for_genesis<E: EthSpec>(
/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn wait_for_connectivity(
beacon_node: &BeaconNodeHttpClient,
log: &Logger,
) -> Result<(), String> {
// Try to get the version string from the node, looping until success is returned.
loop {
let log = log.clone();
let result = beacon_node
.get_node_version()
.await
.map_err(|e| format!("{:?}", e))
.map(|body| body.data.version);
match result {
Ok(version) => {
info!(
log,
"Connected to beacon node";
"version" => version,
);
return Ok(());
}
Err(e) => {
error!(
log,
"Unable to connect to beacon node";
"error" => format!("{:?}", e),
);
sleep(RETRY_DELAY).await;
}
}
}
}
/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn poll_whilst_waiting_for_genesis(
beacon_node: &BeaconNodeHttpClient,
async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
genesis_time: Duration,
log: &Logger,
) -> Result<(), String> {
loop {
match beacon_node.get_lighthouse_staking().await {
match beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node.get_lighthouse_staking().await
})
.await
{
Ok(is_staking) => {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
@ -542,7 +544,7 @@ async fn poll_whilst_waiting_for_genesis(
error!(
log,
"Error polling beacon node";
"error" => format!("{:?}", e)
"error" => %e
);
}
}

View File

@ -1,4 +1,4 @@
use crate::{is_synced::is_synced, ProductionValidatorClient};
use crate::ProductionValidatorClient;
use futures::StreamExt;
use slog::{error, info};
use slot_clock::SlotClock;
@ -10,7 +10,6 @@ pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Resu
let context = client.context.service_context("notifier".into());
let executor = context.executor.clone();
let duties_service = client.duties_service.clone();
let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node;
let slot_duration = Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot);
let duration_to_next_slot = duties_service
@ -26,15 +25,25 @@ pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Resu
let log = context.log();
while interval.next().await.is_some() {
if !is_synced(
&duties_service.beacon_node,
&duties_service.slot_clock,
Some(&log),
let num_available = duties_service.beacon_nodes.num_available().await;
let num_synced = duties_service.beacon_nodes.num_synced().await;
let num_total = duties_service.beacon_nodes.num_total().await;
if num_synced > 0 {
info!(
log,
"Connected to beacon node(s)";
"total" => num_total,
"available" => num_available,
"synced" => num_synced,
)
} else {
error!(
log,
"No synced beacon nodes";
"total" => num_total,
"available" => num_available,
"synced" => num_synced,
)
.await
&& !allow_unsynced_beacon_node
{
continue;
}
if let Some(slot) = duties_service.slot_clock.now() {

View File

@ -6,7 +6,6 @@ use parking_lot::RwLock;
use slashing_protection::{NotSafe, Safe, SlashingDatabase};
use slog::{crit, error, warn, Logger};
use slot_clock::SlotClock;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use tempdir::TempDir;
@ -49,8 +48,7 @@ pub struct ValidatorStore<T, E: EthSpec> {
spec: Arc<ChainSpec>,
log: Logger,
temp_dir: Option<Arc<TempDir>>,
fork_service: ForkService<T>,
_phantom: PhantomData<E>,
fork_service: ForkService<T, E>,
}
impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
@ -59,7 +57,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
slashing_protection: SlashingDatabase,
genesis_validators_root: Hash256,
spec: ChainSpec,
fork_service: ForkService<T>,
fork_service: ForkService<T, E>,
log: Logger,
) -> Self {
Self {
@ -70,7 +68,6 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
log,
temp_dir: None,
fork_service,
_phantom: PhantomData,
}
}