Push RwLock down into OperationPool

There used to be one massive lock on `BeaconChain.op_pool`, however that
would cause unnecessary blocking.
This commit is contained in:
Paul Hauner 2019-03-30 12:26:25 +11:00
parent 1840248af8
commit cd9494181c
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
3 changed files with 75 additions and 90 deletions

View File

@ -89,7 +89,7 @@ pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice> {
pub state_store: Arc<BeaconStateStore<T>>,
pub slot_clock: U,
pub attestation_aggregator: RwLock<AttestationAggregator>,
pub op_pool: RwLock<OperationPool>,
pub op_pool: OperationPool,
canonical_head: RwLock<CheckPoint>,
finalized_head: RwLock<CheckPoint>,
pub state: RwLock<BeaconState>,
@ -143,7 +143,7 @@ where
state_store,
slot_clock,
attestation_aggregator,
op_pool: RwLock::new(OperationPool::new()),
op_pool: OperationPool::new(),
state: RwLock::new(genesis_state),
finalized_head,
canonical_head,
@ -545,7 +545,6 @@ where
attestation: Attestation,
) -> Result<(), AttestationValidationError> {
self.op_pool
.write()
.insert_attestation(attestation, &*self.state.read(), &self.spec)
}
@ -555,21 +554,18 @@ where
deposit: Deposit,
) -> Result<DepositInsertStatus, DepositValidationError> {
self.op_pool
.write()
.insert_deposit(deposit, &*self.state.read(), &self.spec)
}
/// Accept some exit and queue it for inclusion in an appropriate block.
pub fn process_voluntary_exit(&self, exit: VoluntaryExit) -> Result<(), ExitValidationError> {
self.op_pool
.write()
.insert_voluntary_exit(exit, &*self.state.read(), &self.spec)
}
/// Accept some transfer and queue it for inclusion in an appropriate block.
pub fn process_transfer(&self, transfer: Transfer) -> Result<(), TransferValidationError> {
self.op_pool
.write()
.insert_transfer(transfer, &*self.state.read(), &self.spec)
}
@ -578,11 +574,8 @@ where
&self,
proposer_slashing: ProposerSlashing,
) -> Result<(), ProposerSlashingValidationError> {
self.op_pool.write().insert_proposer_slashing(
proposer_slashing,
&*self.state.read(),
&self.spec,
)
self.op_pool
.insert_proposer_slashing(proposer_slashing, &*self.state.read(), &self.spec)
}
/// Accept some attester slashing and queue it for inclusion in an appropriate block.
@ -590,11 +583,8 @@ where
&self,
attester_slashing: AttesterSlashing,
) -> Result<(), AttesterSlashingValidationError> {
self.op_pool.write().insert_attester_slashing(
attester_slashing,
&*self.state.read(),
&self.spec,
)
self.op_pool
.insert_attester_slashing(attester_slashing, &*self.state.read(), &self.spec)
}
/// Accept some block and attempt to add it to block DAG.
@ -705,24 +695,12 @@ where
trace!("Finding attestations for new block...");
let attestations = self
.op_pool
.read()
.get_attestations(&*self.state.read(), &self.spec);
trace!(
"Inserting {} attestation(s) into new block.",
attestations.len()
);
let previous_block_root = *state
.get_block_root(state.slot - 1, &self.spec)
.map_err(|_| BlockProductionError::UnableToGetBlockRootFromState)?;
let (proposer_slashings, attester_slashings) = self
.op_pool
.read()
.get_slashings(&*self.state.read(), &self.spec);
let (proposer_slashings, attester_slashings) =
self.op_pool.get_slashings(&*self.state.read(), &self.spec);
let mut block = BeaconBlock {
slot: state.slot,
@ -738,19 +716,14 @@ where
},
proposer_slashings,
attester_slashings,
attestations,
deposits: self
attestations: self
.op_pool
.read()
.get_deposits(&*self.state.read(), &self.spec),
.get_attestations(&*self.state.read(), &self.spec),
deposits: self.op_pool.get_deposits(&*self.state.read(), &self.spec),
voluntary_exits: self
.op_pool
.read()
.get_voluntary_exits(&*self.state.read(), &self.spec),
transfers: self
.op_pool
.read()
.get_transfers(&*self.state.read(), &self.spec),
transfers: self.op_pool.get_transfers(&*self.state.read(), &self.spec),
},
};

View File

@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
int_to_bytes = { path = "../utils/int_to_bytes" }
itertools = "0.8"
parking_lot = "0.7"
types = { path = "../types" }
state_processing = { path = "../state_processing" }
ssz = { path = "../utils/ssz" }

View File

@ -1,5 +1,6 @@
use int_to_bytes::int_to_bytes8;
use itertools::Itertools;
use parking_lot::RwLock;
use ssz::ssz_encode;
use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
@ -26,21 +27,21 @@ const VERIFY_DEPOSIT_PROOFS: bool = false; // TODO: enable this
#[derive(Default)]
pub struct OperationPool {
/// Map from attestation ID (see below) to vectors of attestations.
attestations: HashMap<AttestationId, Vec<Attestation>>,
attestations: RwLock<HashMap<AttestationId, Vec<Attestation>>>,
/// Map from deposit index to deposit data.
// NOTE: We assume that there is only one deposit per index
// because the Eth1 data is updated (at most) once per epoch,
// and the spec doesn't seem to accomodate for re-orgs on a time-frame
// longer than an epoch
deposits: BTreeMap<u64, Deposit>,
deposits: RwLock<BTreeMap<u64, Deposit>>,
/// Map from two attestation IDs to a slashing for those IDs.
attester_slashings: HashMap<(AttestationId, AttestationId), AttesterSlashing>,
attester_slashings: RwLock<HashMap<(AttestationId, AttestationId), AttesterSlashing>>,
/// Map from proposer index to slashing.
proposer_slashings: HashMap<u64, ProposerSlashing>,
proposer_slashings: RwLock<HashMap<u64, ProposerSlashing>>,
/// Map from exiting validator to their exit data.
voluntary_exits: HashMap<u64, VoluntaryExit>,
voluntary_exits: RwLock<HashMap<u64, VoluntaryExit>>,
/// Set of transfers.
transfers: HashSet<Transfer>,
transfers: RwLock<HashSet<Transfer>>,
}
/// Serialized `AttestationData` augmented with a domain to encode the fork info.
@ -109,7 +110,7 @@ impl OperationPool {
/// Insert an attestation into the pool, aggregating it with existing attestations if possible.
pub fn insert_attestation(
&mut self,
&self,
attestation: Attestation,
state: &BeaconState,
spec: &ChainSpec,
@ -119,7 +120,10 @@ impl OperationPool {
let id = AttestationId::from_data(&attestation.data, state, spec);
let existing_attestations = match self.attestations.entry(id) {
// Take a write lock on the attestations map.
let mut attestations = self.attestations.write();
let existing_attestations = match attestations.entry(id) {
hash_map::Entry::Vacant(entry) => {
entry.insert(vec![attestation]);
return Ok(());
@ -146,7 +150,11 @@ impl OperationPool {
/// Total number of attestations in the pool, including attestations for the same data.
pub fn num_attestations(&self) -> usize {
self.attestations.values().map(|atts| atts.len()).sum()
self.attestations
.read()
.values()
.map(|atts| atts.len())
.sum()
}
/// Get a list of attestations for inclusion in a block.
@ -157,6 +165,7 @@ impl OperationPool {
let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, state, spec);
let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec);
self.attestations
.read()
.iter()
.filter(|(key, _)| {
key.domain_bytes_match(&prev_domain_bytes)
@ -180,8 +189,8 @@ impl OperationPool {
// TODO: we could probably prune other attestations here:
// - ones that are completely covered by attestations included in the state
// - maybe ones invalidated by the confirmation of one fork over another
pub fn prune_attestations(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) {
self.attestations.retain(|_, attestations| {
pub fn prune_attestations(&self, finalized_state: &BeaconState, spec: &ChainSpec) {
self.attestations.write().retain(|_, attestations| {
// All the attestations in this bucket have the same data, so we only need to
// check the first one.
attestations.first().map_or(false, |att| {
@ -194,14 +203,14 @@ impl OperationPool {
///
/// No two distinct deposits should be added with the same index.
pub fn insert_deposit(
&mut self,
&self,
deposit: Deposit,
state: &BeaconState,
spec: &ChainSpec,
) -> Result<DepositInsertStatus, DepositValidationError> {
use DepositInsertStatus::*;
match self.deposits.entry(deposit.index) {
match self.deposits.write().entry(deposit.index) {
Entry::Vacant(entry) => {
verify_deposit(state, &deposit, VERIFY_DEPOSIT_PROOFS, spec)?;
entry.insert(deposit);
@ -224,27 +233,26 @@ impl OperationPool {
pub fn get_deposits(&self, state: &BeaconState, spec: &ChainSpec) -> Vec<Deposit> {
let start_idx = state.deposit_index;
(start_idx..start_idx + spec.max_deposits)
.map(|idx| self.deposits.get(&idx))
.map(|idx| self.deposits.read().get(&idx).cloned())
.take_while(Option::is_some)
.flatten()
.cloned()
.collect()
}
/// Remove all deposits with index less than the deposit index of the latest finalised block.
pub fn prune_deposits(&mut self, state: &BeaconState) -> BTreeMap<u64, Deposit> {
let deposits_keep = self.deposits.split_off(&state.deposit_index);
std::mem::replace(&mut self.deposits, deposits_keep)
pub fn prune_deposits(&self, state: &BeaconState) -> BTreeMap<u64, Deposit> {
let deposits_keep = self.deposits.write().split_off(&state.deposit_index);
std::mem::replace(&mut self.deposits.write(), deposits_keep)
}
/// The number of deposits stored in the pool.
pub fn num_deposits(&self) -> usize {
self.deposits.len()
self.deposits.read().len()
}
/// Insert a proposer slashing into the pool.
pub fn insert_proposer_slashing(
&mut self,
&self,
slashing: ProposerSlashing,
state: &BeaconState,
spec: &ChainSpec,
@ -253,6 +261,7 @@ impl OperationPool {
// because they could *become* known later
verify_proposer_slashing(&slashing, state, spec)?;
self.proposer_slashings
.write()
.insert(slashing.proposer_index, slashing);
Ok(())
}
@ -273,14 +282,14 @@ impl OperationPool {
/// Insert an attester slashing into the pool.
pub fn insert_attester_slashing(
&mut self,
&self,
slashing: AttesterSlashing,
state: &BeaconState,
spec: &ChainSpec,
) -> Result<(), AttesterSlashingValidationError> {
verify_attester_slashing(state, &slashing, true, spec)?;
let id = Self::attester_slashing_id(&slashing, state, spec);
self.attester_slashings.insert(id, slashing);
self.attester_slashings.write().insert(id, slashing);
Ok(())
}
@ -295,7 +304,7 @@ impl OperationPool {
spec: &ChainSpec,
) -> (Vec<ProposerSlashing>, Vec<AttesterSlashing>) {
let proposer_slashings = filter_limit_operations(
self.proposer_slashings.values(),
self.proposer_slashings.read().values(),
|slashing| {
state
.validator_registry
@ -314,6 +323,7 @@ impl OperationPool {
let attester_slashings = self
.attester_slashings
.read()
.iter()
.filter(|(id, slashing)| {
// Check the fork.
@ -345,9 +355,9 @@ impl OperationPool {
}
/// Prune proposer slashings for all slashed or withdrawn validators.
pub fn prune_proposer_slashings(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) {
pub fn prune_proposer_slashings(&self, finalized_state: &BeaconState, spec: &ChainSpec) {
prune_validator_hash_map(
&mut self.proposer_slashings,
&mut self.proposer_slashings.write(),
|validator| {
validator.slashed
|| validator.is_withdrawable_at(finalized_state.current_epoch(spec))
@ -358,8 +368,8 @@ impl OperationPool {
/// Prune attester slashings for all slashed or withdrawn validators, or attestations on another
/// fork.
pub fn prune_attester_slashings(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) {
self.attester_slashings.retain(|id, slashing| {
pub fn prune_attester_slashings(&self, finalized_state: &BeaconState, spec: &ChainSpec) {
self.attester_slashings.write().retain(|id, slashing| {
let fork_ok = &Self::attester_slashing_id(slashing, finalized_state, spec) == id;
let curr_epoch = finalized_state.current_epoch(spec);
let slashing_ok = gather_attester_slashing_indices_modular(
@ -375,29 +385,31 @@ impl OperationPool {
/// Insert a voluntary exit, validating it almost-entirely (future exits are permitted).
pub fn insert_voluntary_exit(
&mut self,
&self,
exit: VoluntaryExit,
state: &BeaconState,
spec: &ChainSpec,
) -> Result<(), ExitValidationError> {
verify_exit_time_independent_only(state, &exit, spec)?;
self.voluntary_exits.insert(exit.validator_index, exit);
self.voluntary_exits
.write()
.insert(exit.validator_index, exit);
Ok(())
}
/// Get a list of voluntary exits for inclusion in a block.
pub fn get_voluntary_exits(&self, state: &BeaconState, spec: &ChainSpec) -> Vec<VoluntaryExit> {
filter_limit_operations(
self.voluntary_exits.values(),
self.voluntary_exits.read().values(),
|exit| verify_exit(state, exit, spec).is_ok(),
spec.max_voluntary_exits,
)
}
/// Prune if validator has already exited at the last finalized state.
pub fn prune_voluntary_exits(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) {
pub fn prune_voluntary_exits(&self, finalized_state: &BeaconState, spec: &ChainSpec) {
prune_validator_hash_map(
&mut self.voluntary_exits,
&mut self.voluntary_exits.write(),
|validator| validator.is_exited_at(finalized_state.current_epoch(spec)),
finalized_state,
);
@ -405,7 +417,7 @@ impl OperationPool {
/// Insert a transfer into the pool, checking it for validity in the process.
pub fn insert_transfer(
&mut self,
&self,
transfer: Transfer,
state: &BeaconState,
spec: &ChainSpec,
@ -414,7 +426,7 @@ impl OperationPool {
// it before we insert into the HashSet, we can't end up with duplicate
// transactions.
verify_transfer_time_independent_only(state, &transfer, spec)?;
self.transfers.insert(transfer);
self.transfers.write().insert(transfer);
Ok(())
}
@ -423,6 +435,7 @@ impl OperationPool {
// dependencies between transfers in the same block e.g. A pays B, B pays C
pub fn get_transfers(&self, state: &BeaconState, spec: &ChainSpec) -> Vec<Transfer> {
self.transfers
.read()
.iter()
.filter(|transfer| verify_transfer(state, transfer, spec).is_ok())
.sorted_by_key(|transfer| std::cmp::Reverse(transfer.fee))
@ -432,16 +445,14 @@ impl OperationPool {
}
/// Prune the set of transfers by removing all those whose slot has already passed.
pub fn prune_transfers(&mut self, finalized_state: &BeaconState) {
self.transfers = self
.transfers
.drain()
.filter(|transfer| transfer.slot > finalized_state.slot)
.collect();
pub fn prune_transfers(&self, finalized_state: &BeaconState) {
self.transfers
.write()
.retain(|transfer| transfer.slot > finalized_state.slot)
}
/// Prune all types of transactions given the latest finalized state.
pub fn prune_all(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) {
pub fn prune_all(&self, finalized_state: &BeaconState, spec: &ChainSpec) {
self.prune_attestations(finalized_state, spec);
self.prune_deposits(finalized_state);
self.prune_proposer_slashings(finalized_state, spec);
@ -497,7 +508,7 @@ mod tests {
fn insert_deposit() {
let rng = &mut XorShiftRng::from_seed([42; 16]);
let (ref spec, ref state) = test_state(rng);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let deposit1 = make_deposit(rng, state, spec);
let mut deposit2 = make_deposit(rng, state, spec);
deposit2.index = deposit1.index;
@ -520,7 +531,7 @@ mod tests {
fn get_deposits_max() {
let rng = &mut XorShiftRng::from_seed([42; 16]);
let (spec, mut state) = test_state(rng);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let start = 10000;
let max_deposits = spec.max_deposits;
let extra = 5;
@ -550,7 +561,7 @@ mod tests {
fn prune_deposits() {
let rng = &mut XorShiftRng::from_seed([42; 16]);
let (spec, state) = test_state(rng);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let start1 = 100;
// test is super slow in debug mode if this parameter is too high
@ -734,7 +745,7 @@ mod tests {
fn attestation_aggregation_insert_get_prune() {
let spec = &ChainSpec::foundation();
let (ref mut state, ref keypairs) = attestation_test_state(spec, 1);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let slot = state.slot - 1;
let committees = state
@ -765,7 +776,7 @@ mod tests {
}
}
assert_eq!(op_pool.attestations.len(), committees.len());
assert_eq!(op_pool.attestations.read().len(), committees.len());
assert_eq!(op_pool.num_attestations(), committees.len());
// Before the min attestation inclusion delay, get_attestations shouldn't return anything.
@ -799,7 +810,7 @@ mod tests {
fn attestation_duplicate() {
let spec = &ChainSpec::foundation();
let (ref mut state, ref keypairs) = attestation_test_state(spec, 1);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let slot = state.slot - 1;
let committees = state
@ -825,7 +836,7 @@ mod tests {
fn attestation_pairwise_overlapping() {
let spec = &ChainSpec::foundation();
let (ref mut state, ref keypairs) = attestation_test_state(spec, 1);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let slot = state.slot - 1;
let committees = state
@ -854,7 +865,7 @@ mod tests {
// The attestations should get aggregated into two attestations that comprise all
// validators.
assert_eq!(op_pool.attestations.len(), committees.len());
assert_eq!(op_pool.attestations.read().len(), committees.len());
assert_eq!(op_pool.num_attestations(), 2 * committees.len());
}
@ -869,7 +880,7 @@ mod tests {
let small_step_size = 2;
let big_step_size = 4;
let (ref mut state, ref keypairs) = attestation_test_state(spec, big_step_size);
let mut op_pool = OperationPool::new();
let op_pool = OperationPool::new();
let slot = state.slot - 1;
let committees = state
@ -907,7 +918,7 @@ mod tests {
let num_small = target_committee_size / small_step_size;
let num_big = target_committee_size / big_step_size;
assert_eq!(op_pool.attestations.len(), committees.len());
assert_eq!(op_pool.attestations.read().len(), committees.len());
assert_eq!(
op_pool.num_attestations(),
(num_small + num_big) * committees.len()