Refactor block_processing

- Add the rayon library for parallelization
- Move from std::sync::Rwlock to rayon::RwLock
- Add `state` field to BeaconChain
- Fix major bug in attestation validator where justified slot was
incorrectly looked up.
This commit is contained in:
Paul Hauner 2019-02-01 14:48:09 +11:00
parent b6b738e83a
commit 20e45b3369
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
20 changed files with 236 additions and 309 deletions

View File

@ -13,6 +13,7 @@ failure = "0.1"
failure_derive = "0.1" failure_derive = "0.1"
genesis = { path = "../../eth2/genesis" } genesis = { path = "../../eth2/genesis" }
hashing = { path = "../../eth2/utils/hashing" } hashing = { path = "../../eth2/utils/hashing" }
parking_lot = "0.7"
log = "0.4" log = "0.4"
env_logger = "0.6" env_logger = "0.6"
serde = "1.0" serde = "1.0"

View File

@ -19,15 +19,9 @@ where
&self, &self,
free_attestation: FreeAttestation, free_attestation: FreeAttestation,
) -> Result<ProcessOutcome, Error> { ) -> Result<ProcessOutcome, Error> {
let present_slot = self
.present_slot()
.ok_or_else(|| Error::PresentSlotUnknown)?;
let state = self.state(present_slot)?;
self.attestation_aggregator self.attestation_aggregator
.write() .write()
.expect("Aggregator unlock failed.") .process_free_attestation(&self.state.read(), &free_attestation, &self.spec)
.process_free_attestation(&state, &free_attestation, &self.spec)
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
} }

View File

@ -13,30 +13,27 @@ where
T: ClientDB, T: ClientDB,
U: SlotClock, U: SlotClock,
{ {
pub fn produce_attestation_data( pub fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, Error> {
&self,
slot: u64,
shard: u64,
) -> Result<AttestationData, Error> {
let present_slot = self
.present_slot()
.ok_or_else(|| Error::PresentSlotUnknown)?;
let state = self.state(present_slot).map_err(|_| Error::StateError)?;
let justified_slot = self.justified_slot(); let justified_slot = self.justified_slot();
let justified_block_root = self
let justified_block_root = *state .state
.read()
.get_block_root(justified_slot, &self.spec) .get_block_root(justified_slot, &self.spec)
.ok_or_else(|| Error::SlotTooOld)?; .ok_or_else(|| Error::SlotTooOld)?
.clone();
let head_slot = self.head().beacon_block.slot; let epoch_boundary_root = self
let previous_epoch_start_slot = head_slot - (head_slot % self.spec.epoch_length); .state
let epoch_boundary_root = *state .read()
.get_block_root(previous_epoch_start_slot, &self.spec) .get_block_root(
.ok_or_else(|| Error::SlotTooOld)?; self.state.read().current_epoch_start_slot(&self.spec),
&self.spec,
)
.ok_or_else(|| Error::SlotTooOld)?
.clone();
Ok(AttestationData { Ok(AttestationData {
slot, slot: self.state.read().slot,
shard, shard,
beacon_block_root: self.head().beacon_block_root.clone(), beacon_block_root: self.head().beacon_block_root.clone(),
epoch_boundary_root, epoch_boundary_root,

View File

@ -28,18 +28,12 @@ where
U: SlotClock, U: SlotClock,
{ {
pub fn insert_latest_attestation_target(&self, validator_index: u64, block_root: Hash256) { pub fn insert_latest_attestation_target(&self, validator_index: u64, block_root: Hash256) {
let mut targets = self let mut targets = self.latest_attestation_targets.write();
.latest_attestation_targets
.write()
.expect("CRITICAL: CanonicalHead poisioned.");
targets.insert(validator_index, block_root); targets.insert(validator_index, block_root);
} }
pub fn get_latest_attestation_target(&self, validator_index: u64) -> Option<Hash256> { pub fn get_latest_attestation_target(&self, validator_index: u64) -> Option<Hash256> {
let targets = self let targets = self.latest_attestation_targets.read();
.latest_attestation_targets
.read()
.expect("CRITICAL: CanonicalHead poisioned.");
match targets.get(validator_index) { match targets.get(validator_index) {
Some(hash) => Some(hash.clone()), Some(hash) => Some(hash.clone()),

View File

@ -1,9 +1,9 @@
use super::state_transition::Error as TransitionError;
use super::{BeaconChain, ClientDB, DBError, SlotClock}; use super::{BeaconChain, ClientDB, DBError, SlotClock};
use log::debug; use log::debug;
use slot_clock::{SystemTimeSlotClockError, TestingSlotClockError}; use slot_clock::{SystemTimeSlotClockError, TestingSlotClockError};
use ssz::{ssz_encode, Encodable}; use ssz::{ssz_encode, Encodable};
use types::{ use types::{
beacon_state::{BlockProcessingError, SlotProcessingError},
readers::{BeaconBlockReader, BeaconStateReader}, readers::{BeaconBlockReader, BeaconStateReader},
Hash256, Hash256,
}; };
@ -16,7 +16,6 @@ pub enum ValidBlock {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum InvalidBlock { pub enum InvalidBlock {
FutureSlot, FutureSlot,
StateTransitionFailed(TransitionError),
StateRootMismatch, StateRootMismatch,
} }
@ -29,31 +28,16 @@ pub enum Outcome {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
DBError(String), DBError(String),
SlotClockError(SystemTimeSlotClockError),
NotImplemented,
PresentSlotIsNone,
UnableToDecodeBlock, UnableToDecodeBlock,
PresentSlotIsNone,
SlotClockError(SystemTimeSlotClockError),
MissingParentState(Hash256), MissingParentState(Hash256),
InvalidParentState(Hash256), InvalidParentState(Hash256),
MissingBeaconBlock(Hash256), MissingBeaconBlock(Hash256),
InvalidBeaconBlock(Hash256), InvalidBeaconBlock(Hash256),
MissingParentBlock(Hash256), MissingParentBlock(Hash256),
NoBlockProducer, SlotProcessingError(SlotProcessingError),
StateSlotMismatch, PerBlockProcessingError(BlockProcessingError),
BadBlockSignature,
BadRandaoSignature,
MaxProposerSlashingsExceeded,
BadProposerSlashing,
MaxAttestationsExceeded,
BadAttestation,
NoBlockRoot,
MaxDepositsExceeded,
MaxExitsExceeded,
BadExit,
BadCustodyReseeds,
BadCustodyChallenges,
BadCustodyResponses,
} }
impl<T, U> BeaconChain<T, U> impl<T, U> BeaconChain<T, U>
@ -99,14 +83,13 @@ where
.into_beacon_state() .into_beacon_state()
.ok_or(Error::InvalidParentState(parent_state_root))?; .ok_or(Error::InvalidParentState(parent_state_root))?;
let state = match self.state_transition(parent_state, &block) { let mut state = parent_state;
Ok(state) => state,
Err(error) => { for _ in state.slot..present_slot {
return Ok(Outcome::InvalidBlock(InvalidBlock::StateTransitionFailed( state.per_slot_processing(parent_block_root.clone(), &self.spec)?;
error, }
)));
} state.per_block_processing(&block, &self.spec)?;
};
let state_root = state.canonical_root(); let state_root = state.canonical_root();
@ -131,6 +114,7 @@ where
state.clone(), state.clone(),
state_root.clone(), state_root.clone(),
); );
*self.state.write() = state.clone();
} }
// The block was sucessfully processed. // The block was sucessfully processed.
@ -144,6 +128,18 @@ impl From<DBError> for Error {
} }
} }
impl From<SlotProcessingError> for Error {
fn from(e: SlotProcessingError) -> Error {
Error::SlotProcessingError(e)
}
}
impl From<BlockProcessingError> for Error {
fn from(e: BlockProcessingError) -> Error {
Error::PerBlockProcessingError(e)
}
}
impl From<TestingSlotClockError> for Error { impl From<TestingSlotClockError> for Error {
fn from(_: TestingSlotClockError) -> Error { fn from(_: TestingSlotClockError) -> Error {
unreachable!(); // Testing clock never throws an error. unreachable!(); // Testing clock never throws an error.

View File

@ -1,10 +1,9 @@
use super::state_transition::Error as TransitionError;
use super::{BeaconChain, ClientDB, DBError, SlotClock}; use super::{BeaconChain, ClientDB, DBError, SlotClock};
use bls::Signature; use bls::Signature;
use log::debug; use log::debug;
use slot_clock::TestingSlotClockError; use slot_clock::{SystemTimeSlotClockError, TestingSlotClockError};
use types::{ use types::{
beacon_state::SlotProcessingError, beacon_state::{BlockProcessingError, SlotProcessingError},
readers::{BeaconBlockReader, BeaconStateReader}, readers::{BeaconBlockReader, BeaconStateReader},
BeaconBlock, BeaconBlockBody, BeaconState, Eth1Data, Hash256, BeaconBlock, BeaconBlockBody, BeaconState, Eth1Data, Hash256,
}; };
@ -12,9 +11,10 @@ use types::{
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
DBError(String), DBError(String),
StateTransitionError(TransitionError),
PresentSlotIsNone, PresentSlotIsNone,
SlotProcessingError(SlotProcessingError), SlotProcessingError(SlotProcessingError),
PerBlockProcessingError(BlockProcessingError),
SlotClockError(SystemTimeSlotClockError),
} }
impl<T, U> BeaconChain<T, U> impl<T, U> BeaconChain<T, U>
@ -29,43 +29,31 @@ where
where where
Error: From<<U>::Error>, Error: From<<U>::Error>,
{ {
// TODO: allow producing a block from a previous (or future?) slot. debug!("Starting block production...");
let present_slot = self
.slot_clock
.present_slot()
.map_err(|e| e.into())?
.ok_or(Error::PresentSlotIsNone)?;
debug!("Producing block for slot {}...", present_slot); let mut state = self.state.read().clone();
let parent_root = self.head().beacon_block_root; debug!("Finding attesatations for new block...");
let parent_block_reader = self
.block_store
.get_reader(&parent_root)?
.ok_or_else(|| Error::DBError("Block not found.".to_string()))?;
let parent_state = self
.state_store
.get_reader(&parent_block_reader.state_root())?
.ok_or_else(|| Error::DBError("State not found.".to_string()))?
.into_beacon_state()
.ok_or_else(|| Error::DBError("State invalid.".to_string()))?;
debug!("Finding attesatations for block..."); let attestations = self
.attestation_aggregator
.read()
.get_attestations_for_state(&state, &self.spec);
let attestations = { debug!(
let mut next_state = parent_state.clone(); "Inserting {} attestation(s) into new block.",
next_state.per_slot_processing(Hash256::zero(), &self.spec)?; attestations.len()
self.attestation_aggregator );
.read()
.unwrap()
.get_attestations_for_state(&next_state, &self.spec)
};
debug!("Found {} attestation(s).", attestations.len()); let parent_root = state
.get_block_root(state.slot.saturating_sub(1), &self.spec)
// TODO: fix unwrap
.unwrap()
.clone();
let mut block = BeaconBlock { let mut block = BeaconBlock {
slot: present_slot, slot: state.slot,
parent_root: parent_root.clone(), parent_root,
state_root: Hash256::zero(), // Updated after the state is calculated. state_root: Hash256::zero(), // Updated after the state is calculated.
randao_reveal: randao_reveal, randao_reveal: randao_reveal,
eth1_data: Eth1Data { eth1_data: Eth1Data {
@ -86,8 +74,8 @@ where
}, },
}; };
let state = state.per_block_processing_without_verifying_block_signature(&block, &self.spec)?;
self.state_transition_without_verifying_block_signature(parent_state, &block)?;
let state_root = state.canonical_root(); let state_root = state.canonical_root();
block.state_root = state_root; block.state_root = state_root;
@ -104,9 +92,15 @@ impl From<DBError> for Error {
} }
} }
impl From<TransitionError> for Error { impl From<SlotProcessingError> for Error {
fn from(e: TransitionError) -> Error { fn from(e: SlotProcessingError) -> Error {
Error::StateTransitionError(e) Error::SlotProcessingError(e)
}
}
impl From<BlockProcessingError> for Error {
fn from(e: BlockProcessingError) -> Error {
Error::PerBlockProcessingError(e)
} }
} }
@ -116,8 +110,8 @@ impl From<TestingSlotClockError> for Error {
} }
} }
impl From<SlotProcessingError> for Error { impl From<SystemTimeSlotClockError> for Error {
fn from(e: SlotProcessingError) -> Error { fn from(e: SystemTimeSlotClockError) -> Error {
Error::SlotProcessingError(e) Error::SlotClockError(e)
} }
} }

View File

@ -1,5 +1,5 @@
use crate::{BeaconChain, CheckPoint, ClientDB, SlotClock}; use crate::{BeaconChain, CheckPoint, ClientDB, SlotClock};
use std::sync::RwLockReadGuard; use parking_lot::RwLockReadGuard;
use types::{beacon_state::SlotProcessingError, BeaconBlock, BeaconState, Hash256}; use types::{beacon_state::SlotProcessingError, BeaconBlock, BeaconState, Hash256};
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -20,10 +20,7 @@ where
new_beacon_state: BeaconState, new_beacon_state: BeaconState,
new_beacon_state_root: Hash256, new_beacon_state_root: Hash256,
) { ) {
let mut head = self let mut head = self.canonical_head.write();
.canonical_head
.write()
.expect("CRITICAL: CanonicalHead poisioned.");
head.update( head.update(
new_beacon_block, new_beacon_block,
new_beacon_block_root, new_beacon_block_root,
@ -33,34 +30,18 @@ where
} }
pub fn head(&self) -> RwLockReadGuard<CheckPoint> { pub fn head(&self) -> RwLockReadGuard<CheckPoint> {
self.canonical_head self.canonical_head.read()
.read()
.expect("CRITICAL: CanonicalHead poisioned.")
} }
pub fn state(&self, slot: u64) -> Result<BeaconState, Error> { pub fn advance_state(&self, slot: u64) -> Result<(), SlotProcessingError> {
let mut state = self let state_slot = self.state.read().slot;
.canonical_head let head_block_root = self.head().beacon_block_root;
.read() for _ in state_slot..slot {
.expect("CRITICAL: CanonicalHead poisioned.") self.state
.beacon_state .write()
.clone(); .per_slot_processing(head_block_root.clone(), &self.spec)?;
let previous_block_root = self
.canonical_head
.read()
.expect("CRITICAL: CanonicalHead poisioned.")
.beacon_block_root
.clone();
match slot.checked_sub(state.slot) {
None => Err(Error::PastSlot),
Some(distance) => {
for _ in 0..distance {
state.per_slot_processing(previous_block_root.clone(), &self.spec)?
}
Ok(state)
}
} }
Ok(())
} }
} }

View File

@ -1,5 +1,5 @@
use crate::{BeaconChain, CheckPoint, ClientDB, SlotClock}; use crate::{BeaconChain, CheckPoint, ClientDB, SlotClock};
use std::sync::RwLockReadGuard; use parking_lot::RwLockReadGuard;
use types::{BeaconBlock, BeaconState, Hash256}; use types::{BeaconBlock, BeaconState, Hash256};
impl<T, U> BeaconChain<T, U> impl<T, U> BeaconChain<T, U>
@ -14,10 +14,7 @@ where
new_beacon_state: BeaconState, new_beacon_state: BeaconState,
new_beacon_state_root: Hash256, new_beacon_state_root: Hash256,
) { ) {
let mut finalized_head = self let mut finalized_head = self.finalized_head.write();
.finalized_head
.write()
.expect("CRITICAL: finalized_head poisioned.");
finalized_head.update( finalized_head.update(
new_beacon_block, new_beacon_block,
new_beacon_block_root, new_beacon_block_root,
@ -27,8 +24,6 @@ where
} }
pub fn finalized_head(&self) -> RwLockReadGuard<CheckPoint> { pub fn finalized_head(&self) -> RwLockReadGuard<CheckPoint> {
self.finalized_head self.finalized_head.read()
.read()
.expect("CRITICAL: finalized_head poisioned.")
} }
} }

View File

@ -28,10 +28,7 @@ where
} }
pub fn proposer_slots(&self, validator_index: usize) -> Option<u64> { pub fn proposer_slots(&self, validator_index: usize) -> Option<u64> {
let slot = self.present_slot()?; if let Some(validator) = self.state.read().validator_registry.get(validator_index) {
let state = self.state(slot).ok()?;
if let Some(validator) = state.validator_registry.get(validator_index) {
Some(validator.proposer_slots) Some(validator.proposer_slots)
} else { } else {
None None
@ -46,35 +43,22 @@ where
} }
pub fn block_proposer(&self, slot: u64) -> Result<usize, CommitteesError> { pub fn block_proposer(&self, slot: u64) -> Result<usize, CommitteesError> {
// TODO: fix unwrap let index = self
let present_slot = self.present_slot().unwrap(); .state
// TODO: fix unwrap .read()
let state = self.state(present_slot).unwrap(); .get_beacon_proposer_index(slot, &self.spec)?;
let index = state.get_beacon_proposer_index(slot, &self.spec)?;
Ok(index) Ok(index)
} }
pub fn justified_slot(&self) -> u64 { pub fn justified_slot(&self) -> u64 {
// TODO: fix unwrap self.state.read().justified_slot
let present_slot = self.present_slot().unwrap();
// TODO: fix unwrap
let state = self.state(present_slot).unwrap();
state.justified_slot
/*
self.justified_head
.read()
.expect("Justified head poisoned")
.beacon_block
.slot
*/
} }
pub fn validator_attestion_slot_and_shard(&self, validator_index: usize) -> Option<(u64, u64)> { pub fn validator_attestion_slot_and_shard(&self, validator_index: usize) -> Option<(u64, u64)> {
let present_slot = self.present_slot()?; let (slot, shard, _committee) = self
let state = self.state(present_slot).ok()?; .state
.read()
let (slot, shard, _committee) = state
.attestation_slot_and_shard_for_validator(validator_index, &self.spec) .attestation_slot_and_shard_for_validator(validator_index, &self.spec)
.ok()?; .ok()?;
Some((slot, shard)) Some((slot, shard))

View File

@ -10,7 +10,7 @@ pub mod dump;
mod finalized_head; mod finalized_head;
mod info; mod info;
mod lmd_ghost; mod lmd_ghost;
mod state_transition; // mod state_transition;
use self::attestation_targets::AttestationTargets; use self::attestation_targets::AttestationTargets;
use self::block_graph::BlockGraph; use self::block_graph::BlockGraph;
@ -20,9 +20,10 @@ use db::{
ClientDB, DBError, ClientDB, DBError,
}; };
use genesis::{genesis_beacon_block, genesis_beacon_state, GenesisError}; use genesis::{genesis_beacon_block, genesis_beacon_state, GenesisError};
use parking_lot::RwLock;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::ssz_encode; use ssz::ssz_encode;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use types::{BeaconBlock, BeaconState, ChainSpec, Hash256}; use types::{BeaconBlock, BeaconState, ChainSpec, Hash256};
pub use self::block_processing::Outcome as BlockProcessingOutcome; pub use self::block_processing::Outcome as BlockProcessingOutcome;
@ -79,6 +80,7 @@ pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock> {
canonical_head: RwLock<CheckPoint>, canonical_head: RwLock<CheckPoint>,
finalized_head: RwLock<CheckPoint>, finalized_head: RwLock<CheckPoint>,
justified_head: RwLock<CheckPoint>, justified_head: RwLock<CheckPoint>,
pub state: RwLock<BeaconState>,
pub latest_attestation_targets: RwLock<AttestationTargets>, pub latest_attestation_targets: RwLock<AttestationTargets>,
pub spec: ChainSpec, pub spec: ChainSpec,
} }
@ -137,6 +139,7 @@ where
slot_clock, slot_clock,
block_graph, block_graph,
attestation_aggregator, attestation_aggregator,
state: RwLock::new(genesis_state.clone()),
justified_head, justified_head,
finalized_head, finalized_head,
canonical_head, canonical_head,

View File

@ -1,64 +1,80 @@
use criterion::Criterion; use criterion::Criterion;
use criterion::{criterion_group, criterion_main}; use criterion::{black_box, criterion_group, criterion_main, Benchmark};
use env_logger::{Builder, Env};
use test_harness::BeaconChainHarness; use test_harness::BeaconChainHarness;
use types::ChainSpec; use types::{ChainSpec, Hash256};
fn mid_epoch_state_transition(c: &mut Criterion) { fn mid_epoch_state_transition(c: &mut Criterion) {
let validator_count = 2; Builder::from_env(Env::default().default_filter_or("debug")).init();
let validator_count = 1000;
let mut rig = BeaconChainHarness::new(ChainSpec::foundation(), validator_count); let mut rig = BeaconChainHarness::new(ChainSpec::foundation(), validator_count);
let two_and_half_epochs = (rig.spec.epoch_length * 2) + (rig.spec.epoch_length / 2); let epoch_depth = (rig.spec.epoch_length * 2) + (rig.spec.epoch_length / 2);
for _ in 0..two_and_half_epochs { for _ in 0..epoch_depth {
rig.advance_chain_with_block(); rig.advance_chain_with_block();
} }
let block = rig.advance_chain_without_block(); let state = rig.beacon_chain.state.read().clone();
let state = rig.beacon_chain.canonical_head().beacon_state.clone();
assert!((state.slot + 1) % rig.spec.epoch_length != 0);
c.bench_function("mid-epoch state transition 10k validators", move |b| { c.bench_function("mid-epoch state transition 10k validators", move |b| {
let block = block.clone();
let state = state.clone(); let state = state.clone();
b.iter(|| { b.iter(|| {
rig.beacon_chain let mut state = state.clone();
.state_transition(state.clone(), &block.clone()) black_box(state.per_slot_processing(Hash256::zero(), &rig.spec))
}) })
}); });
} }
fn epoch_boundary_state_transition(c: &mut Criterion) { fn epoch_boundary_state_transition(c: &mut Criterion) {
let validator_count = 10_000; // Builder::from_env(Env::default().default_filter_or("debug")).init();
let validator_count = 10000;
let mut rig = BeaconChainHarness::new(ChainSpec::foundation(), validator_count); let mut rig = BeaconChainHarness::new(ChainSpec::foundation(), validator_count);
let three_epochs = rig.spec.epoch_length * 3; let epoch_depth = rig.spec.epoch_length * 2;
for _ in 0..(three_epochs - 1) { for _ in 0..(epoch_depth - 1) {
rig.advance_chain_with_block(); rig.advance_chain_with_block();
} }
let state = rig.beacon_chain.canonical_head().beacon_state.clone(); let state = rig.beacon_chain.state.read().clone();
assert_eq!(
state.slot % rig.spec.epoch_length,
rig.spec.epoch_length - 1,
);
let block = rig.advance_chain_without_block();
c.bench_function("epoch boundary state transition 10k validators", move |b| { assert_eq!((state.slot + 1) % rig.spec.epoch_length, 0);
let block = block.clone();
c.bench(
"routines",
Benchmark::new("routine_1", move |b| {
let state = state.clone();
b.iter(|| {
let mut state = state.clone();
black_box(black_box(
state.per_slot_processing(Hash256::zero(), &rig.spec),
))
})
})
.sample_size(5),
);
/*
c.bench_function("mid-epoch state transition 10k validators", move |b| {
let state = state.clone(); let state = state.clone();
b.iter(|| { b.iter(|| {
let state = rig let mut state = state.clone();
.beacon_chain black_box(black_box(
.state_transition(state.clone(), &block.clone()) state.per_slot_processing(Hash256::zero(), &rig.spec),
.unwrap(); ))
assert_eq!(state.slot % rig.spec.epoch_length, 0);
}) })
}); });
*/
} }
criterion_group!( criterion_group!(
benches, benches,
mid_epoch_state_transition, // mid_epoch_state_transition,
epoch_boundary_state_transition epoch_boundary_state_transition
); );
criterion_main!(benches); criterion_main!(benches);

View File

@ -113,6 +113,7 @@ impl BeaconChainHarness {
debug!("Incrementing BeaconChain slot to {}.", slot); debug!("Incrementing BeaconChain slot to {}.", slot);
self.beacon_chain.slot_clock.set_slot(slot); self.beacon_chain.slot_clock.set_slot(slot);
self.beacon_chain.advance_state(slot).unwrap();
slot slot
} }
@ -125,8 +126,8 @@ impl BeaconChainHarness {
let attesting_validators = self let attesting_validators = self
.beacon_chain .beacon_chain
.state(present_slot) .state
.unwrap() .read()
.get_crosslink_committees_at_slot(present_slot, &self.spec) .get_crosslink_committees_at_slot(present_slot, &self.spec)
.unwrap() .unwrap()
.iter() .iter()
@ -195,13 +196,27 @@ impl BeaconChainHarness {
self.beacon_chain.process_block(block).unwrap(); self.beacon_chain.process_block(block).unwrap();
debug!("...block processed by BeaconChain."); debug!("...block processed by BeaconChain.");
debug!("Producing free attestations...");
// Produce new attestations. // Produce new attestations.
let free_attestations = self.gather_free_attesations(); let free_attestations = self.gather_free_attesations();
debug!("Processing free attestations...");
free_attestations.par_iter().for_each(|free_attestation| {
self.beacon_chain
.process_free_attestation(free_attestation.clone())
.unwrap();
});
debug!("Free attestations processed.");
/*
for free_attestation in free_attestations { for free_attestation in free_attestations {
self.beacon_chain self.beacon_chain
.process_free_attestation(free_attestation) .process_free_attestation(free_attestation)
.unwrap(); .unwrap();
} }
*/
} }
pub fn chain_dump(&self) -> Result<Vec<SlotDump>, DumpError> { pub fn chain_dump(&self) -> Result<Vec<SlotDump>, DumpError> {

View File

@ -13,10 +13,10 @@ where
{ {
fn produce_attestation_data( fn produce_attestation_data(
&self, &self,
slot: u64, _slot: u64,
shard: u64, shard: u64,
) -> Result<Option<AttestationData>, NodeError> { ) -> Result<Option<AttestationData>, NodeError> {
match self.beacon_chain.produce_attestation_data(slot, shard) { match self.beacon_chain.produce_attestation_data(shard) {
Ok(attestation_data) => Ok(Some(attestation_data)), Ok(attestation_data) => Ok(Some(attestation_data)),
Err(e) => Err(NodeError::RemoteFailure(format!("{:?}", e))), Err(e) => Err(NodeError::RemoteFailure(format!("{:?}", e))),
} }

View File

@ -2,6 +2,7 @@ use attester::{Attester, Error as AttestationPollError};
use beacon_chain::BeaconChain; use beacon_chain::BeaconChain;
use block_producer::{BlockProducer, Error as BlockPollError}; use block_producer::{BlockProducer, Error as BlockPollError};
use db::MemoryDB; use db::MemoryDB;
use log::trace;
use signer::TestSigner; use signer::TestSigner;
use slot_clock::TestingSlotClock; use slot_clock::TestingSlotClock;
use std::sync::Arc; use std::sync::Arc;

View File

@ -24,7 +24,7 @@ fn it_can_build_on_genesis_block() {
fn it_can_produce_past_first_epoch_boundary() { fn it_can_produce_past_first_epoch_boundary() {
Builder::from_env(Env::default().default_filter_or("debug")).init(); Builder::from_env(Env::default().default_filter_or("debug")).init();
let validator_count = 100; let validator_count = 128 * 1024;
debug!("Starting harness build..."); debug!("Starting harness build...");

View File

@ -12,6 +12,7 @@ hashing = { path = "../utils/hashing" }
honey-badger-split = { path = "../utils/honey-badger-split" } honey-badger-split = { path = "../utils/honey-badger-split" }
integer-sqrt = "0.1" integer-sqrt = "0.1"
log = "0.4" log = "0.4"
rayon = "1.0"
rand = "0.5.5" rand = "0.5.5"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View File

@ -61,7 +61,7 @@ impl BeaconState {
attestation.data.slot + spec.epoch_length >= self.slot, attestation.data.slot + spec.epoch_length >= self.slot,
Error::IncludedTooLate Error::IncludedTooLate
); );
if self.justified_slot >= self.slot - (self.slot % spec.epoch_length) { if attestation.data.slot >= self.current_epoch_start_slot(spec) {
ensure!( ensure!(
attestation.data.justified_slot == self.justified_slot, attestation.data.justified_slot == self.justified_slot,
Error::WrongJustifiedSlot Error::WrongJustifiedSlot

View File

@ -1,19 +1,12 @@
use super::{BeaconChain, ClientDB, DBError, SlotClock}; use crate::{
beacon_state::{AttestationValidationError, CommitteesError, SlotProcessingError},
readers::BeaconBlockReader,
BeaconBlock, BeaconState, ChainSpec, Exit, Fork, Hash256, PendingAttestation,
};
use bls::{PublicKey, Signature}; use bls::{PublicKey, Signature};
use hashing::hash; use hashing::hash;
use log::debug; use log::debug;
use slot_clock::{SystemTimeSlotClockError, TestingSlotClockError};
use ssz::{ssz_encode, TreeHash}; use ssz::{ssz_encode, TreeHash};
use types::{
beacon_state::{AttestationValidationError, CommitteesError, SlotProcessingError},
readers::BeaconBlockReader,
BeaconBlock, BeaconState, Exit, Fork, Hash256, PendingAttestation,
};
// TODO: define elsehwere.
const DOMAIN_PROPOSAL: u64 = 2;
const DOMAIN_EXIT: u64 = 3;
const DOMAIN_RANDAO: u64 = 4;
macro_rules! ensure { macro_rules! ensure {
($condition: expr, $result: expr) => { ($condition: expr, $result: expr) => {
@ -23,6 +16,11 @@ macro_rules! ensure {
}; };
} }
// TODO: define elsehwere.
const DOMAIN_PROPOSAL: u64 = 2;
const DOMAIN_EXIT: u64 = 3;
const DOMAIN_RANDAO: u64 = 4;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
DBError(String), DBError(String),
@ -50,71 +48,50 @@ pub enum Error {
BadCustodyReseeds, BadCustodyReseeds,
BadCustodyChallenges, BadCustodyChallenges,
BadCustodyResponses, BadCustodyResponses,
SlotClockError(SystemTimeSlotClockError),
CommitteesError(CommitteesError), CommitteesError(CommitteesError),
SlotProcessingError(SlotProcessingError), SlotProcessingError(SlotProcessingError),
} }
impl<T, U> BeaconChain<T, U> impl BeaconState {
where pub fn per_block_processing(
T: ClientDB, &mut self,
U: SlotClock,
{
pub fn state_transition(
&self,
state: BeaconState,
block: &BeaconBlock, block: &BeaconBlock,
) -> Result<BeaconState, Error> { spec: &ChainSpec,
self.internal_state_transition(state, block, true) ) -> Result<(), Error> {
self.per_block_processing_signature_optional(block, true, spec)
} }
pub fn state_transition_without_verifying_block_signature( pub fn per_block_processing_without_verifying_block_signature(
&self, &mut self,
state: BeaconState,
block: &BeaconBlock, block: &BeaconBlock,
) -> Result<BeaconState, Error> { spec: &ChainSpec,
self.internal_state_transition(state, block, false) ) -> Result<(), Error> {
self.per_block_processing_signature_optional(block, false, spec)
} }
fn internal_state_transition( fn per_block_processing_signature_optional(
&self, &mut self,
mut state: BeaconState,
block: &BeaconBlock, block: &BeaconBlock,
verify_block_signature: bool, verify_block_signature: bool,
) -> Result<BeaconState, Error> { spec: &ChainSpec,
ensure!(state.slot < block.slot, Error::StateAlreadyTransitioned); ) -> Result<(), Error> {
ensure!(block.slot() == self.slot, Error::StateSlotMismatch);
debug!(
"Starting state transition from slot {} to {}...",
state.slot, block.slot
);
for _ in state.slot..block.slot {
state.per_slot_processing(block.parent_root.clone(), &self.spec)?;
}
/*
* Slot
*/
ensure!(block.slot() == state.slot, Error::StateSlotMismatch);
/* /*
* Proposer Signature * Proposer Signature
*/ */
let block_proposer_index = self
let block_proposer_index = state .get_beacon_proposer_index(block.slot, spec)
.get_beacon_proposer_index(block.slot, &self.spec)
.map_err(|_| Error::NoBlockProducer)?; .map_err(|_| Error::NoBlockProducer)?;
let block_proposer = &state.validator_registry[block_proposer_index]; let block_proposer = &self.validator_registry[block_proposer_index];
if verify_block_signature { if verify_block_signature {
ensure!( ensure!(
bls_verify( bls_verify(
&block_proposer.pubkey, &block_proposer.pubkey,
&block.proposal_root(&self.spec)[..], &block.proposal_root(spec)[..],
&block.signature, &block.signature,
get_domain(&state.fork_data, state.slot, DOMAIN_PROPOSAL) get_domain(&self.fork_data, self.slot, DOMAIN_PROPOSAL)
), ),
Error::BadBlockSignature Error::BadBlockSignature
); );
@ -123,49 +100,42 @@ where
/* /*
* RANDAO * RANDAO
*/ */
ensure!( ensure!(
bls_verify( bls_verify(
&block_proposer.pubkey, &block_proposer.pubkey,
&ssz_encode(&block_proposer.proposer_slots), &ssz_encode(&block_proposer.proposer_slots),
&block.randao_reveal, &block.randao_reveal,
get_domain(&state.fork_data, state.slot, DOMAIN_RANDAO) get_domain(&self.fork_data, self.slot, DOMAIN_RANDAO)
), ),
Error::BadRandaoSignature Error::BadRandaoSignature
); );
// TODO: check this is correct. // TODO: check this is correct.
let new_mix = { let new_mix = {
let mut mix = state.latest_randao_mixes let mut mix = self.latest_randao_mixes
[(state.slot % self.spec.latest_randao_mixes_length) as usize] [(self.slot % spec.latest_randao_mixes_length) as usize]
.to_vec(); .to_vec();
mix.append(&mut ssz_encode(&block.randao_reveal)); mix.append(&mut ssz_encode(&block.randao_reveal));
Hash256::from(&hash(&mix)[..]) Hash256::from(&hash(&mix)[..])
}; };
state.latest_randao_mixes[(state.slot % self.spec.latest_randao_mixes_length) as usize] = self.latest_randao_mixes[(self.slot % spec.latest_randao_mixes_length) as usize] = new_mix;
new_mix;
/* /*
* Eth1 data * Eth1 data
*/ */
// TODO: Eth1 data stuff. // TODO: Eth1 data processing.
/*
* OPERATIONS
*/
/* /*
* Proposer slashings * Proposer slashings
*/ */
ensure!( ensure!(
block.body.proposer_slashings.len() as u64 <= self.spec.max_proposer_slashings, block.body.proposer_slashings.len() as u64 <= spec.max_proposer_slashings,
Error::MaxProposerSlashingsExceeded Error::MaxProposerSlashingsExceeded
); );
for proposer_slashing in &block.body.proposer_slashings { for proposer_slashing in &block.body.proposer_slashings {
let proposer = state let proposer = self
.validator_registry .validator_registry
.get(proposer_slashing.proposer_index as usize) .get(proposer_slashing.proposer_index as usize)
.ok_or(Error::BadProposerSlashing)?; .ok_or(Error::BadProposerSlashing)?;
@ -183,7 +153,7 @@ where
Error::BadProposerSlashing Error::BadProposerSlashing
); );
ensure!( ensure!(
proposer.penalized_slot > state.slot, proposer.penalized_slot > self.slot,
Error::BadProposerSlashing Error::BadProposerSlashing
); );
ensure!( ensure!(
@ -192,7 +162,7 @@ where
&proposer_slashing.proposal_data_1.hash_tree_root(), &proposer_slashing.proposal_data_1.hash_tree_root(),
&proposer_slashing.proposal_signature_1, &proposer_slashing.proposal_signature_1,
get_domain( get_domain(
&state.fork_data, &self.fork_data,
proposer_slashing.proposal_data_1.slot, proposer_slashing.proposal_data_1.slot,
DOMAIN_PROPOSAL DOMAIN_PROPOSAL
) )
@ -205,34 +175,34 @@ where
&proposer_slashing.proposal_data_2.hash_tree_root(), &proposer_slashing.proposal_data_2.hash_tree_root(),
&proposer_slashing.proposal_signature_2, &proposer_slashing.proposal_signature_2,
get_domain( get_domain(
&state.fork_data, &self.fork_data,
proposer_slashing.proposal_data_2.slot, proposer_slashing.proposal_data_2.slot,
DOMAIN_PROPOSAL DOMAIN_PROPOSAL
) )
), ),
Error::BadProposerSlashing Error::BadProposerSlashing
); );
penalize_validator(&state, proposer_slashing.proposer_index as usize); penalize_validator(&self, proposer_slashing.proposer_index as usize);
} }
/* /*
* Attestations * Attestations
*/ */
ensure!( ensure!(
block.body.attestations.len() as u64 <= self.spec.max_attestations, block.body.attestations.len() as u64 <= spec.max_attestations,
Error::MaxAttestationsExceeded Error::MaxAttestationsExceeded
); );
for attestation in &block.body.attestations { for attestation in &block.body.attestations {
state.validate_attestation(attestation, &self.spec)?; self.validate_attestation(attestation, spec)?;
let pending_attestation = PendingAttestation { let pending_attestation = PendingAttestation {
data: attestation.data.clone(), data: attestation.data.clone(),
aggregation_bitfield: attestation.aggregation_bitfield.clone(), aggregation_bitfield: attestation.aggregation_bitfield.clone(),
custody_bitfield: attestation.custody_bitfield.clone(), custody_bitfield: attestation.custody_bitfield.clone(),
slot_included: state.slot, slot_included: self.slot,
}; };
state.latest_attestations.push(pending_attestation); self.latest_attestations.push(pending_attestation);
} }
debug!( debug!(
@ -244,7 +214,7 @@ where
* Deposits * Deposits
*/ */
ensure!( ensure!(
block.body.deposits.len() as u64 <= self.spec.max_deposits, block.body.deposits.len() as u64 <= spec.max_deposits,
Error::MaxDepositsExceeded Error::MaxDepositsExceeded
); );
@ -255,25 +225,25 @@ where
*/ */
ensure!( ensure!(
block.body.exits.len() as u64 <= self.spec.max_exits, block.body.exits.len() as u64 <= spec.max_exits,
Error::MaxExitsExceeded Error::MaxExitsExceeded
); );
for exit in &block.body.exits { for exit in &block.body.exits {
let validator = state let validator = self
.validator_registry .validator_registry
.get(exit.validator_index as usize) .get(exit.validator_index as usize)
.ok_or(Error::BadExit)?; .ok_or(Error::BadExit)?;
ensure!( ensure!(
validator.exit_slot > state.slot + self.spec.entry_exit_delay, validator.exit_slot > self.slot + spec.entry_exit_delay,
Error::BadExit Error::BadExit
); );
ensure!(state.slot >= exit.slot, Error::BadExit); ensure!(self.slot >= exit.slot, Error::BadExit);
let exit_message = { let exit_message = {
let exit_struct = Exit { let exit_struct = Exit {
slot: exit.slot, slot: exit.slot,
validator_index: exit.validator_index, validator_index: exit.validator_index,
signature: self.spec.empty_signature.clone(), signature: spec.empty_signature.clone(),
}; };
exit_struct.hash_tree_root() exit_struct.hash_tree_root()
}; };
@ -282,11 +252,11 @@ where
&validator.pubkey, &validator.pubkey,
&exit_message, &exit_message,
&exit.signature, &exit.signature,
get_domain(&state.fork_data, exit.slot, DOMAIN_EXIT) get_domain(&self.fork_data, exit.slot, DOMAIN_EXIT)
), ),
Error::BadProposerSlashing Error::BadProposerSlashing
); );
initiate_validator_exit(&state, exit.validator_index); initiate_validator_exit(&self, exit.validator_index);
} }
/* /*
@ -307,7 +277,7 @@ where
debug!("State transition complete."); debug!("State transition complete.");
Ok(state) Ok(())
} }
} }
@ -329,24 +299,6 @@ fn bls_verify(pubkey: &PublicKey, message: &[u8], signature: &Signature, _domain
signature.verify(message, pubkey) signature.verify(message, pubkey)
} }
impl From<DBError> for Error {
fn from(e: DBError) -> Error {
Error::DBError(e.message)
}
}
impl From<TestingSlotClockError> for Error {
fn from(_: TestingSlotClockError) -> Error {
unreachable!(); // Testing clock never throws an error.
}
}
impl From<SystemTimeSlotClockError> for Error {
fn from(e: SystemTimeSlotClockError) -> Error {
Error::SlotClockError(e)
}
}
impl From<AttestationValidationError> for Error { impl From<AttestationValidationError> for Error {
fn from(e: AttestationValidationError) -> Error { fn from(e: AttestationValidationError) -> Error {
Error::InvalidAttestation(e) Error::InvalidAttestation(e)

View File

@ -7,6 +7,7 @@ use crate::{
}; };
use integer_sqrt::IntegerSquareRoot; use integer_sqrt::IntegerSquareRoot;
use log::debug; use log::debug;
use rayon::prelude::*;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::iter::FromIterator; use std::iter::FromIterator;
@ -59,7 +60,7 @@ impl BeaconState {
let current_epoch_attestations: Vec<&PendingAttestation> = self let current_epoch_attestations: Vec<&PendingAttestation> = self
.latest_attestations .latest_attestations
.iter() .par_iter()
.filter(|a| a.data.slot / spec.epoch_length == self.current_epoch(spec)) .filter(|a| a.data.slot / spec.epoch_length == self.current_epoch(spec))
.collect(); .collect();
@ -77,7 +78,7 @@ impl BeaconState {
let current_epoch_boundary_attestations: Vec<&PendingAttestation> = let current_epoch_boundary_attestations: Vec<&PendingAttestation> =
current_epoch_attestations current_epoch_attestations
.iter() .par_iter()
.filter(|a| { .filter(|a| {
match self.get_block_root(self.current_epoch_start_slot(spec), spec) { match self.get_block_root(self.current_epoch_start_slot(spec), spec) {
Some(block_root) => { Some(block_root) => {
@ -112,7 +113,7 @@ impl BeaconState {
*/ */
let previous_epoch_attestations: Vec<&PendingAttestation> = self let previous_epoch_attestations: Vec<&PendingAttestation> = self
.latest_attestations .latest_attestations
.iter() .par_iter()
.filter(|a| { .filter(|a| {
//TODO: ensure these saturating subs are correct. //TODO: ensure these saturating subs are correct.
a.data.slot / spec.epoch_length == self.previous_epoch(spec) a.data.slot / spec.epoch_length == self.previous_epoch(spec)

View File

@ -12,6 +12,7 @@ use ssz::{hash, Decodable, DecodeError, Encodable, SszStream, TreeHash};
mod attestation_participants; mod attestation_participants;
mod attestation_validation; mod attestation_validation;
mod block_processing;
mod committees; mod committees;
mod epoch_processing; mod epoch_processing;
mod shuffling; mod shuffling;
@ -20,6 +21,7 @@ mod winning_root;
pub use self::attestation_participants::Error as AttestationParticipantsError; pub use self::attestation_participants::Error as AttestationParticipantsError;
pub use self::attestation_validation::Error as AttestationValidationError; pub use self::attestation_validation::Error as AttestationValidationError;
pub use self::block_processing::Error as BlockProcessingError;
pub use self::committees::Error as CommitteesError; pub use self::committees::Error as CommitteesError;
pub use self::epoch_processing::Error as EpochProcessingError; pub use self::epoch_processing::Error as EpochProcessingError;
pub use self::slot_processing::Error as SlotProcessingError; pub use self::slot_processing::Error as SlotProcessingError;