Push naive attestations into op pool (#1466)

## Issue Addressed

NA

## Proposed Changes

- When producing a block, go and ensure every attestation in the naive aggregation pool is included in the operation pool. This should help us increase the number of useful attestations in a block.
- Lift the `RwLock`s inside `NaiveAggregationPool` up into a single high-level lock. There were race conditions in the existing setup and it was hard to reason about.

## Additional Info

NA
This commit is contained in:
Paul Hauner 2020-08-06 07:26:46 +00:00
parent ee036cba7e
commit 0b287f6ece
2 changed files with 65 additions and 39 deletions

View File

@ -175,7 +175,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// ///
/// This pool accepts `Attestation` objects that only have one aggregation bit set and provides /// This pool accepts `Attestation` objects that only have one aggregation bit set and provides
/// a method to get an aggregated `Attestation` for some `AttestationData`. /// a method to get an aggregated `Attestation` for some `AttestationData`.
pub naive_aggregation_pool: NaiveAggregationPool<T::EthSpec>, pub naive_aggregation_pool: RwLock<NaiveAggregationPool<T::EthSpec>>,
/// Contains a store of attestations which have been observed by the beacon chain. /// Contains a store of attestations which have been observed by the beacon chain.
pub observed_attestations: ObservedAttestations<T::EthSpec>, pub observed_attestations: ObservedAttestations<T::EthSpec>,
/// Maintains a record of which validators have been seen to attest in recent epochs. /// Maintains a record of which validators have been seen to attest in recent epochs.
@ -747,7 +747,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self, &self,
data: &AttestationData, data: &AttestationData,
) -> Result<Option<Attestation<T::EthSpec>>, Error> { ) -> Result<Option<Attestation<T::EthSpec>>, Error> {
self.naive_aggregation_pool.get(data).map_err(Into::into) self.naive_aggregation_pool
.read()
.get(data)
.map_err(Into::into)
} }
/// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`. /// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`.
@ -937,7 +940,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation = unaggregated_attestation.attestation(); let attestation = unaggregated_attestation.attestation();
match self.naive_aggregation_pool.insert(attestation) { match self.naive_aggregation_pool.write().insert(attestation) {
Ok(outcome) => trace!( Ok(outcome) => trace!(
self.log, self.log,
"Stored unaggregated attestation"; "Stored unaggregated attestation";
@ -1632,6 +1635,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}) })
}; };
// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
for attestation in self.naive_aggregation_pool.read().iter() {
if let Err(e) = self.op_pool.insert_attestation(
attestation.clone(),
&state.fork,
state.genesis_validators_root,
&self.spec,
) {
// Don't stop block production if there's an error, just create a log.
error!(
self.log,
"Attestation did not transfer to op pool";
"reason" => format!("{:?}", e)
);
}
}
let mut block = SignedBeaconBlock { let mut block = SignedBeaconBlock {
message: BeaconBlock { message: BeaconBlock {
slot: state.slot, slot: state.slot,
@ -1852,7 +1873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn per_slot_task(&self) { pub fn per_slot_task(&self) {
trace!(self.log, "Running beacon chain per slot tasks"); trace!(self.log, "Running beacon chain per slot tasks");
if let Some(slot) = self.slot_clock.now() { if let Some(slot) = self.slot_clock.now() {
self.naive_aggregation_pool.prune(slot); self.naive_aggregation_pool.write().prune(slot);
} }
} }

View File

@ -1,5 +1,4 @@
use crate::metrics; use crate::metrics;
use parking_lot::RwLock;
use std::collections::HashMap; use std::collections::HashMap;
use types::{Attestation, AttestationData, EthSpec, Slot}; use types::{Attestation, AttestationData, EthSpec, Slot};
@ -120,6 +119,11 @@ impl<E: EthSpec> AggregatedAttestationMap<E> {
Ok(self.map.get(data).cloned()) Ok(self.map.get(data).cloned())
} }
/// Iterate all attestations in `self`.
pub fn iter(&self) -> impl Iterator<Item = &Attestation<E>> {
self.map.iter().map(|(_key, attestation)| attestation)
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.map.len() self.map.len()
} }
@ -147,15 +151,15 @@ impl<E: EthSpec> AggregatedAttestationMap<E> {
/// than that will also be refused. Pruning is done automatically based upon the attestations it /// than that will also be refused. Pruning is done automatically based upon the attestations it
/// receives and it can be triggered manually. /// receives and it can be triggered manually.
pub struct NaiveAggregationPool<E: EthSpec> { pub struct NaiveAggregationPool<E: EthSpec> {
lowest_permissible_slot: RwLock<Slot>, lowest_permissible_slot: Slot,
maps: RwLock<HashMap<Slot, AggregatedAttestationMap<E>>>, maps: HashMap<Slot, AggregatedAttestationMap<E>>,
} }
impl<E: EthSpec> Default for NaiveAggregationPool<E> { impl<E: EthSpec> Default for NaiveAggregationPool<E> {
fn default() -> Self { fn default() -> Self {
Self { Self {
lowest_permissible_slot: RwLock::new(Slot::new(0)), lowest_permissible_slot: Slot::new(0),
maps: RwLock::new(HashMap::new()), maps: HashMap::new(),
} }
} }
} }
@ -168,10 +172,10 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
/// ///
/// The pool may be pruned if the given `attestation.data` has a slot higher than any /// The pool may be pruned if the given `attestation.data` has a slot higher than any
/// previously seen. /// previously seen.
pub fn insert(&self, attestation: &Attestation<E>) -> Result<InsertOutcome, Error> { pub fn insert(&mut self, attestation: &Attestation<E>) -> Result<InsertOutcome, Error> {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_INSERT); let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_INSERT);
let slot = attestation.data.slot; let slot = attestation.data.slot;
let lowest_permissible_slot: Slot = *self.lowest_permissible_slot.read(); let lowest_permissible_slot = self.lowest_permissible_slot;
// Reject any attestations that are too old. // Reject any attestations that are too old.
if slot < lowest_permissible_slot { if slot < lowest_permissible_slot {
@ -183,16 +187,16 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
let lock_timer = let lock_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_MAPS_WRITE_LOCK); metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_MAPS_WRITE_LOCK);
let mut maps = self.maps.write();
drop(lock_timer); drop(lock_timer);
let outcome = if let Some(map) = maps.get_mut(&slot) { let outcome = if let Some(map) = self.maps.get_mut(&slot) {
map.insert(attestation) map.insert(attestation)
} else { } else {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_CREATE_MAP); let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_CREATE_MAP);
// To avoid re-allocations, try and determine a rough initial capacity for the new item // To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier epoch. // by obtaining the mean size of all items in earlier epoch.
let (count, sum) = maps let (count, sum) = self
.maps
.iter() .iter()
// Only include epochs that are less than the given slot in the average. This should // Only include epochs that are less than the given slot in the average. This should
// generally avoid including recent epochs that are still "filling up". // generally avoid including recent epochs that are still "filling up".
@ -205,12 +209,11 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
let mut item = AggregatedAttestationMap::new(initial_capacity); let mut item = AggregatedAttestationMap::new(initial_capacity);
let outcome = item.insert(attestation); let outcome = item.insert(attestation);
maps.insert(slot, item); self.maps.insert(slot, item);
outcome outcome
}; };
drop(maps);
self.prune(slot); self.prune(slot);
outcome outcome
@ -219,16 +222,20 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
/// Returns an aggregated `Attestation` with the given `data`, if any. /// Returns an aggregated `Attestation` with the given `data`, if any.
pub fn get(&self, data: &AttestationData) -> Result<Option<Attestation<E>>, Error> { pub fn get(&self, data: &AttestationData) -> Result<Option<Attestation<E>>, Error> {
self.maps self.maps
.read()
.iter() .iter()
.find(|(slot, _map)| **slot == data.slot) .find(|(slot, _map)| **slot == data.slot)
.map(|(_slot, map)| map.get(data)) .map(|(_slot, map)| map.get(data))
.unwrap_or_else(|| Ok(None)) .unwrap_or_else(|| Ok(None))
} }
/// Iterate all attestations in all slots of `self`.
pub fn iter(&self) -> impl Iterator<Item = &Attestation<E>> {
self.maps.iter().map(|(_slot, map)| map.iter()).flatten()
}
/// Removes any attestations with a slot lower than `current_slot` and bars any future /// Removes any attestations with a slot lower than `current_slot` and bars any future
/// attestations with a slot lower than `current_slot - SLOTS_RETAINED`. /// attestations with a slot lower than `current_slot - SLOTS_RETAINED`.
pub fn prune(&self, current_slot: Slot) { pub fn prune(&mut self, current_slot: Slot) {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_PRUNE); let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_PRUNE);
// Taking advantage of saturating subtraction on `Slot`. // Taking advantage of saturating subtraction on `Slot`.
@ -236,30 +243,34 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
// No need to prune if the lowest permissible slot has not changed and the queue length is // No need to prune if the lowest permissible slot has not changed and the queue length is
// less than the maximum // less than the maximum
if *self.lowest_permissible_slot.read() == lowest_permissible_slot if self.lowest_permissible_slot == lowest_permissible_slot
&& self.maps.read().len() <= SLOTS_RETAINED && self.maps.len() <= SLOTS_RETAINED
{ {
return; return;
} }
*self.lowest_permissible_slot.write() = lowest_permissible_slot; self.lowest_permissible_slot = lowest_permissible_slot;
let mut maps = self.maps.write();
// Remove any maps that are definitely expired. // Remove any maps that are definitely expired.
maps.retain(|slot, _map| *slot >= lowest_permissible_slot); self.maps
.retain(|slot, _map| *slot >= lowest_permissible_slot);
// If we have too many maps, remove the lowest amount to ensure we only have // If we have too many maps, remove the lowest amount to ensure we only have
// `SLOTS_RETAINED` left. // `SLOTS_RETAINED` left.
if maps.len() > SLOTS_RETAINED { if self.maps.len() > SLOTS_RETAINED {
let mut slots = maps.iter().map(|(slot, _map)| *slot).collect::<Vec<_>>(); let mut slots = self
.maps
.iter()
.map(|(slot, _map)| *slot)
.collect::<Vec<_>>();
// Sort is generally pretty slow, however `SLOTS_RETAINED` is quite low so it should be // Sort is generally pretty slow, however `SLOTS_RETAINED` is quite low so it should be
// negligible. // negligible.
slots.sort_unstable(); slots.sort_unstable();
slots slots
.into_iter() .into_iter()
.take(maps.len().saturating_sub(SLOTS_RETAINED)) .take(self.maps.len().saturating_sub(SLOTS_RETAINED))
.for_each(|slot| { .for_each(|slot| {
maps.remove(&slot); self.maps.remove(&slot);
}) })
} }
} }
@ -304,7 +315,7 @@ mod tests {
fn single_attestation() { fn single_attestation() {
let mut a = get_attestation(Slot::new(0)); let mut a = get_attestation(Slot::new(0));
let pool = NaiveAggregationPool::default(); let mut pool = NaiveAggregationPool::default();
assert_eq!( assert_eq!(
pool.insert(&a), pool.insert(&a),
@ -352,7 +363,7 @@ mod tests {
sign(&mut a_0, 0, genesis_validators_root); sign(&mut a_0, 0, genesis_validators_root);
sign(&mut a_1, 1, genesis_validators_root); sign(&mut a_1, 1, genesis_validators_root);
let pool = NaiveAggregationPool::default(); let mut pool = NaiveAggregationPool::default();
assert_eq!( assert_eq!(
pool.insert(&a_0), pool.insert(&a_0),
@ -409,7 +420,7 @@ mod tests {
let mut base = get_attestation(Slot::new(0)); let mut base = get_attestation(Slot::new(0));
sign(&mut base, 0, Hash256::random()); sign(&mut base, 0, Hash256::random());
let pool = NaiveAggregationPool::default(); let mut pool = NaiveAggregationPool::default();
for i in 0..SLOTS_RETAINED * 2 { for i in 0..SLOTS_RETAINED * 2 {
let slot = Slot::from(i); let slot = Slot::from(i);
@ -424,22 +435,16 @@ mod tests {
if i < SLOTS_RETAINED { if i < SLOTS_RETAINED {
let len = i + 1; let len = i + 1;
assert_eq!( assert_eq!(pool.maps.len(), len, "the pool should have length {}", len);
pool.maps.read().len(),
len,
"the pool should have length {}",
len
);
} else { } else {
assert_eq!( assert_eq!(
pool.maps.read().len(), pool.maps.len(),
SLOTS_RETAINED, SLOTS_RETAINED,
"the pool should have length SLOTS_RETAINED" "the pool should have length SLOTS_RETAINED"
); );
let mut pool_slots = pool let mut pool_slots = pool
.maps .maps
.read()
.iter() .iter()
.map(|(slot, _map)| *slot) .map(|(slot, _map)| *slot)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -463,7 +468,7 @@ mod tests {
let mut base = get_attestation(Slot::new(0)); let mut base = get_attestation(Slot::new(0));
sign(&mut base, 0, Hash256::random()); sign(&mut base, 0, Hash256::random());
let pool = NaiveAggregationPool::default(); let mut pool = NaiveAggregationPool::default();
for i in 0..=MAX_ATTESTATIONS_PER_SLOT { for i in 0..=MAX_ATTESTATIONS_PER_SLOT {
let mut a = base.clone(); let mut a = base.clone();