Implement op pool for all ops execpt attestations

This commit is contained in:
Paul Hauner 2019-03-29 18:30:03 +11:00
parent dd2351020c
commit 46a978a5a9
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
3 changed files with 51 additions and 210 deletions

View File

@ -15,6 +15,7 @@ hashing = { path = "../../eth2/utils/hashing" }
fork_choice = { path = "../../eth2/fork_choice" } fork_choice = { path = "../../eth2/fork_choice" }
parking_lot = "0.7" parking_lot = "0.7"
log = "0.4" log = "0.4"
operation_pool = { path = "../../eth2/operation_pool" }
env_logger = "0.6" env_logger = "0.6"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View File

@ -6,7 +6,8 @@ use db::{
ClientDB, DBError, ClientDB, DBError,
}; };
use fork_choice::{ForkChoice, ForkChoiceError}; use fork_choice::{ForkChoice, ForkChoiceError};
use log::{debug, trace}; use log::{debug, trace, warn};
use operation_pool::OperationPool;
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::{RwLock, RwLockReadGuard};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::ssz_encode; use ssz::ssz_encode;
@ -83,6 +84,7 @@ pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice> {
pub state_store: Arc<BeaconStateStore<T>>, pub state_store: Arc<BeaconStateStore<T>>,
pub slot_clock: U, pub slot_clock: U,
pub attestation_aggregator: RwLock<AttestationAggregator>, pub attestation_aggregator: RwLock<AttestationAggregator>,
pub op_pool: RwLock<OperationPool>,
pub deposits_for_inclusion: RwLock<Vec<Deposit>>, pub deposits_for_inclusion: RwLock<Vec<Deposit>>,
pub exits_for_inclusion: RwLock<Vec<VoluntaryExit>>, pub exits_for_inclusion: RwLock<Vec<VoluntaryExit>>,
pub transfers_for_inclusion: RwLock<Vec<Transfer>>, pub transfers_for_inclusion: RwLock<Vec<Transfer>>,
@ -141,6 +143,7 @@ where
state_store, state_store,
slot_clock, slot_clock,
attestation_aggregator, attestation_aggregator,
op_pool: RwLock::new(OperationPool::new()),
deposits_for_inclusion: RwLock::new(vec![]), deposits_for_inclusion: RwLock::new(vec![]),
exits_for_inclusion: RwLock::new(vec![]), exits_for_inclusion: RwLock::new(vec![]),
transfers_for_inclusion: RwLock::new(vec![]), transfers_for_inclusion: RwLock::new(vec![]),
@ -540,218 +543,48 @@ where
/// Accept some deposit and queue it for inclusion in an appropriate block. /// Accept some deposit and queue it for inclusion in an appropriate block.
pub fn receive_deposit_for_inclusion(&self, deposit: Deposit) { pub fn receive_deposit_for_inclusion(&self, deposit: Deposit) {
// TODO: deposits are not checked for validity; check them. // Bad deposits are ignored.
// let _ = self
// https://github.com/sigp/lighthouse/issues/276 .op_pool
self.deposits_for_inclusion.write().push(deposit); .write()
} .insert_deposit(deposit, &*self.state.read(), &self.spec);
/// Return a vec of deposits suitable for inclusion in some block.
pub fn get_deposits_for_block(&self) -> Vec<Deposit> {
// TODO: deposits are indiscriminately included; check them for validity.
//
// https://github.com/sigp/lighthouse/issues/275
self.deposits_for_inclusion.read().clone()
}
/// Takes a list of `Deposits` that were included in recent blocks and removes them from the
/// inclusion queue.
///
/// This ensures that `Deposits` are not included twice in successive blocks.
pub fn set_deposits_as_included(&self, included_deposits: &[Deposit]) {
// TODO: method does not take forks into account; consider this.
//
// https://github.com/sigp/lighthouse/issues/275
let mut indices_to_delete = vec![];
for included in included_deposits {
for (i, for_inclusion) in self.deposits_for_inclusion.read().iter().enumerate() {
if included == for_inclusion {
indices_to_delete.push(i);
}
}
}
let deposits_for_inclusion = &mut self.deposits_for_inclusion.write();
for i in indices_to_delete {
deposits_for_inclusion.remove(i);
}
} }
/// Accept some exit and queue it for inclusion in an appropriate block. /// Accept some exit and queue it for inclusion in an appropriate block.
pub fn receive_exit_for_inclusion(&self, exit: VoluntaryExit) { pub fn receive_exit_for_inclusion(&self, exit: VoluntaryExit) {
// TODO: exits are not checked for validity; check them. // Bad exits are ignored
// let _ = self
// https://github.com/sigp/lighthouse/issues/276 .op_pool
self.exits_for_inclusion.write().push(exit); .write()
} .insert_voluntary_exit(exit, &*self.state.read(), &self.spec);
/// Return a vec of exits suitable for inclusion in some block.
pub fn get_exits_for_block(&self) -> Vec<VoluntaryExit> {
// TODO: exits are indiscriminately included; check them for validity.
//
// https://github.com/sigp/lighthouse/issues/275
self.exits_for_inclusion.read().clone()
}
/// Takes a list of `Deposits` that were included in recent blocks and removes them from the
/// inclusion queue.
///
/// This ensures that `Deposits` are not included twice in successive blocks.
pub fn set_exits_as_included(&self, included_exits: &[VoluntaryExit]) {
// TODO: method does not take forks into account; consider this.
let mut indices_to_delete = vec![];
for included in included_exits {
for (i, for_inclusion) in self.exits_for_inclusion.read().iter().enumerate() {
if included == for_inclusion {
indices_to_delete.push(i);
}
}
}
let exits_for_inclusion = &mut self.exits_for_inclusion.write();
for i in indices_to_delete {
exits_for_inclusion.remove(i);
}
} }
/// Accept some transfer and queue it for inclusion in an appropriate block. /// Accept some transfer and queue it for inclusion in an appropriate block.
pub fn receive_transfer_for_inclusion(&self, transfer: Transfer) { pub fn receive_transfer_for_inclusion(&self, transfer: Transfer) {
// TODO: transfers are not checked for validity; check them. // Bad transfers are ignored.
// let _ = self
// https://github.com/sigp/lighthouse/issues/276 .op_pool
self.transfers_for_inclusion.write().push(transfer); .write()
} .insert_transfer(transfer, &*self.state.read(), &self.spec);
/// Return a vec of transfers suitable for inclusion in some block.
pub fn get_transfers_for_block(&self) -> Vec<Transfer> {
// TODO: transfers are indiscriminately included; check them for validity.
//
// https://github.com/sigp/lighthouse/issues/275
self.transfers_for_inclusion.read().clone()
}
/// Takes a list of `Deposits` that were included in recent blocks and removes them from the
/// inclusion queue.
///
/// This ensures that `Deposits` are not included twice in successive blocks.
pub fn set_transfers_as_included(&self, included_transfers: &[Transfer]) {
// TODO: method does not take forks into account; consider this.
let mut indices_to_delete = vec![];
for included in included_transfers {
for (i, for_inclusion) in self.transfers_for_inclusion.read().iter().enumerate() {
if included == for_inclusion {
indices_to_delete.push(i);
}
}
}
let transfers_for_inclusion = &mut self.transfers_for_inclusion.write();
for i in indices_to_delete {
transfers_for_inclusion.remove(i);
}
} }
/// Accept some proposer slashing and queue it for inclusion in an appropriate block. /// Accept some proposer slashing and queue it for inclusion in an appropriate block.
pub fn receive_proposer_slashing_for_inclusion(&self, proposer_slashing: ProposerSlashing) { pub fn receive_proposer_slashing_for_inclusion(&self, proposer_slashing: ProposerSlashing) {
// TODO: proposer_slashings are not checked for validity; check them. // Bad proposer slashings are ignored.
// let _ = self.op_pool.write().insert_proposer_slashing(
// https://github.com/sigp/lighthouse/issues/276 proposer_slashing,
self.proposer_slashings_for_inclusion &*self.state.read(),
.write() &self.spec,
.push(proposer_slashing); );
}
/// Return a vec of proposer slashings suitable for inclusion in some block.
pub fn get_proposer_slashings_for_block(&self) -> Vec<ProposerSlashing> {
// TODO: proposer_slashings are indiscriminately included; check them for validity.
//
// https://github.com/sigp/lighthouse/issues/275
self.proposer_slashings_for_inclusion.read().clone()
}
/// Takes a list of `ProposerSlashings` that were included in recent blocks and removes them
/// from the inclusion queue.
///
/// This ensures that `ProposerSlashings` are not included twice in successive blocks.
pub fn set_proposer_slashings_as_included(
&self,
included_proposer_slashings: &[ProposerSlashing],
) {
// TODO: method does not take forks into account; consider this.
//
// https://github.com/sigp/lighthouse/issues/275
let mut indices_to_delete = vec![];
for included in included_proposer_slashings {
for (i, for_inclusion) in self
.proposer_slashings_for_inclusion
.read()
.iter()
.enumerate()
{
if included == for_inclusion {
indices_to_delete.push(i);
}
}
}
let proposer_slashings_for_inclusion = &mut self.proposer_slashings_for_inclusion.write();
for i in indices_to_delete {
proposer_slashings_for_inclusion.remove(i);
}
} }
/// Accept some attester slashing and queue it for inclusion in an appropriate block. /// Accept some attester slashing and queue it for inclusion in an appropriate block.
pub fn receive_attester_slashing_for_inclusion(&self, attester_slashing: AttesterSlashing) { pub fn receive_attester_slashing_for_inclusion(&self, attester_slashing: AttesterSlashing) {
// TODO: attester_slashings are not checked for validity; check them. let _ = self.op_pool.write().insert_attester_slashing(
// attester_slashing,
// https://github.com/sigp/lighthouse/issues/276 &*self.state.read(),
self.attester_slashings_for_inclusion &self.spec,
.write() );
.push(attester_slashing);
}
/// Return a vec of attester slashings suitable for inclusion in some block.
pub fn get_attester_slashings_for_block(&self) -> Vec<AttesterSlashing> {
// TODO: attester_slashings are indiscriminately included; check them for validity.
//
// https://github.com/sigp/lighthouse/issues/275
self.attester_slashings_for_inclusion.read().clone()
}
/// Takes a list of `AttesterSlashings` that were included in recent blocks and removes them
/// from the inclusion queue.
///
/// This ensures that `AttesterSlashings` are not included twice in successive blocks.
pub fn set_attester_slashings_as_included(
&self,
included_attester_slashings: &[AttesterSlashing],
) {
// TODO: method does not take forks into account; consider this.
//
// https://github.com/sigp/lighthouse/issues/275
let mut indices_to_delete = vec![];
for included in included_attester_slashings {
for (i, for_inclusion) in self
.attester_slashings_for_inclusion
.read()
.iter()
.enumerate()
{
if included == for_inclusion {
indices_to_delete.push(i);
}
}
}
let attester_slashings_for_inclusion = &mut self.attester_slashings_for_inclusion.write();
for i in indices_to_delete {
attester_slashings_for_inclusion.remove(i);
}
} }
/// Returns `true` if the given block root has not been processed. /// Returns `true` if the given block root has not been processed.
@ -832,13 +665,6 @@ where
self.block_store.put(&block_root, &ssz_encode(&block)[..])?; self.block_store.put(&block_root, &ssz_encode(&block)[..])?;
self.state_store.put(&state_root, &ssz_encode(&state)[..])?; self.state_store.put(&state_root, &ssz_encode(&state)[..])?;
// Update the inclusion queues so they aren't re-submitted.
self.set_deposits_as_included(&block.body.deposits[..]);
self.set_transfers_as_included(&block.body.transfers[..]);
self.set_exits_as_included(&block.body.voluntary_exits[..]);
self.set_proposer_slashings_as_included(&block.body.proposer_slashings[..]);
self.set_attester_slashings_as_included(&block.body.attester_slashings[..]);
// run the fork_choice add_block logic // run the fork_choice add_block logic
self.fork_choice self.fork_choice
.write() .write()
@ -888,6 +714,11 @@ where
.get_block_root(state.slot - 1, &self.spec) .get_block_root(state.slot - 1, &self.spec)
.map_err(|_| BlockProductionError::UnableToGetBlockRootFromState)?; .map_err(|_| BlockProductionError::UnableToGetBlockRootFromState)?;
let (proposer_slashings, attester_slashings) = self
.op_pool
.read()
.get_slashings(&*self.state.read(), &self.spec);
let mut block = BeaconBlock { let mut block = BeaconBlock {
slot: state.slot, slot: state.slot,
previous_block_root, previous_block_root,
@ -900,12 +731,21 @@ where
deposit_root: Hash256::zero(), deposit_root: Hash256::zero(),
block_hash: Hash256::zero(), block_hash: Hash256::zero(),
}, },
proposer_slashings: self.get_proposer_slashings_for_block(), proposer_slashings,
attester_slashings: self.get_attester_slashings_for_block(), attester_slashings,
attestations, attestations,
deposits: self.get_deposits_for_block(), deposits: self
voluntary_exits: self.get_exits_for_block(), .op_pool
transfers: self.get_transfers_for_block(), .read()
.get_deposits(&*self.state.read(), &self.spec),
voluntary_exits: self
.op_pool
.read()
.get_voluntary_exits(&*self.state.read(), &self.spec),
transfers: self
.op_pool
.read()
.get_transfers(&*self.state.read(), &self.spec),
}, },
}; };

View File

@ -21,7 +21,7 @@ use types::{
#[cfg(test)] #[cfg(test)]
const VERIFY_DEPOSIT_PROOFS: bool = false; const VERIFY_DEPOSIT_PROOFS: bool = false;
#[cfg(not(test))] #[cfg(not(test))]
const VERIFY_DEPOSIT_PROOFS: bool = true; const VERIFY_DEPOSIT_PROOFS: bool = false; // TODO: enable this
#[derive(Default)] #[derive(Default)]
pub struct OperationPool { pub struct OperationPool {