Add first pass at removing speculative state

This commit is contained in:
Paul Hauner 2019-08-29 19:14:52 +10:00
parent 29c8477242
commit 682081ef07
No known key found for this signature in database
GPG Key ID: 5E2CFF9B75FA63DF
10 changed files with 225 additions and 185 deletions

View File

@ -5,7 +5,6 @@ use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator};
use crate::metrics; use crate::metrics;
use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
use lmd_ghost::LmdGhost; use lmd_ghost::LmdGhost;
use log::trace;
use operation_pool::DepositInsertStatus; use operation_pool::DepositInsertStatus;
use operation_pool::{OperationPool, PersistedOperationPool}; use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::{RwLock, RwLockReadGuard};
@ -77,6 +76,20 @@ pub enum AttestationProcessingOutcome {
Invalid(AttestationValidationError), Invalid(AttestationValidationError),
} }
pub enum StateCow<'a, T: EthSpec> {
Borrowed(RwLockReadGuard<'a, CheckPoint<T>>),
Owned(BeaconState<T>),
}
impl<'a, T: EthSpec> AsRef<BeaconState<T>> for StateCow<'a, T> {
fn as_ref(&self) -> &BeaconState<T> {
match self {
StateCow::Borrowed(checkpoint) => &checkpoint.beacon_state,
StateCow::Owned(state) => &state,
}
}
}
pub trait BeaconChainTypes: Send + Sync + 'static { pub trait BeaconChainTypes: Send + Sync + 'static {
type Store: store::Store; type Store: store::Store;
type SlotClock: slot_clock::SlotClock; type SlotClock: slot_clock::SlotClock;
@ -97,10 +110,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub op_pool: OperationPool<T::EthSpec>, pub op_pool: OperationPool<T::EthSpec>,
/// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received.
canonical_head: RwLock<CheckPoint<T::EthSpec>>, canonical_head: RwLock<CheckPoint<T::EthSpec>>,
/// The same state from `self.canonical_head`, but updated at the start of each slot with a
/// skip slot if no block is received. This is effectively a cache that avoids repeating calls
/// to `per_slot_processing`.
state: RwLock<BeaconState<T::EthSpec>>,
/// The root of the genesis block. /// The root of the genesis block.
pub genesis_block_root: Hash256, pub genesis_block_root: Hash256,
/// A state-machine that is updated with information from the network and chooses a canonical /// A state-machine that is updated with information from the network and chooses a canonical
@ -158,7 +167,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
spec, spec,
slot_clock, slot_clock,
op_pool: OperationPool::new(), op_pool: OperationPool::new(),
state: RwLock::new(genesis_state),
canonical_head, canonical_head,
genesis_block_root, genesis_block_root,
fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root), fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root),
@ -180,9 +188,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(Some(p)) => p, Ok(Some(p)) => p,
}; };
let state = &p.canonical_head.beacon_state;
let slot_clock = T::SlotClock::from_eth2_genesis( let slot_clock = T::SlotClock::from_eth2_genesis(
spec.genesis_slot, spec.genesis_slot,
p.state.genesis_time, state.genesis_time,
Duration::from_millis(spec.milliseconds_per_slot), Duration::from_millis(spec.milliseconds_per_slot),
) )
.ok_or_else(|| Error::SlotClockDidNotStart)?; .ok_or_else(|| Error::SlotClockDidNotStart)?;
@ -190,7 +200,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let last_finalized_root = p.canonical_head.beacon_state.finalized_checkpoint.root; let last_finalized_root = p.canonical_head.beacon_state.finalized_checkpoint.root;
let last_finalized_block = &p.canonical_head.beacon_block; let last_finalized_block = &p.canonical_head.beacon_block;
let op_pool = p.op_pool.into_operation_pool(&p.state, &spec); let op_pool = p.op_pool.into_operation_pool(state, &spec);
Ok(Some(BeaconChain { Ok(Some(BeaconChain {
spec, spec,
@ -198,7 +208,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root), fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root),
op_pool, op_pool,
canonical_head: RwLock::new(p.canonical_head), canonical_head: RwLock::new(p.canonical_head),
state: RwLock::new(p.state),
genesis_block_root: p.genesis_block_root, genesis_block_root: p.genesis_block_root,
store, store,
log, log,
@ -213,7 +222,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
canonical_head: self.canonical_head.read().clone(), canonical_head: self.canonical_head.read().clone(),
op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool),
genesis_block_root: self.genesis_block_root, genesis_block_root: self.genesis_block_root,
state: self.state.read().clone(),
}; };
let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes());
@ -233,6 +241,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.slot_clock.now().ok_or_else(|| Error::UnableToReadSlot) self.slot_clock.now().ok_or_else(|| Error::UnableToReadSlot)
} }
/// Returns the epoch _right now_ according to `self.slot_clock`. Returns `Err` if the epoch is
/// unavailable.
///
/// The epoch might be unavailable due to an error with the system clock, or if the present time
/// is before genesis (i.e., a negative epoch).
pub fn epoch(&self) -> Result<Epoch, Error> {
self.slot()
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
}
/// Returns the beacon block body for each beacon block root in `roots`. /// Returns the beacon block body for each beacon block root in `roots`.
/// ///
/// Fails if any root in `roots` does not have a corresponding block. /// Fails if any root in `roots` does not have a corresponding block.
@ -318,12 +336,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get(block_root)?) Ok(self.store.get(block_root)?)
} }
/// Returns a read-lock guarded `BeaconState` which is the `canonical_head` that has been
/// updated to match the current slot clock.
pub fn speculative_state(&self) -> Result<RwLockReadGuard<BeaconState<T::EthSpec>>, Error> {
Ok(self.state.read())
}
/// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the /// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the
/// fork-choice rule). /// fork-choice rule).
/// ///
@ -334,43 +346,74 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.canonical_head.read() self.canonical_head.read()
} }
/// Returns the `BeaconState` at the given slot.
///
/// May return:
///
/// - A new state loaded from the database (for states prior to the head)
/// - A reference to the head state (note: this keeps a read lock on the head, try to use
/// sparingly).
/// - The head state, but with skipped slots (for states later than the head).
///
/// Returns `None` when the state is not found in the database or there is an error skipping
/// to a future state.
pub fn state_at_slot(&self, slot: Slot) -> Result<StateCow<T::EthSpec>, Error> {
let head_state = &self.head().beacon_state;
if slot == head_state.slot {
Ok(StateCow::Borrowed(self.head()))
} else if slot > head_state.slot {
let head_state_slot = head_state.slot;
let mut state = head_state.clone();
drop(head_state);
while state.slot < slot {
match per_slot_processing(&mut state, &self.spec) {
Ok(()) => (),
Err(e) => {
warn!(
self.log,
"Unable to load state at slot";
"error" => format!("{:?}", e),
"head_slot" => head_state_slot,
"requested_slot" => slot
);
return Err(Error::NoStateForSlot(slot));
}
};
}
Ok(StateCow::Owned(state))
} else {
let state_root = self
.rev_iter_state_roots()
.find(|(_root, s)| *s == slot)
.map(|(root, _slot)| root)
.ok_or_else(|| Error::NoStateForSlot(slot))?;
Ok(StateCow::Owned(
self.store
.get(&state_root)?
.ok_or_else(|| Error::NoStateForSlot(slot))?,
))
}
}
/// Returns the `BeaconState` the current slot (viz., `self.slot()`).
///
/// - A reference to the head state (note: this keeps a read lock on the head, try to use
/// sparingly).
/// - The head state, but with skipped slots (for states later than the head).
///
/// Returns `None` when there is an error skipping to a future state or the slot clock cannot
/// be read.
pub fn state_now(&self) -> Result<StateCow<T::EthSpec>, Error> {
self.state_at_slot(self.slot()?)
}
/// Returns the slot of the highest block in the canonical chain. /// Returns the slot of the highest block in the canonical chain.
pub fn best_slot(&self) -> Slot { pub fn best_slot(&self) -> Slot {
self.canonical_head.read().beacon_block.slot self.canonical_head.read().beacon_block.slot
} }
/// Ensures the current canonical `BeaconState` has been transitioned to match the `slot_clock`.
pub fn catchup_state(&self) -> Result<(), Error> {
let spec = &self.spec;
let present_slot = self.slot()?;
if self.state.read().slot < present_slot {
let mut state = self.state.write();
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
// Ensure the next epoch state caches are built in case of an epoch transition.
state.build_committee_cache(RelativeEpoch::Next, spec)?;
per_slot_processing(&mut *state, spec)?;
}
state.build_all_caches(spec)?;
}
Ok(())
}
/// Build all of the caches on the current state.
///
/// Ideally this shouldn't be required, however we leave it here for testing.
pub fn ensure_state_caches_are_built(&self) -> Result<(), Error> {
self.state.write().build_all_caches(&self.spec)?;
Ok(())
}
/// Returns the validator index (if any) for the given public key. /// Returns the validator index (if any) for the given public key.
/// ///
/// Information is retrieved from the present `beacon_state.validators`. /// Information is retrieved from the present `beacon_state.validators`.
@ -401,18 +444,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Information is read from the present `beacon_state` shuffling, only information from the /// Information is read from the present `beacon_state` shuffling, only information from the
/// present epoch is available. /// present epoch is available.
pub fn block_proposer(&self, slot: Slot) -> Result<usize, Error> { pub fn block_proposer(&self, slot: Slot) -> Result<usize, Error> {
// Ensures that the present state has been advanced to the present slot, skipping slots if let epoch = |slot: Slot| slot.epoch(T::EthSpec::slots_per_epoch());
// blocks are not present. let head_state = &self.head().beacon_state;
self.catchup_state()?;
// TODO: permit lookups of the proposer at any slot. let state = if epoch(slot) == epoch(head_state.slot) {
let index = self.state.read().get_beacon_proposer_index( StateCow::Borrowed(self.head())
slot, } else {
RelativeEpoch::Current, self.state_at_slot(slot)?
&self.spec, };
)?;
Ok(index) state
.as_ref()
.get_beacon_proposer_index(slot, RelativeEpoch::Current, &self.spec)
.map_err(Into::into)
} }
/// Returns the attestation slot and shard for a given validator index. /// Returns the attestation slot and shard for a given validator index.
@ -422,14 +466,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn validator_attestation_slot_and_shard( pub fn validator_attestation_slot_and_shard(
&self, &self,
validator_index: usize, validator_index: usize,
) -> Result<Option<(Slot, u64)>, BeaconStateError> { epoch: Epoch,
trace!( ) -> Result<Option<(Slot, u64)>, Error> {
"BeaconChain::validator_attestation_slot_and_shard: validator_index: {}", let as_epoch = |slot: Slot| slot.epoch(T::EthSpec::slots_per_epoch());
validator_index let head_state = &self.head().beacon_state;
);
if let Some(attestation_duty) = self let state = if epoch == as_epoch(head_state.slot) {
.state StateCow::Borrowed(self.head())
.read() } else {
self.state_at_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))?
};
if let Some(attestation_duty) = state
.as_ref()
.get_attestation_duties(validator_index, RelativeEpoch::Current)? .get_attestation_duties(validator_index, RelativeEpoch::Current)?
{ {
Ok(Some((attestation_duty.slot, attestation_duty.shard))) Ok(Some((attestation_duty.slot, attestation_duty.shard)))
@ -438,15 +487,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
/// Produce an `AttestationData` that is valid for the present `slot` and given `shard`. /// Produce an `AttestationData` that is valid for the given `slot` `shard`.
/// ///
/// Attests to the canonical chain. /// Always attests to the canonical chain.
pub fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, Error> { pub fn produce_attestation_data(
let state = self.state.read(); &self,
shard: u64,
slot: Slot,
) -> Result<AttestationData, Error> {
let state = self.state_at_slot(slot)?;
let head_block_root = self.head().beacon_block_root; let head_block_root = self.head().beacon_block_root;
let head_block_slot = self.head().beacon_block.slot; let head_block_slot = self.head().beacon_block.slot;
self.produce_attestation_data_for_block(shard, head_block_root, head_block_slot, &*state) self.produce_attestation_data_for_block(
shard,
head_block_root,
head_block_slot,
state.as_ref(),
)
} }
/// Produce an `AttestationData` that attests to the chain denoted by `block_root` and `state`. /// Produce an `AttestationData` that attests to the chain denoted by `block_root` and `state`.
@ -765,14 +824,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Accept some exit and queue it for inclusion in an appropriate block. /// Accept some exit and queue it for inclusion in an appropriate block.
pub fn process_voluntary_exit(&self, exit: VoluntaryExit) -> Result<(), ExitValidationError> { pub fn process_voluntary_exit(&self, exit: VoluntaryExit) -> Result<(), ExitValidationError> {
self.op_pool match self.state_now() {
.insert_voluntary_exit(exit, &*self.state.read(), &self.spec) Ok(state) => self
.op_pool
.insert_voluntary_exit(exit, state.as_ref(), &self.spec),
Err(e) => {
error!(
&self.log,
"Unable to process voluntary exit";
"error" => format!("{:?}", e),
"reason" => "no state"
);
Ok(())
}
}
} }
/// Accept some transfer and queue it for inclusion in an appropriate block. /// Accept some transfer and queue it for inclusion in an appropriate block.
pub fn process_transfer(&self, transfer: Transfer) -> Result<(), TransferValidationError> { pub fn process_transfer(&self, transfer: Transfer) -> Result<(), TransferValidationError> {
self.op_pool match self.state_now() {
.insert_transfer(transfer, &*self.state.read(), &self.spec) Ok(state) => self
.op_pool
.insert_transfer(transfer, state.as_ref(), &self.spec),
Err(e) => {
error!(
&self.log,
"Unable to process transfer";
"error" => format!("{:?}", e),
"reason" => "no state"
);
Ok(())
}
}
} }
/// Accept some proposer slashing and queue it for inclusion in an appropriate block. /// Accept some proposer slashing and queue it for inclusion in an appropriate block.
@ -780,8 +863,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self, &self,
proposer_slashing: ProposerSlashing, proposer_slashing: ProposerSlashing,
) -> Result<(), ProposerSlashingValidationError> { ) -> Result<(), ProposerSlashingValidationError> {
match self.state_now() {
Ok(state) => {
self.op_pool self.op_pool
.insert_proposer_slashing(proposer_slashing, &*self.state.read(), &self.spec) .insert_proposer_slashing(proposer_slashing, state.as_ref(), &self.spec)
}
Err(e) => {
error!(
&self.log,
"Unable to process proposer slashing";
"error" => format!("{:?}", e),
"reason" => "no state"
);
Ok(())
}
}
} }
/// Accept some attester slashing and queue it for inclusion in an appropriate block. /// Accept some attester slashing and queue it for inclusion in an appropriate block.
@ -789,8 +885,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self, &self,
attester_slashing: AttesterSlashing<T::EthSpec>, attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Result<(), AttesterSlashingValidationError> { ) -> Result<(), AttesterSlashingValidationError> {
match self.state_now() {
Ok(state) => {
self.op_pool self.op_pool
.insert_attester_slashing(attester_slashing, &*self.state.read(), &self.spec) .insert_attester_slashing(attester_slashing, state.as_ref(), &self.spec)
}
Err(e) => {
error!(
&self.log,
"Unable to process attester slashing";
"error" => format!("{:?}", e),
"reason" => "no state"
);
Ok(())
}
}
} }
/// Accept some block and attempt to add it to block DAG. /// Accept some block and attempt to add it to block DAG.
@ -804,8 +913,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
let finalized_slot = self let finalized_slot = self
.state .head()
.read() .beacon_state
.finalized_checkpoint .finalized_checkpoint
.epoch .epoch
.start_slot(T::EthSpec::slots_per_epoch()); .start_slot(T::EthSpec::slots_per_epoch());
@ -987,20 +1096,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(BlockProcessingOutcome::Processed { block_root }) Ok(BlockProcessingOutcome::Processed { block_root })
} }
/// Produce a new block at the present slot. /// Produce a new block at the given `slot`.
/// ///
/// The produced block will not be inherently valid, it must be signed by a block producer. /// The produced block will not be inherently valid, it must be signed by a block producer.
/// Block signing is out of the scope of this function and should be done by a separate program. /// Block signing is out of the scope of this function and should be done by a separate program.
pub fn produce_block( pub fn produce_block(
&self, &self,
randao_reveal: Signature, randao_reveal: Signature,
slot: Slot,
) -> Result<(BeaconBlock<T::EthSpec>, BeaconState<T::EthSpec>), BlockProductionError> { ) -> Result<(BeaconBlock<T::EthSpec>, BeaconState<T::EthSpec>), BlockProductionError> {
let state = self.state.read().clone(); let state = self
.state_at_slot(slot)
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
let slot = self let slot = self
.slot() .slot()
.map_err(|_| BlockProductionError::UnableToReadSlot)?; .map_err(|_| BlockProductionError::UnableToReadSlot)?;
self.produce_block_on_state(state, slot, randao_reveal) self.produce_block_on_state(state.as_ref().clone(), slot, randao_reveal)
} }
/// Produce a block for some `slot` upon the given `state`. /// Produce a block for some `slot` upon the given `state`.
@ -1169,29 +1282,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
/// Update the canonical head to `new_head`. /// Update the canonical head to `new_head`.
fn update_canonical_head(&self, new_head: CheckPoint<T::EthSpec>) -> Result<(), Error> { fn update_canonical_head(&self, mut new_head: CheckPoint<T::EthSpec>) -> Result<(), Error> {
let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
new_head.beacon_state.build_all_caches(&self.spec)?;
// Update the checkpoint that stores the head of the chain at the time it received the // Update the checkpoint that stores the head of the chain at the time it received the
// block. // block.
*self.canonical_head.write() = new_head; *self.canonical_head.write() = new_head;
// Update the always-at-the-present-slot state we keep around for performance gains.
*self.state.write() = {
let mut state = self.canonical_head.read().beacon_state.clone();
let present_slot = self.slot()?;
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut state, &self.spec)?;
}
state.build_all_caches(&self.spec)?;
state
};
// Save `self` to `self.store`. // Save `self` to `self.store`.
self.persist()?; self.persist()?;

View File

@ -24,6 +24,7 @@ pub enum BeaconChainError {
new_epoch: Epoch, new_epoch: Epoch,
}, },
SlotClockDidNotStart, SlotClockDidNotStart,
NoStateForSlot(Slot),
UnableToFindTargetRoot(Slot), UnableToFindTargetRoot(Slot),
BeaconStateError(BeaconStateError), BeaconStateError(BeaconStateError),
DBInconsistent(String), DBInconsistent(String),
@ -44,6 +45,7 @@ easy_from_to!(SlotProcessingError, BeaconChainError);
pub enum BlockProductionError { pub enum BlockProductionError {
UnableToGetBlockRootFromState, UnableToGetBlockRootFromState,
UnableToReadSlot, UnableToReadSlot,
UnableToProduceAtSlot(Slot),
SlotProcessingError(SlotProcessingError), SlotProcessingError(SlotProcessingError),
BlockProcessingError(BlockProcessingError), BlockProcessingError(BlockProcessingError),
BeaconStateError(BeaconStateError), BeaconStateError(BeaconStateError),

View File

@ -3,7 +3,7 @@ use operation_pool::PersistedOperationPool;
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use store::{DBColumn, Error as StoreError, StoreItem}; use store::{DBColumn, Error as StoreError, StoreItem};
use types::{BeaconState, Hash256}; use types::Hash256;
/// 32-byte key for accessing the `PersistedBeaconChain`. /// 32-byte key for accessing the `PersistedBeaconChain`.
pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA";
@ -13,7 +13,6 @@ pub struct PersistedBeaconChain<T: BeaconChainTypes> {
pub canonical_head: CheckPoint<T::EthSpec>, pub canonical_head: CheckPoint<T::EthSpec>,
pub op_pool: PersistedOperationPool<T::EthSpec>, pub op_pool: PersistedOperationPool<T::EthSpec>,
pub genesis_block_root: Hash256, pub genesis_block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
} }
impl<T: BeaconChainTypes> StoreItem for PersistedBeaconChain<T> { impl<T: BeaconChainTypes> StoreItem for PersistedBeaconChain<T> {

View File

@ -130,7 +130,6 @@ where
/// Does not produce blocks or attestations. /// Does not produce blocks or attestations.
pub fn advance_slot(&self) { pub fn advance_slot(&self) {
self.chain.slot_clock.advance_slot(); self.chain.slot_clock.advance_slot();
self.chain.catchup_state().expect("should catchup state");
} }
/// Extend the `BeaconChain` with some blocks and attestations. Returns the root of the /// Extend the `BeaconChain` with some blocks and attestations. Returns the root of the

View File

@ -322,7 +322,9 @@ fn roundtrip_operation_pool() {
let p: PersistedBeaconChain<CommonTypes<TestForkChoice, MinimalEthSpec>> = let p: PersistedBeaconChain<CommonTypes<TestForkChoice, MinimalEthSpec>> =
harness.chain.store.get(&key).unwrap().unwrap(); harness.chain.store.get(&key).unwrap().unwrap();
let restored_op_pool = p.op_pool.into_operation_pool(&p.state, &harness.spec); let restored_op_pool = p
.op_pool
.into_operation_pool(&p.canonical_head.beacon_state, &harness.spec);
assert_eq!(harness.chain.op_pool, restored_op_pool); assert_eq!(harness.chain.op_pool, restored_op_pool);
} }

View File

@ -143,7 +143,6 @@ where
"catchup_distance" => wall_clock_slot - state_slot, "catchup_distance" => wall_clock_slot - state_slot,
); );
} }
do_state_catchup(&beacon_chain, &log);
let network_config = &client_config.network; let network_config = &client_config.network;
let (network, network_send) = let (network, network_send) =
@ -199,7 +198,7 @@ where
exit.until( exit.until(
interval interval
.for_each(move |_| { .for_each(move |_| {
do_state_catchup(&chain, &log); log_new_slot(&chain, &log);
Ok(()) Ok(())
}) })
@ -229,35 +228,19 @@ impl<T: BeaconChainTypes> Drop for Client<T> {
} }
} }
fn do_state_catchup<T: BeaconChainTypes>(chain: &Arc<BeaconChain<T>>, log: &slog::Logger) { fn log_new_slot<T: BeaconChainTypes>(chain: &Arc<BeaconChain<T>>, log: &slog::Logger) {
// Only attempt to `catchup_state` if we can read the slot clock.
if let Ok(current_slot) = chain.slot() {
let state_catchup_result = chain.catchup_state();
let best_slot = chain.head().beacon_block.slot; let best_slot = chain.head().beacon_block.slot;
let latest_block_root = chain.head().beacon_block_root; let latest_block_root = chain.head().beacon_block_root;
let common = o!( if let Ok(current_slot) = chain.slot() {
info!(
log,
"Slot start";
"skip_slots" => current_slot.saturating_sub(best_slot), "skip_slots" => current_slot.saturating_sub(best_slot),
"best_block_root" => format!("{}", latest_block_root), "best_block_root" => format!("{}", latest_block_root),
"best_block_slot" => best_slot, "best_block_slot" => best_slot,
"slot" => current_slot, "slot" => current_slot,
);
if let Err(e) = state_catchup_result {
error!(
log,
"State catchup failed";
"error" => format!("{:?}", e),
common
) )
} else {
info!(
log,
"Slot start";
common
)
}
} else { } else {
error!( error!(
log, log,

View File

@ -39,12 +39,7 @@ pub fn get_validator_duties<T: BeaconChainTypes + 'static>(req: Request<Body>) -
.extensions() .extensions()
.get::<Arc<BeaconChain<T>>>() .get::<Arc<BeaconChain<T>>>()
.ok_or_else(|| ApiError::ServerError("Beacon chain extension missing".to_string()))?; .ok_or_else(|| ApiError::ServerError("Beacon chain extension missing".to_string()))?;
let _ = beacon_chain let head_state = &beacon_chain.head().beacon_state;
.ensure_state_caches_are_built()
.map_err(|e| ApiError::ServerError(format!("Unable to build state caches: {:?}", e)))?;
let head_state = beacon_chain
.speculative_state()
.expect("This is legacy code and should be removed.");
// Parse and check query parameters // Parse and check query parameters
let query = UrlQuery::from_request(&req)?; let query = UrlQuery::from_request(&req)?;

View File

@ -14,7 +14,7 @@ use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decode, Encode}; use ssz::{ssz_encode, Decode, Encode};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::Attestation; use types::{Attestation, Slot};
#[derive(Clone)] #[derive(Clone)]
pub struct AttestationServiceInstance<T: BeaconChainTypes> { pub struct AttestationServiceInstance<T: BeaconChainTypes> {
@ -37,49 +37,13 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
req.get_slot() req.get_slot()
); );
// verify the slot, drop lock on state afterwards
{
let slot_requested = req.get_slot();
// TODO: this whole module is legacy and not maintained well.
let state = &self
.chain
.speculative_state()
.expect("This is legacy code and should be removed");
// Start by performing some checks
// Check that the AttestationData is for the current slot (otherwise it will not be valid)
if slot_requested > state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::OutOfRange,
Some(
"AttestationData request for a slot that is in the future.".to_string(),
),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
// currently cannot handle past slots. TODO: Handle this case
else if slot_requested < state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("AttestationData request for a slot that is in the past.".to_string()),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
}
// Then get the AttestationData from the beacon chain // Then get the AttestationData from the beacon chain
let shard = req.get_shard(); let shard = req.get_shard();
let attestation_data = match self.chain.produce_attestation_data(shard) { let slot_requested = req.get_slot();
let attestation_data = match self
.chain
.produce_attestation_data(shard, Slot::from(slot_requested))
{
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
// Could not produce an attestation // Could not produce an attestation

View File

@ -35,7 +35,7 @@ impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
// decode the request // decode the request
// TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336 // TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336
let _requested_slot = Slot::from(req.get_slot()); let requested_slot = Slot::from(req.get_slot());
let randao_reveal = match Signature::from_ssz_bytes(req.get_randao_reveal()) { let randao_reveal = match Signature::from_ssz_bytes(req.get_randao_reveal()) {
Ok(reveal) => reveal, Ok(reveal) => reveal,
Err(_) => { Err(_) => {
@ -51,7 +51,7 @@ impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
} }
}; };
let produced_block = match self.chain.produce_block(randao_reveal) { let produced_block = match self.chain.produce_block(randao_reveal, requested_slot) {
Ok((block, _state)) => block, Ok((block, _state)) => block,
Err(e) => { Err(e) => {
// could not produce a block // could not produce a block

View File

@ -30,10 +30,7 @@ impl<T: BeaconChainTypes> ValidatorService for ValidatorServiceInstance<T> {
let spec = &self.chain.spec; let spec = &self.chain.spec;
// TODO: this whole module is legacy and not maintained well. // TODO: this whole module is legacy and not maintained well.
let state = &self let state = &self.chain.head().beacon_state;
.chain
.speculative_state()
.expect("This is legacy code and should be removed");
let epoch = Epoch::from(req.get_epoch()); let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new(); let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators(); let resp_validators = resp.mut_active_validators();