Merge branch 'master' of github.com:sigp/lighthouse into v0.5.0-state-transition-tests

This commit is contained in:
Kirk Baird 2019-03-22 09:23:56 +11:00
commit 192a49a23d
66 changed files with 2874 additions and 695 deletions

9
.editorconfig Normal file
View File

@ -0,0 +1,9 @@
root = true
[*]
indent_style=space
indent_size=4
end_of_line=lf
charset=utf-8
trim_trailing_whitespace=true
max_line_length=100
insert_final_newline=false

View File

@ -20,6 +20,11 @@ members = [
"eth2/utils/test_random_derive",
"beacon_node",
"beacon_node/db",
"beacon_node/client",
"beacon_node/network",
"beacon_node/eth2-libp2p",
"beacon_node/rpc",
"beacon_node/version",
"beacon_node/beacon_chain",
"beacon_node/beacon_chain/test_harness",
"protos",

View File

@ -1,25 +1,19 @@
[package]
name = "beacon_node"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com"]
edition = "2018"
[dependencies]
bls = { path = "../eth2/utils/bls" }
beacon_chain = { path = "beacon_chain" }
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2"
protos = { path = "../protos" }
types = { path = "../eth2/types" }
client = { path = "client" }
version = { path = "version" }
clap = "2.32.0"
db = { path = "db" }
dirs = "1.0.3"
futures = "0.1.23"
fork_choice = { path = "../eth2/fork_choice" }
slog = "^2.2.3"
slot_clock = { path = "../eth2/utils/slot_clock" }
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ctrlc = { version = "3.1.1", features = ["termination"] }
tokio = "0.1.15"
futures = "0.1.25"
exit-future = "0.1.3"
state_processing = { path = "../eth2/state_processing" }
types = { path = "../eth2/types" }
ssz = { path = "../eth2/utils/ssz" }
tokio = "0.1"

View File

@ -1,7 +1,7 @@
[package]
name = "beacon_chain"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]

View File

@ -82,7 +82,7 @@ where
let state_root = genesis_state.canonical_root();
state_store.put(&state_root, &ssz_encode(&genesis_state)[..])?;
let block_root = genesis_block.into_header().canonical_root();
let block_root = genesis_block.block_header().canonical_root();
block_store.put(&block_root, &ssz_encode(&genesis_block)[..])?;
let finalized_head = RwLock::new(CheckPoint::new(
@ -189,7 +189,7 @@ where
pub fn advance_state(&self, slot: Slot) -> Result<(), SlotProcessingError> {
let state_slot = self.state.read().slot;
let latest_block_header = self.head().beacon_block.into_header();
let latest_block_header = self.head().beacon_block.block_header();
for _ in state_slot.as_u64()..slot.as_u64() {
per_slot_processing(&mut *self.state.write(), &latest_block_header, &self.spec)?;
@ -561,7 +561,7 @@ where
pub fn process_block(&self, block: BeaconBlock) -> Result<BlockProcessingOutcome, Error> {
debug!("Processing block with slot {}...", block.slot);
let block_root = block.into_header().canonical_root();
let block_root = block.block_header().canonical_root();
let present_slot = self.present_slot();
@ -596,7 +596,7 @@ where
// Transition the parent state to the present slot.
let mut state = parent_state;
let previous_block_header = parent_block.into_header();
let previous_block_header = parent_block.block_header();
for _ in state.slot.as_u64()..present_slot.as_u64() {
if let Err(e) = per_slot_processing(&mut state, &previous_block_header, &self.spec) {
return Ok(BlockProcessingOutcome::InvalidBlock(

View File

@ -0,0 +1,94 @@
// Initialisation functions to generate a new BeaconChain.
// Note: A new version of ClientTypes may need to be implemented for the lighthouse
// testnet. These are examples. Also. there is code duplication which can/should be cleaned up.
use crate::BeaconChain;
use db::stores::{BeaconBlockStore, BeaconStateStore};
use db::{DiskDB, MemoryDB};
use fork_choice::BitwiseLMDGhost;
use slot_clock::SystemTimeSlotClock;
use ssz::TreeHash;
use std::path::PathBuf;
use std::sync::Arc;
use types::test_utils::TestingBeaconStateBuilder;
use types::{BeaconBlock, ChainSpec, Hash256};
//TODO: Correct this for prod
//TODO: Account for historical db
pub fn initialise_beacon_chain(
spec: &ChainSpec,
db_name: Option<&PathBuf>,
) -> Arc<BeaconChain<DiskDB, SystemTimeSlotClock, BitwiseLMDGhost<DiskDB>>> {
// set up the db
let db = Arc::new(DiskDB::open(
db_name.expect("Database directory must be included"),
None,
));
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, &spec);
let (genesis_state, _keypairs) = state_builder.build();
let mut genesis_block = BeaconBlock::empty(&spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root());
// Slot clock
let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot)
.expect("Unable to load SystemTimeSlotClock");
// Choose the fork choice
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone());
// Genesis chain
//TODO: Handle error correctly
Arc::new(
BeaconChain::from_genesis(
state_store.clone(),
block_store.clone(),
slot_clock,
genesis_state,
genesis_block,
spec.clone(),
fork_choice,
)
.expect("Terminate if beacon chain generation fails"),
)
}
/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time.
pub fn initialise_test_beacon_chain(
spec: &ChainSpec,
_db_name: Option<&PathBuf>,
) -> Arc<BeaconChain<MemoryDB, SystemTimeSlotClock, BitwiseLMDGhost<MemoryDB>>> {
let db = Arc::new(MemoryDB::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, spec);
let (genesis_state, _keypairs) = state_builder.build();
let mut genesis_block = BeaconBlock::empty(spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root());
// Slot clock
let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot)
.expect("Unable to load SystemTimeSlotClock");
// Choose the fork choice
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone());
// Genesis chain
//TODO: Handle error correctly
Arc::new(
BeaconChain::from_genesis(
state_store.clone(),
block_store.clone(),
slot_clock,
genesis_state,
genesis_block,
spec.clone(),
fork_choice,
)
.expect("Terminate if beacon chain generation fails"),
)
}

View File

@ -2,8 +2,13 @@ mod attestation_aggregator;
mod beacon_chain;
mod checkpoint;
mod errors;
pub mod initialise;
pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock};
pub use self::checkpoint::CheckPoint;
pub use self::errors::BeaconChainError;
pub use fork_choice::{ForkChoice, ForkChoiceAlgorithm, ForkChoiceError};
pub use db;
pub use fork_choice;
pub use parking_lot;
pub use slot_clock;
pub use types;

View File

@ -0,0 +1,21 @@
[package]
name = "client"
version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
network = { path = "../network" }
db = { path = "../db" }
rpc = { path = "../rpc" }
fork_choice = { path = "../../eth2/fork_choice" }
types = { path = "../../eth2/types" }
slot_clock = { path = "../../eth2/utils/slot_clock" }
error-chain = "0.12.0"
slog = "^2.2.3"
tokio = "0.1.15"
clap = "2.32.0"
dirs = "1.0.3"
exit-future = "0.1.3"
futures = "0.1.25"

View File

@ -0,0 +1,124 @@
use clap::ArgMatches;
use db::DBType;
use fork_choice::ForkChoiceAlgorithm;
use network::NetworkConfig;
use slog::error;
use std::fs;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr};
use std::path::PathBuf;
use types::multiaddr::Protocol;
use types::multiaddr::ToMultiaddr;
use types::ChainSpec;
/// Stores the client configuration for this Lighthouse instance.
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub data_dir: PathBuf,
pub spec: ChainSpec,
pub net_conf: network::NetworkConfig,
pub fork_choice: ForkChoiceAlgorithm,
pub db_type: DBType,
pub db_name: PathBuf,
pub rpc_conf: rpc::RPCConfig,
//pub ipc_conf:
}
impl Default for ClientConfig {
/// Build a new lighthouse configuration from defaults.
fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home dir.");
home.join(".lighthouse/")
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let default_spec = ChainSpec::lighthouse_testnet();
let default_net_conf = NetworkConfig::new(default_spec.boot_nodes.clone());
Self {
data_dir: data_dir.clone(),
// default to foundation for chain specs
spec: default_spec,
net_conf: default_net_conf,
// default to bitwise LMD Ghost
fork_choice: ForkChoiceAlgorithm::BitwiseLMDGhost,
// default to memory db for now
db_type: DBType::Memory,
// default db name for disk-based dbs
db_name: data_dir.join("chain.db"),
rpc_conf: rpc::RPCConfig::default(),
}
}
}
impl ClientConfig {
/// Parses the CLI arguments into a `Config` struct.
pub fn parse_args(args: ArgMatches, log: &slog::Logger) -> Result<Self, &'static str> {
let mut config = ClientConfig::default();
/* Network related arguments */
// Custom p2p listen port
if let Some(port_str) = args.value_of("port") {
if let Ok(port) = port_str.parse::<u16>() {
config.net_conf.listen_port = port;
// update the listening multiaddrs
for address in &mut config.net_conf.listen_addresses {
address.pop();
address.append(Protocol::Tcp(port));
}
} else {
error!(log, "Invalid port"; "port" => port_str);
return Err("Invalid port");
}
}
// Custom listening address ipv4/ipv6
// TODO: Handle list of addresses
if let Some(listen_address_str) = args.value_of("listen_address") {
if let Ok(listen_address) = listen_address_str.parse::<IpAddr>() {
let multiaddr = SocketAddr::new(listen_address, config.net_conf.listen_port)
.to_multiaddr()
.expect("Invalid listen address format");
config.net_conf.listen_addresses = vec![multiaddr];
} else {
error!(log, "Invalid IP Address"; "Address" => listen_address_str);
return Err("Invalid IP Address");
}
}
/* Filesystem related arguments */
// Custom datadir
if let Some(dir) = args.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
};
/* RPC related arguments */
if args.is_present("rpc") {
config.rpc_conf.enabled = true;
}
if let Some(rpc_address) = args.value_of("rpc-address") {
if let Ok(listen_address) = rpc_address.parse::<Ipv4Addr>() {
config.rpc_conf.listen_address = listen_address;
} else {
error!(log, "Invalid RPC listen address"; "Address" => rpc_address);
return Err("Invalid RPC listen address");
}
}
if let Some(rpc_port) = args.value_of("rpc-port") {
if let Ok(port) = rpc_port.parse::<u16>() {
config.rpc_conf.port = port;
} else {
error!(log, "Invalid RPC port"; "port" => rpc_port);
return Err("Invalid RPC port");
}
}
Ok(config)
}
}

View File

@ -0,0 +1,49 @@
use crate::ClientConfig;
use beacon_chain::{
db::{ClientDB, DiskDB, MemoryDB},
fork_choice::BitwiseLMDGhost,
initialise,
slot_clock::{SlotClock, SystemTimeSlotClock},
BeaconChain,
};
use fork_choice::ForkChoice;
use std::sync::Arc;
pub trait ClientTypes {
type DB: ClientDB + 'static;
type SlotClock: SlotClock + 'static;
type ForkChoice: ForkChoice + 'static;
fn initialise_beacon_chain(
config: &ClientConfig,
) -> Arc<BeaconChain<Self::DB, Self::SlotClock, Self::ForkChoice>>;
}
pub struct StandardClientType;
impl ClientTypes for StandardClientType {
type DB = DiskDB;
type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<DiskDB>;
fn initialise_beacon_chain(
config: &ClientConfig,
) -> Arc<BeaconChain<Self::DB, Self::SlotClock, Self::ForkChoice>> {
initialise::initialise_beacon_chain(&config.spec, Some(&config.db_name))
}
}
pub struct TestingClientType;
impl ClientTypes for TestingClientType {
type DB = MemoryDB;
type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<MemoryDB>;
fn initialise_beacon_chain(
config: &ClientConfig,
) -> Arc<BeaconChain<Self::DB, Self::SlotClock, Self::ForkChoice>> {
initialise::initialise_test_beacon_chain(&config.spec, None)
}
}

View File

@ -0,0 +1,14 @@
// generates error types
use network;
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {
links {
Network(network::error::Error, network::error::ErrorKind);
}
}

View File

@ -0,0 +1,77 @@
extern crate slog;
mod client_config;
pub mod client_types;
pub mod error;
pub mod notifier;
use beacon_chain::BeaconChain;
pub use client_config::ClientConfig;
pub use client_types::ClientTypes;
use exit_future::Signal;
use network::Service as NetworkService;
use slog::o;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
/// Main beacon node client service. This provides the connection and initialisation of the clients
/// sub-services in multiple threads.
pub struct Client<T: ClientTypes> {
/// Configuration for the lighthouse client.
config: ClientConfig,
/// The beacon chain for the running client.
beacon_chain: Arc<BeaconChain<T::DB, T::SlotClock, T::ForkChoice>>,
/// Reference to the network service.
pub network: Arc<NetworkService>,
/// Future to stop and begin shutdown of the Client.
//TODO: Decide best way to handle shutdown
pub exit: exit_future::Exit,
/// The sending future to call to terminate the Client.
//TODO: Decide best way to handle shutdown
pub exit_signal: Signal,
/// The clients logger.
log: slog::Logger,
/// Marker to pin the beacon chain generics.
phantom: PhantomData<T>,
}
impl<TClientType: ClientTypes> Client<TClientType> {
/// Generate an instance of the client. Spawn and link all internal sub-processes.
pub fn new(
config: ClientConfig,
log: slog::Logger,
executor: &TaskExecutor,
) -> error::Result<Self> {
let (exit_signal, exit) = exit_future::signal();
// generate a beacon chain
let beacon_chain = TClientType::initialise_beacon_chain(&config);
// Start the network service, libp2p and syncing threads
// TODO: Add beacon_chain reference to network parameters
let network_config = &config.net_conf;
let network_logger = log.new(o!("Service" => "Network"));
let (network, _network_send) = NetworkService::new(
beacon_chain.clone(),
network_config,
executor,
network_logger,
)?;
// spawn the RPC server
if config.rpc_conf.enabled {
rpc::start_server(&config.rpc_conf, &log);
}
Ok(Client {
config,
beacon_chain,
exit,
exit_signal,
log,
network,
phantom: PhantomData,
})
}
}

View File

@ -0,0 +1,45 @@
use crate::Client;
use crate::ClientTypes;
use exit_future::Exit;
use futures::{Future, Stream};
use slog::{debug, info, o};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::runtime::TaskExecutor;
use tokio::timer::Interval;
/// Thread that monitors the client and reports useful statistics to the user.
pub fn run<T: ClientTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exit) {
// notification heartbeat
let interval = Interval::new(Instant::now(), Duration::from_secs(5));
let log = client.log.new(o!("Service" => "Notifier"));
// TODO: Debugging only
let counter = Arc::new(Mutex::new(0));
let network = client.network.clone();
// build heartbeat logic here
let heartbeat = move |_| {
info!(log, "Temp heartbeat output");
//TODO: Remove this logic. Testing only
let mut count = counter.lock().unwrap();
*count += 1;
if *count % 5 == 0 {
debug!(log, "Sending Message");
network.send_message();
}
Ok(())
};
// map error and spawn
let log = client.log.clone();
let heartbeat_interval = interval
.map_err(move |e| debug!(log, "Timer error {}", e))
.for_each(heartbeat);
executor.spawn(exit.until(heartbeat_interval).map(|_| ()));
}

View File

@ -12,3 +12,10 @@ use self::stores::COLUMNS;
pub use self::disk_db::DiskDB;
pub use self::memory_db::MemoryDB;
pub use self::traits::{ClientDB, DBError, DBValue};
/// Currently available database options
#[derive(Debug, Clone)]
pub enum DBType {
Memory,
RocksDB,
}

View File

@ -0,0 +1,17 @@
[package]
name = "eth2-libp2p"
version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
# SigP repository until PR is merged
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }
ssz_derive = { path = "../../eth2/utils/ssz_derive" }
slog = "2.4.1"
version = { path = "../version" }
tokio = "0.1.16"
futures = "0.1.25"
error-chain = "0.12.0"

View File

@ -0,0 +1,96 @@
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
use futures::prelude::*;
use libp2p::{
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use types::Topic;
/// Builds the network behaviour for the libp2p Swarm.
/// Implements gossipsub message routing.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
gossipsub: Gossipsub<TSubstream>,
// TODO: Add Kademlia for peer discovery
/// The events generated by this behaviour to be consumed in the swarm poll.
serenity_rpc: Rpc<TSubstream>,
#[behaviour(ignore)]
events: Vec<BehaviourEvent>,
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(message) => {
let gs_message = String::from_utf8_lossy(&message.data);
// TODO: Remove this type - debug only
self.events
.push(BehaviourEvent::Message(gs_message.to_string()))
}
_ => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: RPCMessage) {
match event {
RPCMessage::PeerDialed(peer_id) => {
self.events.push(BehaviourEvent::PeerDialed(peer_id))
}
RPCMessage::RPC(peer_id, rpc_event) => {
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))
}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self {
Behaviour {
gossipsub: Gossipsub::new(local_peer_id, gs_config),
serenity_rpc: Rpc::new(log),
events: Vec::new(),
}
}
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
}
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: Topic) -> bool {
self.gossipsub.subscribe(topic)
}
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
// TODO: This is a stub at the moment
Message(String),
}

View File

@ -0,0 +1,8 @@
// generates error types
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {}

View File

@ -0,0 +1,20 @@
/// This crate contains the main link for lighthouse to rust-libp2p. It therefore re-exports
/// all required libp2p functionality.
///
/// This crate builds and manages the libp2p services required by the beacon node.
pub mod behaviour;
pub mod error;
mod network_config;
pub mod rpc;
mod service;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
};
pub use network_config::NetworkConfig;
pub use rpc::{HelloMessage, RPCEvent};
pub use service::Libp2pEvent;
pub use service::Service;
pub use types::multiaddr;
pub use types::Multiaddr;

View File

@ -0,0 +1,59 @@
use crate::Multiaddr;
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
use libp2p::secio;
use std::fmt;
#[derive(Clone)]
/// Network configuration for lighthouse.
pub struct NetworkConfig {
//TODO: stubbing networking initial params, change in the future
/// IP address to listen on.
pub listen_addresses: Vec<Multiaddr>,
/// Listen port UDP/TCP.
pub listen_port: u16,
/// Gossipsub configuration parameters.
pub gs_config: GossipsubConfig,
/// List of nodes to initially connect to.
pub boot_nodes: Vec<Multiaddr>,
/// Peer key related to this nodes PeerId.
pub local_private_key: secio::SecioKeyPair,
/// Client version
pub client_version: String,
/// List of topics to subscribe to as strings
pub topics: Vec<String>,
}
impl Default for NetworkConfig {
/// Generate a default network configuration.
fn default() -> Self {
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
NetworkConfig {
listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000"
.parse()
.expect("is a correct multi-address")],
listen_port: 9000,
gs_config: GossipsubConfigBuilder::new().build(),
boot_nodes: Vec::new(),
local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(),
client_version: version::version(),
topics: vec![String::from("beacon_chain")],
}
}
}
impl NetworkConfig {
pub fn new(boot_nodes: Vec<Multiaddr>) -> Self {
let mut conf = NetworkConfig::default();
conf.boot_nodes = boot_nodes;
conf
}
}
impl fmt::Debug for NetworkConfig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: <Secio-PubKey {:?}>, client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version)
}
}

View File

@ -0,0 +1,161 @@
/// Available RPC methods types and ids.
use ssz_derive::{Decode, Encode};
use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
#[derive(Debug)]
/// Available Serenity Libp2p RPC methods
pub enum RPCMethod {
/// Initialise handshake between connecting peers.
Hello,
/// Terminate a connection providing a reason.
Goodbye,
/// Requests a number of beacon block roots.
BeaconBlockRoots,
/// Requests a number of beacon block headers.
BeaconBlockHeaders,
/// Requests a number of beacon block bodies.
BeaconBlockBodies,
/// Requests values for a merkle proof for the current blocks state root.
BeaconChainState, // Note: experimental, not complete.
/// Unknown method received.
Unknown,
}
impl From<u16> for RPCMethod {
fn from(method_id: u16) -> Self {
match method_id {
0 => RPCMethod::Hello,
1 => RPCMethod::Goodbye,
10 => RPCMethod::BeaconBlockRoots,
11 => RPCMethod::BeaconBlockHeaders,
12 => RPCMethod::BeaconBlockBodies,
13 => RPCMethod::BeaconChainState,
_ => RPCMethod::Unknown,
}
}
}
impl Into<u16> for RPCMethod {
fn into(self) -> u16 {
match self {
RPCMethod::Hello => 0,
RPCMethod::Goodbye => 1,
RPCMethod::BeaconBlockRoots => 10,
RPCMethod::BeaconBlockHeaders => 11,
RPCMethod::BeaconBlockBodies => 12,
RPCMethod::BeaconChainState => 13,
_ => 0,
}
}
}
#[derive(Debug, Clone)]
pub enum RPCRequest {
Hello(HelloMessage),
Goodbye(u64),
BeaconBlockRoots(BeaconBlockRootsRequest),
BeaconBlockHeaders(BeaconBlockHeadersRequest),
BeaconBlockBodies(BeaconBlockBodiesRequest),
BeaconChainState(BeaconChainStateRequest),
}
#[derive(Debug, Clone)]
pub enum RPCResponse {
Hello(HelloMessage),
BeaconBlockRoots(BeaconBlockRootsResponse),
BeaconBlockHeaders(BeaconBlockHeadersResponse),
BeaconBlockBodies(BeaconBlockBodiesResponse),
BeaconChainState(BeaconChainStateResponse),
}
/* Request/Response data structures for RPC methods */
/// The HELLO request/response handshake message.
#[derive(Encode, Decode, Clone, Debug)]
pub struct HelloMessage {
/// The network ID of the peer.
pub network_id: u8,
/// The peers last finalized root.
pub latest_finalized_root: Hash256,
/// The peers last finalized epoch.
pub latest_finalized_epoch: Epoch,
/// The peers last block root.
pub best_root: Hash256,
/// The peers last slot.
pub best_slot: Slot,
}
/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconBlockRootsRequest {
/// The starting slot of the requested blocks.
start_slot: Slot,
/// The number of blocks from the start slot.
count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers
}
/// Response containing a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconBlockRootsResponse {
/// List of requested blocks and associated slots.
roots: Vec<BlockRootSlot>,
}
/// Contains a block root and associated slot.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BlockRootSlot {
/// The block root.
block_root: Hash256,
/// The block slot.
slot: Slot,
}
/// Request a number of beacon block headers from a peer.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconBlockHeadersRequest {
/// The starting header hash of the requested headers.
start_root: Hash256,
/// The starting slot of the requested headers.
start_slot: Slot,
/// The maximum number of headers than can be returned.
max_headers: u64,
/// The maximum number of slots to skip between blocks.
skip_slots: u64,
}
/// Response containing requested block headers.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconBlockHeadersResponse {
/// The list of requested beacon block headers.
headers: Vec<BeaconBlockHeader>,
}
/// Request a number of beacon block bodies from a peer.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconBlockBodiesRequest {
/// The list of beacon block bodies being requested.
block_roots: Hash256,
}
/// Response containing the list of requested beacon block bodies.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconBlockBodiesResponse {
/// The list of beacon block bodies being requested.
block_bodies: Vec<BeaconBlockBody>,
}
/// Request values for tree hashes which yield a blocks `state_root`.
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconChainStateRequest {
/// The tree hashes that a value is requested for.
hashes: Vec<Hash256>,
}
/// Request values for tree hashes which yield a blocks `state_root`.
// Note: TBD
#[derive(Encode, Decode, Clone, Debug)]
pub struct BeaconChainStateResponse {
/// The values corresponding the to the requested tree hashes.
values: bool, //TBD - stubbed with encodeable bool
}

View File

@ -0,0 +1,138 @@
/// RPC Protocol over libp2p.
///
/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on
/// `/eth/serenity/rpc/1.0.0`
mod methods;
mod protocol;
use futures::prelude::*;
use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler};
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::{Multiaddr, PeerId};
pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
pub use protocol::{RPCEvent, RPCProtocol};
use slog::o;
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};
/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0
/// specification.
pub struct Rpc<TSubstream> {
/// Queue of events to processed.
events: Vec<NetworkBehaviourAction<RPCEvent, RPCMessage>>,
/// Pins the generic substream.
marker: PhantomData<TSubstream>,
/// Slog logger for RPC behaviour.
log: slog::Logger,
}
impl<TSubstream> Rpc<TSubstream> {
pub fn new(log: &slog::Logger) -> Self {
let log = log.new(o!("Service" => "Libp2p-RPC"));
Rpc {
events: Vec::new(),
marker: PhantomData,
log,
}
}
/// Submits and RPC request.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id,
event: rpc_event,
});
}
}
impl<TSubstream> NetworkBehaviour for Rpc<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RPCEvent, OneShotEvent>;
type OutEvent = RPCMessage;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
Default::default()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) {
// if initialised the connection, report this upwards to send the HELLO request
if let ConnectedPoint::Dialer { address: _ } = connected_point {
self.events.push(NetworkBehaviourAction::GenerateEvent(
RPCMessage::PeerDialed(peer_id),
));
}
}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
fn inject_node_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
// ignore successful send events
let event = match event {
OneShotEvent::Rx(event) => event,
OneShotEvent::Sent => return,
};
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC(
source, event,
)));
}
fn poll(
&mut self,
_: &mut PollParameters<'_>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if !self.events.is_empty() {
return Async::Ready(self.events.remove(0));
}
Async::NotReady
}
}
/// Messages sent to the user from the RPC protocol.
pub enum RPCMessage {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
}
/// Transmission between the `OneShotHandler` and the `RPCEvent`.
#[derive(Debug)]
pub enum OneShotEvent {
/// We received an RPC from a remote.
Rx(RPCEvent),
/// We successfully sent an RPC request.
Sent,
}
impl From<RPCEvent> for OneShotEvent {
#[inline]
fn from(rpc: RPCEvent) -> OneShotEvent {
OneShotEvent::Rx(rpc)
}
}
impl From<()> for OneShotEvent {
#[inline]
fn from(_: ()) -> OneShotEvent {
OneShotEvent::Sent
}
}

View File

@ -0,0 +1,181 @@
use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
use std::io;
use std::iter;
use tokio::io::{AsyncRead, AsyncWrite};
/// The maximum bytes that can be sent across the RPC.
const MAX_READ_SIZE: usize = 2048;
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
#[derive(Debug, Clone)]
pub struct RPCProtocol;
impl UpgradeInfo for RPCProtocol {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/eth/serenity/rpc/1.0.0")
}
}
impl Default for RPCProtocol {
fn default() -> Self {
RPCProtocol
}
}
/// The RPC types which are sent/received in this protocol.
#[derive(Debug, Clone)]
pub enum RPCEvent {
Request {
id: u64,
method_id: u16,
body: RPCRequest,
},
Response {
id: u64,
method_id: u16, //TODO: Remove and process decoding upstream
result: RPCResponse,
},
}
impl UpgradeInfo for RPCEvent {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/eth/serenity/rpc/1.0.0")
}
}
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = RPCEvent;
type Error = DecodeError;
type Future =
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
}
}
fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
// decode the header of the rpc
// request/response
let (request, index) = bool::ssz_decode(&packet, 0)?;
let (id, index) = u64::ssz_decode(&packet, index)?;
let (method_id, index) = u16::ssz_decode(&packet, index)?;
if request {
let body = match RPCMethod::from(method_id) {
RPCMethod::Hello => {
let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCRequest::Hello(hello_body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Request {
id,
method_id,
body,
})
}
// we have received a response
else {
let result = match RPCMethod::from(method_id) {
RPCMethod::Hello => {
let (body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCResponse::Hello(body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Response {
id,
method_id,
result,
})
}
}
impl<TSocket> OutboundUpgrade<TSocket> for RPCEvent
where
TSocket: AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = upgrade::WriteOne<TSocket>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
let bytes = ssz_encode(&self);
upgrade::write_one(socket, bytes)
}
}
impl Encodable for RPCEvent {
fn ssz_append(&self, s: &mut SszStream) {
match self {
RPCEvent::Request {
id,
method_id,
body,
} => {
s.append(&true);
s.append(id);
s.append(method_id);
match body {
RPCRequest::Hello(body) => {
s.append(body);
}
_ => {}
}
}
RPCEvent::Response {
id,
method_id,
result,
} => {
s.append(&false);
s.append(id);
s.append(method_id);
match result {
RPCResponse::Hello(response) => {
s.append(response);
}
_ => {}
}
}
}
}
}
#[derive(Debug)]
pub enum DecodeError {
ReadError(upgrade::ReadOneError),
SSZDecodeError(ssz::DecodeError),
UnknownRPCMethod,
}
impl From<upgrade::ReadOneError> for DecodeError {
#[inline]
fn from(err: upgrade::ReadOneError) -> Self {
DecodeError::ReadError(err)
}
}
impl From<ssz::DecodeError> for DecodeError {
#[inline]
fn from(err: ssz::DecodeError) -> Self {
DecodeError::SSZDecodeError(err)
}
}

View File

@ -0,0 +1,163 @@
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::error;
use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent;
use crate::NetworkConfig;
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
muxing::StreamMuxerBox,
nodes::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
};
use libp2p::{core, secio, Transport};
use libp2p::{PeerId, Swarm};
use slog::{debug, info, trace, warn};
use std::io::{Error, ErrorKind};
use std::time::Duration;
use types::TopicBuilder;
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service {
/// The libp2p Swarm handler.
//TODO: Make this private
pub swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Behaviour<Substream<StreamMuxerBox>>>,
/// This node's PeerId.
local_peer_id: PeerId,
/// The libp2p logger handle.
pub log: slog::Logger,
}
impl Service {
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
debug!(log, "Libp2p Service starting");
let local_private_key = config.local_private_key;
let local_peer_id = local_private_key.to_peer_id();
info!(log, "Local peer id: {:?}", local_peer_id);
let mut swarm = {
// Set up the transport
let transport = build_transport(local_private_key);
// Set up gossipsub routing
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log);
// Set up Topology
let topology = local_peer_id.clone();
Swarm::new(transport, behaviour, topology)
};
// listen on all addresses
for address in &config.listen_addresses {
match Swarm::listen_on(&mut swarm, address.clone()) {
Ok(mut listen_addr) => {
listen_addr.append(Protocol::P2p(local_peer_id.clone().into()));
info!(log, "Listening on: {}", listen_addr);
}
Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err),
};
}
// connect to boot nodes - these are currently stored as multiaddrs
// Once we have discovery, can set to peerId
for bootnode in config.boot_nodes {
match Swarm::dial_addr(&mut swarm, bootnode.clone()) {
Ok(()) => debug!(log, "Dialing bootnode: {}", bootnode),
Err(err) => debug!(
log,
"Could not connect to bootnode: {} error: {:?}", bootnode, err
),
};
}
// subscribe to default gossipsub topics
let mut subscribed_topics = vec![];
for topic in config.topics {
let t = TopicBuilder::new(topic.to_string()).build();
if swarm.subscribe(t) {
trace!(log, "Subscribed to topic: {:?}", topic);
subscribed_topics.push(topic);
} else {
warn!(log, "Could not subscribe to topic: {:?}", topic)
}
}
info!(log, "Subscribed to topics: {:?}", subscribed_topics);
Ok(Service {
local_peer_id,
swarm,
log,
})
}
}
impl Stream for Service {
type Item = Libp2pEvent;
type Error = crate::error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// TODO: Currently only gossipsub events passed here.
// Build a type for more generic events
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => {
// TODO: Stub here for debugging
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
}
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break,
_ => break,
}
}
Ok(Async::NotReady)
}
}
/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and
/// mplex or yamux as the multiplexing layer.
fn build_transport(
local_private_key: secio::SecioKeyPair,
) -> Boxed<(PeerId, StreamMuxerBox), Error> {
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
// in the future.
let transport = libp2p::tcp::TcpConfig::new();
let transport = libp2p::dns::DnsConfig::new(transport);
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
};
transport
.with_upgrade(secio::SecioConfig::new(local_private_key))
.and_then(move |out, endpoint| {
let peer_id = out.remote_key.into_peer_id();
let peer_id2 = peer_id.clone();
let upgrade = core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
libp2p::mplex::MplexConfig::new(),
)
// TODO: use a single `.map` instead of two maps
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
core::upgrade::apply(out.stream, upgrade, endpoint)
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
.with_timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()
}
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent {
// We have received an RPC event on the swarm
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Message(String),
}

View File

@ -0,0 +1,16 @@
[package]
name = "network"
version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
eth2-libp2p = { path = "../eth2-libp2p" }
version = { path = "../version" }
types = { path = "../../eth2/types" }
slog = "2.4.1"
futures = "0.1.25"
error-chain = "0.12.0"
crossbeam-channel = "0.3.8"
tokio = "0.1.16"

View File

@ -0,0 +1,43 @@
use beacon_chain::BeaconChain as RawBeaconChain;
use beacon_chain::{
db::ClientDB,
fork_choice::ForkChoice,
parking_lot::RwLockReadGuard,
slot_clock::SlotClock,
types::{BeaconState, ChainSpec},
CheckPoint,
};
/// The network's API to the beacon chain.
pub trait BeaconChain: Send + Sync {
fn get_spec(&self) -> &ChainSpec;
fn get_state(&self) -> RwLockReadGuard<BeaconState>;
fn head(&self) -> RwLockReadGuard<CheckPoint>;
fn finalized_head(&self) -> RwLockReadGuard<CheckPoint>;
}
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
where
T: ClientDB + Sized,
U: SlotClock,
F: ForkChoice,
{
fn get_spec(&self) -> &ChainSpec {
&self.spec
}
fn get_state(&self) -> RwLockReadGuard<BeaconState> {
self.state.read()
}
fn head(&self) -> RwLockReadGuard<CheckPoint> {
self.head()
}
fn finalized_head(&self) -> RwLockReadGuard<CheckPoint> {
self.finalized_head()
}
}

View File

@ -0,0 +1,13 @@
// generates error types
use eth2_libp2p;
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {
links {
Libp2p(eth2_libp2p::error::Error, eth2_libp2p::error::ErrorKind);
}
}

View File

@ -0,0 +1,9 @@
/// This crate provides the network server for Lighthouse.
pub mod beacon_chain;
pub mod error;
mod message_handler;
mod service;
pub mod sync;
pub use eth2_libp2p::NetworkConfig;
pub use service::Service;

View File

@ -0,0 +1,225 @@
use crate::beacon_chain::BeaconChain;
use crate::error;
use crate::service::{NetworkMessage, OutgoingMessage};
use crate::sync::SimpleSync;
use crossbeam_channel::{unbounded as channel, Sender};
use eth2_libp2p::{
rpc::{RPCMethod, RPCRequest, RPCResponse},
HelloMessage, PeerId, RPCEvent,
};
use futures::future;
use slog::warn;
use slog::{debug, trace};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Timeout for RPC requests.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// Timeout before banning a peer for non-identification.
const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler {
/// Currently loaded and initialised beacon chain.
chain: Arc<BeaconChain>,
/// The syncing framework.
sync: SimpleSync,
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
requests: HashMap<(PeerId, u64), Instant>,
/// A counter of request id for each peer.
request_ids: HashMap<PeerId, u64>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
/// Types of messages the handler can receive.
#[derive(Debug, Clone)]
pub enum HandlerMessage {
/// We have initiated a connection to a new peer.
PeerDialed(PeerId),
/// Peer has disconnected,
PeerDisconnected(PeerId),
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A block has been imported.
BlockImported(), //TODO: This comes from pub-sub - decide its contents
}
impl MessageHandler {
/// Initializes and runs the MessageHandler.
pub fn spawn(
beacon_chain: Arc<BeaconChain>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
executor: &tokio::runtime::TaskExecutor,
log: slog::Logger,
) -> error::Result<Sender<HandlerMessage>> {
debug!(log, "Service starting");
let (handler_send, handler_recv) = channel();
// Initialise sync and begin processing in thread
// generate the Message handler
let sync = SimpleSync::new(beacon_chain.clone(), &log);
let mut handler = MessageHandler {
// TODO: The handler may not need a chain, perhaps only sync?
chain: beacon_chain.clone(),
sync,
network_send,
requests: HashMap::new(),
request_ids: HashMap::new(),
log: log.clone(),
};
// spawn handler task
// TODO: Handle manual termination of thread
executor.spawn(future::poll_fn(move || -> Result<_, _> {
loop {
handler.handle_message(handler_recv.recv().map_err(|_| {
debug!(log, "Network message handler terminated.");
})?);
}
}));
Ok(handler_send)
}
/// Handle all messages incoming from the network service.
fn handle_message(&mut self, message: HandlerMessage) {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
let id = self.generate_request_id(&peer_id);
self.send_hello(peer_id, id, true);
}
// we have received an RPC message request/response
HandlerMessage::RPC(peer_id, rpc_event) => {
self.handle_rpc_message(peer_id, rpc_event);
}
//TODO: Handle all messages
_ => {}
}
}
/* RPC - Related functionality */
/// Handle RPC messages
fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) {
match rpc_message {
RPCEvent::Request { id, body, .. // TODO: Clean up RPC Message types, have a cleaner type by this point.
} => self.handle_rpc_request(peer_id, id, body),
RPCEvent::Response { id, result, .. } => self.handle_rpc_response(peer_id, id, result),
}
}
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) {
match request {
RPCRequest::Hello(hello_message) => {
self.handle_hello_request(peer_id, id, hello_message)
}
// TODO: Handle all requests
_ => {}
}
}
/// An RPC response has been received from the network.
// we match on id and ignore responses past the timeout.
fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {
// if response id is related to a request, ignore (likely RPC timeout)
if self.requests.remove(&(peer_id.clone(), id)).is_none() {
debug!(self.log, "Unrecognized response from peer: {:?}", peer_id);
return;
}
match response {
RPCResponse::Hello(hello_message) => {
debug!(self.log, "Hello response received from peer: {:?}", peer_id);
self.validate_hello(peer_id, hello_message);
}
// TODO: Handle all responses
_ => {}
}
}
/// Handle a HELLO RPC request message.
fn handle_hello_request(&mut self, peer_id: PeerId, id: u64, hello_message: HelloMessage) {
// send back a HELLO message
self.send_hello(peer_id.clone(), id, false);
// validate the peer
self.validate_hello(peer_id, hello_message);
}
/// Validate a HELLO RPC message.
fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) {
// validate the peer
if !self.sync.validate_peer(peer_id.clone(), message) {
debug!(
self.log,
"Peer dropped due to mismatching HELLO messages: {:?}", peer_id
);
//TODO: block/ban the peer
}
}
/* General RPC helper functions */
/// Generates a new request id for a peer.
fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 {
// generate a unique id for the peer
let id = {
let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0);
let id = borrowed_id.clone();
//increment the counter
*borrowed_id += 1;
id
};
// register RPC request
self.requests.insert((peer_id.clone(), id), Instant::now());
debug!(
self.log,
"Hello request registered with peer: {:?}", peer_id
);
id
}
/// Sends a HELLO RPC request or response to a newly connected peer.
//TODO: The boolean determines if sending request/respond, will be cleaner in the RPC re-write
fn send_hello(&mut self, peer_id: PeerId, id: u64, is_request: bool) {
let rpc_event = if is_request {
RPCEvent::Request {
id,
method_id: RPCMethod::Hello.into(),
body: RPCRequest::Hello(self.sync.generate_hello()),
}
} else {
RPCEvent::Response {
id,
method_id: RPCMethod::Hello.into(),
result: RPCResponse::Hello(self.sync.generate_hello()),
}
};
// send the hello request to the network
trace!(self.log, "Sending HELLO message to peer {:?}", peer_id);
self.send_rpc(peer_id, rpc_event);
}
/// Sends an RPC request/response to the network server.
fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) {
self.network_send
.send(NetworkMessage::Send(
peer_id,
OutgoingMessage::RPC(rpc_event),
))
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send RPC message to the network service"
)
});
}
}

View File

@ -0,0 +1,180 @@
use crate::beacon_chain::BeaconChain;
use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::NetworkConfig;
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use eth2_libp2p::RPCEvent;
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{Libp2pEvent, PeerId};
use futures::prelude::*;
use futures::sync::oneshot;
use futures::Stream;
use slog::{debug, info, o, trace};
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service {
//eth2_libp2p_service: Arc<Mutex<LibP2PService>>,
eth2_libp2p_exit: oneshot::Sender<()>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
//message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>,
}
impl Service {
pub fn new(
beacon_chain: Arc<BeaconChain>,
config: &NetworkConfig,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> {
// build the network channel
let (network_send, network_recv) = channel::<NetworkMessage>();
// launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::spawn(
beacon_chain,
network_send.clone(),
executor,
message_handler_log,
)?;
// launch eth2_libp2p service
let eth2_libp2p_log = log.new(o!("Service" => "Libp2p"));
let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?;
// TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread.
let eth2_libp2p_exit = spawn_service(
eth2_libp2p_service,
network_recv,
message_handler_send,
executor,
log,
)?;
let network_service = Service {
eth2_libp2p_exit,
network_send: network_send.clone(),
};
Ok((Arc::new(network_service), network_send))
}
// TODO: Testing only
pub fn send_message(&self) {
self.network_send
.send(NetworkMessage::Send(
PeerId::random(),
OutgoingMessage::NotifierTest,
))
.unwrap();
}
}
fn spawn_service(
eth2_libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<oneshot::Sender<()>> {
let (network_exit, exit_rx) = oneshot::channel();
// spawn on the current executor
executor.spawn(
network_service(
eth2_libp2p_service,
network_recv,
message_handler_send,
log.clone(),
)
// allow for manual termination
.select(exit_rx.then(|_| Ok(())))
.then(move |_| {
info!(log.clone(), "Network service shutdown");
Ok(())
}),
);
Ok(network_exit)
}
fn network_service(
mut eth2_libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
log: slog::Logger,
) -> impl futures::Future<Item = (), Error = eth2_libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
// poll the swarm
loop {
match eth2_libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => {
trace!(
eth2_libp2p_service.log,
"RPC Event: RPC message received: {:?}",
rpc_event
);
message_handler_send
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
eth2_libp2p_service.log,
"Network Service: Message received: {}", m
),
_ => break,
}
}
// poll the network channel
// TODO: refactor - combine poll_fn's?
loop {
match network_recv.try_recv() {
// TODO: Testing message - remove
Ok(NetworkMessage::Send(peer_id, outgoing_message)) => {
match outgoing_message {
OutgoingMessage::RPC(rpc_event) => {
trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier");
}
};
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Err(eth2_libp2p::error::Error::from(
"Network channel disconnected",
));
}
}
}
Ok(Async::NotReady)
})
}
/// Types of messages that the network service can receive.
#[derive(Debug, Clone)]
pub enum NetworkMessage {
/// Send a message to eth2_libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
}
/// Type of outgoing messages that can be sent through the network service.
#[derive(Debug, Clone)]
pub enum OutgoingMessage {
/// Send an RPC request/response.
RPC(RPCEvent),
//TODO: Remove
NotifierTest,
}

View File

@ -0,0 +1,11 @@
/// Syncing for lighthouse.
///
/// Stores the various syncing methods for the beacon chain.
mod simple_sync;
pub use simple_sync::SimpleSync;
/// Currently implemented sync methods.
pub enum SyncMethod {
SimpleSync,
}

View File

@ -0,0 +1,112 @@
use crate::beacon_chain::BeaconChain;
use eth2_libp2p::rpc::HelloMessage;
use eth2_libp2p::PeerId;
use slog::{debug, o};
use std::collections::HashMap;
use std::sync::Arc;
use types::{Epoch, Hash256, Slot};
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// Keeps track of syncing information for known connected peers.
pub struct PeerSyncInfo {
latest_finalized_root: Hash256,
latest_finalized_epoch: Epoch,
best_root: Hash256,
best_slot: Slot,
}
/// The current syncing state.
#[derive(PartialEq)]
pub enum SyncState {
Idle,
Downloading,
Stopped,
}
/// Simple Syncing protocol.
//TODO: Decide for HELLO messages whether its better to keep current in RAM or build on the fly
//when asked.
pub struct SimpleSync {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain>,
/// A mapping of Peers to their respective PeerSyncInfo.
known_peers: HashMap<PeerId, PeerSyncInfo>,
/// The current state of the syncing protocol.
state: SyncState,
/// The network id, for quick HELLO RPC message lookup.
network_id: u8,
/// The latest epoch of the syncing chain.
latest_finalized_epoch: Epoch,
/// The latest block of the syncing chain.
latest_slot: Slot,
/// Sync logger.
log: slog::Logger,
}
impl SimpleSync {
pub fn new(beacon_chain: Arc<BeaconChain>, log: &slog::Logger) -> Self {
let state = beacon_chain.get_state();
let sync_logger = log.new(o!("Service"=> "Sync"));
SimpleSync {
chain: beacon_chain.clone(),
known_peers: HashMap::new(),
state: SyncState::Idle,
network_id: beacon_chain.get_spec().network_id,
latest_finalized_epoch: state.finalized_epoch,
latest_slot: state.slot - 1, //TODO: Build latest block function into Beacon chain and correct this
log: sync_logger,
}
}
/// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage {
let state = &self.chain.get_state();
//TODO: Paul to verify the logic of these fields.
HelloMessage {
network_id: self.network_id,
latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch,
best_root: Hash256::zero(), //TODO: build correct value as a beacon chain function
best_slot: state.slot - 1,
}
}
pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool {
// network id must match
if hello_message.network_id != self.network_id {
return false;
}
// compare latest epoch and finalized root to see if they exist in our chain
if hello_message.latest_finalized_epoch <= self.latest_finalized_epoch {
// ensure their finalized root is in our chain
// TODO: Get the finalized root at hello_message.latest_epoch and ensure they match
//if (hello_message.latest_finalized_root == self.chain.get_state() {
// return false;
// }
}
// the client is valid, add it to our list of known_peers and request sync if required
// update peer list if peer already exists
let peer_info = PeerSyncInfo {
latest_finalized_root: hello_message.latest_finalized_root,
latest_finalized_epoch: hello_message.latest_finalized_epoch,
best_root: hello_message.best_root,
best_slot: hello_message.best_slot,
};
debug!(self.log, "Handshake successful. Peer: {:?}", peer_id);
self.known_peers.insert(peer_id, peer_info);
// set state to sync
if self.state == SyncState::Idle
&& hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE
{
self.state = SyncState::Downloading;
//TODO: Start requesting blocks from known peers. Ideally in batches
}
true
}
}

View File

@ -0,0 +1,23 @@
[package]
name = "rpc"
version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
bls = { path = "../../eth2/utils/bls" }
beacon_chain = { path = "../beacon_chain" }
protos = { path = "../../protos" }
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2"
clap = "2.32.0"
db = { path = "../db" }
dirs = "1.0.3"
futures = "0.1.23"
slog = "^2.2.3"
slot_clock = { path = "../../eth2/utils/slot_clock" }
slog-term = "^2.4.0"
slog-async = "^2.3.0"
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }

View File

@ -0,0 +1,22 @@
use std::net::Ipv4Addr;
/// RPC Configuration
#[derive(Debug, Clone)]
pub struct Config {
/// Enable the RPC server.
pub enabled: bool,
/// The IPv4 address the RPC will listen on.
pub listen_address: Ipv4Addr,
/// The port the RPC will listen on.
pub port: u16,
}
impl Default for Config {
fn default() -> Self {
Config {
enabled: false, // rpc disabled by default
listen_address: Ipv4Addr::new(127, 0, 0, 1),
port: 5051,
}
}
}

View File

@ -1,16 +1,18 @@
mod beacon_block;
pub mod config;
mod validator;
use self::beacon_block::BeaconBlockServiceInstance;
use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig;
use grpcio::{Environment, Server, ServerBuilder};
use protos::services_grpc::{create_beacon_block_service, create_validator_service};
use std::sync::Arc;
use slog::{info, Logger};
use slog::{info, o};
pub fn start_server(log: Logger) -> Server {
let log_clone = log.clone();
pub fn start_server(config: &RPCConfig, log: &slog::Logger) -> Server {
let log = log.new(o!("Service"=>"RPC"));
let env = Arc::new(Environment::new(1));
let beacon_block_service = {
@ -25,12 +27,12 @@ pub fn start_server(log: Logger) -> Server {
let mut server = ServerBuilder::new(env)
.register_service(beacon_block_service)
.register_service(validator_service)
.bind("127.0.0.1", 50_051)
.bind(config.listen_address.to_string(), config.port)
.build()
.unwrap();
server.start();
for &(ref host, port) in server.bind_addrs() {
info!(log_clone, "gRPC listening on {}:{}", host, port);
info!(log, "gRPC listening on {}:{}", host, port);
}
server
}

View File

@ -1,30 +0,0 @@
use std::fs;
use std::path::PathBuf;
/// Stores the core configuration for this Lighthouse instance.
/// This struct is general, other components may implement more
/// specialized config structs.
#[derive(Clone)]
pub struct LighthouseConfig {
pub data_dir: PathBuf,
pub p2p_listen_port: u16,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse";
impl LighthouseConfig {
/// Build a new lighthouse configuration from defaults.
pub fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let p2p_listen_port = 0;
Self {
data_dir,
p2p_listen_port,
}
}
}

View File

@ -1,35 +1,20 @@
extern crate slog;
mod config;
mod rpc;
mod run;
use std::path::PathBuf;
use crate::config::LighthouseConfig;
use crate::rpc::start_server;
use beacon_chain::BeaconChain;
use clap::{App, Arg};
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
MemoryDB,
};
use fork_choice::BitwiseLMDGhost;
use slog::{error, info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use ssz::TreeHash;
use std::sync::Arc;
use types::test_utils::TestingBeaconStateBuilder;
use types::*;
use client::ClientConfig;
use slog::{error, o, Drain};
fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
let logger = slog::Logger::root(drain, o!());
let matches = App::new("Lighthouse")
.version("0.0.1")
.author("Sigma Prime <paul@sigmaprime.io>")
.version(version::version().as_str())
.author("Sigma Prime <contact@sigmaprime.io>")
.about("Eth 2.0 Client")
.arg(
Arg::with_name("datadir")
@ -38,6 +23,13 @@ fn main() {
.help("Data directory for keys and databases.")
.takes_value(true),
)
.arg(
Arg::with_name("listen_address")
.long("listen-address")
.value_name("Listen Address")
.help("The Network address to listen for p2p connections.")
.takes_value(true),
)
.arg(
Arg::with_name("port")
.long("port")
@ -45,64 +37,34 @@ fn main() {
.help("Network listen port for p2p connections.")
.takes_value(true),
)
.arg(
Arg::with_name("rpc")
.long("rpc")
.value_name("RPC")
.help("Enable the RPC server.")
.takes_value(false),
)
.arg(
Arg::with_name("rpc-address")
.long("rpc-address")
.value_name("RPCADDRESS")
.help("Listen address for RPC endpoint.")
.takes_value(true),
)
.arg(
Arg::with_name("rpc-port")
.long("rpc-port")
.value_name("RPCPORT")
.help("Listen port for RPC endpoint.")
.takes_value(true),
)
.get_matches();
let mut config = LighthouseConfig::default();
// invalid arguments, panic
let config = ClientConfig::parse_args(matches, &logger).unwrap();
// Custom datadir
if let Some(dir) = matches.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
}
// Custom p2p listen port
if let Some(port_str) = matches.value_of("port") {
if let Ok(port) = port_str.parse::<u16>() {
config.p2p_listen_port = port;
} else {
error!(log, "Invalid port"; "port" => port_str);
return;
}
}
// Log configuration
info!(log, "";
"data_dir" => &config.data_dir.to_str(),
"port" => &config.p2p_listen_port);
// Specification (presently fixed to foundation).
let spec = ChainSpec::foundation();
// Database (presently in-memory)
let db = Arc::new(MemoryDB::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, &spec);
let (genesis_state, _keypairs) = state_builder.build();
let mut genesis_block = BeaconBlock::empty(&spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root());
// Slot clock
let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot)
.expect("Unable to load SystemTimeSlotClock");
// Choose the fork choice
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone());
// Genesis chain
let _chain_result = BeaconChain::from_genesis(
state_store.clone(),
block_store.clone(),
slot_clock,
genesis_state,
genesis_block,
spec,
fork_choice,
);
let _server = start_server(log.clone());
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
match run::run_beacon_node(config, &logger) {
Ok(_) => {}
Err(e) => error!(logger, "Beacon node failed because {:?}", e),
}
}

51
beacon_node/src/run.rs Normal file
View File

@ -0,0 +1,51 @@
use client::client_types::TestingClientType;
use client::error;
use client::{notifier, Client, ClientConfig};
use futures::sync::oneshot;
use futures::Future;
use slog::info;
use std::cell::RefCell;
use tokio::runtime::Builder;
pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> {
let mut runtime = Builder::new()
.name_prefix("main-")
.build()
.map_err(|e| format!("{:?}", e))?;
// Log configuration
info!(log, "Listening on {:?}", &config.net_conf.listen_addresses;
"data_dir" => &config.data_dir.to_str(),
"port" => &config.net_conf.listen_port);
// run service until ctrl-c
let (ctrlc_send, ctrlc) = oneshot::channel();
let ctrlc_send_c = RefCell::new(Some(ctrlc_send));
ctrlc::set_handler(move || {
if let Some(ctrlc_send) = ctrlc_send_c.try_borrow_mut().unwrap().take() {
ctrlc_send.send(()).expect("Error sending ctrl-c message");
}
})
.map_err(|e| format!("Could not set ctrlc hander: {:?}", e))?;
let (exit_signal, exit) = exit_future::signal();
let executor = runtime.executor();
// currently testing - using TestingClientType
let client: Client<TestingClientType> = Client::new(config, log.clone(), &executor)?;
notifier::run(&client, executor, exit);
runtime
.block_on(ctrlc)
.map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))?;
// perform global shutdown operations.
info!(log, "Shutting down..");
exit_signal.fire();
// shutdown the client
// client.exit_signal.fire();
drop(client);
runtime.shutdown_on_idle().wait().unwrap();
Ok(())
}

View File

@ -0,0 +1,8 @@
[package]
name = "version"
version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
target_info = "0.1.0"

View File

@ -0,0 +1,25 @@
//TODO: Build the version and hash of the built lighthouse binary
/// Version information for the Lighthouse beacon node.
// currently only supports unstable release
extern crate target_info;
use target_info::Target;
const TRACK: &str = "unstable";
/// Provides the current platform
pub fn platform() -> String {
format!("{}-{}", Target::arch(), Target::os())
}
/// Version of the beacon node.
// TODO: Find the sha3 hash, date and rust version used to build the beacon_node binary
pub fn version() -> String {
format!(
"Lighthouse/v{}-{}/{}",
env!("CARGO_PKG_VERSION"),
TRACK,
platform()
)
}

View File

@ -96,6 +96,7 @@ impl From<BeaconBlockAtSlotError> for ForkChoiceError {
}
/// Fork choice options that are currently implemented.
#[derive(Debug, Clone)]
pub enum ForkChoiceAlgorithm {
/// Chooses the longest chain becomes the head. Not for production.
LongestChain,

View File

@ -215,10 +215,8 @@ impl<T: ClientDB + Sized> ForkChoice for SlowLMDGhost<T> {
head_vote_count = vote_count;
}
// resolve ties - choose smaller hash
else if vote_count == head_vote_count {
if *child_hash < head_hash {
head_hash = *child_hash;
}
else if vote_count == head_vote_count && *child_hash < head_hash {
head_hash = *child_hash;
}
}
}

View File

@ -4,7 +4,7 @@ use types::*;
///
/// Is title `verify_bitfield` in spec.
///
/// Spec v0.4.0
/// Spec v0.5.0
pub fn verify_bitfield_length(bitfield: &Bitfield, committee_size: usize) -> bool {
if bitfield.num_bytes() != ((committee_size + 7) / 8) {
return false;

View File

@ -109,7 +109,7 @@ pub fn process_block_header(
Invalid::ParentBlockRootMismatch
);
state.latest_block_header = block.into_temporary_header(spec);
state.latest_block_header = block.temporary_block_header(spec);
Ok(())
}
@ -388,7 +388,7 @@ pub fn process_deposits(
// Create a new validator.
let validator = Validator {
pubkey: deposit_input.pubkey.clone(),
withdrawal_credentials: deposit_input.withdrawal_credentials.clone(),
withdrawal_credentials: deposit_input.withdrawal_credentials,
activation_epoch: spec.far_future_epoch,
exit_epoch: spec.far_future_epoch,
withdrawable_epoch: spec.far_future_epoch,

View File

@ -176,17 +176,7 @@ fn validate_attestation_signature_optional(
);
if verify_signature {
let attestation_epoch = attestation.data.slot.epoch(spec.slots_per_epoch);
verify_attestation_signature(
state,
committee,
attestation_epoch,
&attestation.aggregation_bitfield,
&attestation.custody_bitfield,
&attestation.data,
&attestation.aggregate_signature,
spec,
)?;
verify_attestation_signature(state, committee, attestation, spec)?;
}
// Crosslink data root is zero (to be removed in phase 1).
@ -210,32 +200,29 @@ fn validate_attestation_signature_optional(
fn verify_attestation_signature(
state: &BeaconState,
committee: &[usize],
attestation_epoch: Epoch,
aggregation_bitfield: &Bitfield,
custody_bitfield: &Bitfield,
attestation_data: &AttestationData,
aggregate_signature: &AggregateSignature,
a: &Attestation,
spec: &ChainSpec,
) -> Result<(), Error> {
let mut aggregate_pubs = vec![AggregatePublicKey::new(); 2];
let mut message_exists = vec![false; 2];
let attestation_epoch = a.data.slot.epoch(spec.slots_per_epoch);
for (i, v) in committee.iter().enumerate() {
let validator_signed = aggregation_bitfield.get(i).map_err(|_| {
let validator_signed = a.aggregation_bitfield.get(i).map_err(|_| {
Error::Invalid(Invalid::BadAggregationBitfieldLength {
committee_len: committee.len(),
bitfield_len: aggregation_bitfield.len(),
bitfield_len: a.aggregation_bitfield.len(),
})
})?;
if validator_signed {
let custody_bit: bool = match custody_bitfield.get(i) {
let custody_bit: bool = match a.custody_bitfield.get(i) {
Ok(bit) => bit,
// Invalidate signature if custody_bitfield.len() < committee
Err(_) => {
return Err(Error::Invalid(Invalid::BadCustodyBitfieldLength {
committee_len: committee.len(),
bitfield_len: aggregation_bitfield.len(),
bitfield_len: a.aggregation_bitfield.len(),
}));
}
};
@ -254,14 +241,14 @@ fn verify_attestation_signature(
// Message when custody bitfield is `false`
let message_0 = AttestationDataAndCustodyBit {
data: attestation_data.clone(),
data: a.data.clone(),
custody_bit: false,
}
.hash_tree_root();
// Message when custody bitfield is `true`
let message_1 = AttestationDataAndCustodyBit {
data: attestation_data.clone(),
data: a.data.clone(),
custody_bit: true,
}
.hash_tree_root();
@ -283,7 +270,8 @@ fn verify_attestation_signature(
let domain = spec.get_domain(attestation_epoch, Domain::Attestation, &state.fork);
verify!(
aggregate_signature.verify_multiple(&messages[..], domain, &keys[..]),
a.aggregate_signature
.verify_multiple(&messages[..], domain, &keys[..]),
Invalid::BadSignature
);

View File

@ -71,9 +71,7 @@ pub fn get_existing_validator_index(
) -> Result<Option<u64>, Error> {
let deposit_input = &deposit.deposit_data.deposit_input;
let validator_index = state
.get_validator_index(&deposit_input.pubkey)?
.and_then(|i| Some(i));
let validator_index = state.get_validator_index(&deposit_input.pubkey)?;
match validator_index {
None => Ok(None),

View File

@ -1,25 +1,24 @@
use apply_rewards::apply_rewards;
use errors::EpochProcessingError as Error;
use integer_sqrt::IntegerSquareRoot;
use process_ejections::process_ejections;
use process_exit_queue::process_exit_queue;
use process_slashings::process_slashings;
use process_validator_registry::process_validator_registry;
use rayon::prelude::*;
use ssz::TreeHash;
use std::collections::HashMap;
use types::*;
use update_registry_and_shuffling_data::update_registry_and_shuffling_data;
use validator_statuses::{TotalBalances, ValidatorStatuses};
use winning_root::{winning_root, WinningRoot};
pub mod apply_rewards;
pub mod errors;
pub mod get_attestation_participants;
pub mod inclusion_distance;
pub mod process_ejections;
pub mod process_exit_queue;
pub mod process_slashings;
pub mod process_validator_registry;
pub mod tests;
pub mod update_validator_registry;
pub mod update_registry_and_shuffling_data;
pub mod validator_statuses;
pub mod winning_root;
@ -33,36 +32,51 @@ pub type WinningRootHashSet = HashMap<u64, WinningRoot>;
/// Mutates the given `BeaconState`, returning early if an error is encountered. If an error is
/// returned, a state might be "half-processed" and therefore in an invalid state.
///
/// Spec v0.4.0
/// Spec v0.5.0
pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), Error> {
// Ensure the previous and next epoch caches are built.
state.build_epoch_cache(RelativeEpoch::Previous, spec)?;
state.build_epoch_cache(RelativeEpoch::Current, spec)?;
let mut statuses = initialize_validator_statuses(&state, spec)?;
// Load the struct we use to assign validators into sets based on their participation.
//
// E.g., attestation in the previous epoch, attested to the head, etc.
let mut validator_statuses = ValidatorStatuses::new(state, spec)?;
validator_statuses.process_attestations(&state, spec)?;
process_eth1_data(state, spec);
// Justification.
update_justification_and_finalization(state, &validator_statuses.total_balances, spec)?;
process_justification(state, &statuses.total_balances, spec);
// Crosslinks
// Crosslinks.
let winning_root_for_shards = process_crosslinks(state, spec)?;
// Rewards and Penalities
process_rewards_and_penalities(state, &mut statuses, &winning_root_for_shards, spec)?;
// Eth1 data.
maybe_reset_eth1_period(state, spec);
// Ejections
// Rewards and Penalities.
apply_rewards(
state,
&mut validator_statuses,
&winning_root_for_shards,
spec,
)?;
// Ejections.
process_ejections(state, spec)?;
// Validator Registry
process_validator_registry(state, spec)?;
process_slashings(state, spec)?;
// Validator Registry.
update_registry_and_shuffling_data(
state,
validator_statuses.total_balances.current_epoch,
spec,
)?;
// Slashings and exit queue.
process_slashings(state, validator_statuses.total_balances.current_epoch, spec)?;
process_exit_queue(state, spec);
// Final updates
update_active_tree_index_roots(state, spec)?;
update_latest_slashed_balances(state, spec)?;
clean_attestations(state);
// Final updates.
finish_epoch_update(state, spec)?;
// Rotate the epoch caches to suit the epoch transition.
state.advance_caches();
@ -70,29 +84,10 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result
Ok(())
}
/// Calculates various sets of attesters, including:
///
/// - current epoch attesters
/// - current epoch boundary attesters
/// - previous epoch attesters
/// - etc.
///
/// Spec v0.5.0
pub fn initialize_validator_statuses(
state: &BeaconState,
spec: &ChainSpec,
) -> Result<ValidatorStatuses, BeaconStateError> {
let mut statuses = ValidatorStatuses::new(state, spec)?;
statuses.process_attestations(&state, spec)?;
Ok(statuses)
}
/// Maybe resets the eth1 period.
///
/// Spec v0.5.0
pub fn process_eth1_data(state: &mut BeaconState, spec: &ChainSpec) {
pub fn maybe_reset_eth1_period(state: &mut BeaconState, spec: &ChainSpec) {
let next_epoch = state.next_epoch(spec);
let voting_period = spec.epochs_per_eth1_voting_period;
@ -113,83 +108,68 @@ pub fn process_eth1_data(state: &mut BeaconState, spec: &ChainSpec) {
/// - `justified_epoch`
/// - `previous_justified_epoch`
///
/// Spec v0.4.0
pub fn process_justification(
/// Spec v0.5.0
pub fn update_justification_and_finalization(
state: &mut BeaconState,
total_balances: &TotalBalances,
spec: &ChainSpec,
) {
) -> Result<(), Error> {
let previous_epoch = state.previous_epoch(spec);
let current_epoch = state.current_epoch(spec);
let mut new_justified_epoch = state.current_justified_epoch;
let mut new_finalized_epoch = state.finalized_epoch;
// Rotate the justification bitfield up one epoch to make room for the current epoch.
state.justification_bitfield <<= 1;
// If > 2/3 of the total balance attested to the previous epoch boundary
//
// - Set the 2nd bit of the bitfield.
// - Set the previous epoch to be justified.
if (3 * total_balances.previous_epoch_boundary_attesters) >= (2 * total_balances.previous_epoch)
// If the previous epoch gets justified, full the second last bit.
if (total_balances.previous_epoch_boundary_attesters * 3) >= (total_balances.previous_epoch * 2)
{
state.justification_bitfield |= 2;
new_justified_epoch = previous_epoch;
state.justification_bitfield |= 2;
}
// If > 2/3 of the total balance attested to the previous epoch boundary
//
// - Set the 1st bit of the bitfield.
// - Set the current epoch to be justified.
if (3 * total_balances.current_epoch_boundary_attesters) >= (2 * total_balances.current_epoch) {
state.justification_bitfield |= 1;
// If the current epoch gets justified, fill the last bit.
if (total_balances.current_epoch_boundary_attesters * 3) >= (total_balances.current_epoch * 2) {
new_justified_epoch = current_epoch;
state.justification_bitfield |= 1;
}
// If:
//
// - All three epochs prior to this epoch have been justified.
// - The previous justified justified epoch was three epochs ago.
//
// Then, set the finalized epoch to be three epochs ago.
if ((state.justification_bitfield >> 1) % 8 == 0b111)
& (state.previous_justified_epoch == previous_epoch - 2)
{
state.finalized_epoch = state.previous_justified_epoch;
let bitfield = state.justification_bitfield;
// The 2nd/3rd/4th most recent epochs are all justified, the 2nd using the 4th as source.
if ((bitfield >> 1) % 8 == 0b111) & (state.previous_justified_epoch == current_epoch - 3) {
new_finalized_epoch = state.previous_justified_epoch;
}
// If:
//
// - Both two epochs prior to this epoch have been justified.
// - The previous justified epoch was two epochs ago.
//
// Then, set the finalized epoch to two epochs ago.
if ((state.justification_bitfield >> 1) % 4 == 0b11)
& (state.previous_justified_epoch == previous_epoch - 1)
{
state.finalized_epoch = state.previous_justified_epoch;
// The 2nd/3rd most recent epochs are both justified, the 2nd using the 3rd as source.
if ((bitfield >> 1) % 4 == 0b11) & (state.previous_justified_epoch == current_epoch - 2) {
new_finalized_epoch = state.previous_justified_epoch;
}
// If:
//
// - This epoch and the two prior have been justified.
// - The presently justified epoch was two epochs ago.
//
// Then, set the finalized epoch to two epochs ago.
if (state.justification_bitfield % 8 == 0b111)
& (state.current_justified_epoch == previous_epoch - 1)
{
state.finalized_epoch = state.current_justified_epoch;
// The 1st/2nd/3rd most recent epochs are all justified, the 1st using the 2nd as source.
if (bitfield % 8 == 0b111) & (state.current_justified_epoch == current_epoch - 2) {
new_finalized_epoch = state.current_justified_epoch;
}
// If:
//
// - This epoch and the epoch prior to it have been justified.
// - Set the previous epoch to be justified.
//
// Then, set the finalized epoch to be the previous epoch.
if (state.justification_bitfield % 4 == 0b11)
& (state.current_justified_epoch == previous_epoch)
{
state.finalized_epoch = state.current_justified_epoch;
// The 1st/2nd most recent epochs are both justified, the 1st using the 2nd as source.
if (bitfield % 4 == 0b11) & (state.current_justified_epoch == current_epoch - 1) {
new_finalized_epoch = state.current_justified_epoch;
}
state.previous_justified_epoch = state.current_justified_epoch;
state.current_justified_epoch = new_justified_epoch;
state.previous_justified_root = state.current_justified_root;
if new_justified_epoch != state.current_justified_epoch {
state.current_justified_epoch = new_justified_epoch;
state.current_justified_root =
*state.get_block_root(new_justified_epoch.start_slot(spec.slots_per_epoch), spec)?;
}
if new_finalized_epoch != state.finalized_epoch {
state.finalized_epoch = new_finalized_epoch;
state.finalized_root =
*state.get_block_root(new_finalized_epoch.start_slot(spec.slots_per_epoch), spec)?;
}
Ok(())
}
/// Updates the following fields on the `BeaconState`:
@ -239,243 +219,53 @@ pub fn process_crosslinks(
Ok(winning_root_for_shards)
}
/// Updates the following fields on the BeaconState:
///
/// - `validator_balances`
///
/// Spec v0.4.0
pub fn process_rewards_and_penalities(
state: &mut BeaconState,
statuses: &mut ValidatorStatuses,
winning_root_for_shards: &WinningRootHashSet,
spec: &ChainSpec,
) -> Result<(), Error> {
let next_epoch = state.next_epoch(spec);
statuses.process_winning_roots(state, winning_root_for_shards, spec)?;
let total_balances = &statuses.total_balances;
let base_reward_quotient =
total_balances.previous_epoch.integer_sqrt() / spec.base_reward_quotient;
// Guard against a divide-by-zero during the validator balance update.
if base_reward_quotient == 0 {
return Err(Error::BaseRewardQuotientIsZero);
}
// Guard against a divide-by-zero during the validator balance update.
if total_balances.previous_epoch == 0 {
return Err(Error::PreviousTotalBalanceIsZero);
}
// Guard against an out-of-bounds during the validator balance update.
if statuses.statuses.len() != state.validator_balances.len() {
return Err(Error::ValidatorStatusesInconsistent);
}
// Justification and finalization
let epochs_since_finality = next_epoch - state.finalized_epoch;
state.validator_balances = state
.validator_balances
.par_iter()
.enumerate()
.map(|(index, &balance)| {
let mut balance = balance;
let status = &statuses.statuses[index];
let base_reward = get_base_reward(state, index, total_balances.previous_epoch, spec)
.expect(
"Cannot fail to access a validator balance when iterating validator balances.",
);
if epochs_since_finality <= 4 {
// Expected FFG source
if status.is_previous_epoch_attester {
safe_add_assign!(
balance,
base_reward * total_balances.previous_epoch_attesters
/ total_balances.previous_epoch
);
} else if status.is_active_in_previous_epoch {
safe_sub_assign!(balance, base_reward);
}
// Expected FFG target
if status.is_previous_epoch_boundary_attester {
safe_add_assign!(
balance,
base_reward * total_balances.previous_epoch_boundary_attesters
/ total_balances.previous_epoch
);
} else if status.is_active_in_previous_epoch {
safe_sub_assign!(balance, base_reward);
}
// Expected beacon chain head
if status.is_previous_epoch_head_attester {
safe_add_assign!(
balance,
base_reward * total_balances.previous_epoch_head_attesters
/ total_balances.previous_epoch
);
} else if status.is_active_in_previous_epoch {
safe_sub_assign!(balance, base_reward);
};
} else {
let inactivity_penalty = get_inactivity_penalty(
state,
index,
epochs_since_finality.as_u64(),
total_balances.previous_epoch,
spec,
)
.expect(
"Cannot fail to access a validator balance when iterating validator balances.",
);
if status.is_active_in_previous_epoch {
if !status.is_previous_epoch_attester {
safe_sub_assign!(balance, inactivity_penalty);
}
if !status.is_previous_epoch_boundary_attester {
safe_sub_assign!(balance, inactivity_penalty);
}
if !status.is_previous_epoch_head_attester {
safe_sub_assign!(balance, inactivity_penalty);
}
if state.validator_registry[index].slashed {
let base_reward =
get_base_reward(state, index, total_balances.previous_epoch, spec).expect(
"Cannot fail to access a validator balance when iterating validator balances.",
);
safe_sub_assign!(balance, 2 * inactivity_penalty + base_reward);
}
}
}
// Crosslinks
if let Some(ref info) = status.winning_root_info {
safe_add_assign!(
balance,
base_reward * info.total_attesting_balance / info.total_committee_balance
);
} else {
safe_sub_assign!(balance, base_reward);
}
balance
})
.collect();
// Attestation inclusion
// Guard against an out-of-bounds during the attester inclusion balance update.
if statuses.statuses.len() != state.validator_registry.len() {
return Err(Error::ValidatorStatusesInconsistent);
}
for (index, _validator) in state.validator_registry.iter().enumerate() {
let status = &statuses.statuses[index];
if status.is_previous_epoch_attester {
let proposer_index = status.inclusion_info.proposer_index;
let inclusion_distance = status.inclusion_info.distance;
let base_reward =
get_base_reward(state, proposer_index, total_balances.previous_epoch, spec).expect(
"Cannot fail to access a validator balance when iterating validator balances.",
);
if inclusion_distance > 0 && inclusion_distance < Slot::max_value() {
safe_add_assign!(
state.validator_balances[proposer_index],
base_reward * spec.min_attestation_inclusion_delay
/ inclusion_distance.as_u64()
)
}
}
}
Ok(())
}
/// Returns the base reward for some validator.
/// Finish up an epoch update.
///
/// Spec v0.5.0
pub fn get_base_reward(
state: &BeaconState,
index: usize,
previous_total_balance: u64,
spec: &ChainSpec,
) -> Result<u64, BeaconStateError> {
if previous_total_balance == 0 {
Ok(0)
} else {
let adjusted_quotient = previous_total_balance.integer_sqrt() / spec.base_reward_quotient;
Ok(state.get_effective_balance(index, spec)? / adjusted_quotient / 5)
}
}
/// Returns the inactivity penalty for some validator.
///
/// Spec v0.5.0
pub fn get_inactivity_penalty(
state: &BeaconState,
index: usize,
epochs_since_finality: u64,
previous_total_balance: u64,
spec: &ChainSpec,
) -> Result<u64, BeaconStateError> {
Ok(get_base_reward(state, index, previous_total_balance, spec)?
+ state.get_effective_balance(index, spec)? * epochs_since_finality
/ spec.inactivity_penalty_quotient
/ 2)
}
/// Updates the state's `latest_active_index_roots` field with a tree hash the active validator
/// indices for the next epoch.
///
/// Spec v0.4.0
pub fn update_active_tree_index_roots(
state: &mut BeaconState,
spec: &ChainSpec,
) -> Result<(), Error> {
let next_epoch = state.next_epoch(spec);
let active_tree_root = state
.get_active_validator_indices(next_epoch + Epoch::from(spec.activation_exit_delay))
.to_vec()
.hash_tree_root();
state.set_active_index_root(next_epoch, Hash256::from_slice(&active_tree_root[..]), spec)?;
Ok(())
}
/// Advances the state's `latest_slashed_balances` field.
///
/// Spec v0.4.0
pub fn update_latest_slashed_balances(
state: &mut BeaconState,
spec: &ChainSpec,
) -> Result<(), Error> {
pub fn finish_epoch_update(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), Error> {
let current_epoch = state.current_epoch(spec);
let next_epoch = state.next_epoch(spec);
state.set_slashed_balance(
next_epoch,
state.get_slashed_balance(current_epoch, spec)?,
spec,
)?;
// This is a hack to allow us to update index roots and slashed balances for the next epoch.
//
// The indentation here is to make it obvious where the weird stuff happens.
{
state.slot += 1;
// Set active index root
let active_index_root = Hash256::from_slice(
&state
.get_active_validator_indices(next_epoch + spec.activation_exit_delay)
.hash_tree_root()[..],
);
state.set_active_index_root(next_epoch, active_index_root, spec)?;
// Set total slashed balances
state.set_slashed_balance(
next_epoch,
state.get_slashed_balance(current_epoch, spec)?,
spec,
)?;
// Set randao mix
state.set_randao_mix(
next_epoch,
*state.get_randao_mix(current_epoch, spec)?,
spec,
)?;
state.slot -= 1;
}
if next_epoch.as_u64() % (spec.slots_per_historical_root as u64 / spec.slots_per_epoch) == 0 {
let historical_batch: HistoricalBatch = state.historical_batch();
state
.historical_roots
.push(Hash256::from_slice(&historical_batch.hash_tree_root()[..]));
}
state.previous_epoch_attestations = state.current_epoch_attestations.clone();
state.current_epoch_attestations = vec![];
Ok(())
}
/// Removes all pending attestations from the previous epoch.
///
/// Spec v0.4.0
pub fn clean_attestations(state: &mut BeaconState) {
state.previous_epoch_attestations = vec![];
}

View File

@ -0,0 +1,334 @@
use super::validator_statuses::{TotalBalances, ValidatorStatus, ValidatorStatuses};
use super::{Error, WinningRootHashSet};
use integer_sqrt::IntegerSquareRoot;
use types::*;
/// Use to track the changes to a validators balance.
#[derive(Default, Clone)]
pub struct Delta {
rewards: u64,
penalties: u64,
}
impl Delta {
/// Reward the validator with the `reward`.
pub fn reward(&mut self, reward: u64) {
self.rewards += reward;
}
/// Penalize the validator with the `penalty`.
pub fn penalize(&mut self, penalty: u64) {
self.penalties += penalty;
}
}
impl std::ops::AddAssign for Delta {
/// Use wrapping addition as that is how it's defined in the spec.
fn add_assign(&mut self, other: Delta) {
self.rewards += other.rewards;
self.penalties += other.penalties;
}
}
/// Apply attester and proposer rewards.
///
/// Spec v0.5.0
pub fn apply_rewards(
state: &mut BeaconState,
validator_statuses: &mut ValidatorStatuses,
winning_root_for_shards: &WinningRootHashSet,
spec: &ChainSpec,
) -> Result<(), Error> {
// Guard against an out-of-bounds during the validator balance update.
if validator_statuses.statuses.len() != state.validator_balances.len() {
return Err(Error::ValidatorStatusesInconsistent);
}
// Guard against an out-of-bounds during the attester inclusion balance update.
if validator_statuses.statuses.len() != state.validator_registry.len() {
return Err(Error::ValidatorStatusesInconsistent);
}
let mut deltas = vec![Delta::default(); state.validator_balances.len()];
get_justification_and_finalization_deltas(&mut deltas, state, &validator_statuses, spec)?;
get_crosslink_deltas(&mut deltas, state, &validator_statuses, spec)?;
// Apply the proposer deltas if we are finalizing normally.
//
// This is executed slightly differently to the spec because of the way our functions are
// structured. It should be functionally equivalent.
if epochs_since_finality(state, spec) <= 4 {
get_proposer_deltas(
&mut deltas,
state,
validator_statuses,
winning_root_for_shards,
spec,
)?;
}
// Apply the deltas, over-flowing but not under-flowing (saturating at 0 instead).
for (i, delta) in deltas.iter().enumerate() {
state.validator_balances[i] += delta.rewards;
state.validator_balances[i] = state.validator_balances[i].saturating_sub(delta.penalties);
}
Ok(())
}
/// Applies the attestation inclusion reward to each proposer for every validator who included an
/// attestation in the previous epoch.
///
/// Spec v0.5.0
fn get_proposer_deltas(
deltas: &mut Vec<Delta>,
state: &mut BeaconState,
validator_statuses: &mut ValidatorStatuses,
winning_root_for_shards: &WinningRootHashSet,
spec: &ChainSpec,
) -> Result<(), Error> {
// Update statuses with the information from winning roots.
validator_statuses.process_winning_roots(state, winning_root_for_shards, spec)?;
for (index, validator) in validator_statuses.statuses.iter().enumerate() {
let mut delta = Delta::default();
if validator.is_previous_epoch_attester {
let inclusion = validator
.inclusion_info
.expect("It is a logic error for an attester not to have an inclusion distance.");
let base_reward = get_base_reward(
state,
inclusion.proposer_index,
validator_statuses.total_balances.previous_epoch,
spec,
)?;
if inclusion.proposer_index >= deltas.len() {
return Err(Error::ValidatorStatusesInconsistent);
}
delta.reward(base_reward / spec.attestation_inclusion_reward_quotient);
}
deltas[index] += delta;
}
Ok(())
}
/// Apply rewards for participation in attestations during the previous epoch.
///
/// Spec v0.5.0
fn get_justification_and_finalization_deltas(
deltas: &mut Vec<Delta>,
state: &BeaconState,
validator_statuses: &ValidatorStatuses,
spec: &ChainSpec,
) -> Result<(), Error> {
let epochs_since_finality = epochs_since_finality(state, spec);
for (index, validator) in validator_statuses.statuses.iter().enumerate() {
let base_reward = get_base_reward(
state,
index,
validator_statuses.total_balances.previous_epoch,
spec,
)?;
let inactivity_penalty = get_inactivity_penalty(
state,
index,
epochs_since_finality.as_u64(),
validator_statuses.total_balances.previous_epoch,
spec,
)?;
let delta = if epochs_since_finality <= 4 {
compute_normal_justification_and_finalization_delta(
&validator,
&validator_statuses.total_balances,
base_reward,
spec,
)
} else {
compute_inactivity_leak_delta(&validator, base_reward, inactivity_penalty, spec)
};
deltas[index] += delta;
}
Ok(())
}
/// Determine the delta for a single validator, if the chain is finalizing normally.
///
/// Spec v0.5.0
fn compute_normal_justification_and_finalization_delta(
validator: &ValidatorStatus,
total_balances: &TotalBalances,
base_reward: u64,
spec: &ChainSpec,
) -> Delta {
let mut delta = Delta::default();
let boundary_attesting_balance = total_balances.previous_epoch_boundary_attesters;
let total_balance = total_balances.previous_epoch;
let total_attesting_balance = total_balances.previous_epoch_attesters;
let matching_head_balance = total_balances.previous_epoch_boundary_attesters;
// Expected FFG source.
if validator.is_previous_epoch_attester {
delta.reward(base_reward * total_attesting_balance / total_balance);
// Inclusion speed bonus
let inclusion = validator
.inclusion_info
.expect("It is a logic error for an attester not to have an inclusion distance.");
delta.reward(
base_reward * spec.min_attestation_inclusion_delay / inclusion.distance.as_u64(),
);
} else if validator.is_active_in_previous_epoch {
delta.penalize(base_reward);
}
// Expected FFG target.
if validator.is_previous_epoch_boundary_attester {
delta.reward(base_reward / boundary_attesting_balance / total_balance);
} else if validator.is_active_in_previous_epoch {
delta.penalize(base_reward);
}
// Expected head.
if validator.is_previous_epoch_head_attester {
delta.reward(base_reward * matching_head_balance / total_balance);
} else if validator.is_active_in_previous_epoch {
delta.penalize(base_reward);
};
// Proposer bonus is handled in `apply_proposer_deltas`.
//
// This function only computes the delta for a single validator, so it cannot also return a
// delta for a validator.
delta
}
/// Determine the delta for a single delta, assuming the chain is _not_ finalizing normally.
///
/// Spec v0.5.0
fn compute_inactivity_leak_delta(
validator: &ValidatorStatus,
base_reward: u64,
inactivity_penalty: u64,
spec: &ChainSpec,
) -> Delta {
let mut delta = Delta::default();
if validator.is_active_in_previous_epoch {
if !validator.is_previous_epoch_attester {
delta.penalize(inactivity_penalty);
} else {
// If a validator did attest, apply a small penalty for getting attestations included
// late.
let inclusion = validator
.inclusion_info
.expect("It is a logic error for an attester not to have an inclusion distance.");
delta.reward(
base_reward * spec.min_attestation_inclusion_delay / inclusion.distance.as_u64(),
);
delta.penalize(base_reward);
}
if !validator.is_previous_epoch_boundary_attester {
delta.reward(inactivity_penalty);
}
if !validator.is_previous_epoch_head_attester {
delta.penalize(inactivity_penalty);
}
}
// Penalize slashed-but-inactive validators as though they were active but offline.
if !validator.is_active_in_previous_epoch
& validator.is_slashed
& !validator.is_withdrawable_in_current_epoch
{
delta.penalize(2 * inactivity_penalty + base_reward);
}
delta
}
/// Calculate the deltas based upon the winning roots for attestations during the previous epoch.
///
/// Spec v0.5.0
fn get_crosslink_deltas(
deltas: &mut Vec<Delta>,
state: &BeaconState,
validator_statuses: &ValidatorStatuses,
spec: &ChainSpec,
) -> Result<(), Error> {
for (index, validator) in validator_statuses.statuses.iter().enumerate() {
let mut delta = Delta::default();
let base_reward = get_base_reward(
state,
index,
validator_statuses.total_balances.previous_epoch,
spec,
)?;
if let Some(ref winning_root) = validator.winning_root_info {
delta.reward(
base_reward * winning_root.total_attesting_balance
/ winning_root.total_committee_balance,
);
} else {
delta.penalize(base_reward);
}
deltas[index] += delta;
}
Ok(())
}
/// Returns the base reward for some validator.
///
/// Spec v0.5.0
fn get_base_reward(
state: &BeaconState,
index: usize,
previous_total_balance: u64,
spec: &ChainSpec,
) -> Result<u64, BeaconStateError> {
if previous_total_balance == 0 {
Ok(0)
} else {
let adjusted_quotient = previous_total_balance.integer_sqrt() / spec.base_reward_quotient;
Ok(state.get_effective_balance(index, spec)? / adjusted_quotient / 5)
}
}
/// Returns the inactivity penalty for some validator.
///
/// Spec v0.5.0
fn get_inactivity_penalty(
state: &BeaconState,
index: usize,
epochs_since_finality: u64,
previous_total_balance: u64,
spec: &ChainSpec,
) -> Result<u64, BeaconStateError> {
Ok(get_base_reward(state, index, previous_total_balance, spec)?
+ state.get_effective_balance(index, spec)? * epochs_since_finality
/ spec.inactivity_penalty_quotient
/ 2)
}
/// Returns the epochs since the last finalized epoch.
///
/// Spec v0.5.0
fn epochs_since_finality(state: &BeaconState, spec: &ChainSpec) -> Epoch {
state.current_epoch(spec) + 1 - state.finalized_epoch
}

View File

@ -9,6 +9,7 @@ pub enum EpochProcessingError {
PreviousTotalBalanceIsZero,
InclusionDistanceZero,
ValidatorStatusesInconsistent,
DeltasInconsistent,
/// Unable to get the inclusion distance for a validator that should have an inclusion
/// distance. This indicates an internal inconsistency.
///

View File

@ -28,7 +28,7 @@ pub fn get_attestation_participants(
let mut participants = Vec::with_capacity(committee.len());
for (i, validator_index) in committee.iter().enumerate() {
match bitfield.get(i) {
Ok(bit) if bit == true => participants.push(*validator_index),
Ok(bit) if bit => participants.push(*validator_index),
_ => {}
}
}

View File

@ -21,8 +21,8 @@ pub fn process_exit_queue(state: &mut BeaconState, spec: &ChainSpec) {
.collect();
eligable_indices.sort_by_key(|i| state.validator_registry[*i].exit_epoch);
for (withdrawn_so_far, index) in eligable_indices.iter().enumerate() {
if withdrawn_so_far as u64 >= spec.max_exit_dequeues_per_epoch {
for (dequeues, index) in eligable_indices.iter().enumerate() {
if dequeues as u64 >= spec.max_exit_dequeues_per_epoch {
break;
}
prepare_validator_for_withdrawal(state, *index, spec);

View File

@ -2,34 +2,32 @@ use types::{BeaconStateError as Error, *};
/// Process slashings.
///
/// Note: Utilizes the cache and will fail if the appropriate cache is not initialized.
///
/// Spec v0.4.0
pub fn process_slashings(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), Error> {
/// Spec v0.5.0
pub fn process_slashings(
state: &mut BeaconState,
current_total_balance: u64,
spec: &ChainSpec,
) -> Result<(), Error> {
let current_epoch = state.current_epoch(spec);
let active_validator_indices =
state.get_cached_active_validator_indices(RelativeEpoch::Current, spec)?;
let total_balance = state.get_total_balance(&active_validator_indices[..], spec)?;
let total_at_start = state.get_slashed_balance(current_epoch + 1, spec)?;
let total_at_end = state.get_slashed_balance(current_epoch, spec)?;
let total_penalities = total_at_end - total_at_start;
for (index, validator) in state.validator_registry.iter().enumerate() {
if validator.slashed
&& (current_epoch
== validator.withdrawable_epoch - Epoch::from(spec.latest_slashed_exit_length / 2))
{
// TODO: check the following two lines are correct.
let total_at_start = state.get_slashed_balance(current_epoch + 1, spec)?;
let total_at_end = state.get_slashed_balance(current_epoch, spec)?;
let total_penalities = total_at_end.saturating_sub(total_at_start);
let should_penalize = current_epoch.as_usize()
== validator.withdrawable_epoch.as_usize() - spec.latest_slashed_exit_length / 2;
if validator.slashed && should_penalize {
let effective_balance = state.get_effective_balance(index, spec)?;
let penalty = std::cmp::max(
effective_balance * std::cmp::min(total_penalities * 3, total_balance)
/ total_balance,
effective_balance * std::cmp::min(total_penalities * 3, current_total_balance)
/ current_total_balance,
effective_balance / spec.min_penalty_quotient,
);
safe_sub_assign!(state.validator_balances[index], penalty);
state.validator_balances[index] -= penalty;
}
}

View File

@ -1,70 +0,0 @@
use super::update_validator_registry::update_validator_registry;
use super::Error;
use types::*;
/// Peforms a validator registry update, if required.
///
/// Spec v0.4.0
pub fn process_validator_registry(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), Error> {
let current_epoch = state.current_epoch(spec);
let next_epoch = state.next_epoch(spec);
state.previous_shuffling_epoch = state.current_shuffling_epoch;
state.previous_shuffling_start_shard = state.current_shuffling_start_shard;
state.previous_shuffling_seed = state.current_shuffling_seed;
if should_update_validator_registry(state, spec)? {
update_validator_registry(state, spec)?;
state.current_shuffling_epoch = next_epoch;
state.current_shuffling_start_shard = (state.current_shuffling_start_shard
+ spec.get_epoch_committee_count(
state
.get_cached_active_validator_indices(RelativeEpoch::Current, spec)?
.len(),
) as u64)
% spec.shard_count;
state.current_shuffling_seed = state.generate_seed(state.current_shuffling_epoch, spec)?
} else {
let epochs_since_last_registry_update =
current_epoch - state.validator_registry_update_epoch;
if (epochs_since_last_registry_update > 1)
& epochs_since_last_registry_update.is_power_of_two()
{
state.current_shuffling_epoch = next_epoch;
state.current_shuffling_seed =
state.generate_seed(state.current_shuffling_epoch, spec)?
}
}
Ok(())
}
/// Returns `true` if the validator registry should be updated during an epoch processing.
///
/// Spec v0.5.0
pub fn should_update_validator_registry(
state: &BeaconState,
spec: &ChainSpec,
) -> Result<bool, BeaconStateError> {
if state.finalized_epoch <= state.validator_registry_update_epoch {
return Ok(false);
}
let num_active_validators = state
.get_cached_active_validator_indices(RelativeEpoch::Current, spec)?
.len();
let current_epoch_committee_count = spec.get_epoch_committee_count(num_active_validators);
for shard in (0..current_epoch_committee_count)
.into_iter()
.map(|i| (state.current_shuffling_start_shard + i as u64) % spec.shard_count)
{
if state.latest_crosslinks[shard as usize].epoch <= state.validator_registry_update_epoch {
return Ok(false);
}
}
Ok(true)
}

View File

@ -0,0 +1,150 @@
use super::super::common::exit_validator;
use super::Error;
use types::*;
/// Peforms a validator registry update, if required.
///
/// Spec v0.5.0
pub fn update_registry_and_shuffling_data(
state: &mut BeaconState,
current_total_balance: u64,
spec: &ChainSpec,
) -> Result<(), Error> {
// First set previous shuffling data to current shuffling data.
state.previous_shuffling_epoch = state.current_shuffling_epoch;
state.previous_shuffling_start_shard = state.previous_shuffling_start_shard;
state.previous_shuffling_seed = state.previous_shuffling_seed;
let current_epoch = state.current_epoch(spec);
let next_epoch = current_epoch + 1;
// Check we should update, and if so, update.
if should_update_validator_registry(state, spec)? {
update_validator_registry(state, current_total_balance, spec)?;
// If we update the registry, update the shuffling data and shards as well.
state.current_shuffling_epoch = next_epoch;
state.current_shuffling_start_shard = {
let active_validators =
state.get_cached_active_validator_indices(RelativeEpoch::Current, spec)?;
let epoch_committee_count = spec.get_epoch_committee_count(active_validators.len());
(state.current_shuffling_start_shard + epoch_committee_count) % spec.shard_count
};
state.current_shuffling_seed = state.generate_seed(state.current_shuffling_epoch, spec)?;
} else {
// If processing at least on crosslink keeps failing, the reshuffle every power of two, but
// don't update the current_shuffling_start_shard.
let epochs_since_last_update = current_epoch - state.validator_registry_update_epoch;
if epochs_since_last_update > 1 && epochs_since_last_update.is_power_of_two() {
state.current_shuffling_epoch = next_epoch;
state.current_shuffling_seed =
state.generate_seed(state.current_shuffling_epoch, spec)?;
}
}
Ok(())
}
/// Returns `true` if the validator registry should be updated during an epoch processing.
///
/// Spec v0.5.0
pub fn should_update_validator_registry(
state: &BeaconState,
spec: &ChainSpec,
) -> Result<bool, BeaconStateError> {
if state.finalized_epoch <= state.validator_registry_update_epoch {
return Ok(false);
}
let num_active_validators = state
.get_cached_active_validator_indices(RelativeEpoch::Current, spec)?
.len();
let current_epoch_committee_count = spec.get_epoch_committee_count(num_active_validators);
for shard in (0..current_epoch_committee_count)
.map(|i| (state.current_shuffling_start_shard + i as u64) % spec.shard_count)
{
if state.latest_crosslinks[shard as usize].epoch <= state.validator_registry_update_epoch {
return Ok(false);
}
}
Ok(true)
}
/// Update validator registry, activating/exiting validators if possible.
///
/// Note: Utilizes the cache and will fail if the appropriate cache is not initialized.
///
/// Spec v0.5.0
pub fn update_validator_registry(
state: &mut BeaconState,
current_total_balance: u64,
spec: &ChainSpec,
) -> Result<(), Error> {
let current_epoch = state.current_epoch(spec);
let max_balance_churn = std::cmp::max(
spec.max_deposit_amount,
current_total_balance / (2 * spec.max_balance_churn_quotient),
);
// Activate validators within the allowable balance churn.
let mut balance_churn = 0;
for index in 0..state.validator_registry.len() {
let not_activated =
state.validator_registry[index].activation_epoch == spec.far_future_epoch;
let has_enough_balance = state.validator_balances[index] >= spec.max_deposit_amount;
if not_activated && has_enough_balance {
// Check the balance churn would be within the allowance.
balance_churn += state.get_effective_balance(index, spec)?;
if balance_churn > max_balance_churn {
break;
}
activate_validator(state, index, false, spec);
}
}
// Exit validators within the allowable balance churn.
let mut balance_churn = 0;
for index in 0..state.validator_registry.len() {
let not_exited = state.validator_registry[index].exit_epoch == spec.far_future_epoch;
let has_initiated_exit = state.validator_registry[index].initiated_exit;
if not_exited && has_initiated_exit {
// Check the balance churn would be within the allowance.
balance_churn += state.get_effective_balance(index, spec)?;
if balance_churn > max_balance_churn {
break;
}
exit_validator(state, index, spec)?;
}
}
state.validator_registry_update_epoch = current_epoch;
Ok(())
}
/// Activate the validator of the given ``index``.
///
/// Spec v0.5.0
pub fn activate_validator(
state: &mut BeaconState,
validator_index: usize,
is_genesis: bool,
spec: &ChainSpec,
) {
let current_epoch = state.current_epoch(spec);
state.validator_registry[validator_index].activation_epoch = if is_genesis {
spec.genesis_epoch
} else {
state.get_delayed_activation_exit_epoch(current_epoch, spec)
}
}

View File

@ -1,52 +0,0 @@
use crate::common::exit_validator;
use types::{BeaconStateError as Error, *};
/// Update validator registry, activating/exiting validators if possible.
///
/// Note: Utilizes the cache and will fail if the appropriate cache is not initialized.
///
/// Spec v0.4.0
pub fn update_validator_registry(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), Error> {
let current_epoch = state.current_epoch(spec);
let active_validator_indices =
state.get_cached_active_validator_indices(RelativeEpoch::Current, spec)?;
let total_balance = state.get_total_balance(&active_validator_indices[..], spec)?;
let max_balance_churn = std::cmp::max(
spec.max_deposit_amount,
total_balance / (2 * spec.max_balance_churn_quotient),
);
let mut balance_churn = 0;
for index in 0..state.validator_registry.len() {
let validator = &state.validator_registry[index];
if (validator.activation_epoch == spec.far_future_epoch)
& (state.validator_balances[index] == spec.max_deposit_amount)
{
balance_churn += state.get_effective_balance(index, spec)?;
if balance_churn > max_balance_churn {
break;
}
state.activate_validator(index, false, spec);
}
}
let mut balance_churn = 0;
for index in 0..state.validator_registry.len() {
let validator = &state.validator_registry[index];
if (validator.exit_epoch == spec.far_future_epoch) & (validator.initiated_exit) {
balance_churn += state.get_effective_balance(index, spec)?;
if balance_churn > max_balance_churn {
break;
}
exit_validator(state, index, spec)?;
}
}
state.validator_registry_update_epoch = current_epoch;
Ok(())
}

View File

@ -23,7 +23,7 @@ pub struct WinningRootInfo {
}
/// The information required to reward a block producer for including an attestation in a block.
#[derive(Clone)]
#[derive(Clone, Copy)]
pub struct InclusionInfo {
/// The earliest slot a validator had an attestation included in the previous epoch.
pub slot: Slot,
@ -59,7 +59,11 @@ impl InclusionInfo {
/// Information required to reward some validator during the current and previous epoch.
#[derive(Default, Clone)]
pub struct AttesterStatus {
pub struct ValidatorStatus {
/// True if the validator has been slashed, ever.
pub is_slashed: bool,
/// True if the validator can withdraw in the current epoch.
pub is_withdrawable_in_current_epoch: bool,
/// True if the validator was active in the state's _current_ epoch.
pub is_active_in_current_epoch: bool,
/// True if the validator was active in the state's _previous_ epoch.
@ -81,14 +85,14 @@ pub struct AttesterStatus {
/// Information used to reward the block producer of this validators earliest-included
/// attestation.
pub inclusion_info: InclusionInfo,
pub inclusion_info: Option<InclusionInfo>,
/// Information used to reward/penalize the validator if they voted in the super-majority for
/// some shard block.
pub winning_root_info: Option<WinningRootInfo>,
}
impl AttesterStatus {
/// Accepts some `other` `AttesterStatus` and updates `self` if required.
impl ValidatorStatus {
/// Accepts some `other` `ValidatorStatus` and updates `self` if required.
///
/// Will never set one of the `bool` fields to `false`, it will only set it to `true` if other
/// contains a `true` field.
@ -97,6 +101,8 @@ impl AttesterStatus {
pub fn update(&mut self, other: &Self) {
// Update all the bool fields, only updating `self` if `other` is true (never setting
// `self` to false).
set_self_if_other_is_true!(self, other, is_slashed);
set_self_if_other_is_true!(self, other, is_withdrawable_in_current_epoch);
set_self_if_other_is_true!(self, other, is_active_in_current_epoch);
set_self_if_other_is_true!(self, other, is_active_in_previous_epoch);
set_self_if_other_is_true!(self, other, is_current_epoch_attester);
@ -105,7 +111,13 @@ impl AttesterStatus {
set_self_if_other_is_true!(self, other, is_previous_epoch_boundary_attester);
set_self_if_other_is_true!(self, other, is_previous_epoch_head_attester);
self.inclusion_info.update(&other.inclusion_info);
if let Some(other_info) = other.inclusion_info {
if let Some(self_info) = self.inclusion_info.as_mut() {
self_info.update(&other_info);
} else {
self.inclusion_info = other.inclusion_info;
}
}
}
}
@ -137,7 +149,7 @@ pub struct TotalBalances {
#[derive(Clone)]
pub struct ValidatorStatuses {
/// Information about each individual validator from the state's validator registy.
pub statuses: Vec<AttesterStatus>,
pub statuses: Vec<ValidatorStatus>,
/// Summed balances for various sets of validators.
pub total_balances: TotalBalances,
}
@ -154,7 +166,12 @@ impl ValidatorStatuses {
let mut total_balances = TotalBalances::default();
for (i, validator) in state.validator_registry.iter().enumerate() {
let mut status = AttesterStatus::default();
let mut status = ValidatorStatus {
is_slashed: validator.slashed,
is_withdrawable_in_current_epoch: validator
.is_withdrawable_at(state.current_epoch(spec)),
..ValidatorStatus::default()
};
if validator.is_active_at(state.current_epoch(spec)) {
status.is_active_in_current_epoch = true;
@ -193,10 +210,10 @@ impl ValidatorStatuses {
get_attestation_participants(state, &a.data, &a.aggregation_bitfield, spec)?;
let attesting_balance = state.get_total_balance(&attesting_indices, spec)?;
let mut status = AttesterStatus::default();
let mut status = ValidatorStatus::default();
// Profile this attestation, updating the total balances and generating an
// `AttesterStatus` object that applies to all participants in the attestation.
// `ValidatorStatus` object that applies to all participants in the attestation.
if is_from_epoch(a, state.current_epoch(spec), spec) {
self.total_balances.current_epoch_attesters += attesting_balance;
status.is_current_epoch_attester = true;
@ -211,7 +228,7 @@ impl ValidatorStatuses {
// The inclusion slot and distance are only required for previous epoch attesters.
let relative_epoch = RelativeEpoch::from_slot(state.slot, a.data.slot, spec)?;
status.inclusion_info = InclusionInfo {
status.inclusion_info = Some(InclusionInfo {
slot: a.inclusion_slot,
distance: inclusion_distance(a),
proposer_index: state.get_beacon_proposer_index(
@ -219,7 +236,7 @@ impl ValidatorStatuses {
relative_epoch,
spec,
)?,
};
});
if has_common_epoch_boundary_root(a, state, state.previous_epoch(spec), spec)? {
self.total_balances.previous_epoch_boundary_attesters += attesting_balance;

View File

@ -25,6 +25,7 @@ ssz = { path = "../utils/ssz" }
ssz_derive = { path = "../utils/ssz_derive" }
swap_or_not_shuffle = { path = "../utils/swap_or_not_shuffle" }
test_random_derive = { path = "../utils/test_random_derive" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
[dev-dependencies]
env_logger = "0.6.0"

View File

@ -71,7 +71,7 @@ impl BeaconBlock {
/// Note: performs a full tree-hash of `self.body`.
///
/// Spec v0.5.0
pub fn into_header(&self) -> BeaconBlockHeader {
pub fn block_header(&self) -> BeaconBlockHeader {
BeaconBlockHeader {
slot: self.slot,
previous_block_root: self.previous_block_root,
@ -84,11 +84,11 @@ impl BeaconBlock {
/// Returns a "temporary" header, where the `state_root` is `spec.zero_hash`.
///
/// Spec v0.5.0
pub fn into_temporary_header(&self, spec: &ChainSpec) -> BeaconBlockHeader {
pub fn temporary_block_header(&self, spec: &ChainSpec) -> BeaconBlockHeader {
BeaconBlockHeader {
state_root: spec.zero_hash,
signature: spec.empty_signature.clone(),
..self.into_header()
..self.block_header()
}
}
}

View File

@ -162,7 +162,7 @@ impl BeaconState {
latest_state_roots: vec![spec.zero_hash; spec.slots_per_historical_root],
latest_active_index_roots: vec![spec.zero_hash; spec.latest_active_index_roots_length],
latest_slashed_balances: vec![0; spec.latest_slashed_exit_length],
latest_block_header: BeaconBlock::empty(spec).into_temporary_header(spec),
latest_block_header: BeaconBlock::empty(spec).temporary_block_header(spec),
historical_roots: vec![],
/*
@ -193,6 +193,13 @@ impl BeaconState {
Hash256::from_slice(&self.hash_tree_root()[..])
}
pub fn historical_batch(&self) -> HistoricalBatch {
HistoricalBatch {
block_roots: self.latest_block_roots.clone(),
state_roots: self.latest_state_roots.clone(),
}
}
/// If a validator pubkey exists in the validator registry, returns `Some(i)`, otherwise
/// returns `None`.
///
@ -379,7 +386,28 @@ impl BeaconState {
spec: &ChainSpec,
) -> Result<(), BeaconStateError> {
let i = self.get_latest_block_roots_index(slot, spec)?;
Ok(self.latest_block_roots[i] = block_root)
self.latest_block_roots[i] = block_root;
Ok(())
}
/// Safely obtains the index for `latest_randao_mixes`
///
/// Spec v0.5.0
fn get_randao_mix_index(&self, epoch: Epoch, spec: &ChainSpec) -> Result<usize, Error> {
let current_epoch = self.current_epoch(spec);
if (current_epoch - (spec.latest_randao_mixes_length as u64) < epoch)
& (epoch <= current_epoch)
{
let i = epoch.as_usize() % spec.latest_randao_mixes_length;
if i < self.latest_randao_mixes.len() {
Ok(i)
} else {
Err(Error::InsufficientRandaoMixes)
}
} else {
Err(Error::EpochOutOfBounds)
}
}
/// XOR-assigns the existing `epoch` randao mix with the hash of the `signature`.
@ -406,24 +434,24 @@ impl BeaconState {
/// Return the randao mix at a recent ``epoch``.
///
/// # Errors:
/// - `InsufficientRandaoMixes` if `self.latest_randao_mixes` is shorter than
/// `spec.latest_randao_mixes_length`.
/// - `EpochOutOfBounds` if the state no longer stores randao mixes for the given `epoch`.
///
/// Spec v0.5.0
pub fn get_randao_mix(&self, epoch: Epoch, spec: &ChainSpec) -> Result<&Hash256, Error> {
let current_epoch = self.current_epoch(spec);
let i = self.get_randao_mix_index(epoch, spec)?;
Ok(&self.latest_randao_mixes[i])
}
if (current_epoch - (spec.latest_randao_mixes_length as u64) < epoch)
& (epoch <= current_epoch)
{
self.latest_randao_mixes
.get(epoch.as_usize() % spec.latest_randao_mixes_length)
.ok_or_else(|| Error::InsufficientRandaoMixes)
} else {
Err(Error::EpochOutOfBounds)
}
/// Set the randao mix at a recent ``epoch``.
///
/// Spec v0.5.0
pub fn set_randao_mix(
&mut self,
epoch: Epoch,
mix: Hash256,
spec: &ChainSpec,
) -> Result<(), Error> {
let i = self.get_randao_mix_index(epoch, spec)?;
self.latest_randao_mixes[i] = mix;
Ok(())
}
/// Safely obtains the index for `latest_active_index_roots`, given some `epoch`.
@ -466,7 +494,8 @@ impl BeaconState {
spec: &ChainSpec,
) -> Result<(), Error> {
let i = self.get_active_index_root_index(epoch, spec)?;
Ok(self.latest_active_index_roots[i] = index_root)
self.latest_active_index_roots[i] = index_root;
Ok(())
}
/// Replace `active_index_roots` with clones of `index_root`.
@ -511,7 +540,8 @@ impl BeaconState {
spec: &ChainSpec,
) -> Result<(), Error> {
let i = self.get_latest_state_roots_index(slot, spec)?;
Ok(self.latest_state_roots[i] = state_root)
self.latest_state_roots[i] = state_root;
Ok(())
}
/// Safely obtains the index for `latest_slashed_balances`, given some `epoch`.
@ -547,7 +577,8 @@ impl BeaconState {
spec: &ChainSpec,
) -> Result<(), Error> {
let i = self.get_slashed_balance_index(epoch, spec)?;
Ok(self.latest_slashed_balances[i] = balance)
self.latest_slashed_balances[i] = balance;
Ok(())
}
/// Generate a seed for the given `epoch`.
@ -588,24 +619,6 @@ impl BeaconState {
epoch + 1 + spec.activation_exit_delay
}
/// Activate the validator of the given ``index``.
///
/// Spec v0.5.0
pub fn activate_validator(
&mut self,
validator_index: usize,
is_genesis: bool,
spec: &ChainSpec,
) {
let current_epoch = self.current_epoch(spec);
self.validator_registry[validator_index].activation_epoch = if is_genesis {
spec.genesis_epoch
} else {
self.get_delayed_activation_exit_epoch(current_epoch, spec)
}
}
/// Initiate an exit for the validator of the given `index`.
///
/// Spec v0.5.0

View File

@ -107,6 +107,7 @@ impl EpochCache {
})
}
/// Return a vec of `CrosslinkCommittee` for a given slot.
pub fn get_crosslink_committees_at_slot(
&self,
slot: Slot,
@ -116,6 +117,8 @@ impl EpochCache {
.get_crosslink_committees_at_slot(slot, spec)
}
/// Return `Some(CrosslinkCommittee)` if the given shard has a committee during the given
/// `epoch`.
pub fn get_crosslink_committee_for_shard(
&self,
shard: Shard,
@ -131,6 +134,10 @@ impl EpochCache {
}
}
/// Returns a list of all `validator_registry` indices where the validator is active at the given
/// `epoch`.
///
/// Spec v0.5.0
pub fn get_active_validator_indices(validators: &[Validator], epoch: Epoch) -> Vec<usize> {
let mut active = Vec::with_capacity(validators.len());
@ -145,13 +152,17 @@ pub fn get_active_validator_indices(validators: &[Validator], epoch: Epoch) -> V
active
}
/// Contains all `CrosslinkCommittees` for an epoch.
#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
pub struct EpochCrosslinkCommittees {
/// The epoch the committees are present in.
epoch: Epoch,
/// Each commitee for each slot of the epoch.
pub crosslink_committees: Vec<Vec<CrosslinkCommittee>>,
}
impl EpochCrosslinkCommittees {
/// Return a new instances where all slots have zero committees.
fn new(epoch: Epoch, spec: &ChainSpec) -> Self {
Self {
epoch,
@ -159,6 +170,7 @@ impl EpochCrosslinkCommittees {
}
}
/// Return a vec of `CrosslinkCommittee` for a given slot.
fn get_crosslink_committees_at_slot(
&self,
slot: Slot,
@ -176,6 +188,7 @@ impl EpochCrosslinkCommittees {
}
}
/// Builds an `EpochCrosslinkCommittees` object.
pub struct EpochCrosslinkCommitteesBuilder {
epoch: Epoch,
shuffling_start_shard: Shard,
@ -185,6 +198,7 @@ pub struct EpochCrosslinkCommitteesBuilder {
}
impl EpochCrosslinkCommitteesBuilder {
/// Instantiates a builder that will build for the `state`'s previous epoch.
pub fn for_previous_epoch(
state: &BeaconState,
active_validator_indices: Vec<usize>,
@ -199,6 +213,7 @@ impl EpochCrosslinkCommitteesBuilder {
}
}
/// Instantiates a builder that will build for the `state`'s next epoch.
pub fn for_current_epoch(
state: &BeaconState,
active_validator_indices: Vec<usize>,
@ -213,6 +228,10 @@ impl EpochCrosslinkCommitteesBuilder {
}
}
/// Instantiates a builder that will build for the `state`'s next epoch.
///
/// Note: there are two possible epoch builds for the next epoch, one where there is a registry
/// change and one where there is not.
pub fn for_next_epoch(
state: &BeaconState,
active_validator_indices: Vec<usize>,
@ -257,6 +276,7 @@ impl EpochCrosslinkCommitteesBuilder {
})
}
/// Consumes the builder, returning a fully-build `EpochCrosslinkCommittee`.
pub fn build(self, spec: &ChainSpec) -> Result<EpochCrosslinkCommittees, Error> {
// The shuffler fails on a empty list, so if there are no active validator indices, simply
// return an empty list.
@ -284,7 +304,6 @@ impl EpochCrosslinkCommitteesBuilder {
for (i, slot) in self.epoch.slot_iter(spec.slots_per_epoch).enumerate() {
for j in (0..committees.len())
.into_iter()
.skip(i * committees_per_slot)
.take(committees_per_slot)
{

View File

@ -114,6 +114,13 @@ pub struct ChainSpec {
domain_deposit: u32,
domain_exit: u32,
domain_transfer: u32,
/*
* Network specific parameters
*
*/
pub boot_nodes: Vec<Multiaddr>,
pub network_id: u8,
}
impl ChainSpec {
@ -245,6 +252,30 @@ impl ChainSpec {
domain_deposit: 3,
domain_exit: 4,
domain_transfer: 5,
/*
* Boot nodes
*/
boot_nodes: vec![],
network_id: 1, // foundation network id
}
}
/// Returns a `ChainSpec` compatible with the Lighthouse testnet specification.
///
/// Spec v0.4.0
pub fn lighthouse_testnet() -> Self {
/*
* Lighthouse testnet bootnodes
*/
let boot_nodes = vec!["/ip4/127.0.0.1/tcp/9000"
.parse()
.expect("correct multiaddr")];
Self {
boot_nodes,
network_id: 2, // lighthouse testnet network id
..ChainSpec::few_validators()
}
}

View File

@ -85,3 +85,6 @@ pub type AttesterMap = HashMap<(u64, u64), Vec<usize>>;
pub type ProposerMap = HashMap<u64, usize>;
pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature};
pub use libp2p::floodsub::{Topic, TopicBuilder};
pub use libp2p::multiaddr;
pub use libp2p::Multiaddr;

View File

@ -33,7 +33,7 @@ impl RelativeEpoch {
/// Returns the `epoch` that `self` refers to, with respect to the `base` epoch.
///
/// Spec v0.5.0
pub fn into_epoch(&self, base: Epoch) -> Epoch {
pub fn into_epoch(self, base: Epoch) -> Epoch {
match self {
RelativeEpoch::Previous => base - 1,
RelativeEpoch::Current => base,

View File

@ -214,7 +214,7 @@ impl TestingBeaconStateBuilder {
- spec.min_attestation_inclusion_delay;
let last_slot = std::cmp::min(state.slot.as_u64(), last_slot);
for slot in first_slot..last_slot + 1 {
for slot in first_slot..=last_slot {
let slot = Slot::from(slot);
let committees = state

View File

@ -47,7 +47,7 @@ impl TestingDepositBuilder {
self.deposit
.deposit_data
.deposit_input
.withdrawal_credentials = withdrawal_credentials.clone();
.withdrawal_credentials = withdrawal_credentials;
self.deposit.deposit_data.deposit_input.proof_of_possession = self
.deposit