Pack attestations into blocks in parallel (#2307)

## Proposed Changes

Use two instances of max cover when packing attestations into blocks: one for the previous epoch, and one for the current epoch. This reduces the amount of computation done by roughly half due to the `O(n^2)` running time of max cover (`2 * (n/2)^2 = n^2/2`). This should help alleviate some load on block proposal, particularly on Prater.
This commit is contained in:
Michael Sproul 2021-04-13 05:27:42 +00:00
parent c1203f5e52
commit 3b901dc5ec
8 changed files with 191 additions and 78 deletions

4
Cargo.lock generated
View File

@ -4400,8 +4400,12 @@ dependencies = [
"eth2_ssz",
"eth2_ssz_derive",
"int_to_bytes",
"itertools 0.10.0",
"lazy_static",
"lighthouse_metrics",
"parking_lot",
"rand 0.7.3",
"rayon",
"serde",
"serde_derive",
"state_processing",

View File

@ -1160,6 +1160,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(signed_aggregate)
}
/// Filter an attestation from the op pool for shuffling compatibility.
///
/// Use the provided `filter_cache` map to memoize results.
pub fn filter_op_pool_attestation(
&self,
filter_cache: &mut HashMap<(Hash256, Epoch), bool>,
att: &Attestation<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
) -> bool {
*filter_cache
.entry((att.data.beacon_block_root, att.data.target.epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.data.target.epoch,
&state,
)
})
}
/// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`.
///
/// The `target_epoch` argument determines which shuffling to check compatibility with, it
@ -1968,21 +1988,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.deposits_for_block_inclusion(&state, &eth1_data, &self.spec)?
.into();
// Map from attestation head block root to shuffling compatibility.
// Used to memoize the `attestation_shuffling_is_compatible` function.
let mut shuffling_filter_cache = HashMap::new();
let attestation_filter = |att: &&Attestation<T::EthSpec>| -> bool {
*shuffling_filter_cache
.entry((att.data.beacon_block_root, att.data.target.epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.data.target.epoch,
&state,
)
})
};
// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
let unagg_import_timer =
@ -2012,9 +2017,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);
let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &&Attestation<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state)
};
let mut curr_filter_cache = HashMap::new();
let curr_attestation_filter = |att: &&Attestation<T::EthSpec>| {
self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state)
};
let attestations = self
.op_pool
.get_attestations(&state, attestation_filter, &self.spec)
.get_attestations(
&state,
prev_attestation_filter,
curr_attestation_filter,
&self.spec,
)
.map_err(BlockProductionError::OpPoolError)?
.into();
drop(attestation_packing_timer);

View File

@ -5,12 +5,16 @@ authors = ["Michael Sproul <michael@sigmaprime.io>"]
edition = "2018"
[dependencies]
itertools = "0.10.0"
int_to_bytes = { path = "../../consensus/int_to_bytes" }
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.11.0"
types = { path = "../../consensus/types" }
state_processing = { path = "../../consensus/state_processing" }
eth2_ssz = "0.1.2"
eth2_ssz_derive = "0.1.0"
rayon = "1.5.0"
serde = "1.0.116"
serde_derive = "1.0.116"
store = { path = "../store" }

View File

@ -3,6 +3,7 @@ use state_processing::common::{get_attesting_indices, get_base_reward};
use std::collections::HashMap;
use types::{Attestation, BeaconState, BitList, ChainSpec, EthSpec};
#[derive(Debug, Clone)]
pub struct AttMaxCover<'a, T: EthSpec> {
/// Underlying attestation.
att: &'a Attestation<T>,
@ -44,8 +45,8 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
type Object = Attestation<T>;
type Set = HashMap<u64, u64>;
fn object(&self) -> Attestation<T> {
self.att.clone()
fn object(&self) -> &Attestation<T> {
self.att
}
fn covering_set(&self) -> &HashMap<u64, u64> {
@ -100,8 +101,6 @@ pub fn earliest_attestation_validators<T: EthSpec>(
state_attestations
.iter()
// In a single epoch, an attester should only be attesting for one slot and index.
// TODO: we avoid including slashable attestations in the state here,
// but maybe we should do something else with them (like construct slashings).
.filter(|existing_attestation| {
existing_attestation.data.slot == attestation.data.slot
&& existing_attestation.data.index == attestation.data.index

View File

@ -3,6 +3,7 @@ use state_processing::per_block_processing::get_slashable_indices_modular;
use std::collections::{HashMap, HashSet};
use types::{AttesterSlashing, BeaconState, ChainSpec, EthSpec};
#[derive(Debug, Clone)]
pub struct AttesterSlashingMaxCover<'a, T: EthSpec> {
slashing: &'a AttesterSlashing<T>,
effective_balances: HashMap<u64, u64>,
@ -46,8 +47,8 @@ impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> {
type Set = HashMap<u64, u64>;
/// Extract an object for inclusion in a solution.
fn object(&self) -> AttesterSlashing<T> {
self.slashing.clone()
fn object(&self) -> &AttesterSlashing<T> {
self.slashing
}
/// Get the set of elements covered.

View File

@ -2,6 +2,7 @@ mod attestation;
mod attestation_id;
mod attester_slashing;
mod max_cover;
mod metrics;
mod persistence;
pub use persistence::PersistedOperationPool;
@ -9,7 +10,7 @@ pub use persistence::PersistedOperationPool;
use attestation::AttMaxCover;
use attestation_id::AttestationId;
use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover;
use max_cover::{maximum_cover, MaxCover};
use parking_lot::RwLock;
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
@ -96,49 +97,29 @@ impl<T: EthSpec> OperationPool<T> {
self.attestations.read().values().map(Vec::len).sum()
}
/// Get a list of attestations for inclusion in a block.
///
/// The `validity_filter` is a closure that provides extra filtering of the attestations
/// before an approximately optimal bundle is constructed. We use it to provide access
/// to the fork choice data from the `BeaconChain` struct that doesn't logically belong
/// in the operation pool.
pub fn get_attestations(
&self,
state: &BeaconState<T>,
validity_filter: impl FnMut(&&Attestation<T>) -> bool,
spec: &ChainSpec,
) -> Result<Vec<Attestation<T>>, OpPoolError> {
// Attestations for the current fork, which may be from the current or previous epoch.
let prev_epoch = state.previous_epoch();
let current_epoch = state.current_epoch();
let prev_domain_bytes = AttestationId::compute_domain_bytes(
prev_epoch,
/// Return all valid attestations for the given epoch, for use in max cover.
fn get_valid_attestations_for_epoch<'a>(
&'a self,
epoch: Epoch,
all_attestations: &'a HashMap<AttestationId, Vec<Attestation<T>>>,
state: &'a BeaconState<T>,
total_active_balance: u64,
validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send {
let domain_bytes = AttestationId::compute_domain_bytes(
epoch,
&state.fork,
state.genesis_validators_root,
spec,
);
let curr_domain_bytes = AttestationId::compute_domain_bytes(
current_epoch,
&state.fork,
state.genesis_validators_root,
spec,
);
let reader = self.attestations.read();
let active_indices = state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let total_active_balance = state
.get_total_balance(&active_indices, spec)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let valid_attestations = reader
all_attestations
.iter()
.filter(|(key, _)| {
key.domain_bytes_match(&prev_domain_bytes)
|| key.domain_bytes_match(&curr_domain_bytes)
})
.filter(move |(key, _)| key.domain_bytes_match(&domain_bytes))
.flat_map(|(_, attestations)| attestations)
// That are valid...
.filter(|attestation| {
.filter(move |attestation| attestation.data.target.epoch == epoch)
.filter(move |attestation| {
// Ensure attestations are valid for block inclusion
verify_attestation_for_block_inclusion(
state,
attestation,
@ -148,10 +129,77 @@ impl<T: EthSpec> OperationPool<T> {
.is_ok()
})
.filter(validity_filter)
.flat_map(|att| AttMaxCover::new(att, state, total_active_balance, spec));
.filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec))
}
Ok(maximum_cover(
valid_attestations,
/// Get a list of attestations for inclusion in a block.
///
/// The `validity_filter` is a closure that provides extra filtering of the attestations
/// before an approximately optimal bundle is constructed. We use it to provide access
/// to the fork choice data from the `BeaconChain` struct that doesn't logically belong
/// in the operation pool.
pub fn get_attestations(
&self,
state: &BeaconState<T>,
prev_epoch_validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
curr_epoch_validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
spec: &ChainSpec,
) -> Result<Vec<Attestation<T>>, OpPoolError> {
// Attestations for the current fork, which may be from the current or previous epoch.
let prev_epoch = state.previous_epoch();
let current_epoch = state.current_epoch();
let all_attestations = self.attestations.read();
let active_indices = state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let total_active_balance = state
.get_total_balance(&active_indices, spec)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
// Split attestations for the previous & current epochs, so that we
// can optimise them individually in parallel.
let prev_epoch_att = self.get_valid_attestations_for_epoch(
prev_epoch,
&*all_attestations,
state,
total_active_balance,
prev_epoch_validity_filter,
spec,
);
let curr_epoch_att = self.get_valid_attestations_for_epoch(
current_epoch,
&*all_attestations,
state,
total_active_balance,
curr_epoch_validity_filter,
spec,
);
let prev_epoch_limit = std::cmp::min(
T::MaxPendingAttestations::to_usize()
.saturating_sub(state.previous_epoch_attestations.len()),
T::MaxAttestations::to_usize(),
);
let (prev_cover, curr_cover) = rayon::join(
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME);
// If we're in the genesis epoch, just use the current epoch attestations.
if prev_epoch == current_epoch {
vec![]
} else {
maximum_cover(prev_epoch_att, prev_epoch_limit)
}
},
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(curr_epoch_att, T::MaxAttestations::to_usize())
},
);
Ok(max_cover::merge_solutions(
curr_cover,
prev_cover,
T::MaxAttestations::to_usize(),
))
}
@ -231,7 +279,10 @@ impl<T: EthSpec> OperationPool<T> {
let attester_slashings = maximum_cover(
relevant_attester_slashings,
T::MaxAttesterSlashings::to_usize(),
);
)
.into_iter()
.map(|cover| cover.object().clone())
.collect();
(proposer_slashings, attester_slashings)
}
@ -619,7 +670,7 @@ mod release_tests {
state.slot -= 1;
assert_eq!(
op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("should have attestations")
.len(),
0
@ -629,7 +680,7 @@ mod release_tests {
state.slot += spec.min_attestation_inclusion_delay;
let block_attestations = op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("Should have block attestations");
assert_eq!(block_attestations.len(), committees.len());
@ -799,7 +850,7 @@ mod release_tests {
state.slot += spec.min_attestation_inclusion_delay;
let best_attestations = op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("should have best attestations");
assert_eq!(best_attestations.len(), max_attestations);
@ -874,7 +925,7 @@ mod release_tests {
state.slot += spec.min_attestation_inclusion_delay;
let best_attestations = op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("should have valid best attestations");
assert_eq!(best_attestations.len(), max_attestations);

View File

@ -1,3 +1,5 @@
use itertools::Itertools;
/// Trait for types that we can compute a maximum cover for.
///
/// Terminology:
@ -5,14 +7,14 @@
/// * `element`: something contained in a set, and covered by the covering set of an item
/// * `object`: something extracted from an item in order to comprise a solution
/// See: https://en.wikipedia.org/wiki/Maximum_coverage_problem
pub trait MaxCover {
pub trait MaxCover: Clone {
/// The result type, of which we would eventually like a collection of maximal quality.
type Object;
type Object: Clone;
/// The type used to represent sets.
type Set: Clone;
/// Extract an object for inclusion in a solution.
fn object(&self) -> Self::Object;
fn object(&self) -> &Self::Object;
/// Get the set of elements covered.
fn covering_set(&self) -> &Self::Set;
@ -42,7 +44,7 @@ impl<T> MaxCoverItem<T> {
///
/// * Time complexity: `O(limit * items_iter.len())`
/// * Space complexity: `O(item_iter.len())`
pub fn maximum_cover<I, T>(items_iter: I, limit: usize) -> Vec<T::Object>
pub fn maximum_cover<I, T>(items_iter: I, limit: usize) -> Vec<T>
where
I: IntoIterator<Item = T>,
T: MaxCover,
@ -58,14 +60,14 @@ where
for _ in 0..limit {
// Select the item with the maximum score.
let (best_item, best_cover) = match all_items
let best = match all_items
.iter_mut()
.filter(|x| x.available && x.item.score() != 0)
.max_by_key(|x| x.item.score())
{
Some(x) => {
x.available = false;
(x.item.object(), x.item.covering_set().clone())
x.item.clone()
}
None => return result,
};
@ -75,14 +77,32 @@ where
all_items
.iter_mut()
.filter(|x| x.available && x.item.score() != 0)
.for_each(|x| x.item.update_covering_set(&best_item, &best_cover));
.for_each(|x| {
x.item
.update_covering_set(best.object(), best.covering_set())
});
result.push(best_item);
result.push(best);
}
result
}
/// Perform a greedy merge of two max cover solutions, preferring higher-score values.
pub fn merge_solutions<I1, I2, T>(cover1: I1, cover2: I2, limit: usize) -> Vec<T::Object>
where
I1: IntoIterator<Item = T>,
I2: IntoIterator<Item = T>,
T: MaxCover,
{
cover1
.into_iter()
.merge_by(cover2, |item1, item2| item1.score() >= item2.score())
.take(limit)
.map(|item| item.object().clone())
.collect()
}
#[cfg(test)]
mod test {
use super::*;
@ -96,8 +116,8 @@ mod test {
type Object = Self;
type Set = Self;
fn object(&self) -> Self {
self.clone()
fn object(&self) -> &Self {
self
}
fn covering_set(&self) -> &Self {

View File

@ -0,0 +1,14 @@
use lazy_static::lazy_static;
pub use lighthouse_metrics::*;
lazy_static! {
pub static ref ATTESTATION_PREV_EPOCH_PACKING_TIME: Result<Histogram> = try_create_histogram(
"op_pool_attestation_prev_epoch_packing_time",
"Time to pack previous epoch attestations"
);
pub static ref ATTESTATION_CURR_EPOCH_PACKING_TIME: Result<Histogram> = try_create_histogram(
"op_pool_attestation_curr_epoch_packing_time",
"Time to pack current epoch attestations"
);
}