From 0b287f6ece5cf0233516f27dbe67216a992c83b5 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 6 Aug 2020 07:26:46 +0000 Subject: [PATCH] Push naive attestations into op pool (#1466) ## Issue Addressed NA ## Proposed Changes - When producing a block, go and ensure every attestation in the naive aggregation pool is included in the operation pool. This should help us increase the number of useful attestations in a block. - Lift the `RwLock`s inside `NaiveAggregationPool` up into a single high-level lock. There were race conditions in the existing setup and it was hard to reason about. ## Additional Info NA --- beacon_node/beacon_chain/src/beacon_chain.rs | 29 ++++++- .../src/naive_aggregation_pool.rs | 75 ++++++++++--------- 2 files changed, 65 insertions(+), 39 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 433198e4e..bd2601213 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -175,7 +175,7 @@ pub struct BeaconChain { /// /// This pool accepts `Attestation` objects that only have one aggregation bit set and provides /// a method to get an aggregated `Attestation` for some `AttestationData`. - pub naive_aggregation_pool: NaiveAggregationPool, + pub naive_aggregation_pool: RwLock>, /// Contains a store of attestations which have been observed by the beacon chain. pub observed_attestations: ObservedAttestations, /// Maintains a record of which validators have been seen to attest in recent epochs. @@ -747,7 +747,10 @@ impl BeaconChain { &self, data: &AttestationData, ) -> Result>, Error> { - self.naive_aggregation_pool.get(data).map_err(Into::into) + self.naive_aggregation_pool + .read() + .get(data) + .map_err(Into::into) } /// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`. @@ -937,7 +940,7 @@ impl BeaconChain { let attestation = unaggregated_attestation.attestation(); - match self.naive_aggregation_pool.insert(attestation) { + match self.naive_aggregation_pool.write().insert(attestation) { Ok(outcome) => trace!( self.log, "Stored unaggregated attestation"; @@ -1632,6 +1635,24 @@ impl BeaconChain { }) }; + // Iterate through the naive aggregation pool and ensure all the attestations from there + // are included in the operation pool. + for attestation in self.naive_aggregation_pool.read().iter() { + if let Err(e) = self.op_pool.insert_attestation( + attestation.clone(), + &state.fork, + state.genesis_validators_root, + &self.spec, + ) { + // Don't stop block production if there's an error, just create a log. + error!( + self.log, + "Attestation did not transfer to op pool"; + "reason" => format!("{:?}", e) + ); + } + } + let mut block = SignedBeaconBlock { message: BeaconBlock { slot: state.slot, @@ -1852,7 +1873,7 @@ impl BeaconChain { pub fn per_slot_task(&self) { trace!(self.log, "Running beacon chain per slot tasks"); if let Some(slot) = self.slot_clock.now() { - self.naive_aggregation_pool.prune(slot); + self.naive_aggregation_pool.write().prune(slot); } } diff --git a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs index 4a3e042ac..c561141a1 100644 --- a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs +++ b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs @@ -1,5 +1,4 @@ use crate::metrics; -use parking_lot::RwLock; use std::collections::HashMap; use types::{Attestation, AttestationData, EthSpec, Slot}; @@ -120,6 +119,11 @@ impl AggregatedAttestationMap { Ok(self.map.get(data).cloned()) } + /// Iterate all attestations in `self`. + pub fn iter(&self) -> impl Iterator> { + self.map.iter().map(|(_key, attestation)| attestation) + } + pub fn len(&self) -> usize { self.map.len() } @@ -147,15 +151,15 @@ impl AggregatedAttestationMap { /// than that will also be refused. Pruning is done automatically based upon the attestations it /// receives and it can be triggered manually. pub struct NaiveAggregationPool { - lowest_permissible_slot: RwLock, - maps: RwLock>>, + lowest_permissible_slot: Slot, + maps: HashMap>, } impl Default for NaiveAggregationPool { fn default() -> Self { Self { - lowest_permissible_slot: RwLock::new(Slot::new(0)), - maps: RwLock::new(HashMap::new()), + lowest_permissible_slot: Slot::new(0), + maps: HashMap::new(), } } } @@ -168,10 +172,10 @@ impl NaiveAggregationPool { /// /// The pool may be pruned if the given `attestation.data` has a slot higher than any /// previously seen. - pub fn insert(&self, attestation: &Attestation) -> Result { + pub fn insert(&mut self, attestation: &Attestation) -> Result { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_INSERT); let slot = attestation.data.slot; - let lowest_permissible_slot: Slot = *self.lowest_permissible_slot.read(); + let lowest_permissible_slot = self.lowest_permissible_slot; // Reject any attestations that are too old. if slot < lowest_permissible_slot { @@ -183,16 +187,16 @@ impl NaiveAggregationPool { let lock_timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_MAPS_WRITE_LOCK); - let mut maps = self.maps.write(); drop(lock_timer); - let outcome = if let Some(map) = maps.get_mut(&slot) { + let outcome = if let Some(map) = self.maps.get_mut(&slot) { map.insert(attestation) } else { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_CREATE_MAP); // To avoid re-allocations, try and determine a rough initial capacity for the new item // by obtaining the mean size of all items in earlier epoch. - let (count, sum) = maps + let (count, sum) = self + .maps .iter() // Only include epochs that are less than the given slot in the average. This should // generally avoid including recent epochs that are still "filling up". @@ -205,12 +209,11 @@ impl NaiveAggregationPool { let mut item = AggregatedAttestationMap::new(initial_capacity); let outcome = item.insert(attestation); - maps.insert(slot, item); + self.maps.insert(slot, item); outcome }; - drop(maps); self.prune(slot); outcome @@ -219,16 +222,20 @@ impl NaiveAggregationPool { /// Returns an aggregated `Attestation` with the given `data`, if any. pub fn get(&self, data: &AttestationData) -> Result>, Error> { self.maps - .read() .iter() .find(|(slot, _map)| **slot == data.slot) .map(|(_slot, map)| map.get(data)) .unwrap_or_else(|| Ok(None)) } + /// Iterate all attestations in all slots of `self`. + pub fn iter(&self) -> impl Iterator> { + self.maps.iter().map(|(_slot, map)| map.iter()).flatten() + } + /// Removes any attestations with a slot lower than `current_slot` and bars any future /// attestations with a slot lower than `current_slot - SLOTS_RETAINED`. - pub fn prune(&self, current_slot: Slot) { + pub fn prune(&mut self, current_slot: Slot) { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_PRUNE); // Taking advantage of saturating subtraction on `Slot`. @@ -236,30 +243,34 @@ impl NaiveAggregationPool { // No need to prune if the lowest permissible slot has not changed and the queue length is // less than the maximum - if *self.lowest_permissible_slot.read() == lowest_permissible_slot - && self.maps.read().len() <= SLOTS_RETAINED + if self.lowest_permissible_slot == lowest_permissible_slot + && self.maps.len() <= SLOTS_RETAINED { return; } - *self.lowest_permissible_slot.write() = lowest_permissible_slot; - let mut maps = self.maps.write(); + self.lowest_permissible_slot = lowest_permissible_slot; // Remove any maps that are definitely expired. - maps.retain(|slot, _map| *slot >= lowest_permissible_slot); + self.maps + .retain(|slot, _map| *slot >= lowest_permissible_slot); // If we have too many maps, remove the lowest amount to ensure we only have // `SLOTS_RETAINED` left. - if maps.len() > SLOTS_RETAINED { - let mut slots = maps.iter().map(|(slot, _map)| *slot).collect::>(); + if self.maps.len() > SLOTS_RETAINED { + let mut slots = self + .maps + .iter() + .map(|(slot, _map)| *slot) + .collect::>(); // Sort is generally pretty slow, however `SLOTS_RETAINED` is quite low so it should be // negligible. slots.sort_unstable(); slots .into_iter() - .take(maps.len().saturating_sub(SLOTS_RETAINED)) + .take(self.maps.len().saturating_sub(SLOTS_RETAINED)) .for_each(|slot| { - maps.remove(&slot); + self.maps.remove(&slot); }) } } @@ -304,7 +315,7 @@ mod tests { fn single_attestation() { let mut a = get_attestation(Slot::new(0)); - let pool = NaiveAggregationPool::default(); + let mut pool = NaiveAggregationPool::default(); assert_eq!( pool.insert(&a), @@ -352,7 +363,7 @@ mod tests { sign(&mut a_0, 0, genesis_validators_root); sign(&mut a_1, 1, genesis_validators_root); - let pool = NaiveAggregationPool::default(); + let mut pool = NaiveAggregationPool::default(); assert_eq!( pool.insert(&a_0), @@ -409,7 +420,7 @@ mod tests { let mut base = get_attestation(Slot::new(0)); sign(&mut base, 0, Hash256::random()); - let pool = NaiveAggregationPool::default(); + let mut pool = NaiveAggregationPool::default(); for i in 0..SLOTS_RETAINED * 2 { let slot = Slot::from(i); @@ -424,22 +435,16 @@ mod tests { if i < SLOTS_RETAINED { let len = i + 1; - assert_eq!( - pool.maps.read().len(), - len, - "the pool should have length {}", - len - ); + assert_eq!(pool.maps.len(), len, "the pool should have length {}", len); } else { assert_eq!( - pool.maps.read().len(), + pool.maps.len(), SLOTS_RETAINED, "the pool should have length SLOTS_RETAINED" ); let mut pool_slots = pool .maps - .read() .iter() .map(|(slot, _map)| *slot) .collect::>(); @@ -463,7 +468,7 @@ mod tests { let mut base = get_attestation(Slot::new(0)); sign(&mut base, 0, Hash256::random()); - let pool = NaiveAggregationPool::default(); + let mut pool = NaiveAggregationPool::default(); for i in 0..=MAX_ATTESTATIONS_PER_SLOT { let mut a = base.clone();