op_pool: use max cover algorithm, refactor

This commit is contained in:
Michael Sproul 2019-06-17 18:07:14 +10:00
parent 9a356a00c2
commit 38d2d03e3a
No known key found for this signature in database
GPG Key ID: 77B1309D2E54E914
6 changed files with 338 additions and 120 deletions

View File

@ -5,6 +5,7 @@ authors = ["Michael Sproul <michael@sigmaprime.io>"]
edition = "2018"
[dependencies]
boolean-bitfield = { path = "../utils/boolean-bitfield" }
int_to_bytes = { path = "../utils/int_to_bytes" }
itertools = "0.8"
parking_lot = "0.7"

View File

@ -0,0 +1,91 @@
use crate::max_cover::MaxCover;
use boolean_bitfield::BooleanBitfield;
use types::{Attestation, BeaconState, EthSpec};
pub struct AttMaxCover<'a> {
/// Underlying attestation.
att: &'a Attestation,
/// Bitfield of validators that are covered by this attestation.
fresh_validators: BooleanBitfield,
}
impl<'a> AttMaxCover<'a> {
pub fn new(att: &'a Attestation, fresh_validators: BooleanBitfield) -> Self {
Self {
att,
fresh_validators,
}
}
}
impl<'a> MaxCover for AttMaxCover<'a> {
type Object = Attestation;
type Set = BooleanBitfield;
fn object(&self) -> Attestation {
self.att.clone()
}
fn covering_set(&self) -> &BooleanBitfield {
&self.fresh_validators
}
/// Sneaky: we keep all the attestations together in one bucket, even though
/// their aggregation bitfields refer to different committees. In order to avoid
/// confusing committees when updating covering sets, we update only those attestations
/// whose shard and epoch match the attestation being included in the solution, by the logic
/// that a shard and epoch uniquely identify a committee.
fn update_covering_set(
&mut self,
best_att: &Attestation,
covered_validators: &BooleanBitfield,
) {
if self.att.data.shard == best_att.data.shard
&& self.att.data.target_epoch == best_att.data.target_epoch
{
self.fresh_validators.difference_inplace(covered_validators);
}
}
fn score(&self) -> usize {
self.fresh_validators.num_set_bits()
}
}
/// Extract the validators for which `attestation` would be their earliest in the epoch.
///
/// The reward paid to a proposer for including an attestation is proportional to the number
/// of validators for which the included attestation is their first in the epoch. The attestation
/// is judged against the state's `current_epoch_attestations` or `previous_epoch_attestations`
/// depending on when it was created, and all those validators who have already attested are
/// removed from the `aggregation_bitfield` before returning it.
// TODO: This could be optimised with a map from validator index to whether that validator has
// attested in each of the current and previous epochs. Currently quadratic in number of validators.
pub fn earliest_attestation_validators<T: EthSpec>(
attestation: &Attestation,
state: &BeaconState<T>,
) -> BooleanBitfield {
// Bitfield of validators whose attestations are new/fresh.
let mut new_validators = attestation.aggregation_bitfield.clone();
let state_attestations = if attestation.data.target_epoch == state.current_epoch() {
&state.current_epoch_attestations
} else if attestation.data.target_epoch == state.previous_epoch() {
&state.previous_epoch_attestations
} else {
return BooleanBitfield::from_elem(attestation.aggregation_bitfield.len(), false);
};
state_attestations
.iter()
// In a single epoch, an attester should only be attesting for one shard.
// TODO: we avoid including slashable attestations in the state here,
// but maybe we should do something else with them (like construct slashings).
.filter(|existing_attestation| existing_attestation.data.shard == attestation.data.shard)
.for_each(|existing_attestation| {
// Remove the validators who have signed the existing attestation (they are not new)
new_validators.difference_inplace(&existing_attestation.aggregation_bitfield);
});
new_validators
}

View File

@ -0,0 +1,35 @@
use int_to_bytes::int_to_bytes8;
use ssz::ssz_encode;
use types::{AttestationData, BeaconState, ChainSpec, Domain, Epoch, EthSpec};
/// Serialized `AttestationData` augmented with a domain to encode the fork info.
#[derive(PartialEq, Eq, Clone, Hash, Debug)]
pub struct AttestationId(Vec<u8>);
/// Number of domain bytes that the end of an attestation ID is padded with.
const DOMAIN_BYTES_LEN: usize = 8;
impl AttestationId {
pub fn from_data<T: EthSpec>(
attestation: &AttestationData,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Self {
let mut bytes = ssz_encode(attestation);
let epoch = attestation.target_epoch;
bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec));
AttestationId(bytes)
}
pub fn compute_domain_bytes<T: EthSpec>(
epoch: Epoch,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Vec<u8> {
int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork))
}
pub fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool {
&self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes
}
}

View File

@ -1,7 +1,12 @@
use int_to_bytes::int_to_bytes8;
mod attestation;
mod attestation_id;
mod max_cover;
use attestation::{earliest_attestation_validators, AttMaxCover};
use attestation_id::AttestationId;
use itertools::Itertools;
use max_cover::maximum_cover;
use parking_lot::RwLock;
use ssz::ssz_encode;
use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
ExitValidationError, ProposerSlashingValidationError, TransferValidationError,
@ -16,10 +21,9 @@ use state_processing::per_block_processing::{
};
use std::collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet};
use std::marker::PhantomData;
use types::chain_spec::Domain;
use types::{
Attestation, AttestationData, AttesterSlashing, BeaconState, ChainSpec, Deposit, Epoch,
EthSpec, ProposerSlashing, Transfer, Validator, VoluntaryExit,
Attestation, AttesterSlashing, BeaconState, ChainSpec, Deposit, EthSpec, ProposerSlashing,
Transfer, Validator, VoluntaryExit,
};
#[derive(Default)]
@ -43,71 +47,6 @@ pub struct OperationPool<T: EthSpec + Default> {
_phantom: PhantomData<T>,
}
/// Serialized `AttestationData` augmented with a domain to encode the fork info.
#[derive(PartialEq, Eq, Clone, Hash, Debug)]
struct AttestationId(Vec<u8>);
/// Number of domain bytes that the end of an attestation ID is padded with.
const DOMAIN_BYTES_LEN: usize = 8;
impl AttestationId {
fn from_data<T: EthSpec>(
attestation: &AttestationData,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Self {
let mut bytes = ssz_encode(attestation);
let epoch = attestation.target_epoch;
bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec));
AttestationId(bytes)
}
fn compute_domain_bytes<T: EthSpec>(
epoch: Epoch,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Vec<u8> {
int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork))
}
fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool {
&self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes
}
}
/// Compute a fitness score for an attestation.
///
/// The score is calculated by determining the number of *new* attestations that
/// the aggregate attestation introduces, and is proportional to the size of the reward we will
/// receive for including it in a block.
// TODO: this could be optimised with a map from validator index to whether that validator has
// attested in each of the current and previous epochs. Currently quadractic in number of validators.
fn attestation_score<T: EthSpec>(attestation: &Attestation, state: &BeaconState<T>) -> usize {
// Bitfield of validators whose attestations are new/fresh.
let mut new_validators = attestation.aggregation_bitfield.clone();
let state_attestations = if attestation.data.target_epoch == state.current_epoch() {
&state.current_epoch_attestations
} else if attestation.data.target_epoch == state.previous_epoch() {
&state.previous_epoch_attestations
} else {
return 0;
};
state_attestations
.iter()
// In a single epoch, an attester should only be attesting for one shard.
// TODO: we avoid including slashable attestations in the state here,
// but maybe we should do something else with them (like construct slashings).
.filter(|current_attestation| current_attestation.data.shard == attestation.data.shard)
.for_each(|current_attestation| {
// Remove the validators who have signed the existing attestation (they are not new)
new_validators.difference_inplace(&current_attestation.aggregation_bitfield);
});
new_validators.num_set_bits()
}
#[derive(Debug, PartialEq, Clone)]
pub enum DepositInsertStatus {
/// The deposit was not already in the pool.
@ -176,29 +115,19 @@ impl<T: EthSpec> OperationPool<T> {
let current_epoch = state.current_epoch();
let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, state, spec);
let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec);
self.attestations
.read()
let reader = self.attestations.read();
let valid_attestations = reader
.iter()
.filter(|(key, _)| {
key.domain_bytes_match(&prev_domain_bytes)
|| key.domain_bytes_match(&curr_domain_bytes)
})
.flat_map(|(_, attestations)| attestations)
// That are not superseded by an attestation included in the state...
.filter(|attestation| !superior_attestation_exists_in_state(state, attestation))
// That are valid...
.filter(|attestation| validate_attestation(state, attestation, spec).is_ok())
// Scored by the number of new attestations they introduce (descending)
// TODO: need to consider attestations introduced in THIS block
.map(|att| (att, attestation_score(att, state)))
// Don't include any useless attestations (score 0)
.filter(|&(_, score)| score != 0)
.sorted_by_key(|&(_, score)| std::cmp::Reverse(score))
// Limited to the maximum number of attestations per block
.take(spec.max_attestations as usize)
.map(|(att, _)| att)
.cloned()
.collect()
.map(|att| AttMaxCover::new(att, earliest_attestation_validators(att, state)));
maximum_cover(valid_attestations, spec.max_attestations as usize)
}
/// Remove attestations which are too old to be included in a block.
@ -484,34 +413,6 @@ impl<T: EthSpec> OperationPool<T> {
}
}
/// Returns `true` if the state already contains a `PendingAttestation` that is superior to the
/// given `attestation`.
///
/// A validator has nothing to gain from re-including an attestation and it adds load to the
/// network.
///
/// An existing `PendingAttestation` is superior to an existing `attestation` if:
///
/// - Their `AttestationData` is equal.
/// - `attestation` does not contain any signatures that `PendingAttestation` does not have.
fn superior_attestation_exists_in_state<T: EthSpec>(
state: &BeaconState<T>,
attestation: &Attestation,
) -> bool {
state
.current_epoch_attestations
.iter()
.chain(state.previous_epoch_attestations.iter())
.any(|existing_attestation| {
let bitfield = &attestation.aggregation_bitfield;
let existing_bitfield = &existing_attestation.aggregation_bitfield;
existing_attestation.data == attestation.data
&& bitfield.intersection(existing_bitfield).num_set_bits()
== bitfield.num_set_bits()
})
}
/// Filter up to a maximum number of operations out of an iterator.
fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec<T>
where
@ -734,15 +635,13 @@ mod tests {
state_builder.teleport_to_slot(slot);
state_builder.build_caches(&spec).unwrap();
let (state, keypairs) = state_builder.build();
(state, keypairs, MainnetEthSpec::default_spec())
}
#[test]
fn test_attestation_score() {
fn test_earliest_attestation() {
let (ref mut state, ref keypairs, ref spec) =
attestation_test_state::<MainnetEthSpec>(1);
let slot = state.slot - 1;
let committees = state
.get_crosslink_committees_at_slot(slot)
@ -775,9 +674,8 @@ mod tests {
assert_eq!(
att1.aggregation_bitfield.num_set_bits(),
attestation_score(&att1, state)
earliest_attestation_validators(&att1, state).num_set_bits()
);
state.current_epoch_attestations.push(PendingAttestation {
aggregation_bitfield: att1.aggregation_bitfield.clone(),
data: att1.data.clone(),
@ -785,7 +683,10 @@ mod tests {
proposer_index: 0,
});
assert_eq!(cc.committee.len() - 2, attestation_score(&att2, state));
assert_eq!(
cc.committee.len() - 2,
earliest_attestation_validators(&att2, state).num_set_bits()
);
}
}

View File

@ -0,0 +1,189 @@
/// Trait for types that we can compute a maximum cover for.
///
/// Terminology:
/// * `item`: something that implements this trait
/// * `element`: something contained in a set, and covered by the covering set of an item
/// * `object`: something extracted from an item in order to comprise a solution
/// See: https://en.wikipedia.org/wiki/Maximum_coverage_problem
pub trait MaxCover {
/// The result type, of which we would eventually like a collection of maximal quality.
type Object;
/// The type used to represent sets.
type Set: Clone;
/// Extract an object for inclusion in a solution.
fn object(&self) -> Self::Object;
/// Get the set of elements covered.
fn covering_set(&self) -> &Self::Set;
/// Update the set of items covered, for the inclusion of some object in the solution.
fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set);
/// The quality of this item's covering set, usually its cardinality.
fn score(&self) -> usize;
}
/// Helper struct to track which items of the input are still available for inclusion.
/// Saves removing elements from the work vector.
struct MaxCoverItem<T> {
item: T,
available: bool,
}
impl<T> MaxCoverItem<T> {
fn new(item: T) -> Self {
MaxCoverItem {
item,
available: true,
}
}
}
/// Compute an approximate maximum cover using a greedy algorithm.
///
/// * Time complexity: `O(limit * items_iter.len())`
/// * Space complexity: `O(item_iter.len())`
pub fn maximum_cover<'a, I, T>(items_iter: I, limit: usize) -> Vec<T::Object>
where
I: IntoIterator<Item = T>,
T: MaxCover,
{
// Construct an initial vec of all items, marked available.
let mut all_items: Vec<_> = items_iter
.into_iter()
.map(MaxCoverItem::new)
.filter(|x| x.item.score() != 0)
.collect();
let mut result = vec![];
for _ in 0..limit {
// Select the item with the maximum score.
let (best_item, best_cover) = match all_items
.iter_mut()
.filter(|x| x.available && x.item.score() != 0)
.max_by_key(|x| x.item.score())
{
Some(x) => {
x.available = false;
(x.item.object(), x.item.covering_set().clone())
}
None => return result,
};
// Update the covering sets of the other items, for the inclusion of the selected item.
// Items covered by the selected item can't be re-covered.
all_items
.iter_mut()
.filter(|x| x.available && x.item.score() != 0)
.for_each(|x| x.item.update_covering_set(&best_item, &best_cover));
result.push(best_item);
}
result
}
#[cfg(test)]
mod test {
use super::*;
use std::iter::FromIterator;
use std::{collections::HashSet, hash::Hash};
impl<T> MaxCover for HashSet<T>
where
T: Clone + Eq + Hash,
{
type Object = Self;
type Set = Self;
fn object(&self) -> Self {
self.clone()
}
fn covering_set(&self) -> &Self {
&self
}
fn update_covering_set(&mut self, _: &Self, other: &Self) {
let mut difference = &*self - other;
std::mem::swap(self, &mut difference);
}
fn score(&self) -> usize {
self.len()
}
}
fn example_system() -> Vec<HashSet<usize>> {
vec![
HashSet::from_iter(vec![3]),
HashSet::from_iter(vec![1, 2, 4, 5]),
HashSet::from_iter(vec![1, 2, 4, 5]),
HashSet::from_iter(vec![1]),
HashSet::from_iter(vec![2, 4, 5]),
]
}
#[test]
fn zero_limit() {
let cover = maximum_cover(example_system(), 0);
assert_eq!(cover.len(), 0);
}
#[test]
fn one_limit() {
let sets = example_system();
let cover = maximum_cover(sets.clone(), 1);
assert_eq!(cover.len(), 1);
assert_eq!(cover[0], sets[1]);
}
// Check that even if the limit provides room, we don't include useless items in the soln.
#[test]
fn exclude_zero_score() {
let sets = example_system();
for k in 2..10 {
let cover = maximum_cover(sets.clone(), k);
assert_eq!(cover.len(), 2);
assert_eq!(cover[0], sets[1]);
assert_eq!(cover[1], sets[0]);
}
}
fn quality<T: Eq + Hash>(solution: &[HashSet<T>]) -> usize {
solution.iter().map(HashSet::len).sum()
}
// Optimal solution is the first three sets (quality 15) but our greedy algorithm
// will select the last three (quality 11). The comment at the end of each line
// shows that set's score at each iteration, with a * indicating that it will be chosen.
#[test]
fn suboptimal() {
let sets = vec![
HashSet::from_iter(vec![0, 1, 8, 11, 14]), // 5, 3, 2
HashSet::from_iter(vec![2, 3, 7, 9, 10]), // 5, 3, 2
HashSet::from_iter(vec![4, 5, 6, 12, 13]), // 5, 4, 2
HashSet::from_iter(vec![9, 10]), // 4, 4, 2*
HashSet::from_iter(vec![5, 6, 7, 8]), // 4, 4*
HashSet::from_iter(vec![0, 1, 2, 3, 4]), // 5*
];
let cover = maximum_cover(sets.clone(), 3);
assert_eq!(quality(&cover), 11);
}
#[test]
fn intersecting_ok() {
let sets = vec![
HashSet::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8]),
HashSet::from_iter(vec![1, 2, 3, 9, 10, 11]),
HashSet::from_iter(vec![4, 5, 6, 12, 13, 14]),
HashSet::from_iter(vec![7, 8, 15, 16, 17, 18]),
HashSet::from_iter(vec![1, 2, 9, 10]),
HashSet::from_iter(vec![1, 5, 6, 8]),
HashSet::from_iter(vec![1, 7, 11, 19]),
];
let cover = maximum_cover(sets.clone(), 5);
assert_eq!(quality(&cover), 19);
assert_eq!(cover.len(), 5);
}
}

View File

@ -13,7 +13,7 @@ use std::default;
/// A BooleanBitfield represents a set of booleans compactly stored as a vector of bits.
/// The BooleanBitfield is given a fixed size during construction. Reads outside of the current size return an out-of-bounds error. Writes outside of the current size expand the size of the set.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Hash)]
pub struct BooleanBitfield(BitVec);
/// Error represents some reason a request against a bitfield was not satisfied
@ -170,6 +170,7 @@ impl cmp::PartialEq for BooleanBitfield {
ssz::ssz_encode(self) == ssz::ssz_encode(other)
}
}
impl Eq for BooleanBitfield {}
/// Create a new bitfield that is a union of two other bitfields.
///