Improve reduced tree fork choice

This commit is contained in:
Paul Hauner 2019-06-15 18:19:08 -04:00
parent 7756a658a7
commit f4621a9f1a
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
6 changed files with 225 additions and 107 deletions

View File

@ -87,7 +87,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
mut genesis_state: BeaconState<T::EthSpec>,
genesis_block: BeaconBlock,
spec: ChainSpec,
fork_choice: ForkChoice<T>,
) -> Result<Self, Error> {
let state_root = genesis_state.canonical_root();
store.put(&state_root, &genesis_state)?;
@ -110,14 +109,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(Self {
spec,
store,
slot_clock,
op_pool: OperationPool::new(),
state: RwLock::new(genesis_state),
canonical_head,
genesis_block_root,
fork_choice,
fork_choice: ForkChoice::new(store.clone(), genesis_block_root),
metrics: Metrics::new()?,
store,
})
}
@ -139,16 +138,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
spec.seconds_per_slot,
);
// let fork_choice = T::ForkChoice::new(store.clone());
// let fork_choice: ForkChoice<T::LmdGhost, T::EthSpec> = ForkChoice::new(store.clone());
let last_finalized_root = p.canonical_head.beacon_state.finalized_root;
Ok(Some(BeaconChain {
spec,
slot_clock,
fork_choice: ForkChoice::new(store.clone(), last_finalized_root),
op_pool: OperationPool::default(),
canonical_head: RwLock::new(p.canonical_head),
state: RwLock::new(p.state),
fork_choice: ForkChoice::new(store.clone()),
genesis_block_root: p.genesis_block_root,
metrics: Metrics::new()?,
store,
@ -476,11 +474,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.op_pool
.insert_attestation(attestation, &*self.state.read(), &self.spec);
timer.observe_duration();
if result.is_ok() {
self.metrics.attestation_processing_successes.inc();
}
timer.observe_duration();
// TODO: process attestation. Please consider:
//
// - Because a block was not added to the op pool does not mean it's invalid (it might
// just be old).
// - The attestation should be rejected if we don't know the block (ideally it should be
// queued, but this may be overkill).
// - The attestation _must_ be validated against it's state before being added to fork
// choice.
// - You can avoid verifying some attestations by first checking if they're a latest
// message. This would involve expanding the `LmdGhost` API.
result
}

View File

@ -21,9 +21,9 @@ pub struct ForkChoice<T: BeaconChainTypes> {
}
impl<T: BeaconChainTypes> ForkChoice<T> {
pub fn new(store: Arc<T::Store>) -> Self {
pub fn new(store: Arc<T::Store>, genesis_block_root: Hash256) -> Self {
Self {
backend: T::LmdGhost::new(store),
backend: T::LmdGhost::new(store, genesis_block_root),
}
}
@ -67,7 +67,29 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
.map_err(Into::into)
}
pub fn process_attestation(
/// Process all attestations in the given `block`.
///
/// Assumes the block (and therefore it's attestations) are valid. It is a logic error to
/// provide an invalid block.
pub fn process_block(
&self,
state: &BeaconState<T::EthSpec>,
block: &BeaconBlock,
) -> Result<()> {
// Note: we never count the block as a latest message, only attestations.
//
// I (Paul H) do not have an explicit reference to this, but I derive it from this
// document:
//
// https://github.com/ethereum/eth2.0-specs/blob/v0.7.0/specs/core/0_fork-choice.md
for attestation in &block.body.attestations {
self.process_attestation_from_block(state, attestation)?;
}
Ok(())
}
fn process_attestation_from_block(
&self,
state: &BeaconState<T::EthSpec>,
attestation: &Attestation,
@ -94,28 +116,6 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
Ok(())
}
/// A helper function which runs `self.process_attestation` on all `Attestation` in the given `BeaconBlock`.
///
/// Assumes the block (and therefore it's attestations) are valid. It is a logic error to
/// provide an invalid block.
pub fn process_block(
&self,
state: &BeaconState<T::EthSpec>,
block: &BeaconBlock,
) -> Result<()> {
// Note: we never count the block as a latest message, only attestations.
//
// I (Paul H) do not have an explicit reference to this, however I derive it from this
// document:
//
// https://github.com/ethereum/eth2.0-specs/blob/v0.7.0/specs/core/0_fork-choice.md
for attestation in &block.body.attestations {
self.process_attestation(state, attestation)?;
}
Ok(())
}
}
impl From<BeaconStateError> for Error {

View File

@ -9,6 +9,7 @@ mod persisted_beacon_chain;
pub use self::beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
pub use self::checkpoint::CheckPoint;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use lmd_ghost;
pub use parking_lot;
pub use slot_clock;
pub use state_processing::per_block_processing::errors::{

View File

@ -1,8 +1,9 @@
use beacon_chain::{
fork_choice::OptimizedLMDGhost, slot_clock::SystemTimeSlotClock, store::Store, BeaconChain,
BeaconChainTypes,
lmd_ghost::{LmdGhost, ThreadSafeReducedTree},
slot_clock::SystemTimeSlotClock,
store::Store,
BeaconChain, BeaconChainTypes,
};
use fork_choice::ForkChoice;
use slog::{info, Logger};
use slot_clock::SlotClock;
use std::marker::PhantomData;
@ -33,7 +34,7 @@ pub struct ClientType<S: Store, E: EthSpec> {
impl<S: Store, E: EthSpec + Clone> BeaconChainTypes for ClientType<S, E> {
type Store = S;
type SlotClock = SystemTimeSlotClock;
type ForkChoice = OptimizedLMDGhost<S, E>;
type LmdGhost = ThreadSafeReducedTree<S, E>;
type EthSpec = E;
}
impl<T: Store, E: EthSpec, X: BeaconChainTypes> InitialiseBeaconChain<X> for ClientType<T, E> {}
@ -45,8 +46,8 @@ fn maybe_load_from_store_for_testnet<T, U: Store, V: EthSpec>(
log: Logger,
) -> BeaconChain<T>
where
T: BeaconChainTypes<Store = U>,
T::ForkChoice: ForkChoice<U>,
T: BeaconChainTypes<Store = U, EthSpec = V>,
T::LmdGhost: LmdGhost<U, V>,
{
if let Ok(Some(beacon_chain)) = BeaconChain::from_store(store.clone(), spec.clone()) {
info!(
@ -74,19 +75,10 @@ where
genesis_state.genesis_time,
spec.seconds_per_slot,
);
// Choose the fork choice
let fork_choice = T::ForkChoice::new(store.clone());
// Genesis chain
//TODO: Handle error correctly
BeaconChain::from_genesis(
store,
slot_clock,
genesis_state,
genesis_block,
spec,
fork_choice,
)
.expect("Terminate if beacon chain generation fails")
BeaconChain::from_genesis(store, slot_clock, genesis_state, genesis_block, spec)
.expect("Terminate if beacon chain generation fails")
}
}

View File

@ -9,7 +9,7 @@ pub use reduced_tree::ThreadSafeReducedTree;
pub type Result<T> = std::result::Result<T, String>;
pub trait LmdGhost<S: Store, E: EthSpec>: Send + Sync {
fn new(store: Arc<S>) -> Self;
fn new(store: Arc<S>, genesis_root: Hash256) -> Self;
fn process_message(
&self,
@ -20,5 +20,7 @@ pub trait LmdGhost<S: Store, E: EthSpec>: Send + Sync {
fn find_head<F>(&self, start_block_root: Hash256, weight: F) -> Result<Hash256>
where
F: Fn(usize) -> Option<u64>;
F: Fn(usize) -> Option<u64> + Copy;
fn update_finalized_root(&self, new_root: Hash256) -> Result<()>;
}

View File

@ -16,6 +16,7 @@ pub enum Error {
NotInTree(Hash256),
NoCommonAncestor((Hash256, Hash256)),
StoreError(StoreError),
ValidatorWeightUnknown(usize),
}
impl From<StoreError> for Error {
@ -24,43 +25,8 @@ impl From<StoreError> for Error {
}
}
pub type Height = usize;
#[derive(Default, Clone)]
pub struct Node {
pub parent_hash: Option<Hash256>,
pub children: Vec<Hash256>,
pub score: u64,
pub height: Height,
pub block_hash: Hash256,
pub voters: Vec<usize>,
}
impl Node {
pub fn remove_voter(&mut self, voter: usize) -> Option<usize> {
let i = self.voters.iter().position(|&v| v == voter)?;
Some(self.voters.remove(i))
}
pub fn add_voter(&mut self, voter: usize) {
self.voters.push(voter);
}
pub fn has_votes(&self) -> bool {
!self.voters.is_empty()
}
}
impl Node {
fn does_not_have_children(&self) -> bool {
self.children.is_empty()
}
}
#[derive(Debug, Clone, Copy)]
pub struct Vote {
hash: Hash256,
slot: Slot,
pub struct ThreadSafeReducedTree<T, E> {
core: RwLock<ReducedTree<T, E>>,
}
impl<T, E> LmdGhost<T, E> for ThreadSafeReducedTree<T, E>
@ -68,9 +34,9 @@ where
T: Store,
E: EthSpec,
{
fn new(store: Arc<T>) -> Self {
fn new(store: Arc<T>, genesis_root: Hash256) -> Self {
ThreadSafeReducedTree {
core: RwLock::new(ReducedTree::new(store)),
core: RwLock::new(ReducedTree::new(store, genesis_root)),
}
}
@ -86,30 +52,29 @@ where
.map_err(Into::into)
}
fn find_head<F>(&self, _start_block_root: Hash256, _weight: F) -> SuperResult<Hash256>
fn find_head<F>(&self, start_block_root: Hash256, weight_fn: F) -> SuperResult<Hash256>
where
F: Fn(usize) -> Option<u64>,
F: Fn(usize) -> Option<u64> + Copy,
{
unimplemented!();
self.core
.write()
.update_weights_and_find_head(start_block_root, weight_fn)
.map_err(Into::into)
}
fn update_finalized_root(&self, new_root: Hash256) -> SuperResult<()> {
self.core.write().update_root(new_root).map_err(Into::into)
}
}
impl From<Error> for String {
fn from(e: Error) -> String {
format!("{:?}", e)
}
}
pub struct ThreadSafeReducedTree<T, E> {
pub core: RwLock<ReducedTree<T, E>>,
}
pub struct ReducedTree<T, E> {
struct ReducedTree<T, E> {
store: Arc<T>,
/// Stores all nodes of the tree, keyed by the block hash contained in the node.
nodes: HashMap<Hash256, Node>,
/// Maps validator indices to their latest votes.
latest_votes: ElasticList<Option<Vote>>,
/// Stores the root of the tree, used for pruning.
root: Hash256,
_phantom: PhantomData<E>,
}
@ -118,15 +83,54 @@ where
T: Store,
E: EthSpec,
{
pub fn new(store: Arc<T>) -> Self {
pub fn new(store: Arc<T>, genesis_root: Hash256) -> Self {
let mut nodes = HashMap::new();
// Insert the genesis node.
nodes.insert(
genesis_root,
Node {
block_hash: genesis_root,
..Node::default()
},
);
Self {
store,
nodes: HashMap::new(),
nodes,
latest_votes: ElasticList::default(),
root: genesis_root,
_phantom: PhantomData,
}
}
pub fn update_root(&mut self, new_root: Hash256) -> Result<()> {
if !self.nodes.contains_key(&new_root) {
self.add_node(new_root, vec![])?;
}
self.retain_subtree(self.root, new_root)?;
self.root = new_root;
Ok(())
}
fn retain_subtree(&mut self, current_hash: Hash256, subtree_hash: Hash256) -> Result<()> {
if current_hash != subtree_hash {
// Clone satisifies the borrow checker.
let children = self.get_node(current_hash)?.children.clone();
for child_hash in children {
self.retain_subtree(child_hash, subtree_hash)?;
}
self.nodes.remove(&current_hash);
}
Ok(())
}
pub fn process_message(
&mut self,
validator_index: usize,
@ -143,7 +147,7 @@ where
} else if previous_vote.slot == slot && previous_vote.hash != block_hash {
// Vote is an equivocation (double-vote), ignore it.
//
// TODO: flag this as slashable.
// TODO: this is slashable.
return Ok(());
} else {
// Given vote is newer or different to current vote, replace the current vote.
@ -156,6 +160,76 @@ where
Ok(())
}
pub fn update_weights_and_find_head<F>(
&mut self,
start_block_root: Hash256,
weight_fn: F,
) -> Result<Hash256>
where
F: Fn(usize) -> Option<u64> + Copy,
{
let _root_weight = self.update_weight(start_block_root, weight_fn)?;
let start_node = self.get_node(start_block_root)?;
let head_node = self.find_head_from(start_node)?;
Ok(head_node.block_hash)
}
fn find_head_from<'a>(&'a self, start_node: &'a Node) -> Result<&'a Node> {
if start_node.does_not_have_children() {
Ok(start_node)
} else {
let children = start_node
.children
.iter()
.map(|hash| self.get_node(*hash))
.collect::<Result<Vec<&Node>>>()?;
// TODO: check if `max_by` is `O(n^2)`.
let best_child = children
.iter()
.max_by(|a, b| {
if a.weight != b.weight {
a.weight.cmp(&b.weight)
} else {
a.block_hash.cmp(&b.block_hash)
}
})
// There can only be no maximum if there are no children. This code path is guarded
// against that condition.
.expect("There must be a maximally weighted node.");
self.find_head_from(best_child)
}
}
fn update_weight<F>(&mut self, start_block_root: Hash256, weight_fn: F) -> Result<u64>
where
F: Fn(usize) -> Option<u64> + Copy,
{
let weight = {
let node = self.get_node(start_block_root)?.clone();
let mut weight = 0;
for &child in &node.children {
weight += self.update_weight(child, weight_fn)?;
}
for &voter in &node.voters {
weight += weight_fn(voter).ok_or_else(|| Error::ValidatorWeightUnknown(voter))?;
}
weight
};
let node = self.get_mut_node(start_block_root)?;
node.weight = weight;
Ok(weight)
}
fn remove_latest_message(&mut self, validator_index: usize) -> Result<()> {
if self.latest_votes.get(validator_index).is_some() {
// Unwrap is safe as prior `if` statements ensures the result is `Some`.
@ -390,6 +464,40 @@ where
}
}
#[derive(Default, Clone)]
pub struct Node {
pub parent_hash: Option<Hash256>,
pub children: Vec<Hash256>,
pub weight: u64,
pub block_hash: Hash256,
pub voters: Vec<usize>,
}
impl Node {
pub fn does_not_have_children(&self) -> bool {
self.children.is_empty()
}
pub fn remove_voter(&mut self, voter: usize) -> Option<usize> {
let i = self.voters.iter().position(|&v| v == voter)?;
Some(self.voters.remove(i))
}
pub fn add_voter(&mut self, voter: usize) {
self.voters.push(voter);
}
pub fn has_votes(&self) -> bool {
!self.voters.is_empty()
}
}
#[derive(Debug, Clone, Copy)]
pub struct Vote {
hash: Hash256,
slot: Slot,
}
/// A Vec-wrapper which will grow to match any request.
///
/// E.g., a `get` or `insert` to an out-of-bounds element will cause the Vec to grow (using
@ -417,3 +525,9 @@ where
self.0[i] = element;
}
}
impl From<Error> for String {
fn from(e: Error) -> String {
format!("{:?}", e)
}
}