Faster attestation production (#838)

* Start adding interop genesis state to lcli

* Use more efficient method to generate genesis state

* Remove duplicate int_to_bytes32

* Add lcli command to change state genesis time

* Add option to allow VC to start with unsynced BN

* Set VC to do parallel key loading

* Don't default to dummy eth1 backend

* Add endpoint to dump operation pool

* Add metrics for op pool

* Remove state clone for slot notifier

* Add mem size approximation for tree hash cache

* Avoid cloning tree hash when getting head

* Avoid cloning tree hash when getting head

* Add working arena-based cached tree hash

* Add another benchmark

* Add pre-allocation for caches

* Make cache nullable

* Fix bugs in cache tree hash

* Add validator tree hash optimization

* Optimize hash_concat

* Make hash32_concat return fixed-len array

* Fix failing API tests

* Add new beacon state cache struct

* Add validator-specific cache

* Separate list and values arenas

* Add parallel validator registry hashing

* Remove MultiTreeHashCache

* Remove cached tree hash macro

* Fix failing tree hash test

* Address Michael's comments

* Add CachedTreeHash impl for ef tests

* Fix messy merge conflict

* Optimize attestation production

* Add first basic optimizations

* Fix SlotOutOfBounds error

* Resolved missed merge conflicts

* Fix another missed merge conflict

* Fix more merge conflict issues

* Add `StateSkipConfig`

* Fix test compile errors

* Add failing test

* Fix bug, make tests pass

* Add comment

* Delete unused function

* Replace deleted comment
This commit is contained in:
Paul Hauner 2020-03-04 17:10:22 +11:00 committed by GitHub
parent 1f16d8fe4d
commit 12999fb06c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 350 additions and 196 deletions

View File

@ -21,6 +21,7 @@ use state_processing::per_block_processing::{
use state_processing::{
per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy,
};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::fs;
use std::io::prelude::*;
@ -104,10 +105,23 @@ pub enum AttestationProcessingOutcome {
Invalid(AttestationValidationError),
}
/// Defines how a `BeaconState` should be "skipped" through skip-slots.
pub enum StateSkipConfig {
/// Calculate the state root during each skip slot, producing a fully-valid `BeaconState`.
WithStateRoots,
/// Don't calculate the state root at each slot, instead just use the zero hash. This is orders
/// of magnitude faster, however it produces a partially invalid state.
///
/// This state is useful for operations that don't use the state roots; e.g., for calculating
/// the shuffling.
WithoutStateRoots,
}
pub struct HeadInfo {
pub slot: Slot,
pub block_root: Hash256,
pub state_root: Hash256,
pub current_justified_checkpoint: types::Checkpoint,
pub finalized_checkpoint: types::Checkpoint,
pub fork: Fork,
}
@ -409,6 +423,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: head.beacon_block.slot(),
block_root: head.beacon_block_root,
state_root: head.beacon_state_root,
current_justified_checkpoint: head.beacon_state.current_justified_checkpoint.clone(),
finalized_checkpoint: head.beacon_state.finalized_checkpoint.clone(),
fork: head.beacon_state.fork.clone(),
})
@ -425,7 +440,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns `None` when the state is not found in the database or there is an error skipping
/// to a future state.
pub fn state_at_slot(&self, slot: Slot) -> Result<BeaconState<T::EthSpec>, Error> {
pub fn state_at_slot(
&self,
slot: Slot,
config: StateSkipConfig,
) -> Result<BeaconState<T::EthSpec>, Error> {
let head_state = self.head()?.beacon_state;
match slot.cmp(&head_state.slot) {
@ -446,6 +465,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let head_state_slot = head_state.slot;
let mut state = head_state;
let skip_state_root = match config {
StateSkipConfig::WithStateRoots => None,
StateSkipConfig::WithoutStateRoots => Some(Hash256::zero()),
};
while state.slot < slot {
// Do not allow and forward state skip that takes longer than the maximum task duration.
//
@ -461,7 +486,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Note: supplying some `state_root` when it is known would be a cheap and easy
// optimization.
match per_slot_processing(&mut state, None, &self.spec) {
match per_slot_processing(&mut state, skip_state_root, &self.spec) {
Ok(()) => (),
Err(e) => {
warn!(
@ -501,7 +526,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns `None` when there is an error skipping to a future state or the slot clock cannot
/// be read.
pub fn wall_clock_state(&self) -> Result<BeaconState<T::EthSpec>, Error> {
self.state_at_slot(self.slot()?)
self.state_at_slot(self.slot()?, StateSkipConfig::WithStateRoots)
}
/// Returns the slot of the highest block in the canonical chain.
@ -545,7 +570,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut state = if epoch(slot) == epoch(head_state.slot) {
self.head()?.beacon_state
} else {
self.state_at_slot(slot)?
// The block proposer shuffling is not affected by the state roots, so we don't need to
// calculate them.
self.state_at_slot(slot, StateSkipConfig::WithoutStateRoots)?
};
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
@ -563,43 +590,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(Into::into)
}
/// Returns the attestation slot and committee index for a given validator index.
///
/// Information is read from the current state, so only information from the present and prior
/// epoch is available.
pub fn validator_attestation_slot_and_index(
&self,
validator_index: usize,
epoch: Epoch,
) -> Result<Option<(Slot, u64)>, Error> {
let as_epoch = |slot: Slot| slot.epoch(T::EthSpec::slots_per_epoch());
let head_state = &self.head()?.beacon_state;
let mut state = if epoch == as_epoch(head_state.slot) {
self.head()?.beacon_state
} else {
self.state_at_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))?
};
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
if as_epoch(state.slot) != epoch {
return Err(Error::InvariantViolated(format!(
"Epochs in consistent in attestation duties lookup: state: {}, requested: {}",
as_epoch(state.slot),
epoch
)));
}
if let Some(attestation_duty) =
state.get_attestation_duties(validator_index, RelativeEpoch::Current)?
{
Ok(Some((attestation_duty.slot, attestation_duty.index)))
} else {
Ok(None)
}
}
/// Produce an `Attestation` that is valid for the given `slot` and `index`.
///
/// Always attests to the canonical chain.
@ -608,109 +598,99 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
index: CommitteeIndex,
) -> Result<Attestation<T::EthSpec>, Error> {
let state = self.state_at_slot(slot)?;
let head = self.head()?;
// Note: we're taking a lock on the head. The work involved here should be trivial enough
// that the lock should not be held for long.
let head = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?;
let data = self.produce_attestation_data_for_block(
index,
head.beacon_block_root,
head.beacon_block.slot(),
&state,
)?;
if slot >= head.beacon_block.slot() {
self.produce_attestation_for_block(
slot,
index,
head.beacon_block_root,
Cow::Borrowed(&head.beacon_state),
)
} else {
// Note: this method will fail if `slot` is more than `state.block_roots.len()` slots
// prior to the head.
//
// This seems reasonable, producing an attestation at a slot so far
// in the past seems useless, definitely in mainnet spec. In minimal spec, when the
// block roots only contain two epochs of history, it's possible that you will fail to
// produce an attestation that would be valid to be included in a block. Given that
// minimal is only for testing, I think this is fine.
//
// It is important to note that what's _not_ allowed here is attesting to a slot in the
// past. You can still attest to a block an arbitrary distance in the past, just not as
// if you are in a slot in the past.
let beacon_block_root = *head.beacon_state.get_block_root(slot)?;
let state_root = *head.beacon_state.get_state_root(slot)?;
let committee_len = state.get_beacon_committee(slot, index)?.committee.len();
// Avoid holding a lock on the head whilst doing database reads. Good boi functions
// don't hog locks.
drop(head);
Ok(Attestation {
aggregation_bits: BitList::with_capacity(committee_len)?,
data,
signature: AggregateSignature::new(),
})
}
let mut state = self
.get_state_caching_only_with_committee_caches(&state_root, Some(slot))?
.ok_or_else(|| Error::MissingBeaconState(state_root))?;
/// Produce an `AttestationData` that is valid for the given `slot`, `index`.
///
/// Always attests to the canonical chain.
pub fn produce_attestation_data(
&self,
slot: Slot,
index: CommitteeIndex,
) -> Result<AttestationData, Error> {
let state = self.state_at_slot(slot)?;
let head = self.head()?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
self.produce_attestation_data_for_block(
index,
head.beacon_block_root,
head.beacon_block.slot(),
&state,
)
self.produce_attestation_for_block(slot, index, beacon_block_root, Cow::Owned(state))
}
}
/// Produce an `AttestationData` that attests to the chain denoted by `block_root` and `state`.
///
/// Permits attesting to any arbitrary chain. Generally, the `produce_attestation_data`
/// function should be used as it attests to the canonical chain.
pub fn produce_attestation_data_for_block(
pub fn produce_attestation_for_block(
&self,
slot: Slot,
index: CommitteeIndex,
head_block_root: Hash256,
head_block_slot: Slot,
state: &BeaconState<T::EthSpec>,
) -> Result<AttestationData, Error> {
// Collect some metrics.
metrics::inc_counter(&metrics::ATTESTATION_PRODUCTION_REQUESTS);
let timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_TIMES);
beacon_block_root: Hash256,
mut state: Cow<BeaconState<T::EthSpec>>,
) -> Result<Attestation<T::EthSpec>, Error> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let current_epoch_start_slot = state.current_epoch().start_slot(slots_per_epoch);
// The `target_root` is the root of the first block of the current epoch.
//
// The `state` does not know the root of the block for it's current slot (it only knows
// about blocks from prior slots). This creates an edge-case when the state is on the first
// slot of the epoch -- we're unable to obtain the `target_root` because it is not a prior
// root.
//
// This edge case is handled in two ways:
//
// - If the head block is on the same slot as the state, we use it's root.
// - Otherwise, assume the current slot has been skipped and use the block root from the
// prior slot.
//
// For all other cases, we simply read the `target_root` from `state.latest_block_roots`.
let target_root = if state.slot == current_epoch_start_slot {
if head_block_slot == current_epoch_start_slot {
head_block_root
} else {
*state.get_block_root(current_epoch_start_slot - 1)?
if state.slot > slot {
return Err(Error::CannotAttestToFutureState);
} else if state.current_epoch() + 1 < epoch {
let mut_state = state.to_mut();
while mut_state.current_epoch() + 1 < epoch {
// Note: here we provide `Hash256::zero()` as the root of the current state. This
// has the effect of setting the values of all historic state roots to the zero
// hash. This is an optimization, we don't need the state roots so why calculate
// them?
per_slot_processing(mut_state, Some(Hash256::zero()), &self.spec)?;
}
mut_state.build_committee_cache(RelativeEpoch::Next, &self.spec)?;
}
let committee_len = state.get_beacon_committee(slot, index)?.committee.len();
let target_slot = epoch.start_slot(T::EthSpec::slots_per_epoch());
let target_root = if state.slot <= target_slot {
beacon_block_root
} else {
*state.get_block_root(current_epoch_start_slot)?
*state.get_block_root(target_slot)?
};
let target = Checkpoint {
epoch: state.current_epoch(),
root: target_root,
};
// Collect some metrics.
metrics::inc_counter(&metrics::ATTESTATION_PRODUCTION_SUCCESSES);
metrics::stop_timer(timer);
trace!(
self.log,
"Produced beacon attestation data";
"beacon_block_root" => format!("{}", head_block_root),
"slot" => state.slot,
"index" => index
);
Ok(AttestationData {
slot: state.slot,
index,
beacon_block_root: head_block_root,
source: state.current_justified_checkpoint.clone(),
target,
Ok(Attestation {
aggregation_bits: BitList::with_capacity(committee_len)?,
data: AttestationData {
slot,
index,
beacon_block_root: beacon_block_root,
source: state.current_justified_checkpoint.clone(),
target: Checkpoint {
epoch,
root: target_root,
},
},
signature: AggregateSignature::new(),
})
}
@ -815,16 +795,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or_else(|| Error::MissingBeaconState(attestation_block_root))?
} else {
let mut state = self
.store
.get_state(&attestation_block_root, Some(attestation_head_block.slot()))?
.ok_or_else(|| Error::MissingBeaconState(attestation_block_root))?;
.get_state_caching_only_with_committee_caches(
&attestation_head_block.state_root(),
Some(attestation_head_block.slot()),
)?
.ok_or_else(|| {
Error::MissingBeaconState(attestation_head_block.state_root())
})?;
// Fastforward the state to the epoch in which the attestation was made.
// NOTE: this looks like a potential DoS vector, we should probably limit
// the amount we're willing to fastforward without a valid signature.
for _ in state.slot.as_u64()..attestation_epoch.start_slot(slots_per_epoch).as_u64()
{
per_slot_processing(&mut state, None, &self.spec)?;
// Note: we provide the zero hash as the state root because the state root is
// irrelevant to attestation processing and therefore a waste of time to
// compute.
per_slot_processing(&mut state, Some(Hash256::zero()), &self.spec)?;
}
state
@ -1357,7 +1344,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
let state = self
.state_at_slot(slot - 1)
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
self.produce_block_on_state(state, slot, randao_reveal)

View File

@ -40,6 +40,7 @@ pub enum BeaconChainError {
NoStateForAttestation {
beacon_block_root: Hash256,
},
CannotAttestToFutureState,
AttestationValidationError(AttestationValidationError),
StateSkipTooLarge {
start_slot: Slot,

View File

@ -17,6 +17,7 @@ mod timeout_rw_lock;
pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome,
StateSkipConfig,
};
pub use self::checkpoint::CheckPoint;
pub use self::errors::{BeaconChainError, BlockProductionError};

View File

@ -3,13 +3,15 @@ use crate::{
eth1_chain::CachingEth1Backend,
events::NullEventHandler,
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome,
StateSkipConfig,
};
use eth1::Config as Eth1Config;
use genesis::interop_genesis_state;
use rayon::prelude::*;
use sloggers::{terminal::TerminalLoggerBuilder, types::Severity, Build};
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::TestingSlotClock;
use state_processing::per_slot_processing;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
use store::{
@ -17,8 +19,8 @@ use store::{
DiskStore, MemoryStore, Migrate, Store,
};
use types::{
AggregateSignature, Attestation, BeaconState, BitList, ChainSpec, Domain, EthSpec, Hash256,
Keypair, SecretKey, Signature, SignedBeaconBlock, SignedRoot, Slot,
AggregateSignature, Attestation, BeaconState, ChainSpec, Domain, EthSpec, Hash256, Keypair,
SecretKey, Signature, SignedBeaconBlock, SignedRoot, Slot,
};
pub use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
@ -81,10 +83,7 @@ impl<E: EthSpec> BeaconChainHarness<HarnessType<E>> {
pub fn new(eth_spec_instance: E, keypairs: Vec<Keypair>) -> Self {
let spec = E::default_spec();
let log = TerminalLoggerBuilder::new()
.level(Severity::Warning)
.build()
.expect("logger should build");
let log = NullLoggerBuilder.build().expect("logger should build");
let chain = BeaconChainBuilder::new(eth_spec_instance)
.logger(log.clone())
@ -123,10 +122,7 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
) -> Self {
let spec = E::default_spec();
let log = TerminalLoggerBuilder::new()
.level(Severity::Warning)
.build()
.expect("logger should build");
let log = NullLoggerBuilder.build().expect("logger should build");
let chain = BeaconChainBuilder::new(eth_spec_instance)
.logger(log.clone())
@ -163,10 +159,7 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
) -> Self {
let spec = E::default_spec();
let log = TerminalLoggerBuilder::new()
.level(Severity::Warning)
.build()
.expect("logger should build");
let log = NullLoggerBuilder.build().expect("logger should build");
let chain = BeaconChainBuilder::new(eth_spec_instance)
.logger(log.clone())
@ -231,7 +224,7 @@ where
};
self.chain
.state_at_slot(state_slot)
.state_at_slot(state_slot, StateSkipConfig::WithStateRoots)
.expect("should find state for slot")
};
@ -374,8 +367,6 @@ where
.expect("should get committees")
.iter()
.for_each(|bc| {
let committee_size = bc.committee.len();
let mut local_attestations: Vec<Attestation<E>> = bc
.committee
.par_iter()
@ -384,30 +375,29 @@ where
// Note: searching this array is worst-case `O(n)`. A hashset could be a better
// alternative.
if attesting_validators.contains(validator_index) {
let data = self
let mut attestation = self
.chain
.produce_attestation_data_for_block(
.produce_attestation_for_block(
head_block_slot,
bc.index,
head_block_root,
head_block_slot,
state,
Cow::Borrowed(state),
)
.expect("should produce attestation data");
.expect("should produce attestation");
let mut aggregation_bits = BitList::with_capacity(committee_size)
.expect("should make aggregation bits");
aggregation_bits
attestation
.aggregation_bits
.set(i, true)
.expect("should be able to set aggregation bits");
let signature = {
attestation.signature = {
let domain = spec.get_domain(
data.target.epoch,
attestation.data.target.epoch,
Domain::BeaconAttester,
fork,
);
let message = data.signing_root(domain);
let message = attestation.data.signing_root(domain);
let mut agg_sig = AggregateSignature::new();
agg_sig.add(&Signature::new(
@ -418,12 +408,6 @@ where
agg_sig
};
let attestation = Attestation {
aggregation_bits,
data,
signature,
};
Some(attestation)
} else {
None

View File

@ -0,0 +1,127 @@
#![cfg(not(debug_assertions))]
#[macro_use]
extern crate lazy_static;
use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy},
StateSkipConfig,
};
use tree_hash::TreeHash;
use types::{AggregateSignature, EthSpec, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, Slot};
pub const VALIDATOR_COUNT: usize = 16;
lazy_static! {
/// A cached set of keys.
static ref KEYPAIRS: Vec<Keypair> = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
}
/// This test builds a chain that is just long enough to finalize an epoch then it produces an
/// attestation at each slot from genesis through to three epochs past the head.
///
/// It checks the produced attestation against some locally computed values.
#[test]
fn produces_attestations() {
let num_blocks_produced = MainnetEthSpec::slots_per_epoch() * 4;
let harness = BeaconChainHarness::new(MainnetEthSpec, KEYPAIRS[..].to_vec());
// Skip past the genesis slot.
harness.advance_slot();
harness.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
let chain = &harness.chain;
let state = &harness.chain.head().expect("should get head").beacon_state;
assert_eq!(state.slot, num_blocks_produced, "head should have updated");
assert!(
state.finalized_checkpoint.epoch > 0,
"head should have updated"
);
let current_slot = chain.slot().expect("should get slot");
// Test all valid committee indices for all slots in the chain.
for slot in 0..=current_slot.as_u64() + MainnetEthSpec::slots_per_epoch() * 3 {
let slot = Slot::from(slot);
let state = chain
.state_at_slot(slot, StateSkipConfig::WithStateRoots)
.expect("should get state");
let block_slot = if slot > current_slot {
current_slot
} else {
slot
};
let block = chain
.block_at_slot(block_slot)
.expect("should get block")
.expect("block should not be skipped");
let block_root = Hash256::from_slice(&block.message.tree_hash_root());
let epoch_boundary_slot = state
.current_epoch()
.start_slot(MainnetEthSpec::slots_per_epoch());
let target_root = if state.slot == epoch_boundary_slot {
block_root
} else {
*state
.get_block_root(epoch_boundary_slot)
.expect("should get target block root")
};
let committee_cache = state
.committee_cache(RelativeEpoch::Current)
.expect("should get committee_cache");
let committee_count = committee_cache.committees_per_slot();
for index in 0..committee_count {
let committee_len = committee_cache
.get_beacon_committee(slot, index)
.expect("should get committee for slot")
.committee
.len();
let attestation = chain
.produce_attestation(slot, index)
.expect("should produce attestation");
let data = &attestation.data;
assert_eq!(
attestation.aggregation_bits.len(),
committee_len,
"bad committee len"
);
assert!(
attestation.aggregation_bits.is_zero(),
"some committee bits are set"
);
assert_eq!(
attestation.signature,
AggregateSignature::new(),
"bad signature"
);
assert_eq!(data.index, index, "bad index");
assert_eq!(data.slot, slot, "bad slot");
assert_eq!(data.beacon_block_root, block_root, "bad block root");
assert_eq!(
data.source, state.current_justified_checkpoint,
"bad source"
);
assert_eq!(
data.source, state.current_justified_checkpoint,
"bad source"
);
assert_eq!(data.target.epoch, state.current_epoch(), "bad target epoch");
assert_eq!(data.target.root, target_root, "bad target root");
}
}
}

View File

@ -2,7 +2,7 @@ use crate::helpers::*;
use crate::response_builder::ResponseBuilder;
use crate::validator::get_state_for_epoch;
use crate::{ApiError, ApiResult, BoxFut, UrlQuery};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig};
use futures::{Future, Stream};
use hyper::{Body, Request};
use serde::{Deserialize, Serialize};
@ -381,7 +381,7 @@ pub fn get_committees<T: BeaconChainTypes>(
let epoch = query.epoch()?;
let mut state = get_state_for_epoch(&beacon_chain, epoch)?;
let mut state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?;
let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), epoch).map_err(|e| {
ApiError::ServerError(format!("Failed to get state suitable for epoch: {:?}", e))
@ -471,7 +471,7 @@ pub fn get_state_root<T: BeaconChainTypes>(
let slot_string = UrlQuery::from_request(&req)?.only_one("slot")?;
let slot = parse_slot(&slot_string)?;
let root = state_root_at_slot(&beacon_chain, slot)?;
let root = state_root_at_slot(&beacon_chain, slot, StateSkipConfig::WithStateRoots)?;
ResponseBuilder::new(&req)?.body(&root)
}

View File

@ -1,5 +1,5 @@
use crate::{ApiError, ApiResult};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig};
use bls::PublicKeyBytes;
use eth2_libp2p::GossipTopic;
use eth2_libp2p::PubsubMessage;
@ -142,7 +142,7 @@ pub fn state_at_slot<T: BeaconChainTypes>(
if head.beacon_state.slot == slot {
Ok((head.beacon_state_root, head.beacon_state))
} else {
let root = state_root_at_slot(beacon_chain, slot)?;
let root = state_root_at_slot(beacon_chain, slot, StateSkipConfig::WithStateRoots)?;
let state: BeaconState<T::EthSpec> = beacon_chain
.store
@ -161,6 +161,7 @@ pub fn state_at_slot<T: BeaconChainTypes>(
pub fn state_root_at_slot<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
slot: Slot,
config: StateSkipConfig,
) -> Result<Hash256, ApiError> {
let head_state = &beacon_chain.head()?.beacon_state;
let current_slot = beacon_chain
@ -206,11 +207,16 @@ pub fn state_root_at_slot<T: BeaconChainTypes>(
let mut state = beacon_chain.head()?.beacon_state;
let spec = &T::EthSpec::default_spec();
let skip_state_root = match config {
StateSkipConfig::WithStateRoots => None,
StateSkipConfig::WithoutStateRoots => Some(Hash256::zero()),
};
for _ in state.slot.as_u64()..slot.as_u64() {
// Ensure the next epoch state caches are built in case of an epoch transition.
state.build_committee_cache(RelativeEpoch::Next, spec)?;
state_processing::per_slot_processing(&mut state, None, spec)?;
state_processing::per_slot_processing(&mut state, skip_state_root, spec)?;
}
// Note: this is an expensive operation. Once the tree hash cache is implement it may be

View File

@ -21,6 +21,21 @@ lazy_static! {
"http_server_success_total",
"Total count of HTTP 200 responses sent"
);
pub static ref VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME: Result<Histogram> =
try_create_histogram(
"http_server_validator_block_get_request_duration_seconds",
"Time taken to respond to GET /validator/block"
);
pub static ref VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME: Result<Histogram> =
try_create_histogram(
"http_server_validator_attestation_get_request_duration_seconds",
"Time taken to respond to GET /validator/attestation"
);
pub static ref VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME: Result<Histogram> =
try_create_histogram(
"http_server_validator_duties_get_request_duration_seconds",
"Time taken to respond to GET /validator/duties"
);
}
/// Returns the full set of Prometheus metrics for the Beacon Node application.

View File

@ -10,6 +10,7 @@ use hyper::{Body, Error, Method, Request, Response};
use slog::debug;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
fn into_boxfut<F: IntoFuture + 'static>(item: F) -> BoxFut
where
@ -33,6 +34,7 @@ pub fn route<T: BeaconChainTypes>(
) -> impl Future<Item = Response<Body>, Error = Error> {
metrics::inc_counter(&metrics::REQUEST_COUNT);
let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME);
let received_instant = Instant::now();
let path = req.uri().path().to_string();
@ -113,7 +115,11 @@ pub fn route<T: BeaconChainTypes>(
// Methods for Validator
(&Method::POST, "/validator/duties") => {
validator::post_validator_duties::<T>(req, beacon_chain)
let timer =
metrics::start_timer(&metrics::VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME);
let response = validator::post_validator_duties::<T>(req, beacon_chain);
drop(timer);
into_boxfut(response)
}
(&Method::GET, "/validator/duties/all") => {
into_boxfut(validator::get_all_validator_duties::<T>(req, beacon_chain))
@ -122,13 +128,21 @@ pub fn route<T: BeaconChainTypes>(
validator::get_active_validator_duties::<T>(req, beacon_chain),
),
(&Method::GET, "/validator/block") => {
into_boxfut(validator::get_new_beacon_block::<T>(req, beacon_chain, log))
let timer =
metrics::start_timer(&metrics::VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME);
let response = validator::get_new_beacon_block::<T>(req, beacon_chain, log);
drop(timer);
into_boxfut(response)
}
(&Method::POST, "/validator/block") => {
validator::publish_beacon_block::<T>(req, beacon_chain, network_channel, log)
}
(&Method::GET, "/validator/attestation") => {
into_boxfut(validator::get_new_attestation::<T>(req, beacon_chain))
let timer =
metrics::start_timer(&metrics::VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME);
let response = validator::get_new_attestation::<T>(req, beacon_chain);
drop(timer);
into_boxfut(response)
}
(&Method::POST, "/validator/attestation") => {
validator::publish_attestation::<T>(req, beacon_chain, network_channel, log)
@ -176,21 +190,34 @@ pub fn route<T: BeaconChainTypes>(
// Map the Rust-friendly `Result` in to a http-friendly response. In effect, this ensures that
// any `Err` returned from our response handlers becomes a valid http response to the client
// (e.g., a response with a 404 or 500 status).
request_result.then(move |result| match result {
Ok(response) => {
debug!(local_log, "HTTP API request successful"; "path" => path);
metrics::inc_counter(&metrics::SUCCESS_COUNT);
metrics::stop_timer(timer);
request_result.then(move |result| {
let duration = Instant::now().duration_since(received_instant);
match result {
Ok(response) => {
debug!(
local_log,
"HTTP API request successful";
"path" => path,
"duration_ms" => duration.as_millis()
);
metrics::inc_counter(&metrics::SUCCESS_COUNT);
metrics::stop_timer(timer);
Ok(response)
}
Err(e) => {
let error_response = e.into();
Ok(response)
}
Err(e) => {
let error_response = e.into();
debug!(local_log, "HTTP API request failure"; "path" => path);
metrics::stop_timer(timer);
debug!(
local_log,
"HTTP API request failure";
"path" => path,
"duration_ms" => duration.as_millis()
);
metrics::stop_timer(timer);
Ok(error_response)
Ok(error_response)
}
}
})
}

View File

@ -5,6 +5,7 @@ use crate::response_builder::ResponseBuilder;
use crate::{ApiError, ApiResult, BoxFut, NetworkChannel, UrlQuery};
use beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome,
StateSkipConfig,
};
use bls::PublicKeyBytes;
use futures::{Future, Stream};
@ -82,7 +83,7 @@ pub fn get_all_validator_duties<T: BeaconChainTypes>(
let epoch = query.epoch()?;
let state = get_state_for_epoch(&beacon_chain, epoch)?;
let state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?;
let validator_pubkeys = state
.validators
@ -104,7 +105,7 @@ pub fn get_active_validator_duties<T: BeaconChainTypes>(
let epoch = query.epoch()?;
let state = get_state_for_epoch(&beacon_chain, epoch)?;
let state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?;
let validator_pubkeys = state
.validators
@ -122,6 +123,7 @@ pub fn get_active_validator_duties<T: BeaconChainTypes>(
pub fn get_state_for_epoch<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
epoch: Epoch,
config: StateSkipConfig,
) -> Result<BeaconState<T::EthSpec>, ApiError> {
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let head_epoch = beacon_chain.head()?.beacon_state.current_epoch();
@ -141,7 +143,7 @@ pub fn get_state_for_epoch<T: BeaconChainTypes>(
(epoch + 2).start_slot(slots_per_epoch) - 1
};
beacon_chain.state_at_slot(slot).map_err(|e| {
beacon_chain.state_at_slot(slot, config).map_err(|e| {
ApiError::ServerError(format!("Unable to load state for epoch {}: {:?}", epoch, e))
})
}
@ -153,7 +155,7 @@ fn return_validator_duties<T: BeaconChainTypes>(
epoch: Epoch,
validator_pubkeys: Vec<PublicKeyBytes>,
) -> Result<Vec<ValidatorDuty>, ApiError> {
let mut state = get_state_for_epoch(&beacon_chain, epoch)?;
let mut state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?;
let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), epoch)
.map_err(|_| ApiError::ServerError(String::from("Loaded state is in the wrong epoch")))?;

View File

@ -1,6 +1,6 @@
#![cfg(test)]
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig};
use node_test_rig::{
environment::{Environment, EnvironmentBuilder},
testing_client_config, ClientConfig, ClientGenesis, LocalBeaconNode,
@ -242,7 +242,10 @@ fn check_duties<T: BeaconChainTypes>(
);
let mut state = beacon_chain
.state_at_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state_at_slot(
epoch.start_slot(T::EthSpec::slots_per_epoch()),
StateSkipConfig::WithStateRoots,
)
.expect("should get state at slot");
state.build_all_caches(spec).expect("should build caches");
@ -469,7 +472,7 @@ fn beacon_state() {
.client
.beacon_chain()
.expect("client should have beacon chain")
.state_at_slot(Slot::new(0))
.state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots)
.expect("should find state");
db_state.drop_all_caches();

View File

@ -1,5 +1,6 @@
#![cfg(test)]
use beacon_chain::StateSkipConfig;
use node_test_rig::{
environment::{Environment, EnvironmentBuilder},
testing_client_config, LocalBeaconNode,
@ -42,7 +43,7 @@ fn http_server_genesis_state() {
.client
.beacon_chain()
.expect("client should have beacon chain")
.state_at_slot(Slot::new(0))
.state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots)
.expect("should find state");
db_state.drop_all_caches();

View File

@ -854,7 +854,7 @@ impl<T: EthSpec> BeaconState<T> {
/// Returns the cache for some `RelativeEpoch`. Returns an error if the cache has not been
/// initialized.
fn committee_cache(&self, relative_epoch: RelativeEpoch) -> Result<&CommitteeCache, Error> {
pub fn committee_cache(&self, relative_epoch: RelativeEpoch) -> Result<&CommitteeCache, Error> {
let cache = &self.committee_caches[Self::committee_cache_index(relative_epoch)];
if cache.is_initialized_at(relative_epoch.into_epoch(self.current_epoch())) {