diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index a090c1cc5..37d96a497 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -9,7 +9,7 @@ types = { path = "../eth2/types" } client = { path = "client" } version = { path = "version" } clap = "2.32.0" -slog = "^2.2.3" +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } slog-term = "^2.4.0" slog-async = "^2.3.0" ctrlc = { version = "3.1.1", features = ["termination"] } diff --git a/beacon_node/beacon_chain/src/attestation_aggregator.rs b/beacon_node/beacon_chain/src/attestation_aggregator.rs deleted file mode 100644 index 9b4e5a687..000000000 --- a/beacon_node/beacon_chain/src/attestation_aggregator.rs +++ /dev/null @@ -1,218 +0,0 @@ -use ssz::TreeHash; -use state_processing::per_block_processing::validate_attestation_without_signature; -use std::collections::{HashMap, HashSet}; -use types::*; - -const PHASE_0_CUSTODY_BIT: bool = false; - -/// Provides the functionality to: -/// -/// - Recieve a `FreeAttestation` and aggregate it into an `Attestation` (or create a new if it -/// doesn't exist). -/// - Store all aggregated or created `Attestation`s. -/// - Produce a list of attestations that would be valid for inclusion in some `BeaconState` (and -/// therefore valid for inclusion in a `BeaconBlock`. -/// -/// Note: `Attestations` are stored in memory and never deleted. This is not scalable and must be -/// rectified in a future revision. -#[derive(Default)] -pub struct AttestationAggregator { - store: HashMap, Attestation>, -} - -pub struct Outcome { - pub valid: bool, - pub message: Message, -} - -pub enum Message { - /// The free attestation was added to an existing attestation. - Aggregated, - /// The free attestation has already been aggregated to an existing attestation. - AggregationNotRequired, - /// The free attestation was transformed into a new attestation. - NewAttestationCreated, - /// The supplied `validator_index` is not in the committee for the given `shard` and `slot`. - BadValidatorIndex, - /// The given `signature` did not match the `pubkey` in the given - /// `state.validator_registry`. - BadSignature, - /// The given `slot` does not match the validators committee assignment. - BadSlot, - /// The given `shard` does not match the validators committee assignment, or is not included in - /// a committee for the given slot. - BadShard, - /// Attestation is from the epoch prior to this, ignoring. - TooOld, -} - -macro_rules! valid_outcome { - ($error: expr) => { - return Ok(Outcome { - valid: true, - message: $error, - }); - }; -} - -macro_rules! invalid_outcome { - ($error: expr) => { - return Ok(Outcome { - valid: false, - message: $error, - }); - }; -} - -impl AttestationAggregator { - /// Instantiates a new AttestationAggregator with an empty database. - pub fn new() -> Self { - Self { - store: HashMap::new(), - } - } - - /// Accepts some `FreeAttestation`, validates it and either aggregates it upon some existing - /// `Attestation` or produces a new `Attestation`. - /// - /// The "validation" provided is not complete, instead the following points are checked: - /// - The given `validator_index` is in the committee for the given `shard` for the given - /// `slot`. - /// - The signature is verified against that of the validator at `validator_index`. - pub fn process_free_attestation( - &mut self, - state: &BeaconState, - free_attestation: &FreeAttestation, - spec: &ChainSpec, - ) -> Result { - let duties = - match state.get_attestation_duties(free_attestation.validator_index as usize, spec) { - Err(BeaconStateError::EpochCacheUninitialized(e)) => { - panic!("Attempted to access unbuilt cache {:?}.", e) - } - Err(BeaconStateError::EpochOutOfBounds) => invalid_outcome!(Message::TooOld), - Err(BeaconStateError::ShardOutOfBounds) => invalid_outcome!(Message::BadShard), - Err(e) => return Err(e), - Ok(None) => invalid_outcome!(Message::BadValidatorIndex), - Ok(Some(attestation_duties)) => attestation_duties, - }; - - if free_attestation.data.slot != duties.slot { - invalid_outcome!(Message::BadSlot); - } - if free_attestation.data.shard != duties.shard { - invalid_outcome!(Message::BadShard); - } - - let signable_message = AttestationDataAndCustodyBit { - data: free_attestation.data.clone(), - custody_bit: PHASE_0_CUSTODY_BIT, - } - .hash_tree_root(); - - let validator_record = match state - .validator_registry - .get(free_attestation.validator_index as usize) - { - None => invalid_outcome!(Message::BadValidatorIndex), - Some(validator_record) => validator_record, - }; - - if !free_attestation.signature.verify( - &signable_message, - spec.get_domain(state.current_epoch(spec), Domain::Attestation, &state.fork), - &validator_record.pubkey, - ) { - invalid_outcome!(Message::BadSignature); - } - - if let Some(existing_attestation) = self.store.get(&signable_message) { - if let Some(updated_attestation) = aggregate_attestation( - existing_attestation, - &free_attestation.signature, - duties.committee_index as usize, - ) { - self.store.insert(signable_message, updated_attestation); - valid_outcome!(Message::Aggregated); - } else { - valid_outcome!(Message::AggregationNotRequired); - } - } else { - let mut aggregate_signature = AggregateSignature::new(); - aggregate_signature.add(&free_attestation.signature); - let mut aggregation_bitfield = Bitfield::new(); - aggregation_bitfield.set(duties.committee_index as usize, true); - let new_attestation = Attestation { - data: free_attestation.data.clone(), - aggregation_bitfield, - custody_bitfield: Bitfield::new(), - aggregate_signature, - }; - self.store.insert(signable_message, new_attestation); - valid_outcome!(Message::NewAttestationCreated); - } - } - - /// Returns all known attestations which are: - /// - /// - Valid for the given state - /// - Not already in `state.latest_attestations`. - pub fn get_attestations_for_state( - &self, - state: &BeaconState, - spec: &ChainSpec, - ) -> Vec { - let mut known_attestation_data: HashSet = HashSet::new(); - - state - .previous_epoch_attestations - .iter() - .chain(state.current_epoch_attestations.iter()) - .for_each(|attestation| { - known_attestation_data.insert(attestation.data.clone()); - }); - - self.store - .values() - .filter_map(|attestation| { - if validate_attestation_without_signature(&state, attestation, spec).is_ok() - && !known_attestation_data.contains(&attestation.data) - { - Some(attestation.clone()) - } else { - None - } - }) - .collect() - } -} - -/// Produces a new `Attestation` where: -/// -/// - `signature` is added to `Attestation.aggregate_signature` -/// - Attestation.aggregation_bitfield[committee_index]` is set to true. -fn aggregate_attestation( - existing_attestation: &Attestation, - signature: &Signature, - committee_index: usize, -) -> Option { - let already_signed = existing_attestation - .aggregation_bitfield - .get(committee_index) - .unwrap_or(false); - - if already_signed { - None - } else { - let mut aggregation_bitfield = existing_attestation.aggregation_bitfield.clone(); - aggregation_bitfield.set(committee_index, true); - let mut aggregate_signature = existing_attestation.aggregate_signature.clone(); - aggregate_signature.add(&signature); - - Some(Attestation { - aggregation_bitfield, - aggregate_signature, - ..existing_attestation.clone() - }) - } -} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c2336a28..6ca0bff73 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -11,7 +11,7 @@ use operation_pool::OperationPool; use parking_lot::{RwLock, RwLockReadGuard}; use slot_clock::SlotClock; use ssz::ssz_encode; -pub use state_processing::per_block_processing::errors::{ +use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError, }; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 69ff14671..7605ced15 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,4 +1,3 @@ -mod attestation_aggregator; mod beacon_chain; mod checkpoint; mod errors; @@ -10,9 +9,12 @@ pub use self::beacon_chain::{ }; pub use self::checkpoint::CheckPoint; pub use self::errors::{BeaconChainError, BlockProductionError}; -pub use attestation_aggregator::Outcome as AggregationOutcome; pub use db; pub use fork_choice; pub use parking_lot; pub use slot_clock; +pub use state_processing::per_block_processing::errors::{ + AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, + ExitValidationError, ProposerSlashingValidationError, TransferValidationError, +}; pub use types; diff --git a/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml b/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml index 1674ecffc..ad9c899cf 100644 --- a/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml +++ b/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml @@ -48,7 +48,8 @@ test_cases: - slot: 63 num_validators: 1003 num_previous_epoch_attestations: 0 - num_current_epoch_attestations: 10 + # slots_per_epoch - attestation_inclusion_delay - skip_slots + num_current_epoch_attestations: 57 slashed_validators: [11, 12, 13, 14, 42] exited_validators: [] exit_initiated_validators: [50] diff --git a/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs b/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs index b7acac9e1..33c12d7c7 100644 --- a/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs +++ b/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs @@ -178,8 +178,8 @@ impl BeaconChainHarness { agg_sig }; - let mut aggregation_bitfield = Bitfield::with_capacity(committee.committee.len()); - let custody_bitfield = Bitfield::with_capacity(committee.committee.len()); + let mut aggregation_bitfield = Bitfield::with_capacity(duties.committee_len); + let custody_bitfield = Bitfield::with_capacity(duties.committee_len); aggregation_bitfield.set(duties.committee_index, true); diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index ad3233be7..f9adb93c1 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -179,6 +179,19 @@ pub struct BeaconBlockRootsResponse { pub roots: Vec, } +impl BeaconBlockRootsResponse { + /// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`. + pub fn slots_are_ascending(&self) -> bool { + for i in 1..self.roots.len() { + if self.roots[i - 1].slot >= self.roots[i].slot { + return false; + } + } + + true + } +} + /// Contains a block root and associated slot. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlockRootSlot { diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index c6411a020..cd2c2269a 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,7 +13,7 @@ beacon_chain = { path = "../beacon_chain" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -slog = "2.4.1" +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } ssz = { path = "../../eth2/utils/ssz" } futures = "0.1.25" error-chain = "0.12.0" diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 8ec8162ff..7a8efb254 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -5,7 +5,7 @@ use beacon_chain::{ parking_lot::RwLockReadGuard, slot_clock::SlotClock, types::{BeaconState, ChainSpec}, - AggregationOutcome, CheckPoint, + AttestationValidationError, CheckPoint, }; use eth2_libp2p::rpc::HelloMessage; use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; @@ -40,7 +40,7 @@ pub trait BeaconChain: Send + Sync { fn process_attestation( &self, attestation: Attestation, - ) -> Result; + ) -> Result<(), AttestationValidationError>; fn get_block_roots( &self, @@ -126,14 +126,9 @@ where fn process_attestation( &self, - _attestation: Attestation, - ) -> Result { - // Awaiting a proper operations pool before we can import attestations. - // - // Returning a useless error for now. - // - // https://github.com/sigp/lighthouse/issues/281 - return Err(BeaconChainError::DBInconsistent("CANNOT PROCESS".into())); + attestation: Attestation, + ) -> Result<(), AttestationValidationError> { + self.process_attestation(attestation) } fn get_block_roots( diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 17cbd2f12..b9280440b 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -5,7 +5,7 @@ use slog::{debug, error}; use ssz::TreeHash; use std::sync::Arc; use std::time::{Duration, Instant}; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256}; +use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot}; /// Provides a queue for fully and partially built `BeaconBlock`s. /// @@ -113,11 +113,36 @@ impl ImportQueue { }) } - /// Returns the index of the first new root in the list of block roots. - pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option { - roots + /// Adds the `block_roots` to the partials queue. + /// + /// If a `block_root` is not in the queue and has not been processed by the chain it is added + /// to the queue and it's block root is included in the output. + pub fn enqueue_block_roots( + &mut self, + block_roots: &[BlockRootSlot], + sender: PeerId, + ) -> Vec { + let new_roots: Vec = block_roots .iter() - .position(|brs| self.is_new_block(&brs.block_root)) + // Ignore any roots already processed by the chain. + .filter(|brs| self.is_new_block(&brs.block_root)) + // Ignore any roots already stored in the queue. + .filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root)) + .cloned() + .collect(); + + new_roots.iter().for_each(|brs| { + self.partials.push(PartialBeaconBlock { + slot: brs.slot, + block_root: brs.block_root, + sender: sender.clone(), + header: None, + body: None, + inserted: Instant::now(), + }) + }); + + new_roots } /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for @@ -171,11 +196,21 @@ impl ImportQueue { .iter() .position(|p| p.block_root == block_root) { + // Case 1: there already exists a partial with a matching block root. + // + // The `inserted` time is set to now and the header is replaced, regardless of whether + // it existed or not. + self.partials[i].header = Some(header); self.partials[i].inserted = Instant::now(); } else { + // Case 2: there was no partial with a matching block root. + // + // A new partial is added. This case permits adding a header without already known the + // root -- this is not possible in the wire protocol however we support it anyway. self.partials.push(PartialBeaconBlock { + slot: header.slot, block_root, - header, + header: Some(header), body: None, inserted: Instant::now(), sender, @@ -192,12 +227,14 @@ impl ImportQueue { let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); self.partials.iter_mut().for_each(|mut p| { - if body_root == p.header.block_body_root { - p.inserted = Instant::now(); + if let Some(header) = &mut p.header { + if body_root == header.block_body_root { + p.inserted = Instant::now(); - if p.body.is_none() { - p.body = Some(body.clone()); - p.sender = sender.clone(); + if p.body.is_none() { + p.body = Some(body.clone()); + p.sender = sender.clone(); + } } } }); @@ -208,9 +245,10 @@ impl ImportQueue { /// `BeaconBlock`. #[derive(Clone, Debug)] pub struct PartialBeaconBlock { + pub slot: Slot, /// `BeaconBlock` root. pub block_root: Hash256, - pub header: BeaconBlockHeader, + pub header: Option, pub body: Option, /// The instant at which this record was created or last meaningfully modified. Used to /// determine if an entry is stale and should be removed. @@ -225,7 +263,7 @@ impl PartialBeaconBlock { pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { Some(( self.block_root, - self.header.into_block(self.body?), + self.header?.into_block(self.body?), self.sender, )) } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 85949fa98..39fe772b4 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -358,31 +358,50 @@ impl SimpleSync { if res.roots.is_empty() { warn!( self.log, - "Peer returned empty block roots response. PeerId: {:?}", peer_id + "Peer returned empty block roots response"; + "peer_id" => format!("{:?}", peer_id) ); return; } - let new_root_index = self.import_queue.first_new_root(&res.roots); - - // If a new block root is found, request it and all the headers following it. - // - // We make an assumption here that if we don't know a block then we don't know of all - // it's parents. This might not be the case if syncing becomes more sophisticated. - if let Some(i) = new_root_index { - let new = &res.roots[i]; - - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: new.block_root, - start_slot: new.slot, - max_headers: (res.roots.len() - i) as u64, - skip_slots: 0, - }, - network, - ) + // The wire protocol specifies that slots must be in ascending order. + if !res.slots_are_ascending() { + warn!( + self.log, + "Peer returned block roots response with bad slot ordering"; + "peer_id" => format!("{:?}", peer_id) + ); + return; } + + let new_roots = self + .import_queue + .enqueue_block_roots(&res.roots, peer_id.clone()); + + // No new roots means nothing to do. + // + // This check protects against future panics. + if new_roots.is_empty() { + return; + } + + // Determine the first (earliest) and last (latest) `BlockRootSlot` items. + // + // This logic relies upon slots to be in ascending order, which is enforced earlier. + let first = new_roots.first().expect("Non-empty list must have first"); + let last = new_roots.last().expect("Non-empty list must have last"); + + // Request all headers between the earliest and latest new `BlockRootSlot` items. + self.request_block_headers( + peer_id, + BeaconBlockHeadersRequest { + start_root: first.block_root, + start_slot: first.slot, + max_headers: (last.slot - first.slot + 1).as_u64(), + skip_slots: 0, + }, + network, + ) } /// Handle a `BeaconBlockHeaders` request from the peer. @@ -528,8 +547,17 @@ impl SimpleSync { "NewGossipBlock"; "peer" => format!("{:?}", peer_id), ); - // TODO: filter out messages that a prior to the finalized slot. - // + + // Ignore any block from a finalized slot. + if self.slot_is_finalized(msg.slot) { + warn!( + self.log, "NewGossipBlock"; + "msg" => "new block slot is finalized.", + "slot" => msg.slot, + ); + return; + } + // TODO: if the block is a few more slots ahead, try to get all block roots from then until // now. // @@ -563,12 +591,9 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), ); - // Awaiting a proper operations pool before we can import attestations. - // - // https://github.com/sigp/lighthouse/issues/281 match self.chain.process_attestation(msg) { - Ok(_) => panic!("Impossible, method not implemented."), - Err(_) => error!(self.log, "Attestation processing not implemented!"), + Ok(()) => info!(self.log, "ImportedAttestation"), + Err(e) => warn!(self.log, "InvalidAttestation"; "error" => format!("{:?}", e)), } } @@ -678,6 +703,14 @@ impl SimpleSync { network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); } + fn slot_is_finalized(&self, slot: Slot) -> bool { + slot <= self + .chain + .hello_message() + .latest_finalized_epoch + .start_slot(self.chain.get_spec().slots_per_epoch) + } + /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { self.chain.hello_message() diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 9cead1b55..47d5482d3 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -543,7 +543,7 @@ fn sync_two_nodes() { // A provides block bodies to B. node_a.tee_block_body_response(&node_b); - std::thread::sleep(Duration::from_secs(10)); + std::thread::sleep(Duration::from_secs(20)); node_b.harness.run_fork_choice(); diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index c42527b60..9d4d0091a 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -172,6 +172,8 @@ impl OperationPool { || key.domain_bytes_match(&curr_domain_bytes) }) .flat_map(|(_, attestations)| attestations) + // That are not superseded by an attestation included in the state... + .filter(|attestation| !superior_attestation_exists_in_state(state, attestation)) // That are valid... .filter(|attestation| validate_attestation(state, attestation, spec).is_ok()) // Scored by the number of new attestations they introduce (descending) @@ -462,6 +464,31 @@ impl OperationPool { } } +/// Returns `true` if the state already contains a `PendingAttestation` that is superior to the +/// given `attestation`. +/// +/// A validator has nothing to gain from re-including an attestation and it adds load to the +/// network. +/// +/// An existing `PendingAttestation` is superior to an existing `attestation` if: +/// +/// - Their `AttestationData` is equal. +/// - `attestation` does not contain any signatures that `PendingAttestation` does not have. +fn superior_attestation_exists_in_state(state: &BeaconState, attestation: &Attestation) -> bool { + state + .current_epoch_attestations + .iter() + .chain(state.previous_epoch_attestations.iter()) + .any(|existing_attestation| { + let bitfield = &attestation.aggregation_bitfield; + let existing_bitfield = &existing_attestation.aggregation_bitfield; + + existing_attestation.data == attestation.data + && bitfield.intersection(existing_bitfield).num_set_bits() + == bitfield.num_set_bits() + }) +} + /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec where diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index c04188920..f437240dc 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder { }) .collect(); - let genesis_time = 1553950542; // arbitrary + let genesis_time = 1553977336; // arbitrary let mut state = BeaconState::genesis( genesis_time,