Add first iteration on faster rewards processing.

This commit is contained in:
Paul Hauner 2019-03-14 11:53:50 +11:00
parent 243ef2db80
commit 6f919e6f7d
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
4 changed files with 265 additions and 102 deletions

View File

@ -150,13 +150,15 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
let state_clone = state.clone();
let spec_clone = spec.clone();
let active_validator_indices = calculate_active_validator_indices(&state, &spec);
c.bench(
&format!("{}/epoch_processing", desc),
Benchmark::new("calculate_attester_sets", move |b| {
b.iter_batched(
|| state_clone.clone(),
|mut state| {
calculate_attester_sets(&mut state, &spec_clone).unwrap();
calculate_attester_sets(&mut state, &active_validator_indices, &spec_clone)
.unwrap();
state
},
criterion::BatchSize::SmallInput,
@ -168,8 +170,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
let state_clone = state.clone();
let spec_clone = spec.clone();
let previous_epoch = state.previous_epoch(&spec);
let attesters = calculate_attester_sets(&state, &spec).unwrap();
let active_validator_indices = calculate_active_validator_indices(&state, &spec);
let attesters = calculate_attester_sets(&state, &active_validator_indices, &spec).unwrap();
let current_total_balance = state.get_total_balance(&active_validator_indices[..], &spec);
let previous_total_balance = state.get_total_balance(
&get_active_validator_indices(&state.validator_registry, previous_epoch)[..],
@ -185,8 +187,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
&mut state,
current_total_balance,
previous_total_balance,
attesters.previous_epoch_boundary.balance,
attesters.current_epoch_boundary.balance,
attesters.balances.previous_epoch_boundary,
attesters.balances.current_epoch_boundary,
&spec_clone,
);
state
@ -214,8 +216,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
let mut state_clone = state.clone();
let spec_clone = spec.clone();
let previous_epoch = state.previous_epoch(&spec);
let attesters = calculate_attester_sets(&state, &spec).unwrap();
let active_validator_indices = calculate_active_validator_indices(&state, &spec);
let attesters = calculate_attester_sets(&state, &active_validator_indices, &spec).unwrap();
let previous_total_balance = state.get_total_balance(
&get_active_validator_indices(&state.validator_registry, previous_epoch)[..],
&spec,
@ -229,7 +231,6 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
|mut state| {
process_rewards_and_penalities(
&mut state,
&active_validator_indices,
&attesters,
previous_total_balance,
&winning_root_for_shards,
@ -264,8 +265,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
let mut state_clone = state.clone();
let spec_clone = spec.clone();
let previous_epoch = state.previous_epoch(&spec);
let attesters = calculate_attester_sets(&state, &spec).unwrap();
let active_validator_indices = calculate_active_validator_indices(&state, &spec);
let attesters = calculate_attester_sets(&state, &active_validator_indices, &spec).unwrap();
let current_total_balance = state.get_total_balance(&active_validator_indices[..], spec);
let previous_total_balance = state.get_total_balance(
&get_active_validator_indices(&state.validator_registry, previous_epoch)[..],
@ -279,8 +280,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp
&mut state_clone,
current_total_balance,
previous_total_balance,
attesters.previous_epoch_boundary.balance,
attesters.current_epoch_boundary.balance,
attesters.balances.previous_epoch_boundary,
attesters.balances.current_epoch_boundary,
spec,
);
assert!(

View File

@ -18,8 +18,8 @@ pub fn state_processing(c: &mut Criterion) {
Builder::from_env(Env::default().default_filter_or(LOG_LEVEL)).init();
}
bench_block_processing::bench_block_processing_n_validators(c, VALIDATOR_COUNT);
bench_epoch_processing::bench_epoch_processing_n_validators(c, VALIDATOR_COUNT);
bench_block_processing::bench_block_processing_n_validators(c, VALIDATOR_COUNT);
}
criterion_group!(benches, state_processing);

View File

@ -1,6 +1,5 @@
use attester_sets::AttesterSets;
use attesters::Attesters;
use errors::EpochProcessingError as Error;
use fnv::FnvHashMap;
use fnv::FnvHashSet;
use integer_sqrt::IntegerSquareRoot;
use rayon::prelude::*;
@ -11,6 +10,7 @@ use types::{validator_registry::get_active_validator_indices, *};
use winning_root::{winning_root, WinningRoot};
pub mod attester_sets;
pub mod attesters;
pub mod errors;
pub mod inclusion_distance;
pub mod tests;
@ -35,8 +35,6 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result
state.build_epoch_cache(RelativeEpoch::Current, spec)?;
state.build_epoch_cache(RelativeEpoch::Next, spec)?;
let attesters = calculate_attester_sets(&state, spec)?;
let active_validator_indices = calculate_active_validator_indices(&state, spec);
let current_total_balance = state.get_total_balance(&active_validator_indices[..], spec);
@ -46,14 +44,16 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result
spec,
);
let attesters = calculate_attester_sets(&state, &active_validator_indices, spec)?;
process_eth1_data(state, spec);
process_justification(
state,
current_total_balance,
previous_total_balance,
attesters.previous_epoch_boundary.balance,
attesters.current_epoch_boundary.balance,
attesters.balances.previous_epoch_boundary,
attesters.balances.current_epoch_boundary,
spec,
);
@ -63,7 +63,6 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result
// Rewards and Penalities
process_rewards_and_penalities(
state,
&active_validator_indices,
&attesters,
previous_total_balance,
&winning_root_for_shards,
@ -107,9 +106,13 @@ pub fn calculate_active_validator_indices(state: &BeaconState, spec: &ChainSpec)
/// Spec v0.4.0
pub fn calculate_attester_sets(
state: &BeaconState,
active_validator_indices: &[usize],
spec: &ChainSpec,
) -> Result<AttesterSets, BeaconStateError> {
AttesterSets::new(&state, spec)
) -> Result<Attesters, BeaconStateError> {
let mut attesters = Attesters::empty(state.validator_registry.len());
attesters.process_active_validator_indices(&active_validator_indices);
attesters.process_attestations(&state, &state.latest_attestations, spec)?;
Ok(attesters)
}
/// Spec v0.4.0
@ -283,22 +286,20 @@ pub fn process_crosslinks(
/// Spec v0.4.0
pub fn process_rewards_and_penalities(
state: &mut BeaconState,
active_validator_indices: &[usize],
attesters: &AttesterSets,
attesters: &Attesters,
previous_total_balance: u64,
winning_root_for_shards: &WinningRootHashSet,
spec: &ChainSpec,
) -> Result<(), Error> {
let next_epoch = state.next_epoch(spec);
let active_validator_indices: FnvHashSet<usize> =
FnvHashSet::from_iter(active_validator_indices.iter().cloned());
/*
let previous_epoch_attestations: Vec<&PendingAttestation> = state
.latest_attestations
.par_iter()
.filter(|a| a.data.slot.epoch(spec.slots_per_epoch) == state.previous_epoch(spec))
.collect();
*/
let base_reward_quotient = previous_total_balance.integer_sqrt() / spec.base_reward_quotient;
@ -309,6 +310,7 @@ pub fn process_rewards_and_penalities(
return Err(Error::PreviousTotalBalanceIsZero);
}
/*
// Map is ValidatorIndex -> ProposerIndex
let mut inclusion_slots: FnvHashMap<usize, (Slot, usize)> = FnvHashMap::default();
for a in &previous_epoch_attestations {
@ -330,79 +332,55 @@ pub fn process_rewards_and_penalities(
);
}
}
*/
// Justification and finalization
let epochs_since_finality = next_epoch - state.finalized_epoch;
if epochs_since_finality <= 4 {
state.validator_balances = state
.validator_balances
.par_iter()
.enumerate()
.map(|(index, &balance)| {
let mut balance = balance;
state.validator_balances = state
.validator_balances
.par_iter()
.enumerate()
.map(|(index, &balance)| {
let mut balance = balance;
let status = &attesters.statuses[index];
if epochs_since_finality <= 4 {
let base_reward = state.base_reward(index, base_reward_quotient, spec);
// Expected FFG source
if attesters.previous_epoch.indices.contains(&index) {
if status.is_previous_epoch {
safe_add_assign!(
balance,
base_reward * attesters.previous_epoch.balance / previous_total_balance
base_reward * attesters.balances.previous_epoch / previous_total_balance
);
} else if active_validator_indices.contains(&index) {
} else if status.is_active {
safe_sub_assign!(balance, base_reward);
}
// Expected FFG target
if attesters.previous_epoch_boundary.indices.contains(&index) {
if status.is_previous_epoch_boundary {
safe_add_assign!(
balance,
base_reward * attesters.previous_epoch_boundary.balance
base_reward * attesters.balances.previous_epoch_boundary
/ previous_total_balance
);
} else if active_validator_indices.contains(&index) {
} else if status.is_active {
safe_sub_assign!(balance, base_reward);
}
// Expected beacon chain head
if attesters.previous_epoch_head.indices.contains(&index) {
if status.is_previous_epoch_head {
safe_add_assign!(
balance,
base_reward * attesters.previous_epoch_head.balance
base_reward * attesters.balances.previous_epoch_head
/ previous_total_balance
);
} else if active_validator_indices.contains(&index) {
} else if status.is_active {
safe_sub_assign!(balance, base_reward);
};
if attesters.previous_epoch.indices.contains(&index) {
let base_reward = state.base_reward(index, base_reward_quotient, spec);
let (inclusion_distance, _) = inclusion_slots
.get(&index)
.expect("Inconsistent inclusion_slots.");
if *inclusion_distance > 0 {
safe_add_assign!(
balance,
base_reward * spec.min_attestation_inclusion_delay
/ inclusion_distance.as_u64()
)
}
}
balance
})
.collect();
} else {
state.validator_balances = state
.validator_balances
.par_iter()
.enumerate()
.map(|(index, &balance)| {
let mut balance = balance;
} else {
let inactivity_penalty = state.inactivity_penalty(
index,
epochs_since_finality,
@ -410,14 +388,14 @@ pub fn process_rewards_and_penalities(
spec,
);
if active_validator_indices.contains(&index) {
if !attesters.previous_epoch.indices.contains(&index) {
if status.is_active {
if !status.is_previous_epoch {
safe_sub_assign!(balance, inactivity_penalty);
}
if !attesters.previous_epoch_boundary.indices.contains(&index) {
if !status.is_previous_epoch_boundary {
safe_sub_assign!(balance, inactivity_penalty);
}
if !attesters.previous_epoch_head.indices.contains(&index) {
if !status.is_previous_epoch_head {
safe_sub_assign!(balance, inactivity_penalty);
}
@ -426,42 +404,31 @@ pub fn process_rewards_and_penalities(
safe_sub_assign!(balance, 2 * inactivity_penalty + base_reward);
}
}
}
if attesters.previous_epoch.indices.contains(&index) {
let base_reward = state.base_reward(index, base_reward_quotient, spec);
let (inclusion_distance, _) = inclusion_slots
.get(&index)
.expect("Inconsistent inclusion_slots.");
if *inclusion_distance > 0 {
safe_add_assign!(
balance,
base_reward * spec.min_attestation_inclusion_delay
/ inclusion_distance.as_u64()
)
}
}
balance
})
.collect();
}
balance
})
.collect();
// Attestation inclusion
//
for &index in &attesters.previous_epoch.indices {
let (_, proposer_index) = inclusion_slots
.get(&index)
.ok_or_else(|| Error::InclusionSlotsInconsistent(index))?;
for (index, _validator) in state.validator_registry.iter().enumerate() {
let status = &attesters.statuses[index];
let base_reward = state.base_reward(*proposer_index, base_reward_quotient, spec);
if status.is_previous_epoch {
let proposer_index = status.inclusion_info.proposer_index;
let inclusion_distance = status.inclusion_info.distance;
safe_add_assign!(
state.validator_balances[*proposer_index],
base_reward / spec.attestation_inclusion_reward_quotient
);
let base_reward = state.base_reward(proposer_index, base_reward_quotient, spec);
if inclusion_distance > 0 && inclusion_distance < Slot::max_value() {
safe_add_assign!(
state.validator_balances[proposer_index],
base_reward * spec.min_attestation_inclusion_delay
/ inclusion_distance.as_u64()
)
}
}
}
//Crosslinks

View File

@ -0,0 +1,195 @@
use types::*;
macro_rules! set_self_if_other_is_true {
($self_: ident, $other: ident, $var: ident) => {
$self_.$var = $other.$var & !$self_.$var;
};
}
#[derive(Clone)]
pub struct InclusionInfo {
pub slot: Slot,
pub distance: Slot,
pub proposer_index: usize,
}
impl Default for InclusionInfo {
fn default() -> Self {
Self {
slot: Slot::max_value(),
distance: Slot::max_value(),
proposer_index: 0,
}
}
}
impl InclusionInfo {
pub fn update(&mut self, other: &Self) {
if other.slot < self.slot {
self.slot = other.slot;
self.distance = other.distance;
self.proposer_index = other.proposer_index;
}
}
}
#[derive(Default, Clone)]
pub struct AttesterStatus {
pub is_active: bool,
pub is_current_epoch: bool,
pub is_current_epoch_boundary: bool,
pub is_previous_epoch: bool,
pub is_previous_epoch_boundary: bool,
pub is_previous_epoch_head: bool,
pub inclusion_info: InclusionInfo,
}
impl AttesterStatus {
pub fn update(&mut self, other: &Self) {
// Update all the bool fields, only updating `self` if `other` is true (never setting
// `self` to false).
set_self_if_other_is_true!(self, other, is_active);
set_self_if_other_is_true!(self, other, is_current_epoch);
set_self_if_other_is_true!(self, other, is_current_epoch_boundary);
set_self_if_other_is_true!(self, other, is_previous_epoch);
set_self_if_other_is_true!(self, other, is_previous_epoch_boundary);
set_self_if_other_is_true!(self, other, is_previous_epoch_head);
self.inclusion_info.update(&other.inclusion_info);
}
}
#[derive(Default, Clone)]
pub struct TotalBalances {
pub current_epoch: u64,
pub current_epoch_boundary: u64,
pub previous_epoch: u64,
pub previous_epoch_boundary: u64,
pub previous_epoch_head: u64,
}
pub struct Attesters {
pub statuses: Vec<AttesterStatus>,
pub balances: TotalBalances,
}
impl Attesters {
pub fn empty(num_validators: usize) -> Self {
Self {
statuses: vec![AttesterStatus::default(); num_validators],
balances: TotalBalances::default(),
}
}
pub fn process_active_validator_indices(&mut self, active_validator_indices: &[usize]) {
let status = AttesterStatus {
is_active: true,
..AttesterStatus::default()
};
for &i in active_validator_indices {
self.statuses[i].update(&status);
}
}
pub fn process_attestations(
&mut self,
state: &BeaconState,
attestations: &[PendingAttestation],
spec: &ChainSpec,
) -> Result<(), BeaconStateError> {
for a in attestations {
let attesting_indices =
state.get_attestation_participants(&a.data, &a.aggregation_bitfield, spec)?;
let attesting_balance = state.get_total_balance(&attesting_indices, spec);
let mut status = AttesterStatus::default();
// Profile this attestation, updating the total balances and generating an
// `AttesterStatus` object that applies to all participants in the attestation.
if is_from_epoch(a, state.current_epoch(spec), spec) {
self.balances.current_epoch += attesting_balance;
status.is_current_epoch = true;
if has_common_epoch_boundary_root(a, state, state.current_epoch(spec), spec)? {
self.balances.current_epoch_boundary += attesting_balance;
status.is_current_epoch_boundary = true;
}
} else if is_from_epoch(a, state.previous_epoch(spec), spec) {
self.balances.previous_epoch += attesting_balance;
status.is_previous_epoch = true;
// The inclusion slot and distance are only required for previous epoch attesters.
status.inclusion_info = InclusionInfo {
slot: a.inclusion_slot,
distance: inclusion_distance(a),
proposer_index: state.get_beacon_proposer_index(a.inclusion_slot, spec)?,
};
if has_common_epoch_boundary_root(a, state, state.previous_epoch(spec), spec)? {
self.balances.previous_epoch_boundary += attesting_balance;
status.is_previous_epoch_boundary = true;
}
if has_common_beacon_block_root(a, state, spec)? {
self.balances.previous_epoch_head += attesting_balance;
status.is_previous_epoch_head = true;
}
}
// Loop through the participating validator indices and update the status vec.
for validator_index in attesting_indices {
self.statuses[validator_index].update(&status);
}
}
Ok(())
}
}
fn inclusion_distance(a: &PendingAttestation) -> Slot {
a.inclusion_slot - a.data.slot
}
/// Returns `true` if some `PendingAttestation` is from the supplied `epoch`.
///
/// Spec v0.4.0
fn is_from_epoch(a: &PendingAttestation, epoch: Epoch, spec: &ChainSpec) -> bool {
a.data.slot.epoch(spec.slots_per_epoch) == epoch
}
/// Returns `true` if a `PendingAttestation` and `BeaconState` share the same beacon block hash for
/// the first slot of the given epoch.
///
/// Spec v0.4.0
fn has_common_epoch_boundary_root(
a: &PendingAttestation,
state: &BeaconState,
epoch: Epoch,
spec: &ChainSpec,
) -> Result<bool, BeaconStateError> {
let slot = epoch.start_slot(spec.slots_per_epoch);
let state_boundary_root = *state
.get_block_root(slot, spec)
.ok_or_else(|| BeaconStateError::InsufficientBlockRoots)?;
Ok(a.data.epoch_boundary_root == state_boundary_root)
}
/// Returns `true` if a `PendingAttestation` and `BeaconState` share the same beacon block hash for
/// the current slot of the `PendingAttestation`.
///
/// Spec v0.4.0
fn has_common_beacon_block_root(
a: &PendingAttestation,
state: &BeaconState,
spec: &ChainSpec,
) -> Result<bool, BeaconStateError> {
let state_block_root = *state
.get_block_root(a.data.slot, spec)
.ok_or_else(|| BeaconStateError::InsufficientBlockRoots)?;
Ok(a.data.beacon_block_root == state_block_root)
}