Fixed merge conflicts with master, with new release testing options.

This commit is contained in:
Luke Anderson 2019-03-28 19:57:00 +11:00
commit d38f4ed547
No known key found for this signature in database
GPG Key ID: 44408169EC61E228
50 changed files with 3808 additions and 711 deletions

View File

@ -8,10 +8,12 @@ before_install:
- sudo chown -R $USER /usr/local/include/google
script:
- cargo build --verbose --all
- cargo build --verbose --release --all
- cargo test --verbose --all
- cargo test --verbose --release --all
- cargo fmt --all -- --check
# No clippy until later...
#- cargo clippy
- cargo test --verbose --all
rust:
- stable
- beta

View File

@ -29,4 +29,5 @@ members = [
"beacon_node/beacon_chain/test_harness",
"protos",
"validator_client",
"account_manager",
]

22
Jenkinsfile vendored
View File

@ -1,16 +1,17 @@
pipeline {
agent {
agent {
dockerfile {
filename 'Dockerfile'
args '-v cargo-cache:/cargocache:rw'
}
}
stages {
stage('Build') {
steps {
sh 'cargo build'
}
}
stages {
stage('Build') {
steps {
sh 'cargo build --verbose --all'
sh 'cargo build --verbose --all --release'
}
}
stage('Check') {
steps {
sh 'cargo fmt --all -- --check'
@ -18,10 +19,11 @@ pipeline {
//sh 'cargo clippy'
}
}
stage('Test') {
stage('Test') {
steps {
sh 'cargo test --all'
sh 'cargo test --verbose --all'
sh 'cargo test --verbose --all --release'
}
}
}
}
}

View File

@ -0,0 +1,13 @@
[package]
name = "account_manager"
version = "0.0.1"
authors = ["Luke Anderson <luke@sigmaprime.io>"]
edition = "2018"
[dependencies]
bls = { path = "../eth2/utils/bls" }
clap = "2.32.0"
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
validator_client = { path = "../validator_client" }

24
account_manager/README.md Normal file
View File

@ -0,0 +1,24 @@
# Lighthouse Accounts Manager
The accounts manager (AM) is a stand-alone binary which allows
users to generate and manage the cryptographic keys necessary to
interact with Ethereum Serenity.
## Roles
The AM is responsible for the following tasks:
- Generation of cryptographic key pairs
- Must acquire sufficient entropy to ensure keys are generated securely (TBD)
- Secure storage of private keys
- Keys must be encrypted while at rest on the disk (TBD)
- The format is compatible with the validator client
- Produces messages and transactions necessary to initiate
staking on Ethereum 1.x (TPD)
## Implementation
The AM is not a service, and does not run continuously, nor does it
interact with any running services.
It is intended to be executed separately from other Lighthouse binaries
and produce files which can be consumed by them.

View File

@ -0,0 +1,58 @@
use bls::Keypair;
use clap::{App, Arg, SubCommand};
use slog::{debug, info, o, Drain};
use std::path::PathBuf;
use validator_client::Config as ValidatorClientConfig;
fn main() {
// Logging
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
// CLI
let matches = App::new("Lighthouse Accounts Manager")
.version("0.0.1")
.author("Sigma Prime <contact@sigmaprime.io>")
.about("Eth 2.0 Accounts Manager")
.arg(
Arg::with_name("datadir")
.long("datadir")
.value_name("DIR")
.help("Data directory for keys and databases.")
.takes_value(true),
)
.subcommand(
SubCommand::with_name("generate")
.about("Generates a new validator private key")
.version("0.0.1")
.author("Sigma Prime <contact@sigmaprime.io>"),
)
.get_matches();
let config = ValidatorClientConfig::parse_args(&matches, &log)
.expect("Unable to build a configuration for the account manager.");
// Log configuration
info!(log, "";
"data_dir" => &config.data_dir.to_str());
match matches.subcommand() {
("generate", Some(_gen_m)) => {
let keypair = Keypair::random();
let key_path: PathBuf = config
.save_key(&keypair)
.expect("Unable to save newly generated private key.");
debug!(
log,
"Keypair generated {:?}, saved to: {:?}",
keypair.identifier(),
key_path.to_string_lossy()
);
}
_ => panic!(
"The account manager must be run with a subcommand. See help for more information."
),
}
}

View File

@ -26,7 +26,10 @@ pub enum ValidBlock {
#[derive(Debug, PartialEq)]
pub enum InvalidBlock {
/// The block slot is greater than the present slot.
FutureSlot,
FutureSlot {
present_slot: Slot,
block_slot: Slot,
},
/// The block state_root does not match the generated state.
StateRootMismatch,
/// The blocks parent_root is unknown.
@ -46,6 +49,35 @@ pub enum BlockProcessingOutcome {
InvalidBlock(InvalidBlock),
}
impl BlockProcessingOutcome {
/// Returns `true` if the block was objectively invalid and we should disregard the peer who
/// sent it.
pub fn is_invalid(&self) -> bool {
match self {
BlockProcessingOutcome::ValidBlock(_) => false,
BlockProcessingOutcome::InvalidBlock(r) => match r {
InvalidBlock::FutureSlot { .. } => true,
InvalidBlock::StateRootMismatch => true,
InvalidBlock::ParentUnknown => false,
InvalidBlock::SlotProcessingError(_) => false,
InvalidBlock::PerBlockProcessingError(e) => match e {
BlockProcessingError::Invalid(_) => true,
BlockProcessingError::BeaconStateError(_) => false,
},
},
}
}
/// Returns `true` if the block was successfully processed and can be removed from any import
/// queues or temporary storage.
pub fn sucessfully_processed(&self) -> bool {
match self {
BlockProcessingOutcome::ValidBlock(_) => true,
_ => false,
}
}
}
pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice> {
pub block_store: Arc<BeaconBlockStore<T>>,
pub state_store: Arc<BeaconStateStore<T>>,
@ -122,6 +154,126 @@ where
})
}
/// Returns the beacon block body for each beacon block root in `roots`.
///
/// Fails if any root in `roots` does not have a corresponding block.
pub fn get_block_bodies(&self, roots: &[Hash256]) -> Result<Vec<BeaconBlockBody>, Error> {
let bodies: Result<Vec<BeaconBlockBody>, _> = roots
.iter()
.map(|root| match self.get_block(root)? {
Some(block) => Ok(block.body),
None => Err(Error::DBInconsistent("Missing block".into())),
})
.collect();
Ok(bodies?)
}
/// Returns the beacon block header for each beacon block root in `roots`.
///
/// Fails if any root in `roots` does not have a corresponding block.
pub fn get_block_headers(&self, roots: &[Hash256]) -> Result<Vec<BeaconBlockHeader>, Error> {
let headers: Result<Vec<BeaconBlockHeader>, _> = roots
.iter()
.map(|root| match self.get_block(root)? {
Some(block) => Ok(block.block_header()),
None => Err(Error::DBInconsistent("Missing block".into())),
})
.collect();
Ok(headers?)
}
/// Returns `count `beacon block roots, starting from `start_slot` with an
/// interval of `skip` slots between each root.
///
/// ## Errors:
///
/// - `SlotOutOfBounds`: Unable to return the full specified range.
/// - `SlotOutOfBounds`: Unable to load a state from the DB.
/// - `SlotOutOfBounds`: Start slot is higher than the first slot.
/// - Other: BeaconState` is inconsistent.
pub fn get_block_roots(
&self,
earliest_slot: Slot,
count: usize,
skip: usize,
) -> Result<Vec<Hash256>, Error> {
let spec = &self.spec;
let step_by = Slot::from(skip + 1);
let mut roots: Vec<Hash256> = vec![];
// The state for reading block roots. Will be updated with an older state if slots go too
// far back in history.
let mut state = self.state.read().clone();
// The final slot in this series, will be reduced by `skip` each loop iteration.
let mut slot = earliest_slot + Slot::from(count * (skip + 1)) - 1;
// If the highest slot requested is that of the current state insert the root of the
// head block, unless the head block's slot is not matching.
if slot == state.slot && self.head().beacon_block.slot == slot {
roots.push(self.head().beacon_block_root);
slot -= step_by;
} else if slot >= state.slot {
return Err(BeaconStateError::SlotOutOfBounds.into());
}
loop {
// If the slot is within the range of the current state's block roots, append the root
// to the output vec.
//
// If we get `SlotOutOfBounds` error, load the oldest available historic
// state from the DB.
match state.get_block_root(slot, spec) {
Ok(root) => {
if slot < earliest_slot {
break;
} else {
roots.push(*root);
slot -= step_by;
}
}
Err(BeaconStateError::SlotOutOfBounds) => {
// Read the earliest historic state in the current slot.
let earliest_historic_slot =
state.slot - Slot::from(spec.slots_per_historical_root);
// Load the earlier state from disk.
let new_state_root = state.get_state_root(earliest_historic_slot, spec)?;
// Break if the DB is unable to load the state.
state = match self.state_store.get_deserialized(&new_state_root) {
Ok(Some(state)) => state,
_ => break,
}
}
Err(e) => return Err(e.into()),
};
}
// Return the results if they pass a sanity check.
if (slot <= earliest_slot) && (roots.len() == count) {
// Reverse the ordering of the roots. We extracted them in reverse order to make it
// simpler to lookup historic states.
//
// This is a potential optimisation target.
Ok(roots.iter().rev().cloned().collect())
} else {
Err(BeaconStateError::SlotOutOfBounds.into())
}
}
/// Returns the block at the given root, if any.
///
/// ## Errors
///
/// May return a database error.
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock>, Error> {
Ok(self.block_store.get_deserialized(block_root)?)
}
/// Update the canonical head to some new values.
pub fn update_canonical_head(
&self,
@ -153,6 +305,49 @@ where
self.canonical_head.read()
}
/// Updates the canonical `BeaconState` with the supplied state.
///
/// Advances the chain forward to the present slot. This method is better than just setting
/// state and calling `catchup_state` as it will not result in an old state being installed and
/// then having it iteratively updated -- in such a case it's possible for another thread to
/// find the state at an old slot.
pub fn update_state(&self, mut state: BeaconState) -> Result<(), Error> {
let latest_block_header = self.head().beacon_block.block_header();
let present_slot = match self.slot_clock.present_slot() {
Ok(Some(slot)) => slot,
_ => return Err(Error::UnableToReadSlot),
};
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut state, &latest_block_header, &self.spec)?;
}
*self.state.write() = state;
Ok(())
}
/// Ensures the current canonical `BeaconState` has been transitioned to match the `slot_clock`.
pub fn catchup_state(&self) -> Result<(), Error> {
let latest_block_header = self.head().beacon_block.block_header();
let present_slot = match self.slot_clock.present_slot() {
Ok(Some(slot)) => slot,
_ => return Err(Error::UnableToReadSlot),
};
let mut state = self.state.write();
// If required, transition the new state to the present slot.
for _ in state.slot.as_u64()..present_slot.as_u64() {
per_slot_processing(&mut *state, &latest_block_header, &self.spec)?;
}
Ok(())
}
/// Update the justified head to some new values.
pub fn update_finalized_head(
&self,
@ -176,28 +371,6 @@ where
self.finalized_head.read()
}
/// Advance the `self.state` `BeaconState` to the supplied slot.
///
/// This will perform per_slot and per_epoch processing as required.
///
/// The `previous_block_root` will be set to the root of the current head block (as determined
/// by the fork-choice rule).
///
/// It is important to note that this is _not_ the state corresponding to the canonical head
/// block, instead it is that state which may or may not have had additional per slot/epoch
/// processing applied to it.
pub fn advance_state(&self, slot: Slot) -> Result<(), SlotProcessingError> {
let state_slot = self.state.read().slot;
let latest_block_header = self.head().beacon_block.block_header();
for _ in state_slot.as_u64()..slot.as_u64() {
per_slot_processing(&mut *self.state.write(), &latest_block_header, &self.spec)?;
}
Ok(())
}
/// Returns the validator index (if any) for the given public key.
///
/// Information is retrieved from the present `beacon_state.validator_registry`.
@ -246,7 +419,10 @@ where
/// Information is read from the present `beacon_state` shuffling, so only information from the
/// present and prior epoch is available.
pub fn block_proposer(&self, slot: Slot) -> Result<usize, BeaconStateError> {
trace!("BeaconChain::block_proposer: slot: {}", slot);
self.state
.write()
.build_epoch_cache(RelativeEpoch::Current, &self.spec)?;
let index = self.state.read().get_beacon_proposer_index(
slot,
RelativeEpoch::Current,
@ -555,6 +731,11 @@ where
}
}
/// Returns `true` if the given block root has not been processed.
pub fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result<bool, Error> {
Ok(!self.block_store.exists(beacon_block_root)?)
}
/// Accept some block and attempt to add it to block DAG.
///
/// Will accept blocks from prior slots, however it will reject any block from a future slot.
@ -567,7 +748,10 @@ where
if block.slot > present_slot {
return Ok(BlockProcessingOutcome::InvalidBlock(
InvalidBlock::FutureSlot,
InvalidBlock::FutureSlot {
present_slot,
block_slot: block.slot,
},
));
}
@ -594,10 +778,10 @@ where
// TODO: check the block proposer signature BEFORE doing a state transition. This will
// significantly lower exposure surface to DoS attacks.
// Transition the parent state to the present slot.
// Transition the parent state to the block slot.
let mut state = parent_state;
let previous_block_header = parent_block.block_header();
for _ in state.slot.as_u64()..present_slot.as_u64() {
for _ in state.slot.as_u64()..block.slot.as_u64() {
if let Err(e) = per_slot_processing(&mut state, &previous_block_header, &self.spec) {
return Ok(BlockProcessingOutcome::InvalidBlock(
InvalidBlock::SlotProcessingError(e),
@ -643,8 +827,9 @@ where
// run instead.
if self.head().beacon_block_root == parent_block_root {
self.update_canonical_head(block.clone(), block_root, state.clone(), state_root);
// Update the local state variable.
*self.state.write() = state;
// Update the canonical `BeaconState`.
self.update_state(state)?;
}
Ok(BlockProcessingOutcome::ValidBlock(ValidBlock::Processed))
@ -662,6 +847,8 @@ where
let mut state = self.state.read().clone();
state.build_epoch_cache(RelativeEpoch::Current, &self.spec)?;
trace!("Finding attestations for new block...");
let attestations = self
@ -732,7 +919,10 @@ where
.ok_or_else(|| Error::MissingBeaconState(block.state_root))?;
let state_root = state.canonical_root();
self.update_canonical_head(block, block_root, state, state_root);
self.update_canonical_head(block, block_root, state.clone(), state_root);
// Update the canonical `BeaconState`.
self.update_state(state)?;
}
Ok(())

View File

@ -3,7 +3,7 @@ use types::{BeaconBlock, BeaconState, Hash256};
/// Represents some block and it's associated state. Generally, this will be used for tracking the
/// head, justified head and finalized head.
#[derive(Clone, Serialize)]
#[derive(Clone, Serialize, PartialEq, Debug)]
pub struct CheckPoint {
pub beacon_block: BeaconBlock,
pub beacon_block_root: Hash256,

View File

@ -1,5 +1,6 @@
use fork_choice::ForkChoiceError;
use state_processing::BlockProcessingError;
use state_processing::SlotProcessingError;
use types::*;
macro_rules! easy_from_to {
@ -16,18 +17,24 @@ macro_rules! easy_from_to {
pub enum BeaconChainError {
InsufficientValidators,
BadRecentBlockRoots,
UnableToReadSlot,
BeaconStateError(BeaconStateError),
DBInconsistent(String),
DBError(String),
ForkChoiceError(ForkChoiceError),
MissingBeaconBlock(Hash256),
MissingBeaconState(Hash256),
SlotProcessingError(SlotProcessingError),
}
easy_from_to!(SlotProcessingError, BeaconChainError);
#[derive(Debug, PartialEq)]
pub enum BlockProductionError {
UnableToGetBlockRootFromState,
BlockProcessingError(BlockProcessingError),
BeaconStateError(BeaconStateError),
}
easy_from_to!(BlockProcessingError, BlockProductionError);
easy_from_to!(BeaconStateError, BlockProductionError);

View File

@ -3,10 +3,12 @@ mod beacon_chain;
mod checkpoint;
mod errors;
pub mod initialise;
pub mod test_utils;
pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock};
pub use self::checkpoint::CheckPoint;
pub use self::errors::BeaconChainError;
pub use attestation_aggregator::Outcome as AggregationOutcome;
pub use db;
pub use fork_choice;
pub use parking_lot;

View File

@ -0,0 +1,3 @@
mod testing_beacon_chain_builder;
pub use testing_beacon_chain_builder::TestingBeaconChainBuilder;

View File

@ -0,0 +1,50 @@
pub use crate::{BeaconChain, BeaconChainError, CheckPoint};
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
MemoryDB,
};
use fork_choice::BitwiseLMDGhost;
use slot_clock::TestingSlotClock;
use ssz::TreeHash;
use std::sync::Arc;
use types::test_utils::TestingBeaconStateBuilder;
use types::*;
type TestingBeaconChain = BeaconChain<MemoryDB, TestingSlotClock, BitwiseLMDGhost<MemoryDB>>;
pub struct TestingBeaconChainBuilder {
state_builder: TestingBeaconStateBuilder,
}
impl TestingBeaconChainBuilder {
pub fn build(self, spec: &ChainSpec) -> TestingBeaconChain {
let db = Arc::new(MemoryDB::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64());
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone());
let (genesis_state, _keypairs) = self.state_builder.build();
let mut genesis_block = BeaconBlock::empty(&spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root());
// Create the Beacon Chain
BeaconChain::from_genesis(
state_store.clone(),
block_store.clone(),
slot_clock,
genesis_state,
genesis_block,
spec.clone(),
fork_choice,
)
.unwrap()
}
}
impl From<TestingBeaconStateBuilder> for TestingBeaconChainBuilder {
fn from(state_builder: TestingBeaconStateBuilder) -> TestingBeaconChainBuilder {
TestingBeaconChainBuilder { state_builder }
}
}

View File

@ -15,6 +15,8 @@ use std::iter::FromIterator;
use std::sync::Arc;
use types::{test_utils::TestingBeaconStateBuilder, *};
type TestingBeaconChain = BeaconChain<MemoryDB, TestingSlotClock, BitwiseLMDGhost<MemoryDB>>;
/// The beacon chain harness simulates a single beacon node with `validator_count` validators connected
/// to it. Each validator is provided a borrow to the beacon chain, where it may read
/// information and submit blocks/attestations for processing.
@ -23,7 +25,7 @@ use types::{test_utils::TestingBeaconStateBuilder, *};
/// is not useful for testing that multiple beacon nodes can reach consensus.
pub struct BeaconChainHarness {
pub db: Arc<MemoryDB>,
pub beacon_chain: Arc<BeaconChain<MemoryDB, TestingSlotClock, BitwiseLMDGhost<MemoryDB>>>,
pub beacon_chain: Arc<TestingBeaconChain>,
pub block_store: Arc<BeaconBlockStore<MemoryDB>>,
pub state_store: Arc<BeaconStateStore<MemoryDB>>,
pub validators: Vec<ValidatorHarness>,
@ -36,19 +38,39 @@ impl BeaconChainHarness {
/// - A keypair, `BlockProducer` and `Attester` for each validator.
/// - A new BeaconChain struct where the given validators are in the genesis.
pub fn new(spec: ChainSpec, validator_count: usize) -> Self {
let state_builder =
TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec);
Self::from_beacon_state_builder(state_builder, spec)
}
pub fn from_beacon_state_builder(
state_builder: TestingBeaconStateBuilder,
spec: ChainSpec,
) -> Self {
let db = Arc::new(MemoryDB::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64());
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone());
let state_builder =
TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec);
let (genesis_state, keypairs) = state_builder.build();
let (mut genesis_state, keypairs) = state_builder.build();
let mut genesis_block = BeaconBlock::empty(&spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root());
genesis_state
.build_epoch_cache(RelativeEpoch::Previous, &spec)
.unwrap();
genesis_state
.build_epoch_cache(RelativeEpoch::Current, &spec)
.unwrap();
genesis_state
.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, &spec)
.unwrap();
genesis_state
.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, &spec)
.unwrap();
// Create the Beacon Chain
let beacon_chain = Arc::new(
BeaconChain::from_genesis(
@ -109,7 +131,9 @@ impl BeaconChainHarness {
);
self.beacon_chain.slot_clock.set_slot(slot.as_u64());
self.beacon_chain.advance_state(slot).unwrap();
self.beacon_chain
.catchup_state()
.expect("Failed to catch state");
slot
}
@ -187,7 +211,6 @@ impl BeaconChainHarness {
self.increment_beacon_chain_slot();
// Produce a new block.
debug!("Producing block...");
let block = self.produce_block();
debug!("Submitting block for processing...");
match self.beacon_chain.process_block(block) {

View File

@ -5,8 +5,9 @@ authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
# SigP repository until PR is merged
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b3c32d9a821ae6cc89079499cc6e8a6bab0bffc3" }
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }
ssz_derive = { path = "../../eth2/utils/ssz_derive" }

View File

@ -1,11 +1,21 @@
use crate::rpc::methods::BlockRootSlot;
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
use crate::NetworkConfig;
use futures::prelude::*;
use libp2p::{
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent},
core::{
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
PublicKey,
},
gossipsub::{Gossipsub, GossipsubEvent},
identify::{protocol::IdentifyInfo, Identify, IdentifyEvent},
ping::{Ping, PingEvent},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use slog::{debug, o};
use ssz_derive::{Decode, Encode};
use types::Attestation;
use types::Topic;
/// Builds the network behaviour for the libp2p Swarm.
@ -13,12 +23,22 @@ use types::Topic;
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
// TODO: Add Kademlia for peer discovery
/// The events generated by this behaviour to be consumed in the swarm poll.
serenity_rpc: Rpc<TSubstream>,
/// Allows discovery of IP addresses for peers on the network.
identify: Identify<TSubstream>,
/// Keep regular connection to peers and disconnect if absent.
// TODO: Keepalive, likely remove this later.
// TODO: Make the ping time customizeable.
ping: Ping<TSubstream>,
#[behaviour(ignore)]
events: Vec<BehaviourEvent>,
/// Logger for behaviour actions.
#[behaviour(ignore)]
log: slog::Logger,
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
@ -53,12 +73,54 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Identified {
peer_id, mut info, ..
} => {
if info.listen_addrs.len() > 20 {
debug!(
self.log,
"More than 20 peers have been identified, truncating"
);
info.listen_addrs.truncate(20);
}
self.events.push(BehaviourEvent::Identified(peer_id, info));
}
IdentifyEvent::Error { .. } => {}
IdentifyEvent::SendBack { .. } => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, _event: PingEvent) {
// not interested in ping responses at the moment.
}
}
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self {
pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self {
let local_peer_id = local_public_key.clone().into_peer_id();
let identify_config = net_conf.identify_config.clone();
let behaviour_log = log.new(o!());
Behaviour {
gossipsub: Gossipsub::new(local_peer_id, gs_config),
gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()),
serenity_rpc: Rpc::new(log),
identify: Identify::new(
identify_config.version,
identify_config.user_agent,
local_public_key,
),
ping: Ping::new(),
events: Vec::new(),
log: behaviour_log,
}
}
@ -91,6 +153,25 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub enum BehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Identified(PeerId, IdentifyInfo),
// TODO: This is a stub at the moment
Message(String),
}
#[derive(Debug, Clone)]
pub enum IncomingGossip {
Block(BlockGossip),
Attestation(AttestationGossip),
}
/// Gossipsub message providing notification of a new block.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockGossip {
pub root: BlockRootSlot,
}
/// Gossipsub message providing notification of a new attestation.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct AttestationGossip {
pub attestation: Attestation,
}

View File

@ -1,11 +1,9 @@
use crate::Multiaddr;
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
use libp2p::secio;
use std::fmt;
#[derive(Clone)]
#[derive(Clone, Debug)]
/// Network configuration for lighthouse.
pub struct NetworkConfig {
pub struct Config {
//TODO: stubbing networking initial params, change in the future
/// IP address to listen on.
pub listen_addresses: Vec<Multiaddr>,
@ -13,47 +11,58 @@ pub struct NetworkConfig {
pub listen_port: u16,
/// Gossipsub configuration parameters.
pub gs_config: GossipsubConfig,
/// Configuration parameters for node identification protocol.
pub identify_config: IdentifyConfig,
/// List of nodes to initially connect to.
pub boot_nodes: Vec<Multiaddr>,
/// Peer key related to this nodes PeerId.
pub local_private_key: secio::SecioKeyPair,
/// Client version
pub client_version: String,
/// List of topics to subscribe to as strings
pub topics: Vec<String>,
}
impl Default for NetworkConfig {
impl Default for Config {
/// Generate a default network configuration.
fn default() -> Self {
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
NetworkConfig {
Config {
listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000"
.parse()
.expect("is a correct multi-address")],
listen_port: 9000,
gs_config: GossipsubConfigBuilder::new().build(),
gs_config: GossipsubConfigBuilder::new()
.max_gossip_size(4_000_000)
.build(),
identify_config: IdentifyConfig::default(),
boot_nodes: Vec::new(),
local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(),
client_version: version::version(),
topics: vec![String::from("beacon_chain")],
}
}
}
impl NetworkConfig {
impl Config {
pub fn new(boot_nodes: Vec<Multiaddr>) -> Self {
let mut conf = NetworkConfig::default();
let mut conf = Config::default();
conf.boot_nodes = boot_nodes;
conf
}
}
impl fmt::Debug for NetworkConfig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: <Secio-PubKey {:?}>, client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version)
/// The configuration parameters for the Identify protocol
#[derive(Debug, Clone)]
pub struct IdentifyConfig {
/// The protocol version to listen on.
pub version: String,
/// The client's name and version for identification.
pub user_agent: String,
}
impl Default for IdentifyConfig {
fn default() -> Self {
Self {
version: "/eth/serenity/1.0".to_string(),
user_agent: version::version(),
}
}
}

View File

@ -3,16 +3,16 @@
///
/// This crate builds and manages the libp2p services required by the beacon node.
pub mod behaviour;
mod config;
pub mod error;
mod network_config;
pub mod rpc;
mod service;
pub use config::Config as NetworkConfig;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
};
pub use network_config::NetworkConfig;
pub use rpc::{HelloMessage, RPCEvent};
pub use service::Libp2pEvent;
pub use service::Service;

View File

@ -1,6 +1,7 @@
use ssz::{Decodable, DecodeError, Encodable, SszStream};
/// Available RPC methods types and ids.
use ssz_derive::{Decode, Encode};
use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
use types::{Attestation, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
#[derive(Debug)]
/// Available Serenity Libp2p RPC methods
@ -53,13 +54,27 @@ impl Into<u16> for RPCMethod {
#[derive(Debug, Clone)]
pub enum RPCRequest {
Hello(HelloMessage),
Goodbye(u64),
Goodbye(GoodbyeReason),
BeaconBlockRoots(BeaconBlockRootsRequest),
BeaconBlockHeaders(BeaconBlockHeadersRequest),
BeaconBlockBodies(BeaconBlockBodiesRequest),
BeaconChainState(BeaconChainStateRequest),
}
impl RPCRequest {
pub fn method_id(&self) -> u16 {
let method = match self {
RPCRequest::Hello(_) => RPCMethod::Hello,
RPCRequest::Goodbye(_) => RPCMethod::Goodbye,
RPCRequest::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots,
RPCRequest::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders,
RPCRequest::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies,
RPCRequest::BeaconChainState(_) => RPCMethod::BeaconChainState,
};
method.into()
}
}
#[derive(Debug, Clone)]
pub enum RPCResponse {
Hello(HelloMessage),
@ -69,6 +84,19 @@ pub enum RPCResponse {
BeaconChainState(BeaconChainStateResponse),
}
impl RPCResponse {
pub fn method_id(&self) -> u16 {
let method = match self {
RPCResponse::Hello(_) => RPCMethod::Hello,
RPCResponse::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots,
RPCResponse::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders,
RPCResponse::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies,
RPCResponse::BeaconChainState(_) => RPCMethod::BeaconChainState,
};
method.into()
}
}
/* Request/Response data structures for RPC methods */
/// The HELLO request/response handshake message.
@ -86,76 +114,125 @@ pub struct HelloMessage {
pub best_slot: Slot,
}
/// The reason given for a `Goodbye` message.
///
/// Note: any unknown `u64::into(n)` will resolve to `GoodbyeReason::Unknown` for any unknown `n`,
/// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then
/// re-serializing may not return the same bytes.
#[derive(Debug, Clone)]
pub enum GoodbyeReason {
ClientShutdown,
IrreleventNetwork,
Fault,
Unknown,
}
impl From<u64> for GoodbyeReason {
fn from(id: u64) -> GoodbyeReason {
match id {
1 => GoodbyeReason::ClientShutdown,
2 => GoodbyeReason::IrreleventNetwork,
3 => GoodbyeReason::Fault,
_ => GoodbyeReason::Unknown,
}
}
}
impl Into<u64> for GoodbyeReason {
fn into(self) -> u64 {
match self {
GoodbyeReason::Unknown => 0,
GoodbyeReason::ClientShutdown => 1,
GoodbyeReason::IrreleventNetwork => 2,
GoodbyeReason::Fault => 3,
}
}
}
impl Encodable for GoodbyeReason {
fn ssz_append(&self, s: &mut SszStream) {
let id: u64 = (*self).clone().into();
id.ssz_append(s);
}
}
impl Decodable for GoodbyeReason {
fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> {
let (id, index) = u64::ssz_decode(bytes, index)?;
Ok((Self::from(id), index))
}
}
/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockRootsRequest {
/// The starting slot of the requested blocks.
start_slot: Slot,
pub start_slot: Slot,
/// The number of blocks from the start slot.
count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers
pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers
}
/// Response containing a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockRootsResponse {
/// List of requested blocks and associated slots.
roots: Vec<BlockRootSlot>,
pub roots: Vec<BlockRootSlot>,
}
/// Contains a block root and associated slot.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockRootSlot {
/// The block root.
block_root: Hash256,
pub block_root: Hash256,
/// The block slot.
slot: Slot,
pub slot: Slot,
}
/// Request a number of beacon block headers from a peer.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockHeadersRequest {
/// The starting header hash of the requested headers.
start_root: Hash256,
pub start_root: Hash256,
/// The starting slot of the requested headers.
start_slot: Slot,
pub start_slot: Slot,
/// The maximum number of headers than can be returned.
max_headers: u64,
pub max_headers: u64,
/// The maximum number of slots to skip between blocks.
skip_slots: u64,
pub skip_slots: u64,
}
/// Response containing requested block headers.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockHeadersResponse {
/// The list of requested beacon block headers.
headers: Vec<BeaconBlockHeader>,
pub headers: Vec<BeaconBlockHeader>,
}
/// Request a number of beacon block bodies from a peer.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockBodiesRequest {
/// The list of beacon block bodies being requested.
block_roots: Hash256,
pub block_roots: Vec<Hash256>,
}
/// Response containing the list of requested beacon block bodies.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockBodiesResponse {
/// The list of beacon block bodies being requested.
block_bodies: Vec<BeaconBlockBody>,
pub block_bodies: Vec<BeaconBlockBody>,
}
/// Request values for tree hashes which yield a blocks `state_root`.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconChainStateRequest {
/// The tree hashes that a value is requested for.
hashes: Vec<Hash256>,
pub hashes: Vec<Hash256>,
}
/// Request values for tree hashes which yield a blocks `state_root`.
// Note: TBD
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconChainStateResponse {
/// The values corresponding the to the requested tree hashes.
values: bool, //TBD - stubbed with encodeable bool
pub values: bool, //TBD - stubbed with encodeable bool
}

View File

@ -2,7 +2,7 @@
///
/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on
/// `/eth/serenity/rpc/1.0.0`
mod methods;
pub mod methods;
mod protocol;
use futures::prelude::*;
@ -12,7 +12,7 @@ use libp2p::core::swarm::{
};
use libp2p::{Multiaddr, PeerId};
pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
pub use protocol::{RPCEvent, RPCProtocol};
pub use protocol::{RPCEvent, RPCProtocol, RequestId};
use slog::o;
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};

View File

@ -1,12 +1,13 @@
use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
use super::methods::*;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
use ssz::{ssz_encode, Decodable, DecodeError as SSZDecodeError, Encodable, SszStream};
use std::hash::{Hash, Hasher};
use std::io;
use std::iter;
use tokio::io::{AsyncRead, AsyncWrite};
/// The maximum bytes that can be sent across the RPC.
const MAX_READ_SIZE: usize = 2048;
const MAX_READ_SIZE: usize = 4_194_304; // 4M
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
@ -29,16 +30,65 @@ impl Default for RPCProtocol {
}
}
/// A monotonic counter for ordering `RPCRequest`s.
#[derive(Debug, Clone, PartialEq, Default)]
pub struct RequestId(u64);
impl RequestId {
/// Increment the request id.
pub fn increment(&mut self) {
self.0 += 1
}
/// Return the previous id.
pub fn previous(&self) -> Self {
Self(self.0 - 1)
}
}
impl Eq for RequestId {}
impl Hash for RequestId {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
impl From<u64> for RequestId {
fn from(x: u64) -> RequestId {
RequestId(x)
}
}
impl Into<u64> for RequestId {
fn into(self) -> u64 {
self.0
}
}
impl Encodable for RequestId {
fn ssz_append(&self, s: &mut SszStream) {
self.0.ssz_append(s);
}
}
impl Decodable for RequestId {
fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), SSZDecodeError> {
let (id, index) = u64::ssz_decode(bytes, index)?;
Ok((Self::from(id), index))
}
}
/// The RPC types which are sent/received in this protocol.
#[derive(Debug, Clone)]
pub enum RPCEvent {
Request {
id: u64,
id: RequestId,
method_id: u16,
body: RPCRequest,
},
Response {
id: u64,
id: RequestId,
method_id: u16, //TODO: Remove and process decoding upstream
result: RPCResponse,
},
@ -60,10 +110,13 @@ where
{
type Output = RPCEvent;
type Error = DecodeError;
type Future =
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>>;
type Future = upgrade::ReadOneThen<
upgrade::Negotiated<TSocket>,
(),
fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>,
>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
}
}
@ -72,7 +125,7 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
// decode the header of the rpc
// request/response
let (request, index) = bool::ssz_decode(&packet, 0)?;
let (id, index) = u64::ssz_decode(&packet, index)?;
let (id, index) = RequestId::ssz_decode(&packet, index)?;
let (method_id, index) = u16::ssz_decode(&packet, index)?;
if request {
@ -81,7 +134,31 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCRequest::Hello(hello_body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
RPCMethod::Goodbye => {
let (goodbye_reason, _index) = GoodbyeReason::ssz_decode(&packet, index)?;
RPCRequest::Goodbye(goodbye_reason)
}
RPCMethod::BeaconBlockRoots => {
let (block_roots_request, _index) =
BeaconBlockRootsRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockRoots(block_roots_request)
}
RPCMethod::BeaconBlockHeaders => {
let (block_headers_request, _index) =
BeaconBlockHeadersRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockHeaders(block_headers_request)
}
RPCMethod::BeaconBlockBodies => {
let (block_bodies_request, _index) =
BeaconBlockBodiesRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockBodies(block_bodies_request)
}
RPCMethod::BeaconChainState => {
let (chain_state_request, _index) =
BeaconChainStateRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconChainState(chain_state_request)
}
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Request {
@ -97,7 +174,24 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
let (body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCResponse::Hello(body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"),
RPCMethod::BeaconBlockRoots => {
let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockRoots(body)
}
RPCMethod::BeaconBlockHeaders => {
let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockHeaders(body)
}
RPCMethod::BeaconBlockBodies => {
let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockBodies(body)
}
RPCMethod::BeaconChainState => {
let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconChainState(body)
}
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Response {
id,
@ -113,10 +207,10 @@ where
{
type Output = ();
type Error = io::Error;
type Future = upgrade::WriteOne<TSocket>;
type Future = upgrade::WriteOne<upgrade::Negotiated<TSocket>>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
let bytes = ssz_encode(&self);
upgrade::write_one(socket, bytes)
}
@ -137,7 +231,21 @@ impl Encodable for RPCEvent {
RPCRequest::Hello(body) => {
s.append(body);
}
_ => {}
RPCRequest::Goodbye(body) => {
s.append(body);
}
RPCRequest::BeaconBlockRoots(body) => {
s.append(body);
}
RPCRequest::BeaconBlockHeaders(body) => {
s.append(body);
}
RPCRequest::BeaconBlockBodies(body) => {
s.append(body);
}
RPCRequest::BeaconChainState(body) => {
s.append(body);
}
}
}
RPCEvent::Response {
@ -152,7 +260,18 @@ impl Encodable for RPCEvent {
RPCResponse::Hello(response) => {
s.append(response);
}
_ => {}
RPCResponse::BeaconBlockRoots(response) => {
s.append(response);
}
RPCResponse::BeaconBlockHeaders(response) => {
s.append(response);
}
RPCResponse::BeaconBlockBodies(response) => {
s.append(response);
}
RPCResponse::BeaconChainState(response) => {
s.append(response);
}
}
}
}

View File

@ -6,13 +6,14 @@ use crate::NetworkConfig;
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
identity,
muxing::StreamMuxerBox,
nodes::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
};
use libp2p::{core, secio, Transport};
use libp2p::{PeerId, Swarm};
use libp2p::identify::protocol::IdentifyInfo;
use libp2p::{core, secio, PeerId, Swarm, Transport};
use slog::{debug, info, trace, warn};
use std::io::{Error, ErrorKind};
use std::time::Duration;
@ -33,15 +34,20 @@ impl Service {
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
debug!(log, "Libp2p Service starting");
let local_private_key = config.local_private_key;
let local_peer_id = local_private_key.to_peer_id();
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
// TODO: Save and recover node key from disk
let local_private_key = identity::Keypair::generate_secp256k1();
let local_public_key = local_private_key.public();
let local_peer_id = PeerId::from(local_private_key.public());
info!(log, "Local peer id: {:?}", local_peer_id);
let mut swarm = {
// Set up the transport
let transport = build_transport(local_private_key);
// Set up gossipsub routing
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log);
let behaviour = Behaviour::new(local_public_key.clone(), &config, &log);
// Set up Topology
let topology = local_peer_id.clone();
Swarm::new(transport, behaviour, topology)
@ -99,17 +105,23 @@ impl Stream for Service {
// TODO: Currently only gossipsub events passed here.
// Build a type for more generic events
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => {
//Behaviour events
Ok(Async::Ready(Some(event))) => match event {
// TODO: Stub here for debugging
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
}
BehaviourEvent::Message(m) => {
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
BehaviourEvent::RPC(peer_id, event) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
BehaviourEvent::PeerDialed(peer_id) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
}
BehaviourEvent::Identified(peer_id, info) => {
return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info))));
}
},
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break,
_ => break,
@ -121,9 +133,7 @@ impl Stream for Service {
/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and
/// mplex or yamux as the multiplexing layer.
fn build_transport(
local_private_key: secio::SecioKeyPair,
) -> Boxed<(PeerId, StreamMuxerBox), Error> {
fn build_transport(local_private_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
// in the future.
let transport = libp2p::tcp::TcpConfig::new();
@ -156,8 +166,12 @@ fn build_transport(
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent {
// We have received an RPC event on the swarm
/// An RPC response request has been received on the swarm.
RPC(PeerId, RPCEvent),
/// Initiated the connection to a new peer.
PeerDialed(PeerId),
/// Received information about a peer on the network.
Identified(PeerId, IdentifyInfo),
// TODO: Pub-sub testing only.
Message(String),
}

View File

@ -4,12 +4,17 @@ version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dev-dependencies]
test_harness = { path = "../beacon_chain/test_harness" }
sloggers = "0.3.2"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
eth2-libp2p = { path = "../eth2-libp2p" }
version = { path = "../version" }
types = { path = "../../eth2/types" }
slog = "2.4.1"
ssz = { path = "../../eth2/utils/ssz" }
futures = "0.1.25"
error-chain = "0.12.0"
crossbeam-channel = "0.3.8"

View File

@ -5,8 +5,12 @@ use beacon_chain::{
parking_lot::RwLockReadGuard,
slot_clock::SlotClock,
types::{BeaconState, ChainSpec},
CheckPoint,
AggregationOutcome, CheckPoint,
};
use eth2_libp2p::HelloMessage;
use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
/// The network's API to the beacon chain.
pub trait BeaconChain: Send + Sync {
@ -14,9 +18,48 @@ pub trait BeaconChain: Send + Sync {
fn get_state(&self) -> RwLockReadGuard<BeaconState>;
fn slot(&self) -> Slot;
fn head(&self) -> RwLockReadGuard<CheckPoint>;
fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock>, BeaconChainError>;
fn best_slot(&self) -> Slot;
fn best_block_root(&self) -> Hash256;
fn finalized_head(&self) -> RwLockReadGuard<CheckPoint>;
fn finalized_epoch(&self) -> Epoch;
fn hello_message(&self) -> HelloMessage;
fn process_block(&self, block: BeaconBlock)
-> Result<BlockProcessingOutcome, BeaconChainError>;
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<AggregationOutcome, BeaconChainError>;
fn get_block_roots(
&self,
start_slot: Slot,
count: usize,
skip: usize,
) -> Result<Vec<Hash256>, BeaconChainError>;
fn get_block_headers(
&self,
start_slot: Slot,
count: usize,
skip: usize,
) -> Result<Vec<BeaconBlockHeader>, BeaconChainError>;
fn get_block_bodies(&self, roots: &[Hash256])
-> Result<Vec<BeaconBlockBody>, BeaconChainError>;
fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result<bool, BeaconChainError>;
}
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
@ -33,11 +76,93 @@ where
self.state.read()
}
fn slot(&self) -> Slot {
self.get_state().slot
}
fn head(&self) -> RwLockReadGuard<CheckPoint> {
self.head()
}
fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock>, BeaconChainError> {
self.get_block(block_root)
}
fn finalized_epoch(&self) -> Epoch {
self.get_state().finalized_epoch
}
fn finalized_head(&self) -> RwLockReadGuard<CheckPoint> {
self.finalized_head()
}
fn best_slot(&self) -> Slot {
self.head().beacon_block.slot
}
fn best_block_root(&self) -> Hash256 {
self.head().beacon_block_root
}
fn hello_message(&self) -> HelloMessage {
let spec = self.get_spec();
let state = self.get_state();
HelloMessage {
network_id: spec.network_id,
latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch,
best_root: self.best_block_root(),
best_slot: self.best_slot(),
}
}
fn process_block(
&self,
block: BeaconBlock,
) -> Result<BlockProcessingOutcome, BeaconChainError> {
self.process_block(block)
}
fn process_attestation(
&self,
_attestation: Attestation,
) -> Result<AggregationOutcome, BeaconChainError> {
// Awaiting a proper operations pool before we can import attestations.
//
// Returning a useless error for now.
//
// https://github.com/sigp/lighthouse/issues/281
return Err(BeaconChainError::DBInconsistent("CANNOT PROCESS".into()));
}
fn get_block_roots(
&self,
start_slot: Slot,
count: usize,
skip: usize,
) -> Result<Vec<Hash256>, BeaconChainError> {
self.get_block_roots(start_slot, count, skip)
}
fn get_block_headers(
&self,
start_slot: Slot,
count: usize,
skip: usize,
) -> Result<Vec<BeaconBlockHeader>, BeaconChainError> {
let roots = self.get_block_roots(start_slot, count, skip)?;
self.get_block_headers(&roots)
}
fn get_block_bodies(
&self,
roots: &[Hash256],
) -> Result<Vec<BeaconBlockBody>, BeaconChainError> {
self.get_block_bodies(roots)
}
fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result<bool, BeaconChainError> {
self.is_new_block_root(beacon_block_root)
}
}

View File

@ -1,8 +1,8 @@
/// This crate provides the network server for Lighthouse.
pub mod beacon_chain;
pub mod error;
mod message_handler;
mod service;
pub mod message_handler;
pub mod service;
pub mod sync;
pub use eth2_libp2p::NetworkConfig;

View File

@ -4,33 +4,29 @@ use crate::service::{NetworkMessage, OutgoingMessage};
use crate::sync::SimpleSync;
use crossbeam_channel::{unbounded as channel, Sender};
use eth2_libp2p::{
rpc::{RPCMethod, RPCRequest, RPCResponse},
HelloMessage, PeerId, RPCEvent,
behaviour::IncomingGossip,
rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId},
PeerId, RPCEvent,
};
use futures::future;
use slog::warn;
use slog::{debug, trace};
use slog::{debug, warn};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
/// Timeout for RPC requests.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// Timeout before banning a peer for non-identification.
const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
// const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler {
/// Currently loaded and initialised beacon chain.
chain: Arc<BeaconChain>,
_chain: Arc<BeaconChain>,
/// The syncing framework.
sync: SimpleSync,
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
requests: HashMap<(PeerId, u64), Instant>,
/// A counter of request id for each peer.
request_ids: HashMap<PeerId, u64>,
/// The context required to send messages to, and process messages from peers.
network_context: NetworkContext,
/// The `MessageHandler` logger.
log: slog::Logger,
}
@ -44,8 +40,8 @@ pub enum HandlerMessage {
PeerDisconnected(PeerId),
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A block has been imported.
BlockImported(), //TODO: This comes from pub-sub - decide its contents
/// A gossip message has been received.
IncomingGossip(PeerId, IncomingGossip),
}
impl MessageHandler {
@ -65,13 +61,9 @@ impl MessageHandler {
let sync = SimpleSync::new(beacon_chain.clone(), &log);
let mut handler = MessageHandler {
// TODO: The handler may not need a chain, perhaps only sync?
chain: beacon_chain.clone(),
_chain: beacon_chain.clone(),
sync,
network_send,
requests: HashMap::new(),
request_ids: HashMap::new(),
network_context: NetworkContext::new(network_send, log.clone()),
log: log.clone(),
};
@ -93,13 +85,16 @@ impl MessageHandler {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
let id = self.generate_request_id(&peer_id);
self.send_hello(peer_id, id, true);
self.sync.on_connect(peer_id, &mut self.network_context);
}
// we have received an RPC message request/response
HandlerMessage::RPC(peer_id, rpc_event) => {
self.handle_rpc_message(peer_id, rpc_event);
}
// we have received an RPC message request/response
HandlerMessage::IncomingGossip(peer_id, gossip) => {
self.handle_gossip(peer_id, gossip);
}
//TODO: Handle all messages
_ => {}
}
@ -117,109 +112,195 @@ impl MessageHandler {
}
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) {
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
// TODO: process the `id`.
match request {
RPCRequest::Hello(hello_message) => {
self.handle_hello_request(peer_id, id, hello_message)
RPCRequest::Hello(hello_message) => self.sync.on_hello_request(
peer_id,
request_id,
hello_message,
&mut self.network_context,
),
RPCRequest::Goodbye(goodbye_reason) => self.sync.on_goodbye(peer_id, goodbye_reason),
RPCRequest::BeaconBlockRoots(request) => self.sync.on_beacon_block_roots_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
RPCRequest::BeaconBlockHeaders(request) => self.sync.on_beacon_block_headers_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
RPCRequest::BeaconBlockBodies(request) => self.sync.on_beacon_block_bodies_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
RPCRequest::BeaconChainState(_) => {
// We do not implement this endpoint, it is not required and will only likely be
// useful for light-client support in later phases.
warn!(self.log, "BeaconChainState RPC call is not supported.");
}
// TODO: Handle all requests
_ => {}
}
}
/// An RPC response has been received from the network.
// we match on id and ignore responses past the timeout.
fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {
// if response id is related to a request, ignore (likely RPC timeout)
if self.requests.remove(&(peer_id.clone(), id)).is_none() {
debug!(self.log, "Unrecognized response from peer: {:?}", peer_id);
fn handle_rpc_response(&mut self, peer_id: PeerId, id: RequestId, response: RPCResponse) {
// if response id is not related to a request, ignore (likely RPC timeout)
if self
.network_context
.outstanding_outgoing_request_ids
.remove(&(peer_id.clone(), id.clone()))
.is_none()
{
warn!(
self.log,
"Unknown ResponseId for incoming RPCRequest";
"peer" => format!("{:?}", peer_id),
"request_id" => format!("{:?}", id)
);
return;
}
match response {
RPCResponse::Hello(hello_message) => {
debug!(self.log, "Hello response received from peer: {:?}", peer_id);
self.validate_hello(peer_id, hello_message);
self.sync
.on_hello_response(peer_id, hello_message, &mut self.network_context);
}
RPCResponse::BeaconBlockRoots(response) => {
self.sync.on_beacon_block_roots_response(
peer_id,
response,
&mut self.network_context,
);
}
RPCResponse::BeaconBlockHeaders(response) => {
self.sync.on_beacon_block_headers_response(
peer_id,
response,
&mut self.network_context,
);
}
RPCResponse::BeaconBlockBodies(response) => {
self.sync.on_beacon_block_bodies_response(
peer_id,
response,
&mut self.network_context,
);
}
RPCResponse::BeaconChainState(_) => {
// We do not implement this endpoint, it is not required and will only likely be
// useful for light-client support in later phases.
//
// Theoretically, we shouldn't reach this code because we should never send a
// beacon state RPC request.
warn!(self.log, "BeaconChainState RPC call is not supported.");
}
// TODO: Handle all responses
_ => {}
}
}
/// Handle a HELLO RPC request message.
fn handle_hello_request(&mut self, peer_id: PeerId, id: u64, hello_message: HelloMessage) {
// send back a HELLO message
self.send_hello(peer_id.clone(), id, false);
// validate the peer
self.validate_hello(peer_id, hello_message);
}
/// Validate a HELLO RPC message.
fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) {
// validate the peer
if !self.sync.validate_peer(peer_id.clone(), message) {
debug!(
self.log,
"Peer dropped due to mismatching HELLO messages: {:?}", peer_id
);
//TODO: block/ban the peer
}
}
/* General RPC helper functions */
/// Generates a new request id for a peer.
fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 {
// generate a unique id for the peer
let id = {
let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0);
let id = borrowed_id.clone();
//increment the counter
*borrowed_id += 1;
id
};
// register RPC request
self.requests.insert((peer_id.clone(), id), Instant::now());
debug!(
self.log,
"Hello request registered with peer: {:?}", peer_id
);
id
}
/// Sends a HELLO RPC request or response to a newly connected peer.
//TODO: The boolean determines if sending request/respond, will be cleaner in the RPC re-write
fn send_hello(&mut self, peer_id: PeerId, id: u64, is_request: bool) {
let rpc_event = if is_request {
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: IncomingGossip) {
match gossip_message {
IncomingGossip::Block(message) => {
self.sync
.on_block_gossip(peer_id, message, &mut self.network_context)
}
IncomingGossip::Attestation(message) => {
self.sync
.on_attestation_gossip(peer_id, message, &mut self.network_context)
}
}
}
}
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>,
/// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`.
outgoing_request_ids: HashMap<PeerId, RequestId>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
impl NetworkContext {
pub fn new(network_send: crossbeam_channel::Sender<NetworkMessage>, log: slog::Logger) -> Self {
Self {
network_send,
outstanding_outgoing_request_ids: HashMap::new(),
outgoing_request_ids: HashMap::new(),
log,
}
}
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason))
// TODO: disconnect peers.
}
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
let id = self.generate_request_id(&peer_id);
self.outstanding_outgoing_request_ids
.insert((peer_id.clone(), id.clone()), Instant::now());
self.send_rpc_event(
peer_id,
RPCEvent::Request {
id,
method_id: RPCMethod::Hello.into(),
body: RPCRequest::Hello(self.sync.generate_hello()),
}
} else {
RPCEvent::Response {
id,
method_id: RPCMethod::Hello.into(),
result: RPCResponse::Hello(self.sync.generate_hello()),
}
};
// send the hello request to the network
trace!(self.log, "Sending HELLO message to peer {:?}", peer_id);
self.send_rpc(peer_id, rpc_event);
method_id: rpc_request.method_id(),
body: rpc_request,
},
);
}
/// Sends an RPC request/response to the network server.
fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) {
pub fn send_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
rpc_response: RPCResponse,
) {
self.send_rpc_event(
peer_id,
RPCEvent::Response {
id: request_id,
method_id: rpc_response.method_id(),
result: rpc_response,
},
);
}
fn send_rpc_event(&self, peer_id: PeerId, rpc_event: RPCEvent) {
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
}
fn send(&self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
self.network_send
.send(NetworkMessage::Send(
peer_id,
OutgoingMessage::RPC(rpc_event),
))
.send(NetworkMessage::Send(peer_id, outgoing_message))
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send RPC message to the network service"
)
});
//
}
/// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`.
fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId {
let next_id = self
.outgoing_request_ids
.entry(peer_id.clone())
.and_modify(|id| id.increment())
.or_insert_with(|| RequestId::from(1));
next_id.previous()
}
}

View File

@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor;
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service {
//eth2_libp2p_service: Arc<Mutex<LibP2PService>>,
eth2_libp2p_exit: oneshot::Sender<()>,
//libp2p_service: Arc<Mutex<LibP2PService>>,
libp2p_exit: oneshot::Sender<()>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
//message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>,
@ -40,20 +40,20 @@ impl Service {
message_handler_log,
)?;
// launch eth2_libp2p service
let eth2_libp2p_log = log.new(o!("Service" => "Libp2p"));
let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?;
// launch libp2p service
let libp2p_log = log.new(o!("Service" => "Libp2p"));
let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?;
// TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread.
let eth2_libp2p_exit = spawn_service(
eth2_libp2p_service,
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let libp2p_exit = spawn_service(
libp2p_service,
network_recv,
message_handler_send,
executor,
log,
)?;
let network_service = Service {
eth2_libp2p_exit,
libp2p_exit,
network_send: network_send.clone(),
};
@ -72,7 +72,7 @@ impl Service {
}
fn spawn_service(
eth2_libp2p_service: LibP2PService,
libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
executor: &TaskExecutor,
@ -83,7 +83,7 @@ fn spawn_service(
// spawn on the current executor
executor.spawn(
network_service(
eth2_libp2p_service,
libp2p_service,
network_recv,
message_handler_send,
log.clone(),
@ -100,7 +100,7 @@ fn spawn_service(
}
fn network_service(
mut eth2_libp2p_service: LibP2PService,
mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
log: slog::Logger,
@ -108,28 +108,34 @@ fn network_service(
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
// poll the swarm
loop {
match eth2_libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => {
trace!(
eth2_libp2p_service.log,
"RPC Event: RPC message received: {:?}",
rpc_event
);
message_handler_send
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
eth2_libp2p_service.log,
"Network Service: Message received: {}", m
),
_ => break,
match libp2p_service.poll() {
Ok(Async::Ready(Some(event))) => match event {
Libp2pEvent::RPC(peer_id, rpc_event) => {
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
message_handler_send
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "failed to send rpc to handler")?;
}
Libp2pEvent::PeerDialed(peer_id) => {
debug!(log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "failed to send rpc to handler")?;
}
Libp2pEvent::Identified(peer_id, info) => {
debug!(
log,
"We have identified peer: {:?} with {:?}", peer_id, info
);
}
Libp2pEvent::Message(m) => debug!(
libp2p_service.log,
"Network Service: Message received: {}", m
),
},
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
Ok(Async::NotReady) => break,
Err(_) => break,
}
}
// poll the network channel
@ -143,7 +149,7 @@ fn network_service(
trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event);
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier");
@ -165,7 +171,7 @@ fn network_service(
/// Types of messages that the network service can receive.
#[derive(Debug, Clone)]
pub enum NetworkMessage {
/// Send a message to eth2_libp2p service.
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
}

View File

@ -0,0 +1,232 @@
use crate::beacon_chain::BeaconChain;
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::PeerId;
use slog::{debug, error};
use ssz::TreeHash;
use std::sync::Arc;
use std::time::{Duration, Instant};
use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256};
/// Provides a queue for fully and partially built `BeaconBlock`s.
///
/// The queue is fundamentally a `Vec<PartialBeaconBlock>` where no two items have the same
/// `item.block_root`. This struct it backed by a `Vec` not a `HashMap` for the following two
/// reasons:
///
/// - When we receive a `BeaconBlockBody`, the only way we can find it's matching
/// `BeaconBlockHeader` is to find a header such that `header.beacon_block_body ==
/// hash_tree_root(body)`. Therefore, if we used a `HashMap` we would need to use the root of
/// `BeaconBlockBody` as the key.
/// - It is possible for multiple distinct blocks to have identical `BeaconBlockBodies`. Therefore
/// we cannot use a `HashMap` keyed by the root of `BeaconBlockBody`.
pub struct ImportQueue {
pub chain: Arc<BeaconChain>,
/// Partially imported blocks, keyed by the root of `BeaconBlockBody`.
pub partials: Vec<PartialBeaconBlock>,
/// Time before a queue entry is considered state.
pub stale_time: Duration,
/// Logging
log: slog::Logger,
}
impl ImportQueue {
/// Return a new, empty queue.
pub fn new(chain: Arc<BeaconChain>, stale_time: Duration, log: slog::Logger) -> Self {
Self {
chain,
partials: vec![],
stale_time,
log,
}
}
/// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing
/// slot number. Does not delete the partials from the queue, this must be done manually.
///
/// Returns `(queue_index, block, sender)`:
///
/// - `block_root`: may be used to remove the entry if it is successfully processed.
/// - `block`: the completed block.
/// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial.
pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> {
let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self
.partials
.iter()
.filter_map(|partial| partial.clone().complete())
.collect();
// Sort the completable partials to be in ascending slot order.
complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap());
complete
}
/// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial
/// if it exists.
pub fn remove(&mut self, block_root: Hash256) -> Option<PartialBeaconBlock> {
let position = self
.partials
.iter()
.position(|p| p.block_root == block_root)?;
Some(self.partials.remove(position))
}
/// Flushes all stale entries from the queue.
///
/// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the
/// past.
pub fn remove_stale(&mut self) {
let stale_indices: Vec<usize> = self
.partials
.iter()
.enumerate()
.filter_map(|(i, partial)| {
if partial.inserted + self.stale_time <= Instant::now() {
Some(i)
} else {
None
}
})
.collect();
if !stale_indices.is_empty() {
debug!(
self.log,
"ImportQueue removing stale entries";
"stale_items" => stale_indices.len(),
"stale_time_seconds" => self.stale_time.as_secs()
);
}
stale_indices.iter().for_each(|&i| {
self.partials.remove(i);
});
}
/// Returns `true` if `self.chain` has not yet processed this block.
pub fn is_new_block(&self, block_root: &Hash256) -> bool {
self.chain
.is_new_block_root(&block_root)
.unwrap_or_else(|_| {
error!(self.log, "Unable to determine if block is new.");
true
})
}
/// Returns the index of the first new root in the list of block roots.
pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option<usize> {
roots
.iter()
.position(|brs| self.is_new_block(&brs.block_root))
}
/// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for
/// which we should use to request `BeaconBlockBodies`.
///
/// If a `header` is not in the queue and has not been processed by the chain it is added to
/// the queue and it's block root is included in the output.
///
/// If a `header` is already in the queue, but not yet processed by the chain the block root is
/// included in the output and the `inserted` time for the partial record is set to
/// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale.
///
/// Presently the queue enforces that a `BeaconBlockHeader` _must_ be received before its
/// `BeaconBlockBody`. This is not a natural requirement and we could enhance the queue to lift
/// this restraint.
pub fn enqueue_headers(
&mut self,
headers: Vec<BeaconBlockHeader>,
sender: PeerId,
) -> Vec<Hash256> {
let mut required_bodies: Vec<Hash256> = vec![];
for header in headers {
let block_root = Hash256::from_slice(&header.hash_tree_root()[..]);
if self.is_new_block(&block_root) {
self.insert_header(block_root, header, sender.clone());
required_bodies.push(block_root)
}
}
required_bodies
}
/// If there is a matching `header` for this `body`, adds it to the queue.
///
/// If there is no `header` for the `body`, the body is simply discarded.
pub fn enqueue_bodies(&mut self, bodies: Vec<BeaconBlockBody>, sender: PeerId) {
for body in bodies {
self.insert_body(body, sender.clone());
}
}
/// Inserts a header to the queue.
///
/// If the header already exists, the `inserted` time is set to `now` and not other
/// modifications are made.
fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) {
if let Some(i) = self
.partials
.iter()
.position(|p| p.block_root == block_root)
{
self.partials[i].inserted = Instant::now();
} else {
self.partials.push(PartialBeaconBlock {
block_root,
header,
body: None,
inserted: Instant::now(),
sender,
})
}
}
/// Updates an existing partial with the `body`.
///
/// If there is no header for the `body`, the body is simply discarded.
///
/// If the body already existed, the `inserted` time is set to `now`.
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) {
let body_root = Hash256::from_slice(&body.hash_tree_root()[..]);
self.partials.iter_mut().for_each(|mut p| {
if body_root == p.header.block_body_root {
p.inserted = Instant::now();
if p.body.is_none() {
p.body = Some(body.clone());
p.sender = sender.clone();
}
}
});
}
}
/// Individual components of a `BeaconBlock`, potentially all that are required to form a full
/// `BeaconBlock`.
#[derive(Clone, Debug)]
pub struct PartialBeaconBlock {
/// `BeaconBlock` root.
pub block_root: Hash256,
pub header: BeaconBlockHeader,
pub body: Option<BeaconBlockBody>,
/// The instant at which this record was created or last meaningfully modified. Used to
/// determine if an entry is stale and should be removed.
pub inserted: Instant,
/// The `PeerId` that last meaningfully contributed to this item.
pub sender: PeerId,
}
impl PartialBeaconBlock {
/// Consumes `self` and returns a full built `BeaconBlock`, it's root and the `sender`
/// `PeerId`, if enough information exists to complete the block. Otherwise, returns `None`.
pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> {
Some((
self.block_root,
self.header.into_block(self.body?),
self.sender,
))
}
}

View File

@ -1,3 +1,4 @@
mod import_queue;
/// Syncing for lighthouse.
///
/// Stores the various syncing methods for the beacon chain.

View File

@ -1,112 +1,686 @@
use super::import_queue::ImportQueue;
use crate::beacon_chain::BeaconChain;
use eth2_libp2p::rpc::HelloMessage;
use crate::message_handler::NetworkContext;
use eth2_libp2p::behaviour::{AttestationGossip, BlockGossip};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, o};
use slog::{debug, error, info, o, warn};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use types::{Epoch, Hash256, Slot};
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// The amount of seconds a block (or partial block) may exist in the import queue.
const QUEUE_STALE_SECS: u64 = 60;
/// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo {
network_id: u8,
latest_finalized_root: Hash256,
latest_finalized_epoch: Epoch,
best_root: Hash256,
best_slot: Slot,
}
impl PeerSyncInfo {
/// Returns `true` if the has a different network ID to `other`.
fn has_different_network_id_to(&self, other: Self) -> bool {
self.network_id != other.network_id
}
/// Returns `true` if the peer has a higher finalized epoch than `other`.
fn has_higher_finalized_epoch_than(&self, other: Self) -> bool {
self.latest_finalized_epoch > other.latest_finalized_epoch
}
/// Returns `true` if the peer has a higher best slot than `other`.
fn has_higher_best_slot_than(&self, other: Self) -> bool {
self.best_slot > other.best_slot
}
}
/// The status of a peers view on the chain, relative to some other view of the chain (presumably
/// our view).
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum PeerStatus {
/// The peer is on a completely different chain.
DifferentNetworkId,
/// The peer lists a finalized epoch for which we have a different root.
FinalizedEpochNotInChain,
/// The peer has a higher finalized epoch.
HigherFinalizedEpoch,
/// The peer has a higher best slot.
HigherBestSlot,
/// The peer has the same or lesser view of the chain. We have nothing to request of them.
NotInteresting,
}
impl PeerStatus {
pub fn should_handshake(&self) -> bool {
match self {
PeerStatus::DifferentNetworkId => false,
PeerStatus::FinalizedEpochNotInChain => false,
PeerStatus::HigherFinalizedEpoch => true,
PeerStatus::HigherBestSlot => true,
PeerStatus::NotInteresting => true,
}
}
}
impl From<HelloMessage> for PeerSyncInfo {
fn from(hello: HelloMessage) -> PeerSyncInfo {
PeerSyncInfo {
network_id: hello.network_id,
latest_finalized_root: hello.latest_finalized_root,
latest_finalized_epoch: hello.latest_finalized_epoch,
best_root: hello.best_root,
best_slot: hello.best_slot,
}
}
}
impl From<&Arc<BeaconChain>> for PeerSyncInfo {
fn from(chain: &Arc<BeaconChain>) -> PeerSyncInfo {
Self::from(chain.hello_message())
}
}
/// The current syncing state.
#[derive(PartialEq)]
pub enum SyncState {
Idle,
Downloading,
Stopped,
_Stopped,
}
/// Simple Syncing protocol.
//TODO: Decide for HELLO messages whether its better to keep current in RAM or build on the fly
//when asked.
pub struct SimpleSync {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain>,
/// A mapping of Peers to their respective PeerSyncInfo.
known_peers: HashMap<PeerId, PeerSyncInfo>,
/// A queue to allow importing of blocks
import_queue: ImportQueue,
/// The current state of the syncing protocol.
state: SyncState,
/// The network id, for quick HELLO RPC message lookup.
network_id: u8,
/// The latest epoch of the syncing chain.
latest_finalized_epoch: Epoch,
/// The latest block of the syncing chain.
latest_slot: Slot,
/// Sync logger.
log: slog::Logger,
}
impl SimpleSync {
/// Instantiate a `SimpleSync` instance, with no peers and an empty queue.
pub fn new(beacon_chain: Arc<BeaconChain>, log: &slog::Logger) -> Self {
let state = beacon_chain.get_state();
let sync_logger = log.new(o!("Service"=> "Sync"));
let queue_item_stale_time = Duration::from_secs(QUEUE_STALE_SECS);
let import_queue =
ImportQueue::new(beacon_chain.clone(), queue_item_stale_time, log.clone());
SimpleSync {
chain: beacon_chain.clone(),
known_peers: HashMap::new(),
import_queue,
state: SyncState::Idle,
network_id: beacon_chain.get_spec().network_id,
latest_finalized_epoch: state.finalized_epoch,
latest_slot: state.slot - 1, //TODO: Build latest block function into Beacon chain and correct this
log: sync_logger,
}
}
/// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage {
let state = &self.chain.get_state();
//TODO: Paul to verify the logic of these fields.
HelloMessage {
network_id: self.network_id,
latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch,
best_root: Hash256::zero(), //TODO: build correct value as a beacon chain function
best_slot: state.slot - 1,
/// Handle a `Goodbye` message from a peer.
///
/// Removes the peer from `known_peers`.
pub fn on_goodbye(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
info!(
self.log, "PeerGoodbye";
"peer" => format!("{:?}", peer_id),
"reason" => format!("{:?}", reason),
);
self.known_peers.remove(&peer_id);
}
/// Handle the connection of a new peer.
///
/// Sends a `Hello` message to the peer.
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) {
info!(self.log, "PeerConnect"; "peer" => format!("{:?}", peer_id));
network.send_rpc_request(peer_id, RPCRequest::Hello(self.chain.hello_message()));
}
/// Handle a `Hello` request.
///
/// Processes the `HelloMessage` from the remote peer and sends back our `Hello`.
pub fn on_hello_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
hello: HelloMessage,
network: &mut NetworkContext,
) {
debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id));
// Say hello back.
network.send_rpc_response(
peer_id.clone(),
request_id,
RPCResponse::Hello(self.chain.hello_message()),
);
self.process_hello(peer_id, hello, network);
}
/// Process a `Hello` response from a peer.
pub fn on_hello_response(
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext,
) {
debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id));
// Process the hello message, without sending back another hello.
self.process_hello(peer_id, hello, network);
}
/// Returns a `PeerStatus` for some peer.
fn peer_status(&self, peer: PeerSyncInfo) -> PeerStatus {
let local = PeerSyncInfo::from(&self.chain);
if peer.has_different_network_id_to(local) {
return PeerStatus::DifferentNetworkId;
}
if local.has_higher_finalized_epoch_than(peer) {
let peer_finalized_slot = peer
.latest_finalized_epoch
.start_slot(self.chain.get_spec().slots_per_epoch);
let local_roots = self.chain.get_block_roots(peer_finalized_slot, 1, 0);
if let Ok(local_roots) = local_roots {
if let Some(local_root) = local_roots.get(0) {
if *local_root != peer.latest_finalized_root {
return PeerStatus::FinalizedEpochNotInChain;
}
} else {
error!(
self.log,
"Cannot get root for peer finalized slot.";
"error" => "empty roots"
);
}
} else {
error!(
self.log,
"Cannot get root for peer finalized slot.";
"error" => format!("{:?}", local_roots)
);
}
}
if peer.has_higher_finalized_epoch_than(local) {
PeerStatus::HigherFinalizedEpoch
} else if peer.has_higher_best_slot_than(local) {
PeerStatus::HigherBestSlot
} else {
PeerStatus::NotInteresting
}
}
pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool {
// network id must match
if hello_message.network_id != self.network_id {
return false;
}
// compare latest epoch and finalized root to see if they exist in our chain
if hello_message.latest_finalized_epoch <= self.latest_finalized_epoch {
// ensure their finalized root is in our chain
// TODO: Get the finalized root at hello_message.latest_epoch and ensure they match
//if (hello_message.latest_finalized_root == self.chain.get_state() {
// return false;
// }
/// Process a `Hello` message, requesting new blocks if appropriate.
///
/// Disconnects the peer if required.
fn process_hello(
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext,
) {
let spec = self.chain.get_spec();
let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain);
let remote_status = self.peer_status(remote);
if remote_status.should_handshake() {
info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id));
self.known_peers.insert(peer_id.clone(), remote);
} else {
info!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "network_id"
);
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
}
// the client is valid, add it to our list of known_peers and request sync if required
// update peer list if peer already exists
let peer_info = PeerSyncInfo {
latest_finalized_root: hello_message.latest_finalized_root,
latest_finalized_epoch: hello_message.latest_finalized_epoch,
best_root: hello_message.best_root,
best_slot: hello_message.best_slot,
// If required, send additional requests.
match remote_status {
PeerStatus::HigherFinalizedEpoch => {
let start_slot = remote
.latest_finalized_epoch
.start_slot(spec.slots_per_epoch);
let required_slots = start_slot - local.best_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.into(),
},
network,
);
}
PeerStatus::HigherBestSlot => {
let required_slots = remote.best_slot - local.best_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot: local.best_slot + 1,
count: required_slots.into(),
},
network,
);
}
PeerStatus::FinalizedEpochNotInChain => {}
PeerStatus::DifferentNetworkId => {}
PeerStatus::NotInteresting => {}
}
}
/// Handle a `BeaconBlockRoots` request from the peer.
pub fn on_beacon_block_roots_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockRootsRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.count,
);
let roots = match self
.chain
.get_block_roots(req.start_slot, req.count as usize, 0)
{
Ok(roots) => roots,
Err(e) => {
// TODO: return RPC error.
warn!(
self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id),
"req" => "BeaconBlockRoots",
"error" => format!("{:?}", e)
);
return;
}
};
debug!(self.log, "Handshake successful. Peer: {:?}", peer_id);
self.known_peers.insert(peer_id, peer_info);
let roots = roots
.iter()
.enumerate()
.map(|(i, &block_root)| BlockRootSlot {
slot: req.start_slot + Slot::from(i),
block_root,
})
.collect();
// set state to sync
if self.state == SyncState::Idle
&& hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE
{
self.state = SyncState::Downloading;
//TODO: Start requesting blocks from known peers. Ideally in batches
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }),
)
}
/// Handle a `BeaconBlockRoots` response from the peer.
pub fn on_beacon_block_roots_response(
&mut self,
peer_id: PeerId,
res: BeaconBlockRootsResponse,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockRootsResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.roots.len(),
);
if res.roots.is_empty() {
warn!(
self.log,
"Peer returned empty block roots response. PeerId: {:?}", peer_id
);
return;
}
true
let new_root_index = self.import_queue.first_new_root(&res.roots);
// If a new block root is found, request it and all the headers following it.
//
// We make an assumption here that if we don't know a block then we don't know of all
// it's parents. This might not be the case if syncing becomes more sophisticated.
if let Some(i) = new_root_index {
let new = &res.roots[i];
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: new.block_root,
start_slot: new.slot,
max_headers: (res.roots.len() - i) as u64,
skip_slots: 0,
},
network,
)
}
}
/// Handle a `BeaconBlockHeaders` request from the peer.
pub fn on_beacon_block_headers_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockHeadersRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.max_headers,
);
let headers = match self.chain.get_block_headers(
req.start_slot,
req.max_headers as usize,
req.skip_slots as usize,
) {
Ok(headers) => headers,
Err(e) => {
// TODO: return RPC error.
warn!(
self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id),
"req" => "BeaconBlockHeaders",
"error" => format!("{:?}", e)
);
return;
}
};
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }),
)
}
/// Handle a `BeaconBlockHeaders` response from the peer.
pub fn on_beacon_block_headers_response(
&mut self,
peer_id: PeerId,
res: BeaconBlockHeadersResponse,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockHeadersResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.headers.len(),
);
if res.headers.is_empty() {
warn!(
self.log,
"Peer returned empty block headers response. PeerId: {:?}", peer_id
);
return;
}
// Enqueue the headers, obtaining a list of the roots of the headers which were newly added
// to the queue.
let block_roots = self
.import_queue
.enqueue_headers(res.headers, peer_id.clone());
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
}
/// Handle a `BeaconBlockBodies` request from the peer.
pub fn on_beacon_block_bodies_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockBodiesRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.block_roots.len(),
);
let block_bodies = match self.chain.get_block_bodies(&req.block_roots) {
Ok(bodies) => bodies,
Err(e) => {
// TODO: return RPC error.
warn!(
self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id),
"req" => "BeaconBlockBodies",
"error" => format!("{:?}", e)
);
return;
}
};
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }),
)
}
/// Handle a `BeaconBlockBodies` response from the peer.
pub fn on_beacon_block_bodies_response(
&mut self,
peer_id: PeerId,
res: BeaconBlockBodiesResponse,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockBodiesResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.block_bodies.len(),
);
self.import_queue
.enqueue_bodies(res.block_bodies, peer_id.clone());
// Clear out old entries
self.import_queue.remove_stale();
// Import blocks, if possible.
self.process_import_queue(network);
}
/// Process a gossip message declaring a new block.
pub fn on_block_gossip(
&mut self,
peer_id: PeerId,
msg: BlockGossip,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockGossip";
"peer" => format!("{:?}", peer_id),
);
// TODO: filter out messages that a prior to the finalized slot.
//
// TODO: if the block is a few more slots ahead, try to get all block roots from then until
// now.
//
// Note: only requests the new block -- will fail if we don't have its parents.
if self.import_queue.is_new_block(&msg.root.block_root) {
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: msg.root.block_root,
start_slot: msg.root.slot,
max_headers: 1,
skip_slots: 0,
},
network,
)
}
}
/// Process a gossip message declaring a new attestation.
///
/// Not currently implemented.
pub fn on_attestation_gossip(
&mut self,
peer_id: PeerId,
msg: AttestationGossip,
_network: &mut NetworkContext,
) {
debug!(
self.log,
"AttestationGossip";
"peer" => format!("{:?}", peer_id),
);
// Awaiting a proper operations pool before we can import attestations.
//
// https://github.com/sigp/lighthouse/issues/281
match self.chain.process_attestation(msg.attestation) {
Ok(_) => panic!("Impossible, method not implemented."),
Err(_) => error!(self.log, "Attestation processing not implemented!"),
}
}
/// Iterate through the `import_queue` and process any complete blocks.
///
/// If a block is successfully processed it is removed from the queue, otherwise it remains in
/// the queue.
pub fn process_import_queue(&mut self, network: &mut NetworkContext) {
let mut successful = 0;
let mut invalid = 0;
let mut errored = 0;
// Loop through all of the complete blocks in the queue.
for (block_root, block, sender) in self.import_queue.complete_blocks() {
match self.chain.process_block(block) {
Ok(outcome) => {
if outcome.is_invalid() {
invalid += 1;
warn!(
self.log,
"InvalidBlock";
"sender_peer_id" => format!("{:?}", sender),
"reason" => format!("{:?}", outcome),
);
network.disconnect(sender, GoodbyeReason::Fault);
}
// If this results to true, the item will be removed from the queue.
if outcome.sucessfully_processed() {
successful += 1;
self.import_queue.remove(block_root);
}
}
Err(e) => {
errored += 1;
error!(self.log, "BlockProcessingError"; "error" => format!("{:?}", e));
}
}
}
if successful > 0 {
info!(self.log, "Imported {} blocks", successful)
}
if invalid > 0 {
warn!(self.log, "Rejected {} invalid blocks", invalid)
}
if errored > 0 {
warn!(self.log, "Failed to process {} blocks", errored)
}
}
/// Request some `BeaconBlockRoots` from the remote peer.
fn request_block_roots(
&mut self,
peer_id: PeerId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext,
) {
// Potentially set state to sync.
if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE {
debug!(self.log, "Entering downloading sync state.");
self.state = SyncState::Downloading;
}
debug!(
self.log,
"RPCRequest(BeaconBlockRoots)";
"count" => req.count,
"peer" => format!("{:?}", peer_id)
);
// TODO: handle count > max count.
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(req));
}
/// Request some `BeaconBlockHeaders` from the remote peer.
fn request_block_headers(
&mut self,
peer_id: PeerId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext,
) {
debug!(
self.log,
"RPCRequest(BeaconBlockHeaders)";
"max_headers" => req.max_headers,
"peer" => format!("{:?}", peer_id)
);
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req));
}
/// Request some `BeaconBlockBodies` from the remote peer.
fn request_block_bodies(
&mut self,
peer_id: PeerId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext,
) {
debug!(
self.log,
"RPCRequest(BeaconBlockBodies)";
"count" => req.block_roots.len(),
"peer" => format!("{:?}", peer_id)
);
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req));
}
/// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage {
self.chain.hello_message()
}
}

View File

@ -0,0 +1,570 @@
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::{PeerId, RPCEvent};
use network::beacon_chain::BeaconChain as NetworkBeaconChain;
use network::message_handler::{HandlerMessage, MessageHandler};
use network::service::{NetworkMessage, OutgoingMessage};
use sloggers::terminal::{Destination, TerminalLoggerBuilder};
use sloggers::types::Severity;
use sloggers::Build;
use std::time::Duration;
use test_harness::BeaconChainHarness;
use tokio::runtime::TaskExecutor;
use types::{test_utils::TestingBeaconStateBuilder, *};
pub struct SyncNode {
pub id: usize,
sender: Sender<HandlerMessage>,
receiver: Receiver<NetworkMessage>,
peer_id: PeerId,
harness: BeaconChainHarness,
}
impl SyncNode {
fn from_beacon_state_builder(
id: usize,
executor: &TaskExecutor,
state_builder: TestingBeaconStateBuilder,
spec: &ChainSpec,
logger: slog::Logger,
) -> Self {
let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone());
let (network_sender, network_receiver) = unbounded();
let message_handler_sender = MessageHandler::spawn(
harness.beacon_chain.clone(),
network_sender,
executor,
logger,
)
.unwrap();
Self {
id,
sender: message_handler_sender,
receiver: network_receiver,
peer_id: PeerId::random(),
harness,
}
}
fn increment_beacon_chain_slot(&mut self) {
self.harness.increment_beacon_chain_slot();
}
fn send(&self, message: HandlerMessage) {
self.sender.send(message).unwrap();
}
fn recv(&self) -> Result<NetworkMessage, RecvTimeoutError> {
self.receiver.recv_timeout(Duration::from_millis(500))
}
fn hello_message(&self) -> HelloMessage {
self.harness.beacon_chain.hello_message()
}
pub fn connect_to(&mut self, node: &SyncNode) {
let message = HandlerMessage::PeerDialed(self.peer_id.clone());
node.send(message);
}
/// Reads the receive queue from one node and passes the message to the other. Also returns a
/// copy of the message.
///
/// self -----> node
/// |
/// us
///
/// Named after the unix `tee` command.
fn tee(&mut self, node: &SyncNode) -> NetworkMessage {
let network_message = self.recv().expect("Timeout on tee");
let handler_message = match network_message.clone() {
NetworkMessage::Send(_to_peer_id, OutgoingMessage::RPC(event)) => {
HandlerMessage::RPC(self.peer_id.clone(), event)
}
_ => panic!("tee cannot parse {:?}", network_message),
};
node.send(handler_message);
network_message
}
fn tee_hello_request(&mut self, node: &SyncNode) -> HelloMessage {
let request = self.tee_rpc_request(node);
match request {
RPCRequest::Hello(message) => message,
_ => panic!("tee_hello_request got: {:?}", request),
}
}
fn tee_hello_response(&mut self, node: &SyncNode) -> HelloMessage {
let response = self.tee_rpc_response(node);
match response {
RPCResponse::Hello(message) => message,
_ => panic!("tee_hello_response got: {:?}", response),
}
}
fn tee_block_root_request(&mut self, node: &SyncNode) -> BeaconBlockRootsRequest {
let msg = self.tee_rpc_request(node);
match msg {
RPCRequest::BeaconBlockRoots(data) => data,
_ => panic!("tee_block_root_request got: {:?}", msg),
}
}
fn tee_block_root_response(&mut self, node: &SyncNode) -> BeaconBlockRootsResponse {
let msg = self.tee_rpc_response(node);
match msg {
RPCResponse::BeaconBlockRoots(data) => data,
_ => panic!("tee_block_root_response got: {:?}", msg),
}
}
fn tee_block_header_request(&mut self, node: &SyncNode) -> BeaconBlockHeadersRequest {
let msg = self.tee_rpc_request(node);
match msg {
RPCRequest::BeaconBlockHeaders(data) => data,
_ => panic!("tee_block_header_request got: {:?}", msg),
}
}
fn tee_block_header_response(&mut self, node: &SyncNode) -> BeaconBlockHeadersResponse {
let msg = self.tee_rpc_response(node);
match msg {
RPCResponse::BeaconBlockHeaders(data) => data,
_ => panic!("tee_block_header_response got: {:?}", msg),
}
}
fn tee_block_body_request(&mut self, node: &SyncNode) -> BeaconBlockBodiesRequest {
let msg = self.tee_rpc_request(node);
match msg {
RPCRequest::BeaconBlockBodies(data) => data,
_ => panic!("tee_block_body_request got: {:?}", msg),
}
}
fn tee_block_body_response(&mut self, node: &SyncNode) -> BeaconBlockBodiesResponse {
let msg = self.tee_rpc_response(node);
match msg {
RPCResponse::BeaconBlockBodies(data) => data,
_ => panic!("tee_block_body_response got: {:?}", msg),
}
}
fn tee_rpc_request(&mut self, node: &SyncNode) -> RPCRequest {
let network_message = self.tee(node);
match network_message {
NetworkMessage::Send(
_peer_id,
OutgoingMessage::RPC(RPCEvent::Request {
id: _,
method_id: _,
body,
}),
) => body,
_ => panic!("tee_rpc_request failed! got {:?}", network_message),
}
}
fn tee_rpc_response(&mut self, node: &SyncNode) -> RPCResponse {
let network_message = self.tee(node);
match network_message {
NetworkMessage::Send(
_peer_id,
OutgoingMessage::RPC(RPCEvent::Response {
id: _,
method_id: _,
result,
}),
) => result,
_ => panic!("tee_rpc_response failed! got {:?}", network_message),
}
}
pub fn get_block_root_request(&self) -> BeaconBlockRootsRequest {
let request = self.recv_rpc_request().expect("No block root request");
match request {
RPCRequest::BeaconBlockRoots(request) => request,
_ => panic!("Did not get block root request"),
}
}
pub fn get_block_headers_request(&self) -> BeaconBlockHeadersRequest {
let request = self.recv_rpc_request().expect("No block headers request");
match request {
RPCRequest::BeaconBlockHeaders(request) => request,
_ => panic!("Did not get block headers request"),
}
}
pub fn get_block_bodies_request(&self) -> BeaconBlockBodiesRequest {
let request = self.recv_rpc_request().expect("No block bodies request");
match request {
RPCRequest::BeaconBlockBodies(request) => request,
_ => panic!("Did not get block bodies request"),
}
}
fn _recv_rpc_response(&self) -> Result<RPCResponse, RecvTimeoutError> {
let network_message = self.recv()?;
Ok(match network_message {
NetworkMessage::Send(
_peer_id,
OutgoingMessage::RPC(RPCEvent::Response {
id: _,
method_id: _,
result,
}),
) => result,
_ => panic!("get_rpc_response failed! got {:?}", network_message),
})
}
fn recv_rpc_request(&self) -> Result<RPCRequest, RecvTimeoutError> {
let network_message = self.recv()?;
Ok(match network_message {
NetworkMessage::Send(
_peer_id,
OutgoingMessage::RPC(RPCEvent::Request {
id: _,
method_id: _,
body,
}),
) => body,
_ => panic!("get_rpc_request failed! got {:?}", network_message),
})
}
}
fn get_logger() -> slog::Logger {
let mut builder = TerminalLoggerBuilder::new();
builder.level(Severity::Debug);
builder.destination(Destination::Stderr);
builder.build().unwrap()
}
pub struct SyncMaster {
harness: BeaconChainHarness,
peer_id: PeerId,
response_ids: Vec<RequestId>,
}
impl SyncMaster {
fn from_beacon_state_builder(
state_builder: TestingBeaconStateBuilder,
node_count: usize,
spec: &ChainSpec,
) -> Self {
let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone());
let peer_id = PeerId::random();
let response_ids = vec![RequestId::from(0); node_count];
Self {
harness,
peer_id,
response_ids,
}
}
pub fn response_id(&mut self, node: &SyncNode) -> RequestId {
let id = self.response_ids[node.id].clone();
self.response_ids[node.id].increment();
id
}
pub fn do_hello_with(&mut self, node: &SyncNode) {
let message = HandlerMessage::PeerDialed(self.peer_id.clone());
node.send(message);
let request = node.recv_rpc_request().expect("No hello response");
match request {
RPCRequest::Hello(_hello) => {
let hello = self.harness.beacon_chain.hello_message();
let response = self.rpc_response(node, RPCResponse::Hello(hello));
node.send(response);
}
_ => panic!("Got message other than hello from node."),
}
}
pub fn respond_to_block_roots_request(
&mut self,
node: &SyncNode,
request: BeaconBlockRootsRequest,
) {
let roots = self
.harness
.beacon_chain
.get_block_roots(request.start_slot, request.count as usize, 0)
.expect("Beacon chain did not give block roots")
.iter()
.enumerate()
.map(|(i, root)| BlockRootSlot {
block_root: *root,
slot: Slot::from(i) + request.start_slot,
})
.collect();
let response = RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots });
self.send_rpc_response(node, response)
}
pub fn respond_to_block_headers_request(
&mut self,
node: &SyncNode,
request: BeaconBlockHeadersRequest,
) {
let roots = self
.harness
.beacon_chain
.get_block_roots(
request.start_slot,
request.max_headers as usize,
request.skip_slots as usize,
)
.expect("Beacon chain did not give blocks");
if roots.is_empty() {
panic!("Roots was empty when trying to get headers.")
}
assert_eq!(
roots[0], request.start_root,
"Got the wrong start root when getting headers"
);
let headers: Vec<BeaconBlockHeader> = roots
.iter()
.map(|root| {
let block = self
.harness
.beacon_chain
.get_block(root)
.expect("Failed to load block")
.expect("Block did not exist");
block.block_header()
})
.collect();
let response = RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers });
self.send_rpc_response(node, response)
}
pub fn respond_to_block_bodies_request(
&mut self,
node: &SyncNode,
request: BeaconBlockBodiesRequest,
) {
let block_bodies: Vec<BeaconBlockBody> = request
.block_roots
.iter()
.map(|root| {
let block = self
.harness
.beacon_chain
.get_block(root)
.expect("Failed to load block")
.expect("Block did not exist");
block.body
})
.collect();
let response = RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies });
self.send_rpc_response(node, response)
}
fn send_rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) {
node.send(self.rpc_response(node, rpc_response));
}
fn rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) -> HandlerMessage {
HandlerMessage::RPC(
self.peer_id.clone(),
RPCEvent::Response {
id: self.response_id(node),
method_id: RPCMethod::Hello.into(),
result: rpc_response,
},
)
}
}
fn test_setup(
state_builder: TestingBeaconStateBuilder,
node_count: usize,
spec: &ChainSpec,
logger: slog::Logger,
) -> (tokio::runtime::Runtime, SyncMaster, Vec<SyncNode>) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut nodes = Vec::with_capacity(node_count);
for id in 0..node_count {
let node = SyncNode::from_beacon_state_builder(
id,
&runtime.executor(),
state_builder.clone(),
&spec,
logger.clone(),
);
nodes.push(node);
}
let master = SyncMaster::from_beacon_state_builder(state_builder, node_count, &spec);
(runtime, master, nodes)
}
pub fn build_blocks(blocks: usize, master: &mut SyncMaster, nodes: &mut Vec<SyncNode>) {
for _ in 0..blocks {
master.harness.advance_chain_with_block();
for i in 0..nodes.len() {
nodes[i].increment_beacon_chain_slot();
}
}
master.harness.run_fork_choice();
for i in 0..nodes.len() {
nodes[i].harness.run_fork_choice();
}
}
#[test]
#[ignore]
fn sync_node_with_master() {
let logger = get_logger();
let spec = ChainSpec::few_validators();
let validator_count = 8;
let node_count = 1;
let state_builder =
TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec);
let (runtime, mut master, mut nodes) =
test_setup(state_builder, node_count, &spec, logger.clone());
let original_node_slot = nodes[0].hello_message().best_slot;
build_blocks(2, &mut master, &mut nodes);
master.do_hello_with(&nodes[0]);
let roots_request = nodes[0].get_block_root_request();
assert_eq!(roots_request.start_slot, original_node_slot + 1);
assert_eq!(roots_request.count, 2);
master.respond_to_block_roots_request(&nodes[0], roots_request);
let headers_request = nodes[0].get_block_headers_request();
assert_eq!(headers_request.start_slot, original_node_slot + 1);
assert_eq!(headers_request.max_headers, 2);
assert_eq!(headers_request.skip_slots, 0);
master.respond_to_block_headers_request(&nodes[0], headers_request);
let bodies_request = nodes[0].get_block_bodies_request();
assert_eq!(bodies_request.block_roots.len(), 2);
master.respond_to_block_bodies_request(&nodes[0], bodies_request);
std::thread::sleep(Duration::from_millis(10000));
runtime.shutdown_now();
}
#[test]
#[ignore]
fn sync_two_nodes() {
let logger = get_logger();
let spec = ChainSpec::few_validators();
let validator_count = 8;
let node_count = 2;
let state_builder =
TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec);
let (runtime, _master, mut nodes) =
test_setup(state_builder, node_count, &spec, logger.clone());
// let original_node_slot = nodes[0].hello_message().best_slot;
let mut node_a = nodes.remove(0);
let mut node_b = nodes.remove(0);
let blocks = 2;
// Node A builds out a longer, better chain.
for _ in 0..blocks {
// Node A should build a block.
node_a.harness.advance_chain_with_block();
// Node B should just increment it's slot without a block.
node_b.harness.increment_beacon_chain_slot();
}
node_a.harness.run_fork_choice();
// A connects to B.
node_a.connect_to(&node_b);
// B says hello to A.
node_b.tee_hello_request(&node_a);
// A says hello back.
node_a.tee_hello_response(&node_b);
// B requests block roots from A.
node_b.tee_block_root_request(&node_a);
// A provides block roots to A.
node_a.tee_block_root_response(&node_b);
// B requests block headers from A.
node_b.tee_block_header_request(&node_a);
// A provides block headers to B.
node_a.tee_block_header_response(&node_b);
// B requests block bodies from A.
node_b.tee_block_body_request(&node_a);
// A provides block bodies to B.
node_a.tee_block_body_response(&node_b);
std::thread::sleep(Duration::from_secs(10));
node_b.harness.run_fork_choice();
let node_a_chain = node_a
.harness
.beacon_chain
.chain_dump()
.expect("Can't dump node a chain");
let node_b_chain = node_b
.harness
.beacon_chain
.chain_dump()
.expect("Can't dump node b chain");
assert_eq!(
node_a_chain.len(),
node_b_chain.len(),
"Chains should be equal length"
);
assert_eq!(node_a_chain, node_b_chain, "Chains should be identical");
runtime.shutdown_now();
}

File diff suppressed because it is too large Load Diff

View File

@ -15,12 +15,10 @@ pub struct TestDoc {
pub title: String,
pub summary: String,
pub fork: String,
pub version: String,
pub test_cases: Vec<TestCase>,
}
#[test]
#[ignore]
fn yaml() {
use serde_yaml;
use std::{fs::File, io::prelude::*, path::PathBuf};

View File

@ -10,6 +10,7 @@ boolean-bitfield = { path = "../utils/boolean-bitfield" }
dirs = "1.0"
ethereum-types = "0.5"
hashing = { path = "../utils/hashing" }
hex = "0.3"
honey-badger-split = { path = "../utils/honey-badger-split" }
int_to_bytes = { path = "../utils/int_to_bytes" }
log = "0.4"
@ -24,7 +25,7 @@ ssz = { path = "../utils/ssz" }
ssz_derive = { path = "../utils/ssz_derive" }
swap_or_not_shuffle = { path = "../utils/swap_or_not_shuffle" }
test_random_derive = { path = "../utils/test_random_derive" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b3c32d9a821ae6cc89079499cc6e8a6bab0bffc3" }
[dev-dependencies]
env_logger = "0.6.0"

View File

@ -37,6 +37,19 @@ impl BeaconBlockHeader {
pub fn canonical_root(&self) -> Hash256 {
Hash256::from_slice(&self.hash_tree_root()[..])
}
/// Given a `body`, consumes `self` and returns a complete `BeaconBlock`.
///
/// Spec v0.5.0
pub fn into_block(self, body: BeaconBlockBody) -> BeaconBlock {
BeaconBlock {
slot: self.slot,
previous_block_root: self.previous_block_root,
state_root: self.state_root,
body,
signature: self.signature,
}
}
}
#[cfg(test)]

View File

@ -2,6 +2,7 @@ use crate::*;
use bls::Signature;
use int_to_bytes::int_to_bytes4;
use serde_derive::Deserialize;
use test_utils::u8_from_hex_str;
const GWEI: u64 = 1_000_000_000;
@ -57,6 +58,7 @@ pub struct ChainSpec {
pub far_future_epoch: Epoch,
pub zero_hash: Hash256,
pub empty_signature: Signature,
#[serde(deserialize_with = "u8_from_hex_str")]
pub bls_withdrawal_prefix_byte: u8,
/*

View File

@ -1,4 +1,7 @@
use crate::{test_utils::TestRandom, ChainSpec, Epoch};
use crate::{
test_utils::{fork_from_hex_str, TestRandom},
ChainSpec, Epoch,
};
use int_to_bytes::int_to_bytes4;
use rand::RngCore;
use serde_derive::{Deserialize, Serialize};
@ -12,7 +15,9 @@ use test_random_derive::TestRandom;
Debug, Clone, PartialEq, Default, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom,
)]
pub struct Fork {
#[serde(deserialize_with = "fork_from_hex_str")]
pub previous_version: [u8; 4],
#[serde(deserialize_with = "fork_from_hex_str")]
pub current_version: [u8; 4],
pub epoch: Epoch,
}

View File

@ -2,6 +2,7 @@
mod macros;
mod generate_deterministic_keypairs;
mod keypairs_file;
mod serde_utils;
mod test_random;
mod testing_attestation_builder;
mod testing_attestation_data_builder;
@ -17,6 +18,7 @@ mod testing_voluntary_exit_builder;
pub use generate_deterministic_keypairs::generate_deterministic_keypairs;
pub use keypairs_file::KeypairsFile;
pub use rand::{prng::XorShiftRng, SeedableRng};
pub use serde_utils::{fork_from_hex_str, u8_from_hex_str};
pub use test_random::TestRandom;
pub use testing_attestation_builder::TestingAttestationBuilder;
pub use testing_attestation_data_builder::TestingAttestationDataBuilder;

View File

@ -0,0 +1,28 @@
use serde::de::Error;
use serde::{Deserialize, Deserializer};
pub fn u8_from_hex_str<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
u8::from_str_radix(&s.as_str()[2..], 16).map_err(D::Error::custom)
}
pub fn fork_from_hex_str<'de, D>(deserializer: D) -> Result<[u8; 4], D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let mut array = [0 as u8; 4];
let decoded: Vec<u8> = hex::decode(&s.as_str()[2..]).map_err(D::Error::custom)?;
for (i, item) in array.iter_mut().enumerate() {
if i > decoded.len() {
break;
}
*item = decoded[i];
}
Ok(array)
}

View File

@ -23,6 +23,7 @@ pub fn keypairs_path() -> PathBuf {
/// Builds a beacon state to be used for testing purposes.
///
/// This struct should **never be used for production purposes.**
#[derive(Clone)]
pub struct TestingBeaconStateBuilder {
state: BeaconState,
keypairs: Vec<Keypair>,

View File

@ -0,0 +1,117 @@
use super::{fake_signature::FakeSignature, AggregatePublicKey};
use serde::de::{Deserialize, Deserializer};
use serde::ser::{Serialize, Serializer};
use serde_hex::{encode as hex_encode, PrefixedHexVisitor};
use ssz::{
decode_ssz_list, hash, ssz_encode, Decodable, DecodeError, Encodable, SszStream, TreeHash,
};
const SIGNATURE_LENGTH: usize = 48;
/// A BLS aggregate signature.
///
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone, Default, Eq)]
pub struct FakeAggregateSignature {
bytes: Vec<u8>,
}
impl FakeAggregateSignature {
/// Creates a new all-zero's signature
pub fn new() -> Self {
Self::zero()
}
/// Creates a new all-zero's signature
pub fn zero() -> Self {
Self {
bytes: vec![0; SIGNATURE_LENGTH],
}
}
/// Does glorious nothing.
pub fn add(&mut self, _signature: &FakeSignature) {
// Do nothing.
}
/// _Always_ returns `true`.
pub fn verify(
&self,
_msg: &[u8],
_domain: u64,
_aggregate_public_key: &AggregatePublicKey,
) -> bool {
true
}
/// _Always_ returns `true`.
pub fn verify_multiple(
&self,
_messages: &[&[u8]],
_domain: u64,
_aggregate_public_keys: &[&AggregatePublicKey],
) -> bool {
true
}
}
impl Encodable for FakeAggregateSignature {
fn ssz_append(&self, s: &mut SszStream) {
s.append_vec(&self.bytes);
}
}
impl Decodable for FakeAggregateSignature {
fn ssz_decode(bytes: &[u8], i: usize) -> Result<(Self, usize), DecodeError> {
let (sig_bytes, i) = decode_ssz_list(bytes, i)?;
Ok((FakeAggregateSignature { bytes: sig_bytes }, i))
}
}
impl Serialize for FakeAggregateSignature {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&hex_encode(ssz_encode(self)))
}
}
impl<'de> Deserialize<'de> for FakeAggregateSignature {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let bytes = deserializer.deserialize_str(PrefixedHexVisitor)?;
let (obj, _) = <_>::ssz_decode(&bytes[..], 0)
.map_err(|e| serde::de::Error::custom(format!("invalid ssz ({:?})", e)))?;
Ok(obj)
}
}
impl TreeHash for FakeAggregateSignature {
fn hash_tree_root(&self) -> Vec<u8> {
hash(&self.bytes)
}
}
#[cfg(test)]
mod tests {
use super::super::{Keypair, Signature};
use super::*;
use ssz::ssz_encode;
#[test]
pub fn test_ssz_round_trip() {
let keypair = Keypair::random();
let mut original = FakeAggregateSignature::new();
original.add(&Signature::new(&[42, 42], 0, &keypair.sk));
let bytes = ssz_encode(&original);
let (decoded, _) = FakeAggregateSignature::ssz_decode(&bytes, 0).unwrap();
assert_eq!(original, decoded);
}
}

View File

@ -0,0 +1,117 @@
use super::serde_vistors::HexVisitor;
use super::{PublicKey, SecretKey};
use hex::encode as hex_encode;
use serde::de::{Deserialize, Deserializer};
use serde::ser::{Serialize, Serializer};
use ssz::{
decode_ssz_list, hash, ssz_encode, Decodable, DecodeError, Encodable, SszStream, TreeHash,
};
const SIGNATURE_LENGTH: usize = 48;
/// A single BLS signature.
///
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone, Eq)]
pub struct FakeSignature {
bytes: Vec<u8>,
}
impl FakeSignature {
/// Creates a new all-zero's signature
pub fn new(_msg: &[u8], _domain: u64, _sk: &SecretKey) -> Self {
FakeSignature::zero()
}
/// Creates a new all-zero's signature
pub fn zero() -> Self {
Self {
bytes: vec![0; SIGNATURE_LENGTH],
}
}
/// Creates a new all-zero's signature
pub fn new_hashed(_x_real_hashed: &[u8], _x_imaginary_hashed: &[u8], _sk: &SecretKey) -> Self {
FakeSignature::zero()
}
/// _Always_ returns `true`.
pub fn verify(&self, _msg: &[u8], _domain: u64, _pk: &PublicKey) -> bool {
true
}
/// _Always_ returns true.
pub fn verify_hashed(
&self,
_x_real_hashed: &[u8],
_x_imaginary_hashed: &[u8],
_pk: &PublicKey,
) -> bool {
true
}
/// Returns a new empty signature.
pub fn empty_signature() -> Self {
FakeSignature::zero()
}
}
impl Encodable for FakeSignature {
fn ssz_append(&self, s: &mut SszStream) {
s.append_vec(&self.bytes);
}
}
impl Decodable for FakeSignature {
fn ssz_decode(bytes: &[u8], i: usize) -> Result<(Self, usize), DecodeError> {
let (sig_bytes, i) = decode_ssz_list(bytes, i)?;
Ok((FakeSignature { bytes: sig_bytes }, i))
}
}
impl TreeHash for FakeSignature {
fn hash_tree_root(&self) -> Vec<u8> {
hash(&self.bytes)
}
}
impl Serialize for FakeSignature {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&hex_encode(ssz_encode(self)))
}
}
impl<'de> Deserialize<'de> for FakeSignature {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let bytes = deserializer.deserialize_str(HexVisitor)?;
let (pubkey, _) = <_>::ssz_decode(&bytes[..], 0)
.map_err(|e| serde::de::Error::custom(format!("invalid ssz ({:?})", e)))?;
Ok(pubkey)
}
}
#[cfg(test)]
mod tests {
use super::super::Keypair;
use super::*;
use ssz::ssz_encode;
#[test]
pub fn test_ssz_round_trip() {
let keypair = Keypair::random();
let original = FakeSignature::new(&[42, 42], 0, &keypair.sk);
let bytes = ssz_encode(&original);
let (decoded, _) = FakeSignature::ssz_decode(&bytes, 0).unwrap();
assert_eq!(original, decoded);
}
}

View File

@ -14,4 +14,8 @@ impl Keypair {
let pk = PublicKey::from_secret_key(&sk);
Keypair { sk, pk }
}
pub fn identifier(&self) -> String {
self.pk.concatenated_hex_id()
}
}

View File

@ -2,19 +2,33 @@ extern crate bls_aggregates;
extern crate ssz;
mod aggregate_public_key;
mod aggregate_signature;
mod keypair;
mod public_key;
mod secret_key;
mod serde_vistors;
#[cfg(not(debug_assertions))]
mod aggregate_signature;
#[cfg(not(debug_assertions))]
mod signature;
#[cfg(not(debug_assertions))]
pub use crate::aggregate_signature::AggregateSignature;
#[cfg(not(debug_assertions))]
pub use crate::signature::Signature;
#[cfg(debug_assertions)]
mod fake_aggregate_signature;
#[cfg(debug_assertions)]
mod fake_signature;
#[cfg(debug_assertions)]
pub use crate::fake_aggregate_signature::FakeAggregateSignature as AggregateSignature;
#[cfg(debug_assertions)]
pub use crate::fake_signature::FakeSignature as Signature;
pub use crate::aggregate_public_key::AggregatePublicKey;
pub use crate::aggregate_signature::AggregateSignature;
pub use crate::keypair::Keypair;
pub use crate::public_key::PublicKey;
pub use crate::secret_key::SecretKey;
pub use crate::signature::Signature;
pub const BLS_AGG_SIG_BYTE_SIZE: usize = 96;

View File

@ -8,13 +8,14 @@ impl<'de> Visitor<'de> for HexVisitor {
type Value = Vec<u8>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a hex string (without 0x prefix)")
formatter.write_str("a hex string (irrelevant of prefix)")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(hex::decode(value).map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e)))?)
Ok(hex::decode(value.trim_start_matches("0x"))
.map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e)))?)
}
}

View File

@ -13,27 +13,35 @@ use ssz::{
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone, Eq)]
pub struct Signature(RawSignature);
pub struct Signature {
signature: RawSignature,
is_empty: bool,
}
impl Signature {
/// Instantiate a new Signature from a message and a SecretKey.
pub fn new(msg: &[u8], domain: u64, sk: &SecretKey) -> Self {
Signature(RawSignature::new(msg, domain, sk.as_raw()))
Signature {
signature: RawSignature::new(msg, domain, sk.as_raw()),
is_empty: false,
}
}
/// Instantiate a new Signature from a message and a SecretKey, where the message has already
/// been hashed.
pub fn new_hashed(x_real_hashed: &[u8], x_imaginary_hashed: &[u8], sk: &SecretKey) -> Self {
Signature(RawSignature::new_hashed(
x_real_hashed,
x_imaginary_hashed,
sk.as_raw(),
))
Signature {
signature: RawSignature::new_hashed(x_real_hashed, x_imaginary_hashed, sk.as_raw()),
is_empty: false,
}
}
/// Verify the Signature against a PublicKey.
pub fn verify(&self, msg: &[u8], domain: u64, pk: &PublicKey) -> bool {
self.0.verify(msg, domain, pk.as_raw())
if self.is_empty {
return false;
}
self.signature.verify(msg, domain, pk.as_raw())
}
/// Verify the Signature against a PublicKey, where the message has already been hashed.
@ -43,44 +51,72 @@ impl Signature {
x_imaginary_hashed: &[u8],
pk: &PublicKey,
) -> bool {
self.0
self.signature
.verify_hashed(x_real_hashed, x_imaginary_hashed, pk.as_raw())
}
/// Returns the underlying signature.
pub fn as_raw(&self) -> &RawSignature {
&self.0
&self.signature
}
/// Returns a new empty signature.
pub fn empty_signature() -> Self {
// Empty Signature is currently being represented as BLS::Signature.point_at_infinity()
// However it should be represented as vec![0; 96] but this
// would require all signatures to be represented in byte form as opposed to Signature
// Set RawSignature = infinity
let mut empty: Vec<u8> = vec![0; 96];
// Sets C_flag and B_flag to 1 and all else to 0
empty[0] += u8::pow(2, 6) + u8::pow(2, 7);
Signature(RawSignature::from_bytes(&empty).unwrap())
Signature {
signature: RawSignature::from_bytes(&empty).unwrap(),
is_empty: true,
}
}
// Converts a BLS Signature to bytes
pub fn as_bytes(&self) -> Vec<u8> {
if self.is_empty {
return vec![0; 96];
}
self.signature.as_bytes()
}
// Convert bytes to BLS Signature
pub fn from_bytes(bytes: &[u8]) -> Result<Self, DecodeError> {
for byte in bytes {
if *byte != 0 {
let raw_signature =
RawSignature::from_bytes(&bytes).map_err(|_| DecodeError::Invalid)?;
return Ok(Signature {
signature: raw_signature,
is_empty: false,
});
}
}
Ok(Signature::empty_signature())
}
// Check for empty Signature
pub fn is_empty(&self) -> bool {
self.is_empty
}
}
impl Encodable for Signature {
fn ssz_append(&self, s: &mut SszStream) {
s.append_vec(&self.0.as_bytes());
s.append_vec(&self.as_bytes());
}
}
impl Decodable for Signature {
fn ssz_decode(bytes: &[u8], i: usize) -> Result<(Self, usize), DecodeError> {
let (sig_bytes, i) = decode_ssz_list(bytes, i)?;
let raw_sig = RawSignature::from_bytes(&sig_bytes).map_err(|_| DecodeError::TooShort)?;
Ok((Signature(raw_sig), i))
let signature = Signature::from_bytes(&sig_bytes)?;
Ok((signature, i))
}
}
impl TreeHash for Signature {
fn hash_tree_root(&self) -> Vec<u8> {
hash(&self.0.as_bytes())
hash(&self.as_bytes())
}
}

View File

@ -4,6 +4,15 @@ version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[[bin]]
name = "validator_client"
path = "src/main.rs"
[lib]
name = "validator_client"
path = "src/lib.rs"
[dependencies]
block_proposer = { path = "../eth2/block_proposer" }
bls = { path = "../eth2/utils/bls" }
@ -18,3 +27,4 @@ slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ssz = { path = "../eth2/utils/ssz" }
bincode = "^1.1.2"

View File

@ -57,10 +57,30 @@ complete and return a block from the BN.
### Configuration
Presently the validator specifics (pubkey, etc.) are randomly generated and the
chain specification (slot length, BLS domain, etc.) are fixed to foundation
parameters. This is temporary and will be upgrade so these parameters can be
read from file (or initialized on first-boot).
Validator configurations are stored in a separate data directory from the main Beacon Node
binary. The validator data directory defaults to:
`$HOME/.lighthouse-validator`, however an alternative can be specified on the command line
with `--datadir`.
The configuration directory structure looks like:
```
~/.lighthouse-validator
├── 3cf4210d58ec
│   └── private.key
├── 9b5d8b5be4e7
│   └── private.key
└── cf6e07188f48
└── private.key
```
Where the hex value of the directory is a portion of the validator public key.
Validator keys must be generated using the separate `accounts_manager` binary, which will
place the keys into this directory structure in a format compatible with the validator client.
The chain specification (slot length, BLS domain, etc.) defaults to foundation
parameters, however is temporary and an upgrade will allow these parameters to be
read from a file (or initialized on first-boot).
## BN Communication

View File

@ -1,28 +1,39 @@
use bincode;
use bls::Keypair;
use clap::ArgMatches;
use slog::{debug, error, info};
use std::fs;
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
use types::ChainSpec;
/// Stores the core configuration for this validator instance.
#[derive(Clone)]
pub struct ClientConfig {
pub struct Config {
/// The data directory, which stores all validator databases
pub data_dir: PathBuf,
/// The server at which the Beacon Node can be contacted
pub server: String,
/// The chain specification that we are connecting to
pub spec: ChainSpec,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators";
const DEFAULT_PRIVATE_KEY_FILENAME: &str = "private.key";
impl ClientConfig {
/// Build a new configuration from defaults.
pub fn default() -> Self {
impl Default for Config {
fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
let home = dirs::home_dir().expect("Unable to determine home directory.");
home.join(".lighthouse-validator")
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let server = "localhost:50051".to_string();
let spec = ChainSpec::foundation();
Self {
data_dir,
server,
@ -30,3 +41,114 @@ impl ClientConfig {
}
}
}
impl Config {
/// Build a new configuration from defaults, which are overrided by arguments provided.
pub fn parse_args(args: &ArgMatches, log: &slog::Logger) -> Result<Self, Error> {
let mut config = Config::default();
// Use the specified datadir, or default in the home directory
if let Some(datadir) = args.value_of("datadir") {
config.data_dir = PathBuf::from(datadir);
fs::create_dir_all(&config.data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &config.data_dir));
info!(log, "Using custom data dir: {:?}", &config.data_dir);
};
if let Some(srv) = args.value_of("server") {
//TODO: I don't think this parses correctly a server & port combo
config.server = srv.to_string();
info!(log, "Using custom server: {:?}", &config.server);
};
// TODO: Permit loading a custom spec from file.
if let Some(spec_str) = args.value_of("spec") {
info!(log, "Using custom spec: {:?}", spec_str);
config.spec = match spec_str {
"foundation" => ChainSpec::foundation(),
"few_validators" => ChainSpec::few_validators(),
// Should be impossible due to clap's `possible_values(..)` function.
_ => unreachable!(),
};
};
Ok(config)
}
/// Try to load keys from validator_dir, returning None if none are found or an error.
pub fn fetch_keys(&self, log: &slog::Logger) -> Option<Vec<Keypair>> {
let key_pairs: Vec<Keypair> = fs::read_dir(&self.data_dir)
.unwrap()
.filter_map(|validator_dir| {
let validator_dir = validator_dir.ok()?;
if !(validator_dir.file_type().ok()?.is_dir()) {
// Skip non-directories (i.e. no files/symlinks)
return None;
}
let key_filename = validator_dir.path().join(DEFAULT_PRIVATE_KEY_FILENAME);
if !(key_filename.is_file()) {
info!(
log,
"Private key is not a file: {:?}",
key_filename.to_str()
);
return None;
}
debug!(
log,
"Deserializing private key from file: {:?}",
key_filename.to_str()
);
let mut key_file = File::open(key_filename.clone()).ok()?;
let key: Keypair = if let Ok(key_ok) = bincode::deserialize_from(&mut key_file) {
key_ok
} else {
error!(
log,
"Unable to deserialize the private key file: {:?}", key_filename
);
return None;
};
let ki = key.identifier();
if ki != validator_dir.file_name().into_string().ok()? {
error!(
log,
"The validator key ({:?}) did not match the directory filename {:?}.",
ki,
&validator_dir.path().to_string_lossy()
);
return None;
}
Some(key)
})
.collect();
// Check if it's an empty vector, and return none.
if key_pairs.is_empty() {
None
} else {
Some(key_pairs)
}
}
/// Saves a keypair to a file inside the appropriate validator directory. Returns the saved path filename.
pub fn save_key(&self, key: &Keypair) -> Result<PathBuf, Error> {
let validator_config_path = self.data_dir.join(key.identifier());
let key_path = validator_config_path.join(DEFAULT_PRIVATE_KEY_FILENAME);
fs::create_dir_all(&validator_config_path)?;
let mut key_file = File::create(&key_path)?;
bincode::serialize_into(&mut key_file, &key)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
Ok(key_path)
}
}

View File

@ -0,0 +1,3 @@
pub mod config;
pub use crate::config::Config;

View File

@ -1,17 +1,14 @@
use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService};
use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap};
use crate::config::ClientConfig;
use crate::config::Config;
use block_proposer::{test_utils::LocalSigner, BlockProducer};
use bls::Keypair;
use clap::{App, Arg};
use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient};
use slog::{error, info, o, Drain};
use slog::{info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use types::ChainSpec;
mod block_producer_service;
mod config;
@ -55,36 +52,11 @@ fn main() {
)
.get_matches();
let mut config = ClientConfig::default();
// Custom datadir
if let Some(dir) = matches.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
}
// Custom server port
if let Some(server_str) = matches.value_of("server") {
if let Ok(addr) = server_str.parse::<u16>() {
config.server = addr.to_string();
} else {
error!(log, "Invalid address"; "server" => server_str);
return;
}
}
// TODO: Permit loading a custom spec from file.
// Custom spec
if let Some(spec_str) = matches.value_of("spec") {
match spec_str {
"foundation" => config.spec = ChainSpec::foundation(),
"few_validators" => config.spec = ChainSpec::few_validators(),
// Should be impossible due to clap's `possible_values(..)` function.
_ => unreachable!(),
};
}
let config = Config::parse_args(&matches, &log)
.expect("Unable to build a configuration for the validator client.");
// Log configuration
info!(log, "";
info!(log, "Configuration parameters:";
"data_dir" => &config.data_dir.to_str(),
"server" => &config.server);
@ -119,13 +91,13 @@ fn main() {
let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision.
info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis);
let keypairs = config.fetch_keys(&log)
.expect("No key pairs found in configuration, they must first be generated with: account_manager generate.");
/*
* Start threads.
*/
let mut threads = vec![];
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
let keypairs = vec![Keypair::random()];
for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());