Merge branch 'docker-env' of github.com:sigp/lighthouse into docker-env

This commit is contained in:
Paul Hauner 2019-06-03 17:27:09 +10:00
commit 6f5f1061c7
No known key found for this signature in database
GPG Key ID: 5E2CFF9B75FA63DF
13 changed files with 297 additions and 193 deletions

View File

@ -88,15 +88,28 @@ pub trait BeaconChainTypes {
type EthSpec: types::EthSpec;
}
/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
/// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB.
pub store: Arc<T::Store>,
/// Reports the current slot, typically based upon the system clock.
pub slot_clock: T::SlotClock,
/// Stores all operations (e.g., `Attestation`, `Deposit`, etc) that are candidates for
/// inclusion in a block.
pub op_pool: OperationPool<T::EthSpec>,
/// Stores a "snapshot" of the chain at the time the head-of-the-chain block was recieved.
canonical_head: RwLock<CheckPoint<T::EthSpec>>,
finalized_head: RwLock<CheckPoint<T::EthSpec>>,
/// The same state from `self.canonical_head`, but updated at the start of each slot with a
/// skip slot if no block is recieved. This is effectively a cache that avoids repeating calls
/// to `per_slot_processing`.
state: RwLock<BeaconState<T::EthSpec>>,
pub spec: ChainSpec,
/// The root of the genesis block.
genesis_block_root: Hash256,
/// A state-machine that is updated with information from the network and chooses a canonical
/// head block.
pub fork_choice: RwLock<T::ForkChoice>,
/// Stores metrics about this `BeaconChain`.
pub metrics: Metrics,
}
@ -113,18 +126,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let state_root = genesis_state.canonical_root();
store.put(&state_root, &genesis_state)?;
let block_root = genesis_block.block_header().canonical_root();
store.put(&block_root, &genesis_block)?;
let genesis_block_root = genesis_block.block_header().canonical_root();
store.put(&genesis_block_root, &genesis_block)?;
let finalized_head = RwLock::new(CheckPoint::new(
genesis_block.clone(),
block_root,
genesis_state.clone(),
state_root,
));
let canonical_head = RwLock::new(CheckPoint::new(
genesis_block.clone(),
block_root,
genesis_block_root,
genesis_state.clone(),
state_root,
));
@ -136,9 +143,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot_clock,
op_pool: OperationPool::new(),
state: RwLock::new(genesis_state),
finalized_head,
canonical_head,
spec,
genesis_block_root,
fork_choice: RwLock::new(fork_choice),
metrics: Metrics::new()?,
})
@ -168,10 +174,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot_clock,
op_pool: OperationPool::default(),
canonical_head: RwLock::new(p.canonical_head),
finalized_head: RwLock::new(p.finalized_head),
state: RwLock::new(p.state),
spec,
fork_choice: RwLock::new(fork_choice),
genesis_block_root: p.genesis_block_root,
metrics: Metrics::new()?,
}))
}
@ -180,7 +185,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn persist(&self) -> Result<(), Error> {
let p: PersistedBeaconChain<T> = PersistedBeaconChain {
canonical_head: self.canonical_head.read().clone(),
finalized_head: self.finalized_head.read().clone(),
genesis_block_root: self.genesis_block_root,
state: self.state.read().clone(),
};
@ -309,42 +314,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get(block_root)?)
}
/// Update the canonical head to some new values.
fn update_canonical_head(
&self,
new_beacon_block: BeaconBlock,
new_beacon_block_root: Hash256,
new_beacon_state: BeaconState<T::EthSpec>,
new_beacon_state_root: Hash256,
) {
debug!(
"Updating canonical head with block at slot: {}",
new_beacon_block.slot
);
let mut head = self.canonical_head.write();
head.update(
new_beacon_block,
new_beacon_block_root,
new_beacon_state,
new_beacon_state_root,
);
}
/// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the
/// fork-choice rule).
///
/// It is important to note that the `beacon_state` returned may not match the present slot. It
/// is the state as it was when the head block was received, which could be some slots prior to
/// now.
pub fn head(&self) -> RwLockReadGuard<CheckPoint<T::EthSpec>> {
self.canonical_head.read()
}
/// Returns the slot of the highest block in the canonical chain.
pub fn best_slot(&self) -> Slot {
self.canonical_head.read().beacon_block.slot
/// Update the canonical head to `new_head`.
fn update_canonical_head(&self, new_head: CheckPoint<T::EthSpec>) -> Result<(), Error> {
// Update the checkpoint that stores the head of the chain at the time it received the
// block.
*self.canonical_head.write() = new_head;
// Update the always-at-the-present-slot state we keep around for performance gains.
*self.state.write() = {
let mut state = self.canonical_head.read().beacon_state.clone();
let present_slot = match self.slot_clock.present_slot() {
Ok(Some(slot)) => slot,
_ => return Err(Error::UnableToReadSlot),
};
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut state, &T::EthSpec::spec())?;
}
state.build_all_caches(&T::EthSpec::spec())?;
state
};
// Save `self` to `self.store`.
self.persist()?;
Ok(())
}
/*
/// Updates the canonical `BeaconState` with the supplied state.
///
/// Advances the chain forward to the present slot. This method is better than just setting
@ -362,10 +363,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut state, &self.spec)?;
per_slot_processing(&mut state, &T::EthSpec::spec())?;
}
state.build_all_caches(&self.spec)?;
state.build_all_caches(&T::EthSpec::spec())?;
*self.state.write() = state;
@ -373,9 +374,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(())
}
*/
/// Returns a read-lock guarded `BeaconState` which is the `canonical_head` that has been
/// updated to match the current slot clock.
pub fn current_state(&self) -> RwLockReadGuard<BeaconState<T::EthSpec>> {
self.state.read()
}
/// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the
/// fork-choice rule).
///
/// It is important to note that the `beacon_state` returned may not match the present slot. It
/// is the state as it was when the head block was received, which could be some slots prior to
/// now.
pub fn head(&self) -> RwLockReadGuard<CheckPoint<T::EthSpec>> {
self.canonical_head.read()
}
/// Returns the slot of the highest block in the canonical chain.
pub fn best_slot(&self) -> Slot {
self.canonical_head.read().beacon_block.slot
}
/// Ensures the current canonical `BeaconState` has been transitioned to match the `slot_clock`.
pub fn catchup_state(&self) -> Result<(), Error> {
let spec = &T::EthSpec::spec();
let present_slot = match self.slot_clock.present_slot() {
Ok(Some(slot)) => slot,
_ => return Err(Error::UnableToReadSlot),
@ -386,13 +411,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
// Ensure the next epoch state caches are built in case of an epoch transition.
state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, &self.spec)?;
state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, &self.spec)?;
state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, spec)?;
state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, spec)?;
per_slot_processing(&mut *state, &self.spec)?;
per_slot_processing(&mut *state, spec)?;
}
state.build_all_caches(&self.spec)?;
state.build_all_caches(spec)?;
Ok(())
}
@ -401,34 +426,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Ideally this shouldn't be required, however we leave it here for testing.
pub fn ensure_state_caches_are_built(&self) -> Result<(), Error> {
self.state.write().build_all_caches(&self.spec)?;
self.state.write().build_all_caches(&T::EthSpec::spec())?;
Ok(())
}
/// Update the justified head to some new values.
fn update_finalized_head(
&self,
new_beacon_block: BeaconBlock,
new_beacon_block_root: Hash256,
new_beacon_state: BeaconState<T::EthSpec>,
new_beacon_state_root: Hash256,
) {
let mut finalized_head = self.finalized_head.write();
finalized_head.update(
new_beacon_block,
new_beacon_block_root,
new_beacon_state,
new_beacon_state_root,
);
}
/// Returns a read-lock guarded `CheckPoint` struct for reading the justified head (as chosen,
/// indirectly, by the fork-choice rule).
pub fn finalized_head(&self) -> RwLockReadGuard<CheckPoint<T::EthSpec>> {
self.finalized_head.read()
}
/// Returns the validator index (if any) for the given public key.
///
/// Information is retrieved from the present `beacon_state.validator_registry`.
@ -467,13 +469,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// genesis.
pub fn slots_since_genesis(&self) -> Option<SlotHeight> {
let now = self.read_slot_clock()?;
let genesis_slot = T::EthSpec::spec().genesis_slot;
if now < self.spec.genesis_slot {
if now < genesis_slot {
None
} else {
Some(SlotHeight::from(
now.as_u64() - self.spec.genesis_slot.as_u64(),
))
Some(SlotHeight::from(now.as_u64() - genesis_slot.as_u64()))
}
}
@ -493,12 +494,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn block_proposer(&self, slot: Slot) -> Result<usize, BeaconStateError> {
self.state
.write()
.build_epoch_cache(RelativeEpoch::Current, &self.spec)?;
.build_epoch_cache(RelativeEpoch::Current, &T::EthSpec::spec())?;
let index = self.state.read().get_beacon_proposer_index(
slot,
RelativeEpoch::Current,
&self.spec,
&T::EthSpec::spec(),
)?;
Ok(index)
@ -519,7 +520,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(attestation_duty) = self
.state
.read()
.get_attestation_duties(validator_index, &self.spec)?
.get_attestation_duties(validator_index, &T::EthSpec::spec())?
{
Ok(Some((attestation_duty.slot, attestation_duty.shard)))
} else {
@ -529,7 +530,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Produce an `AttestationData` that is valid for the present `slot` and given `shard`.
pub fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, Error> {
trace!("BeaconChain::produce_attestation: shard: {}", shard);
let slots_per_epoch = T::EthSpec::spec().slots_per_epoch;
self.metrics.attestation_production_requests.inc();
let timer = self.metrics.attestation_production_times.start_timer();
@ -540,8 +541,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.state
.read()
.slot
.epoch(self.spec.slots_per_epoch)
.start_slot(self.spec.slots_per_epoch);
.epoch(slots_per_epoch)
.start_slot(slots_per_epoch);
let target_root = if state.slot == current_epoch_start_slot {
// If we're on the first slot of the state's epoch.
@ -554,7 +555,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
*self
.state
.read()
.get_block_root(current_epoch_start_slot - self.spec.slots_per_epoch)?
.get_block_root(current_epoch_start_slot - slots_per_epoch)?
}
} else {
// If we're not on the first slot of the epoch.
@ -587,9 +588,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.metrics.attestation_processing_requests.inc();
let timer = self.metrics.attestation_processing_times.start_timer();
let result = self
.op_pool
.insert_attestation(attestation, &*self.state.read(), &self.spec);
let result =
self.op_pool
.insert_attestation(attestation, &*self.state.read(), &T::EthSpec::spec());
if result.is_ok() {
self.metrics.attestation_production_successes.inc();
@ -606,19 +607,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
deposit: Deposit,
) -> Result<DepositInsertStatus, DepositValidationError> {
self.op_pool
.insert_deposit(deposit, &*self.state.read(), &self.spec)
.insert_deposit(deposit, &*self.state.read(), &T::EthSpec::spec())
}
/// Accept some exit and queue it for inclusion in an appropriate block.
pub fn process_voluntary_exit(&self, exit: VoluntaryExit) -> Result<(), ExitValidationError> {
self.op_pool
.insert_voluntary_exit(exit, &*self.state.read(), &self.spec)
.insert_voluntary_exit(exit, &*self.state.read(), &T::EthSpec::spec())
}
/// Accept some transfer and queue it for inclusion in an appropriate block.
pub fn process_transfer(&self, transfer: Transfer) -> Result<(), TransferValidationError> {
self.op_pool
.insert_transfer(transfer, &*self.state.read(), &self.spec)
.insert_transfer(transfer, &*self.state.read(), &T::EthSpec::spec())
}
/// Accept some proposer slashing and queue it for inclusion in an appropriate block.
@ -626,8 +627,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
proposer_slashing: ProposerSlashing,
) -> Result<(), ProposerSlashingValidationError> {
self.op_pool
.insert_proposer_slashing(proposer_slashing, &*self.state.read(), &self.spec)
self.op_pool.insert_proposer_slashing(
proposer_slashing,
&*self.state.read(),
&T::EthSpec::spec(),
)
}
/// Accept some attester slashing and queue it for inclusion in an appropriate block.
@ -635,8 +639,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
attester_slashing: AttesterSlashing,
) -> Result<(), AttesterSlashingValidationError> {
self.op_pool
.insert_attester_slashing(attester_slashing, &*self.state.read(), &self.spec)
self.op_pool.insert_attester_slashing(
attester_slashing,
&*self.state.read(),
&T::EthSpec::spec(),
)
}
/// Accept some block and attempt to add it to block DAG.
@ -686,7 +693,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Transition the parent state to the block slot.
let mut state: BeaconState<T::EthSpec> = parent_state;
for _ in state.slot.as_u64()..block.slot.as_u64() {
if let Err(e) = per_slot_processing(&mut state, &self.spec) {
if let Err(e) = per_slot_processing(&mut state, &T::EthSpec::spec()) {
return Ok(BlockProcessingOutcome::InvalidBlock(
InvalidBlock::SlotProcessingError(e),
));
@ -695,7 +702,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Apply the received block to its parent state (which has been transitioned into this
// slot).
if let Err(e) = per_block_processing(&mut state, &block, &self.spec) {
if let Err(e) = per_block_processing(&mut state, &block, &T::EthSpec::spec()) {
return Ok(BlockProcessingOutcome::InvalidBlock(
InvalidBlock::PerBlockProcessingError(e),
));
@ -716,7 +723,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Register the new block with the fork choice service.
self.fork_choice
.write()
.add_block(&block, &block_root, &self.spec)?;
.add_block(&block, &block_root, &T::EthSpec::spec())?;
// Execute the fork choice algorithm, enthroning a new head if discovered.
//
@ -744,7 +751,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut state = self.state.read().clone();
state.build_epoch_cache(RelativeEpoch::Current, &self.spec)?;
state.build_epoch_cache(RelativeEpoch::Current, &T::EthSpec::spec())?;
trace!("Finding attestations for new block...");
@ -752,14 +759,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_block_root(state.slot - 1)
.map_err(|_| BlockProductionError::UnableToGetBlockRootFromState)?;
let (proposer_slashings, attester_slashings) =
self.op_pool.get_slashings(&*self.state.read(), &self.spec);
let (proposer_slashings, attester_slashings) = self
.op_pool
.get_slashings(&*self.state.read(), &T::EthSpec::spec());
let mut block = BeaconBlock {
slot: state.slot,
previous_block_root,
state_root: Hash256::zero(), // Updated after the state is calculated.
signature: self.spec.empty_signature.clone(), // To be completed by a validator.
signature: T::EthSpec::spec().empty_signature.clone(), // To be completed by a validator.
body: BeaconBlockBody {
randao_reveal,
eth1_data: Eth1Data {
@ -771,12 +779,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
attester_slashings,
attestations: self
.op_pool
.get_attestations(&*self.state.read(), &self.spec),
deposits: self.op_pool.get_deposits(&*self.state.read(), &self.spec),
.get_attestations(&*self.state.read(), &T::EthSpec::spec()),
deposits: self
.op_pool
.get_deposits(&*self.state.read(), &T::EthSpec::spec()),
voluntary_exits: self
.op_pool
.get_voluntary_exits(&*self.state.read(), &self.spec),
transfers: self.op_pool.get_transfers(&*self.state.read(), &self.spec),
.get_voluntary_exits(&*self.state.read(), &T::EthSpec::spec()),
transfers: self
.op_pool
.get_transfers(&*self.state.read(), &T::EthSpec::spec()),
},
};
@ -785,7 +797,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block.body.attestations.len()
);
per_block_processing_without_verifying_block_signature(&mut state, &block, &self.spec)?;
per_block_processing_without_verifying_block_signature(
&mut state,
&block,
&T::EthSpec::spec(),
)?;
let state_root = state.canonical_root();
@ -801,39 +817,53 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn fork_choice(&self) -> Result<(), Error> {
self.metrics.fork_choice_requests.inc();
let present_head_root = self.finalized_head().beacon_block_root;
// Start fork choice metrics timer.
let timer = self.metrics.fork_choice_times.start_timer();
let new_head_root = self
let justified_root = {
let root = self.head().beacon_state.current_justified_root;
if root == T::EthSpec::spec().zero_hash {
self.genesis_block_root
} else {
root
}
};
// Determine the root of the block that is the head of the chain.
let beacon_block_root = self
.fork_choice
.write()
.find_head(&present_head_root, &self.spec)?;
.find_head(&justified_root, &T::EthSpec::spec())?;
// End fork choice metrics timer.
timer.observe_duration();
if new_head_root != present_head_root {
// If a new head was chosen.
if beacon_block_root != self.head().beacon_block_root {
self.metrics.fork_choice_changed_head.inc();
let block: BeaconBlock = self
let beacon_block: BeaconBlock = self
.store
.get(&new_head_root)?
.ok_or_else(|| Error::MissingBeaconBlock(new_head_root))?;
let state: BeaconState<T::EthSpec> = self
.store
.get(&block.state_root)?
.ok_or_else(|| Error::MissingBeaconState(block.state_root))?;
.get(&beacon_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?;
// Log if we switched to a new chain.
if present_head_root != block.previous_block_root {
let beacon_state_root = beacon_block.state_root;
let beacon_state: BeaconState<T::EthSpec> = self
.store
.get(&beacon_state_root)?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
// If we switched to a new chain (instead of building atop the present chain).
if self.head().beacon_block_root != beacon_block.previous_block_root {
self.metrics.fork_choice_reorg_count.inc();
};
let state_root = block.state_root;
self.update_canonical_head(block, new_head_root, state.clone(), state_root);
// Update the canonical `BeaconState`.
self.update_state(state)?;
self.update_canonical_head(CheckPoint {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
})?;
}
Ok(())
@ -863,7 +893,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
loop {
let beacon_block_root = last_slot.beacon_block.previous_block_root;
if beacon_block_root == self.spec.zero_hash {
if beacon_block_root == T::EthSpec::spec().zero_hash {
break; // Genesis has been reached.
}

View File

@ -2,7 +2,7 @@ use crate::{BeaconChainTypes, CheckPoint};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use store::{DBColumn, Error as StoreError, StoreItem};
use types::BeaconState;
use types::{BeaconState, Hash256};
/// 32-byte key for accessing the `PersistedBeaconChain`.
pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA";
@ -10,8 +10,8 @@ pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA";
#[derive(Encode, Decode)]
pub struct PersistedBeaconChain<T: BeaconChainTypes> {
pub canonical_head: CheckPoint<T::EthSpec>,
pub finalized_head: CheckPoint<T::EthSpec>,
// TODO: operations pool.
pub genesis_block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
}

View File

@ -1,5 +1,5 @@
use beacon_chain::{
fork_choice::BitwiseLMDGhost,
fork_choice::OptimizedLMDGhost,
slot_clock::SystemTimeSlotClock,
store::{DiskStore, MemoryStore, Store},
BeaconChain, BeaconChainTypes,
@ -28,7 +28,7 @@ pub struct TestnetMemoryBeaconChainTypes;
impl BeaconChainTypes for TestnetMemoryBeaconChainTypes {
type Store = MemoryStore;
type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<Self::Store, Self::EthSpec>;
type ForkChoice = OptimizedLMDGhost<Self::Store, Self::EthSpec>;
type EthSpec = LighthouseTestnetEthSpec;
}
@ -45,7 +45,7 @@ pub struct TestnetDiskBeaconChainTypes;
impl BeaconChainTypes for TestnetDiskBeaconChainTypes {
type Store = DiskStore;
type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<Self::Store, Self::EthSpec>;
type ForkChoice = OptimizedLMDGhost<Self::Store, Self::EthSpec>;
type EthSpec = LighthouseTestnetEthSpec;
}

View File

@ -128,6 +128,7 @@ where
executor,
network_send,
beacon_chain.clone(),
config.db_name.clone(),
metrics_registry,
&log,
))

View File

@ -10,6 +10,7 @@ use persistent::Read;
use router::Router;
use serde_json::json;
use std::sync::Arc;
use types::EthSpec;
/// Yields a handler for the HTTP API.
pub fn build_handler<T: BeaconChainTypes + 'static>(
@ -64,7 +65,7 @@ fn handle_fork<T: BeaconChainTypes + 'static>(req: &mut Request) -> IronResult<R
let response = json!({
"fork": beacon_chain.head().beacon_state.fork,
"chain_id": beacon_chain.spec.chain_id
"chain_id": T::EthSpec::spec().chain_id
});
Ok(Response::with((Status::Ok, response.to_string())))

View File

@ -3,6 +3,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
use iron::typemap::Key;
use prometheus::Registry;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
pub struct BeaconChainKey<T> {
@ -24,3 +25,9 @@ pub struct LocalMetricsKey;
impl Key for LocalMetricsKey {
type Value = LocalMetrics;
}
pub struct DBPathKey;
impl Key for DBPathKey {
type Value = PathBuf;
}

View File

@ -9,6 +9,7 @@ use network::NetworkMessage;
use prometheus::Registry;
use router::Router;
use slog::{info, o, warn};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
@ -32,6 +33,7 @@ impl Default for HttpServerConfig {
/// Build the `iron` HTTP server, defining the core routes.
pub fn create_iron_http_server<T: BeaconChainTypes + 'static>(
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,
) -> Iron<Router> {
let mut router = Router::new();
@ -39,7 +41,7 @@ pub fn create_iron_http_server<T: BeaconChainTypes + 'static>(
// A `GET` request to `/metrics` is handled by the `metrics` module.
router.get(
"/metrics",
metrics::build_handler(beacon_chain.clone(), metrics_registry),
metrics::build_handler(beacon_chain.clone(), db_path, metrics_registry),
"metrics",
);
@ -55,6 +57,7 @@ pub fn start_service<T: BeaconChainTypes + 'static>(
executor: &TaskExecutor,
_network_chan: crossbeam_channel::Sender<NetworkMessage>,
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,
log: &slog::Logger,
) -> exit_future::Signal {
@ -66,7 +69,7 @@ pub fn start_service<T: BeaconChainTypes + 'static>(
let (shutdown_trigger, wait_for_shutdown) = exit_future::signal();
// Create an `iron` http, without starting it yet.
let iron = create_iron_http_server(beacon_chain, metrics_registry);
let iron = create_iron_http_server(beacon_chain, db_path, metrics_registry);
// Create a HTTP server future.
//

View File

@ -1,19 +1,23 @@
use crate::{
key::{BeaconChainKey, LocalMetricsKey, MetricsRegistryKey},
key::{BeaconChainKey, DBPathKey, LocalMetricsKey, MetricsRegistryKey},
map_persistent_err_to_500,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use iron::prelude::*;
use iron::{status::Status, Handler, IronResult, Request, Response};
use persistent::Read;
use prometheus::{Encoder, IntGauge, Opts, Registry, TextEncoder};
use slot_clock::SlotClock;
use prometheus::{Encoder, Registry, TextEncoder};
use std::path::PathBuf;
use std::sync::Arc;
use types::Slot;
pub use local_metrics::LocalMetrics;
mod local_metrics;
/// Yields a handler for the metrics endpoint.
pub fn build_handler<T: BeaconChainTypes + 'static>(
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,
) -> impl Handler {
let mut chain = Chain::new(handle_metrics::<T>);
@ -24,43 +28,11 @@ pub fn build_handler<T: BeaconChainTypes + 'static>(
chain.link(Read::<BeaconChainKey<T>>::both(beacon_chain));
chain.link(Read::<MetricsRegistryKey>::both(metrics_registry));
chain.link(Read::<LocalMetricsKey>::both(local_metrics));
chain.link(Read::<DBPathKey>::both(db_path));
chain
}
pub struct LocalMetrics {
present_slot: IntGauge,
best_slot: IntGauge,
validator_count: IntGauge,
}
impl LocalMetrics {
pub fn new() -> Result<Self, prometheus::Error> {
Ok(Self {
present_slot: {
let opts = Opts::new("present_slot", "slot_at_time_of_scrape");
IntGauge::with_opts(opts)?
},
best_slot: {
let opts = Opts::new("best_slot", "slot_of_block_at_chain_head");
IntGauge::with_opts(opts)?
},
validator_count: {
let opts = Opts::new("validator_count", "number_of_validators");
IntGauge::with_opts(opts)?
},
})
}
pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.present_slot.clone()))?;
registry.register(Box::new(self.best_slot.clone()))?;
registry.register(Box::new(self.validator_count.clone()))?;
Ok(())
}
}
/// Handle a request for Prometheus metrics.
///
/// Returns a text string containing all metrics.
@ -77,18 +49,12 @@ fn handle_metrics<T: BeaconChainTypes + 'static>(req: &mut Request) -> IronResul
.get::<Read<LocalMetricsKey>>()
.map_err(map_persistent_err_to_500)?;
let present_slot = beacon_chain
.slot_clock
.present_slot()
.unwrap_or_else(|_| None)
.unwrap_or_else(|| Slot::new(0));
local_metrics.present_slot.set(present_slot.as_u64() as i64);
let db_path = req
.get::<Read<DBPathKey>>()
.map_err(map_persistent_err_to_500)?;
let best_slot = beacon_chain.head().beacon_block.slot;
local_metrics.best_slot.set(best_slot.as_u64() as i64);
let validator_count = beacon_chain.head().beacon_state.validator_registry.len();
local_metrics.validator_count.set(validator_count as i64);
// Update metrics that are calculated on each scrape.
local_metrics.update(&beacon_chain, &db_path);
let mut buffer = vec![];
let encoder = TextEncoder::new();

View File

@ -0,0 +1,94 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use prometheus::{IntGauge, Opts, Registry};
use slot_clock::SlotClock;
use std::path::PathBuf;
use std::fs::File;
use types::Slot;
// If set to `true` will iterate and sum the balances of all validators in the state for each
// scrape.
const SHOULD_SUM_VALIDATOR_BALANCES: bool = true;
pub struct LocalMetrics {
present_slot: IntGauge,
best_slot: IntGauge,
validator_count: IntGauge,
justified_epoch: IntGauge,
finalized_epoch: IntGauge,
validator_balances_sum: IntGauge,
database_size: IntGauge,
}
impl LocalMetrics {
/// Create a new instance.
pub fn new() -> Result<Self, prometheus::Error> {
Ok(Self {
present_slot: {
let opts = Opts::new("present_slot", "slot_at_time_of_scrape");
IntGauge::with_opts(opts)?
},
best_slot: {
let opts = Opts::new("best_slot", "slot_of_block_at_chain_head");
IntGauge::with_opts(opts)?
},
validator_count: {
let opts = Opts::new("validator_count", "number_of_validators");
IntGauge::with_opts(opts)?
},
justified_epoch: {
let opts = Opts::new("justified_epoch", "state_justified_epoch");
IntGauge::with_opts(opts)?
},
finalized_epoch: {
let opts = Opts::new("finalized_epoch", "state_finalized_epoch");
IntGauge::with_opts(opts)?
},
validator_balances_sum: {
let opts = Opts::new("validator_balances_sum", "sum_of_all_validator_balances");
IntGauge::with_opts(opts)?
},
database_size: {
let opts = Opts::new("database_size", "size_of_on_disk_db_in_mb");
IntGauge::with_opts(opts)?
},
})
}
/// Registry this instance with the `registry`.
pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.present_slot.clone()))?;
registry.register(Box::new(self.best_slot.clone()))?;
registry.register(Box::new(self.validator_count.clone()))?;
registry.register(Box::new(self.finalized_epoch.clone()))?;
registry.register(Box::new(self.justified_epoch.clone()))?;
registry.register(Box::new(self.validator_balances_sum.clone()))?;
registry.register(Box::new(self.database_size.clone()))?;
Ok(())
}
/// Update the metrics in `self` to the latest values.
pub fn update<T: BeaconChainTypes>(&self, beacon_chain: &BeaconChain<T>, db_path: &PathBuf) {
let state = &beacon_chain.head().beacon_state;
let present_slot = beacon_chain
.slot_clock
.present_slot()
.unwrap_or_else(|_| None)
.unwrap_or_else(|| Slot::new(0));
self.present_slot.set(present_slot.as_u64() as i64);
self.best_slot.set(state.slot.as_u64() as i64);
self.validator_count.set(state.validator_registry.len() as i64);
self.justified_epoch.set(state.current_justified_epoch.as_u64() as i64);
self.finalized_epoch.set(state.finalized_epoch.as_u64() as i64);
if SHOULD_SUM_VALIDATOR_BALANCES {
self.validator_balances_sum.set(state.validator_balances.iter().sum::<u64>() as i64);
}
let db_size = File::open(db_path)
.and_then(|f| f.metadata())
.and_then(|m| Ok(m.len()))
.unwrap_or(0);
self.database_size.set(db_size as i64);
}
}

View File

@ -859,7 +859,7 @@ fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMes
latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch,
best_root: beacon_chain.head().beacon_block_root,
best_slot: beacon_chain.head().beacon_block.slot,
best_slot: state.slot,
}
}

View File

@ -34,7 +34,7 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
// verify the slot, drop lock on state afterwards
{
let slot_requested = req.get_slot();
let state = &self.chain.head().beacon_state;
let state = &self.chain.current_state();
// Start by performing some checks
// Check that the AttestionData is for the current slot (otherwise it will not be valid)

View File

@ -30,7 +30,7 @@ impl<T: BeaconChainTypes> ValidatorService for ValidatorServiceInstance<T> {
trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let spec = T::EthSpec::spec();
let state = &self.chain.head().beacon_state;
let state = &self.chain.current_state();
let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators();

View File

@ -1,6 +1,8 @@
use slot_clock;
use error_chain::error_chain;
use error_chain::{
error_chain
};
error_chain! {
links { }