Merge latest gossip branch

This commit is contained in:
Age Manning 2019-03-31 11:02:57 +11:00
commit 77fb738c78
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
14 changed files with 169 additions and 278 deletions

View File

@ -9,7 +9,7 @@ types = { path = "../eth2/types" }
client = { path = "client" } client = { path = "client" }
version = { path = "version" } version = { path = "version" }
clap = "2.32.0" clap = "2.32.0"
slog = "^2.2.3" slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] }
slog-term = "^2.4.0" slog-term = "^2.4.0"
slog-async = "^2.3.0" slog-async = "^2.3.0"
ctrlc = { version = "3.1.1", features = ["termination"] } ctrlc = { version = "3.1.1", features = ["termination"] }

View File

@ -1,218 +0,0 @@
use ssz::TreeHash;
use state_processing::per_block_processing::validate_attestation_without_signature;
use std::collections::{HashMap, HashSet};
use types::*;
const PHASE_0_CUSTODY_BIT: bool = false;
/// Provides the functionality to:
///
/// - Recieve a `FreeAttestation` and aggregate it into an `Attestation` (or create a new if it
/// doesn't exist).
/// - Store all aggregated or created `Attestation`s.
/// - Produce a list of attestations that would be valid for inclusion in some `BeaconState` (and
/// therefore valid for inclusion in a `BeaconBlock`.
///
/// Note: `Attestations` are stored in memory and never deleted. This is not scalable and must be
/// rectified in a future revision.
#[derive(Default)]
pub struct AttestationAggregator {
store: HashMap<Vec<u8>, Attestation>,
}
pub struct Outcome {
pub valid: bool,
pub message: Message,
}
pub enum Message {
/// The free attestation was added to an existing attestation.
Aggregated,
/// The free attestation has already been aggregated to an existing attestation.
AggregationNotRequired,
/// The free attestation was transformed into a new attestation.
NewAttestationCreated,
/// The supplied `validator_index` is not in the committee for the given `shard` and `slot`.
BadValidatorIndex,
/// The given `signature` did not match the `pubkey` in the given
/// `state.validator_registry`.
BadSignature,
/// The given `slot` does not match the validators committee assignment.
BadSlot,
/// The given `shard` does not match the validators committee assignment, or is not included in
/// a committee for the given slot.
BadShard,
/// Attestation is from the epoch prior to this, ignoring.
TooOld,
}
macro_rules! valid_outcome {
($error: expr) => {
return Ok(Outcome {
valid: true,
message: $error,
});
};
}
macro_rules! invalid_outcome {
($error: expr) => {
return Ok(Outcome {
valid: false,
message: $error,
});
};
}
impl AttestationAggregator {
/// Instantiates a new AttestationAggregator with an empty database.
pub fn new() -> Self {
Self {
store: HashMap::new(),
}
}
/// Accepts some `FreeAttestation`, validates it and either aggregates it upon some existing
/// `Attestation` or produces a new `Attestation`.
///
/// The "validation" provided is not complete, instead the following points are checked:
/// - The given `validator_index` is in the committee for the given `shard` for the given
/// `slot`.
/// - The signature is verified against that of the validator at `validator_index`.
pub fn process_free_attestation(
&mut self,
state: &BeaconState,
free_attestation: &FreeAttestation,
spec: &ChainSpec,
) -> Result<Outcome, BeaconStateError> {
let duties =
match state.get_attestation_duties(free_attestation.validator_index as usize, spec) {
Err(BeaconStateError::EpochCacheUninitialized(e)) => {
panic!("Attempted to access unbuilt cache {:?}.", e)
}
Err(BeaconStateError::EpochOutOfBounds) => invalid_outcome!(Message::TooOld),
Err(BeaconStateError::ShardOutOfBounds) => invalid_outcome!(Message::BadShard),
Err(e) => return Err(e),
Ok(None) => invalid_outcome!(Message::BadValidatorIndex),
Ok(Some(attestation_duties)) => attestation_duties,
};
if free_attestation.data.slot != duties.slot {
invalid_outcome!(Message::BadSlot);
}
if free_attestation.data.shard != duties.shard {
invalid_outcome!(Message::BadShard);
}
let signable_message = AttestationDataAndCustodyBit {
data: free_attestation.data.clone(),
custody_bit: PHASE_0_CUSTODY_BIT,
}
.hash_tree_root();
let validator_record = match state
.validator_registry
.get(free_attestation.validator_index as usize)
{
None => invalid_outcome!(Message::BadValidatorIndex),
Some(validator_record) => validator_record,
};
if !free_attestation.signature.verify(
&signable_message,
spec.get_domain(state.current_epoch(spec), Domain::Attestation, &state.fork),
&validator_record.pubkey,
) {
invalid_outcome!(Message::BadSignature);
}
if let Some(existing_attestation) = self.store.get(&signable_message) {
if let Some(updated_attestation) = aggregate_attestation(
existing_attestation,
&free_attestation.signature,
duties.committee_index as usize,
) {
self.store.insert(signable_message, updated_attestation);
valid_outcome!(Message::Aggregated);
} else {
valid_outcome!(Message::AggregationNotRequired);
}
} else {
let mut aggregate_signature = AggregateSignature::new();
aggregate_signature.add(&free_attestation.signature);
let mut aggregation_bitfield = Bitfield::new();
aggregation_bitfield.set(duties.committee_index as usize, true);
let new_attestation = Attestation {
data: free_attestation.data.clone(),
aggregation_bitfield,
custody_bitfield: Bitfield::new(),
aggregate_signature,
};
self.store.insert(signable_message, new_attestation);
valid_outcome!(Message::NewAttestationCreated);
}
}
/// Returns all known attestations which are:
///
/// - Valid for the given state
/// - Not already in `state.latest_attestations`.
pub fn get_attestations_for_state(
&self,
state: &BeaconState,
spec: &ChainSpec,
) -> Vec<Attestation> {
let mut known_attestation_data: HashSet<AttestationData> = HashSet::new();
state
.previous_epoch_attestations
.iter()
.chain(state.current_epoch_attestations.iter())
.for_each(|attestation| {
known_attestation_data.insert(attestation.data.clone());
});
self.store
.values()
.filter_map(|attestation| {
if validate_attestation_without_signature(&state, attestation, spec).is_ok()
&& !known_attestation_data.contains(&attestation.data)
{
Some(attestation.clone())
} else {
None
}
})
.collect()
}
}
/// Produces a new `Attestation` where:
///
/// - `signature` is added to `Attestation.aggregate_signature`
/// - Attestation.aggregation_bitfield[committee_index]` is set to true.
fn aggregate_attestation(
existing_attestation: &Attestation,
signature: &Signature,
committee_index: usize,
) -> Option<Attestation> {
let already_signed = existing_attestation
.aggregation_bitfield
.get(committee_index)
.unwrap_or(false);
if already_signed {
None
} else {
let mut aggregation_bitfield = existing_attestation.aggregation_bitfield.clone();
aggregation_bitfield.set(committee_index, true);
let mut aggregate_signature = existing_attestation.aggregate_signature.clone();
aggregate_signature.add(&signature);
Some(Attestation {
aggregation_bitfield,
aggregate_signature,
..existing_attestation.clone()
})
}
}

View File

@ -11,7 +11,7 @@ use operation_pool::OperationPool;
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::{RwLock, RwLockReadGuard};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::ssz_encode; use ssz::ssz_encode;
pub use state_processing::per_block_processing::errors::{ use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
ExitValidationError, ProposerSlashingValidationError, TransferValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError,
}; };

View File

@ -1,4 +1,3 @@
mod attestation_aggregator;
mod beacon_chain; mod beacon_chain;
mod checkpoint; mod checkpoint;
mod errors; mod errors;
@ -10,9 +9,12 @@ pub use self::beacon_chain::{
}; };
pub use self::checkpoint::CheckPoint; pub use self::checkpoint::CheckPoint;
pub use self::errors::{BeaconChainError, BlockProductionError}; pub use self::errors::{BeaconChainError, BlockProductionError};
pub use attestation_aggregator::Outcome as AggregationOutcome;
pub use db; pub use db;
pub use fork_choice; pub use fork_choice;
pub use parking_lot; pub use parking_lot;
pub use slot_clock; pub use slot_clock;
pub use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
ExitValidationError, ProposerSlashingValidationError, TransferValidationError,
};
pub use types; pub use types;

View File

@ -48,7 +48,8 @@ test_cases:
- slot: 63 - slot: 63
num_validators: 1003 num_validators: 1003
num_previous_epoch_attestations: 0 num_previous_epoch_attestations: 0
num_current_epoch_attestations: 10 # slots_per_epoch - attestation_inclusion_delay - skip_slots
num_current_epoch_attestations: 57
slashed_validators: [11, 12, 13, 14, 42] slashed_validators: [11, 12, 13, 14, 42]
exited_validators: [] exited_validators: []
exit_initiated_validators: [50] exit_initiated_validators: [50]

View File

@ -178,8 +178,8 @@ impl BeaconChainHarness {
agg_sig agg_sig
}; };
let mut aggregation_bitfield = Bitfield::with_capacity(committee.committee.len()); let mut aggregation_bitfield = Bitfield::with_capacity(duties.committee_len);
let custody_bitfield = Bitfield::with_capacity(committee.committee.len()); let custody_bitfield = Bitfield::with_capacity(duties.committee_len);
aggregation_bitfield.set(duties.committee_index, true); aggregation_bitfield.set(duties.committee_index, true);

View File

@ -179,6 +179,19 @@ pub struct BeaconBlockRootsResponse {
pub roots: Vec<BlockRootSlot>, pub roots: Vec<BlockRootSlot>,
} }
impl BeaconBlockRootsResponse {
/// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`.
pub fn slots_are_ascending(&self) -> bool {
for i in 1..self.roots.len() {
if self.roots[i - 1].slot >= self.roots[i].slot {
return false;
}
}
true
}
}
/// Contains a block root and associated slot. /// Contains a block root and associated slot.
#[derive(Encode, Decode, Clone, Debug, PartialEq)] #[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockRootSlot { pub struct BlockRootSlot {

View File

@ -13,7 +13,7 @@ beacon_chain = { path = "../beacon_chain" }
eth2-libp2p = { path = "../eth2-libp2p" } eth2-libp2p = { path = "../eth2-libp2p" }
version = { path = "../version" } version = { path = "../version" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
slog = "2.4.1" slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] }
ssz = { path = "../../eth2/utils/ssz" } ssz = { path = "../../eth2/utils/ssz" }
futures = "0.1.25" futures = "0.1.25"
error-chain = "0.12.0" error-chain = "0.12.0"

View File

@ -5,7 +5,7 @@ use beacon_chain::{
parking_lot::RwLockReadGuard, parking_lot::RwLockReadGuard,
slot_clock::SlotClock, slot_clock::SlotClock,
types::{BeaconState, ChainSpec}, types::{BeaconState, ChainSpec},
AggregationOutcome, CheckPoint, AttestationValidationError, CheckPoint,
}; };
use eth2_libp2p::rpc::HelloMessage; use eth2_libp2p::rpc::HelloMessage;
use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
@ -40,7 +40,7 @@ pub trait BeaconChain: Send + Sync {
fn process_attestation( fn process_attestation(
&self, &self,
attestation: Attestation, attestation: Attestation,
) -> Result<AggregationOutcome, BeaconChainError>; ) -> Result<(), AttestationValidationError>;
fn get_block_roots( fn get_block_roots(
&self, &self,
@ -126,14 +126,9 @@ where
fn process_attestation( fn process_attestation(
&self, &self,
_attestation: Attestation, attestation: Attestation,
) -> Result<AggregationOutcome, BeaconChainError> { ) -> Result<(), AttestationValidationError> {
// Awaiting a proper operations pool before we can import attestations. self.process_attestation(attestation)
//
// Returning a useless error for now.
//
// https://github.com/sigp/lighthouse/issues/281
return Err(BeaconChainError::DBInconsistent("CANNOT PROCESS".into()));
} }
fn get_block_roots( fn get_block_roots(

View File

@ -5,7 +5,7 @@ use slog::{debug, error};
use ssz::TreeHash; use ssz::TreeHash;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256}; use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot};
/// Provides a queue for fully and partially built `BeaconBlock`s. /// Provides a queue for fully and partially built `BeaconBlock`s.
/// ///
@ -113,11 +113,36 @@ impl ImportQueue {
}) })
} }
/// Returns the index of the first new root in the list of block roots. /// Adds the `block_roots` to the partials queue.
pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option<usize> { ///
roots /// If a `block_root` is not in the queue and has not been processed by the chain it is added
/// to the queue and it's block root is included in the output.
pub fn enqueue_block_roots(
&mut self,
block_roots: &[BlockRootSlot],
sender: PeerId,
) -> Vec<BlockRootSlot> {
let new_roots: Vec<BlockRootSlot> = block_roots
.iter() .iter()
.position(|brs| self.is_new_block(&brs.block_root)) // Ignore any roots already processed by the chain.
.filter(|brs| self.is_new_block(&brs.block_root))
// Ignore any roots already stored in the queue.
.filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root))
.cloned()
.collect();
new_roots.iter().for_each(|brs| {
self.partials.push(PartialBeaconBlock {
slot: brs.slot,
block_root: brs.block_root,
sender: sender.clone(),
header: None,
body: None,
inserted: Instant::now(),
})
});
new_roots
} }
/// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for
@ -171,11 +196,21 @@ impl ImportQueue {
.iter() .iter()
.position(|p| p.block_root == block_root) .position(|p| p.block_root == block_root)
{ {
// Case 1: there already exists a partial with a matching block root.
//
// The `inserted` time is set to now and the header is replaced, regardless of whether
// it existed or not.
self.partials[i].header = Some(header);
self.partials[i].inserted = Instant::now(); self.partials[i].inserted = Instant::now();
} else { } else {
// Case 2: there was no partial with a matching block root.
//
// A new partial is added. This case permits adding a header without already known the
// root -- this is not possible in the wire protocol however we support it anyway.
self.partials.push(PartialBeaconBlock { self.partials.push(PartialBeaconBlock {
slot: header.slot,
block_root, block_root,
header, header: Some(header),
body: None, body: None,
inserted: Instant::now(), inserted: Instant::now(),
sender, sender,
@ -192,7 +227,8 @@ impl ImportQueue {
let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); let body_root = Hash256::from_slice(&body.hash_tree_root()[..]);
self.partials.iter_mut().for_each(|mut p| { self.partials.iter_mut().for_each(|mut p| {
if body_root == p.header.block_body_root { if let Some(header) = &mut p.header {
if body_root == header.block_body_root {
p.inserted = Instant::now(); p.inserted = Instant::now();
if p.body.is_none() { if p.body.is_none() {
@ -200,6 +236,7 @@ impl ImportQueue {
p.sender = sender.clone(); p.sender = sender.clone();
} }
} }
}
}); });
} }
} }
@ -208,9 +245,10 @@ impl ImportQueue {
/// `BeaconBlock`. /// `BeaconBlock`.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PartialBeaconBlock { pub struct PartialBeaconBlock {
pub slot: Slot,
/// `BeaconBlock` root. /// `BeaconBlock` root.
pub block_root: Hash256, pub block_root: Hash256,
pub header: BeaconBlockHeader, pub header: Option<BeaconBlockHeader>,
pub body: Option<BeaconBlockBody>, pub body: Option<BeaconBlockBody>,
/// The instant at which this record was created or last meaningfully modified. Used to /// The instant at which this record was created or last meaningfully modified. Used to
/// determine if an entry is stale and should be removed. /// determine if an entry is stale and should be removed.
@ -225,7 +263,7 @@ impl PartialBeaconBlock {
pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> {
Some(( Some((
self.block_root, self.block_root,
self.header.into_block(self.body?), self.header?.into_block(self.body?),
self.sender, self.sender,
)) ))
} }

View File

@ -358,32 +358,51 @@ impl SimpleSync {
if res.roots.is_empty() { if res.roots.is_empty() {
warn!( warn!(
self.log, self.log,
"Peer returned empty block roots response. PeerId: {:?}", peer_id "Peer returned empty block roots response";
"peer_id" => format!("{:?}", peer_id)
); );
return; return;
} }
let new_root_index = self.import_queue.first_new_root(&res.roots); // The wire protocol specifies that slots must be in ascending order.
if !res.slots_are_ascending() {
warn!(
self.log,
"Peer returned block roots response with bad slot ordering";
"peer_id" => format!("{:?}", peer_id)
);
return;
}
// If a new block root is found, request it and all the headers following it. let new_roots = self
.import_queue
.enqueue_block_roots(&res.roots, peer_id.clone());
// No new roots means nothing to do.
// //
// We make an assumption here that if we don't know a block then we don't know of all // This check protects against future panics.
// it's parents. This might not be the case if syncing becomes more sophisticated. if new_roots.is_empty() {
if let Some(i) = new_root_index { return;
let new = &res.roots[i]; }
// Determine the first (earliest) and last (latest) `BlockRootSlot` items.
//
// This logic relies upon slots to be in ascending order, which is enforced earlier.
let first = new_roots.first().expect("Non-empty list must have first");
let last = new_roots.last().expect("Non-empty list must have last");
// Request all headers between the earliest and latest new `BlockRootSlot` items.
self.request_block_headers( self.request_block_headers(
peer_id, peer_id,
BeaconBlockHeadersRequest { BeaconBlockHeadersRequest {
start_root: new.block_root, start_root: first.block_root,
start_slot: new.slot, start_slot: first.slot,
max_headers: (res.roots.len() - i) as u64, max_headers: (last.slot - first.slot + 1).as_u64(),
skip_slots: 0, skip_slots: 0,
}, },
network, network,
) )
} }
}
/// Handle a `BeaconBlockHeaders` request from the peer. /// Handle a `BeaconBlockHeaders` request from the peer.
pub fn on_beacon_block_headers_request( pub fn on_beacon_block_headers_request(
@ -528,8 +547,17 @@ impl SimpleSync {
"NewGossipBlock"; "NewGossipBlock";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
// TODO: filter out messages that a prior to the finalized slot.
// // Ignore any block from a finalized slot.
if self.slot_is_finalized(msg.slot) {
warn!(
self.log, "NewGossipBlock";
"msg" => "new block slot is finalized.",
"slot" => msg.slot,
);
return;
}
// TODO: if the block is a few more slots ahead, try to get all block roots from then until // TODO: if the block is a few more slots ahead, try to get all block roots from then until
// now. // now.
// //
@ -563,12 +591,9 @@ impl SimpleSync {
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
// Awaiting a proper operations pool before we can import attestations.
//
// https://github.com/sigp/lighthouse/issues/281
match self.chain.process_attestation(msg) { match self.chain.process_attestation(msg) {
Ok(_) => panic!("Impossible, method not implemented."), Ok(()) => info!(self.log, "ImportedAttestation"),
Err(_) => error!(self.log, "Attestation processing not implemented!"), Err(e) => warn!(self.log, "InvalidAttestation"; "error" => format!("{:?}", e)),
} }
} }
@ -678,6 +703,14 @@ impl SimpleSync {
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req));
} }
fn slot_is_finalized(&self, slot: Slot) -> bool {
slot <= self
.chain
.hello_message()
.latest_finalized_epoch
.start_slot(self.chain.get_spec().slots_per_epoch)
}
/// Generates our current state in the form of a HELLO RPC message. /// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage { pub fn generate_hello(&self) -> HelloMessage {
self.chain.hello_message() self.chain.hello_message()

View File

@ -543,7 +543,7 @@ fn sync_two_nodes() {
// A provides block bodies to B. // A provides block bodies to B.
node_a.tee_block_body_response(&node_b); node_a.tee_block_body_response(&node_b);
std::thread::sleep(Duration::from_secs(10)); std::thread::sleep(Duration::from_secs(20));
node_b.harness.run_fork_choice(); node_b.harness.run_fork_choice();

View File

@ -172,6 +172,8 @@ impl OperationPool {
|| key.domain_bytes_match(&curr_domain_bytes) || key.domain_bytes_match(&curr_domain_bytes)
}) })
.flat_map(|(_, attestations)| attestations) .flat_map(|(_, attestations)| attestations)
// That are not superseded by an attestation included in the state...
.filter(|attestation| !superior_attestation_exists_in_state(state, attestation))
// That are valid... // That are valid...
.filter(|attestation| validate_attestation(state, attestation, spec).is_ok()) .filter(|attestation| validate_attestation(state, attestation, spec).is_ok())
// Scored by the number of new attestations they introduce (descending) // Scored by the number of new attestations they introduce (descending)
@ -462,6 +464,31 @@ impl OperationPool {
} }
} }
/// Returns `true` if the state already contains a `PendingAttestation` that is superior to the
/// given `attestation`.
///
/// A validator has nothing to gain from re-including an attestation and it adds load to the
/// network.
///
/// An existing `PendingAttestation` is superior to an existing `attestation` if:
///
/// - Their `AttestationData` is equal.
/// - `attestation` does not contain any signatures that `PendingAttestation` does not have.
fn superior_attestation_exists_in_state(state: &BeaconState, attestation: &Attestation) -> bool {
state
.current_epoch_attestations
.iter()
.chain(state.previous_epoch_attestations.iter())
.any(|existing_attestation| {
let bitfield = &attestation.aggregation_bitfield;
let existing_bitfield = &existing_attestation.aggregation_bitfield;
existing_attestation.data == attestation.data
&& bitfield.intersection(existing_bitfield).num_set_bits()
== bitfield.num_set_bits()
})
}
/// Filter up to a maximum number of operations out of an iterator. /// Filter up to a maximum number of operations out of an iterator.
fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec<T> fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec<T>
where where

View File

@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder {
}) })
.collect(); .collect();
let genesis_time = 1553950542; // arbitrary let genesis_time = 1553977336; // arbitrary
let mut state = BeaconState::genesis( let mut state = BeaconState::genesis(
genesis_time, genesis_time,