Merge branch 'master' into shuffling

This commit is contained in:
Age Manning 2018-10-10 14:41:49 +11:00
commit ddc8037487
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
17 changed files with 265 additions and 886 deletions

View File

@ -12,7 +12,6 @@ clap = "2.32.0"
db = { path = "lighthouse/db" }
dirs = "1.0.3"
futures = "0.1.23"
network-libp2p = { path = "network-libp2p" }
rand = "0.3"
rlp = { git = "https://github.com/paritytech/parity-common" }
slog = "^2.2.3"

267
README.md
View File

@ -1,59 +1,252 @@
# Lighthouse: a (future) Ethereum 2.0 client
# Lighthouse: an Ethereum 2.0 client
[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/sigp/lighthouse?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)
A **work-in-progress** implementation of the Ethereum 2.0 Beacon Chain in Rust.
A work-in-progress, open-source implementation of the Ethereum 2.0 Beacon
Chain, maintained by Sigma Prime.
It is an implementation of the [Full PoS Casper chain
v2](https://notes.ethereum.org/SCIg8AH5SA-O4C1G1LYZHQ?view) spec and is also
largely based upon the
[ethereum/beacon_chain](https://github.com/ethereum/beacon_chain) repo.
## Introduction
**NOTE: the cryptography libraries used in this implementation are very
experimental and as such all cryptography should be assumed to be insecure.**
This readme is split into two major sections:
## Motivation
- [Lighthouse Client](#lighthouse-client): information about this
implementation.
- [What is Ethereum 2.0](#what-is-ethereum-20): an introduction to Ethereum 2.0.
The objective of this project is to build a purely Ethereum 2.0 client from
the ground up.
If you'd like some background on Sigma Prime, please see the [Lighthouse Update
\#00](https://lighthouse.sigmaprime.io/update-00.html) blog post or the
[company website](https://sigmaprime.io).
As such, the early days of Lighthouse will be very much a research effort -- it
will be evolving on the bleeding-edge of specification without requiring to
maintain prod-grade stability or backwards-compatibility for the existing PoW
chain.
## Lighthouse Client
Whilst the Beacon Chain relies upon the PoW chain for block hashes, Lighthouse
will need to run alongside an existing client (e.g., Geth, Parity Ethereum),
only being able to stand by itself once the PoW chain has been deprecated.
Lighthouse is an open-source Ethereum 2.0 client that is currently under
development. Designed as an Ethereum 2.0-only client, Lighthouse will not
re-implement the existing proof-of-work protocol. Maintaining a forward-focus
on Ethereum 2.0 ensures that Lighthouse avoids reproducing the high-quality
work already undertaken by existing projects. As such, Lighthouse will connect
to existing clients, such as
[Geth](https://github.com/ethereum/go-ethereum) or
[Parity-Ethereum](https://github.com/paritytech/parity-ethereum), via RPC to enable
present-Ethereum functionality.
Lighthouse aims to assist in advancing the progress of the following Ethereum
technologies:
### Goals
- Proof-of-Stake
- Sharding
- EVM alternatives (e.g., WASM)
- Scalable, topic-based P2P networks (e.g., libp2p-gossipsub)
- Scalable signature schemes (e.g, BLS aggregates)
The purpose of this project is to further research and development towards a
secure, efficient, and decentralized Ethereum protocol, facilitated by a new
open-source Ethereum 2.0 client.
## Progress
In addition to implementing a new client, the project seeks to maintain and
improve the Ethereum protocol wherever possible.
As of 02/08/2018, there is a basic libp2p implementation alongside a series of
state objects and state transition functions. There are no syncing capabilities.
### Components
## Usage
The following list describes some of the components actively under development
by the team:
You can run the tests like this:
- **BLS cryptography**: Lighthouse presently use the [Apache
Milagro](https://milagro.apache.org/) cryptography library to create and
verify BLS aggregate signatures. BLS signatures are core to Eth 2.0 as they
allow the signatures of many validators to be compressed into a constant 96
bytes and efficiently verified. The Lighthouse project is presently
maintaining its own [BLS aggregates
library](https://github.com/sigp/signature-schemes), gratefully forked from
[@lovesh](https://github.com/lovesh).
- **DoS-resistant block pre-processing**: Processing blocks in proof-of-stake
is more resource intensive than proof-of-work. As such, clients need to
ensure that bad blocks can be rejected as efficiently as possible. At
present, blocks having 10 million ETH staked can be processed in 0.006
seconds, and invalid blocks are rejected even more quickly. See [issue
#103](https://github.com/ethereum/beacon_chain/issues/103) on
[ethereum/beacon_chain](https://github.com/ethereum/beacon_chain).
.
- **P2P networking**: Eth 2.0 will likely use the [libp2p
framework](https://libp2p.io/). Lighthouse aims to work alongside
[Parity](https://www.parity.io/) to ensure
[libp2p-rust](https://github.com/libp2p/rust-libp2p) is fit-for-purpose.
- **Validator duties** : The project involves development of "validator
services" for users who wish to stake ETH. To fulfill their duties,
validators require a consistent view of the chain and the ability to vote
upon blocks from both shard and beacon chains.
- **New serialization formats**: Lighthouse is working alongside researchers
from the Ethereum Foundation to develop *simpleserialize* (SSZ), a
purpose-built serialization format for sending information across a network.
Check out the [SSZ
implementation](https://github.com/sigp/lighthouse/tree/master/beacon_chain/utils/ssz)
and this
[research](https://github.com/sigp/serialization_sandbox/blob/report/report/serialization_report.md)
on serialization formats for more information.
- **Casper FFG fork-choice**: The [Casper
FFG](https://arxiv.org/abs/1710.09437) fork-choice rules allow the chain to
select a canonical chain in the case of a fork.
- **Efficient state transition logic**: State transition logic governs
updates to the validator set as validators log in/out, penalizes/rewards
validators, rotates validators across shards, and implements other core tasks.
- **Fuzzing and testing environments**: Implementation of lab environments with
continuous integration (CI) workflows, providing automated security analysis.
In addition to these components we are also working on database schemas, RPC
frameworks, specification development, database optimizations (e.g.,
bloom-filters), and tons of other interesting stuff (at least we think so).
### Contributing
**Lighthouse welcomes contributors with open-arms.**
Layer-1 infrastructure is a critical component for the ecosystem and relies
heavily on contributions from the community. Building Ethereum 2.0 is a huge
task and we refuse to conduct an inappropriate ICO or charge licensing fees.
Instead, we fund development through grants and support from Sigma Prime.
If you would like to learn more about Ethereum 2.0 and/or
[Rust](https://www.rust-lang.org/), we are more than happy to on-board you
and assign you some tasks. We aim to be as accepting and understanding as
possible; we are more than happy to up-skill contributors in exchange for their
assistance with the project.
Alternatively, if you are an ETH/Rust veteran, we'd love your input. We're
always looking for the best way to implement things and welcome all
respectful criticisms.
If you'd like to contribute, try having a look through the [open
issues](https://github.com/sigp/lighthouse/issues) (tip: look for the [good
first
issue](https://github.com/sigp/lighthouse/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22)
tag) and ping us on the [gitter](https://gitter.im/sigp/lighthouse) channel. We need
your support!
### Running
**NOTE: The cryptography libraries used in this implementation are
experimental. As such all cryptography is assumed to be insecure.**
This code-base is still very much under-development and does not provide any
user-facing functionality. For developers and researchers, there are several
tests and benchmarks which may be of interest.
To run tests, use:
```
$ git clone <url>
$ cd rust_beacon_chain
$ cargo test
$ cargo test --all
```
To run benchmarks, use:
```
$ cargo bench --all
```
Lighthouse presently runs on Rust `stable`, however, benchmarks currently require the
`nightly` version.
### Engineering Ethos
Lighthouse aims to produce many small easily-tested components, each separated
into individual crates wherever possible.
Generally, tests can be kept in the same file, as is typical in Rust.
Integration tests should be placed in the `tests` directory in the crate's
root. Particularity large (line-count) tests should be placed into a separate
file.
A function is not considered complete until a test exists for it. We produce
tests to protect against regression (accidentally breaking things) and to
provide examples that help readers of the code base understand how functions
should (or should not) be used.
Each pull request is to be reviewed by at least one "core developer" (i.e.,
someone with write-access to the repository). This helps to ensure bugs are
detected, consistency is maintained, and responsibility of errors is dispersed.
Discussion must be respectful and intellectual. Have fun and make jokes, but
always respect the limits of other people.
### Directory Structure
Here we provide an overview of the directory structure:
- `/beacon_chain`: contains logic derived directly from the specification.
E.g., shuffling algorithms, state transition logic and structs, block
validation, BLS crypto, etc.
- `/lighthouse`: contains logic specific to this client implementation. E.g.,
CLI parsing, RPC end-points, databases, etc.
## Contact
This repo is presently authored by Paul Hauner as a
[Sigma Prime](https://github.com/sigp) project.
The best place for discussion is the [sigp/lighthouse gitter](https://gitter.im/sigp/lighthouse).
Ping @paulhauner or @AgeManning to get the quickest response.
The best place for discussion is probably the [ethereum/sharding
gitter](https://gitter.im/ethereum/sharding).
# What is Ethereum 2.0
Ethereum 2.0 refers to a new blockchain system currently under development by
the Ethereum Foundation and the Ethereum community. The Ethereum 2.0 blockchain
consists of 1,025 proof-of-stake blockchains. This includes the "beacon chain"
and 1,024 "shard chains".
## Beacon Chain
The concept of a beacon chain differs from existing blockchains, such as
Bitcoin and Ethereum, in that it doesn't process transactions per se. Instead,
it maintains a set of bonded (staked) validators and coordinates these to
provide services to a static set of *sub-blockchains* (i.e. shards). Each of
these shard blockchains processes normal transactions (e.g. "Transfer 5 ETH
from A to B") in parallel whilst deferring consensus mechanisms to the beacon
chain.
Major services provided by the beacon chain to its shards include the following:
- A source of entropy, likely using a [RANDAO + VDF
scheme](https://ethresear.ch/t/minimal-vdf-randomness-beacon/3566).
- Validator management, including:
- Inducting and ejecting validators.
- Assigning randomly-shuffled subsets of validators to particular shards.
- Penalizing and rewarding validators.
- Proof-of-stake consensus for shard chain blocks.
## Shard Chains
Shards are analogous to CPU cores - they're a resource where transactions can
execute in series (one-after-another). Presently, Ethereum is single-core and
can only _fully_ process one transaction at a time. Sharding allows processing
of multiple transactions simultaneously, greatly increasing the per-second
transaction capacity of Ethereum.
Each shard uses a proof-of-stake consensus mechanism and shares its validators
(stakers) with other shards. The beacon chain rotates validators
pseudo-randomly between different shards. Shards will likely be the basis of
layer-2 transaction processing schemes, however, that is not in scope of this
discussion.
## The Proof-of-Work Chain
The present-Ethereum proof-of-work (PoW) chain will host a smart contract that
enables accounts to deposit 32 ETH, a BLS public key, and some [other
parameters](https://github.com/ethereum/eth2.0-specs/blob/master/specs/casper_sharding_v2.1.md#pow-chain-changes),
allowing them to become beacon chain validators. Each beacon chain will
reference a PoW block hash allowing PoW clients to use the beacon chain as a
source of [Casper FFG finality](https://arxiv.org/abs/1710.09437), if desired.
It is a requirement that ETH can move freely between shard chains, as well as between
Eth 2.0 and present-Ethereum blockchains. The exact mechanics of these transfers remain
an active topic of research and their details are yet to be confirmed.
## Ethereum 2.0 Progress
Ethereum 2.0 is not fully specified and a working implementation does not yet
exist. Some teams have demos available which indicate progress, but do not
constitute a complete product. We look forward to providing user functionality
once we are ready to provide a minimum-viable user experience.
The work-in-progress Eth 2.0 specification lives
[here](https://github.com/ethereum/eth2.0-specs/blob/master/specs/casper_sharding_v2.1.md)
in the [ethereum/eth2.0-specs](https://github.com/ethereum/eth2.0-specs)
repository. The spec is still in a draft phase, however there are several teams
basing their Eth 2.0 implementations upon it while the Ethereum Foundation research
team continue to fill in the gaps. There is active discussion about the specification in the
[ethereum/sharding](https://gitter.im/ethereum/sharding) gitter channel. A
proof-of-concept implementation in Python is available at
[ethereum/beacon_chain](https://github.com/ethereum/beacon_chain).
Presently, the specification focuses almost exclusively on the beacon chain,
as it is the focus of current development efforts. Progress on shard chain
specification will soon follow.

View File

@ -39,11 +39,3 @@ pub type AttesterMap = HashMap<(u64, u16), Vec<usize>>;
/// Maps a slot to a block proposer.
pub type ProposerMap = HashMap<u64, usize>;
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}

View File

@ -1,20 +0,0 @@
extern crate rlp;
extern crate ethereum_types;
extern crate blake2_rfc as blake2;
extern crate bytes;
extern crate ssz;
mod common;
pub mod active_state;
pub mod attestation_record;
pub mod crystallized_state;
pub mod chain_config;
pub mod block;
pub mod crosslink_record;
pub mod shard_and_committee;
pub mod validator_record;
use super::bls;
use super::db;
use super::utils;

33
docs/onboarding.md Normal file
View File

@ -0,0 +1,33 @@
# Learn how to contribute to ETH 2.0!
Lighthouse is an Ethereum 2.0 client built in Rust.
If you are interested in contributing to the Ethereum ecosystem, and you want to learn Rust, Lighthouse is a great project to work on.
Initially this doc will contain reading material to help get you started in Rust and Ethereum. Eventually it will have guides specific to Lighthouse.
## Learn Rust
* [The Rust Programming Language](https://doc.rust-lang.org/book/2018-edition/index.html)
## Learn Ethereum
#### General Ethereum Resources
* [What is Ethereum](http://ethdocs.org/en/latest/introduction/what-is-ethereum.html)
* [Ethereum Introduction](https://github.com/ethereum/wiki/wiki/Ethereum-introduction)
#### Ethereum 2.0
* [Ethereum 2.0 Spec - Casper and Sharding](https://github.com/ethereum/eth2.0-specs/blob/master/specs/beacon-chain.md)
#### Sharding
* [How to Scale Ethereum: Sharding Explained](https://medium.com/prysmatic-labs/how-to-scale-ethereum-sharding-explained-ba2e283b7fce)
#### Casper
* [Proof of Stake - Casper FFG](https://www.youtube.com/watch?v=uQ3IqLDf-oo)
* [Beacon Casper Chain](https://www.youtube.com/watch?v=GAywmwGToUI)
### TODO
- add reading material as we discover.
- start developing guides specific to lighthouse.

View File

@ -1,83 +0,0 @@
use std::sync::Arc;
use std::thread;
use super::db::{ DiskDB };
use super::config::LighthouseConfig;
use super::futures::sync::mpsc::{
unbounded,
};
use super::network_libp2p::service::listen as network_listen;
use super::network_libp2p::state::NetworkState;
use super::slog::Logger;
use super::sync::run_sync_future;
use super::db::ClientDB;
/// Represents the co-ordination of the
/// networking, syncing and RPC (not-yet-implemented) threads.
pub struct Client {
pub db: Arc<ClientDB>,
pub network_thread: thread::JoinHandle<()>,
pub sync_thread: thread::JoinHandle<()>,
}
impl Client {
/// Instantiates a new "Client".
///
/// Presently, this means starting network and sync threads
/// and plumbing them together.
pub fn new(config: &LighthouseConfig,
log: &Logger)
-> Self
{
// Open the local db
let db = {
let db = DiskDB::open(&config.data_dir, None);
Arc::new(db)
};
// Start the network thread
let network_state = NetworkState::new(
&config.data_dir,
config.p2p_listen_port,
&log).expect("Network setup failed"); let (network_thread, network_tx, network_rx) = {
let (message_sender, message_receiver) = unbounded();
let (event_sender, event_receiver) = unbounded();
let network_log = log.new(o!());
let thread = thread::spawn(move || {
network_listen(
network_state,
&event_sender,
message_receiver,
&network_log,
);
});
(thread, message_sender, event_receiver)
};
// Start the sync thread
let (sync_thread, _sync_tx, _sync_rx) = {
let (sync_out_sender, sync_out_receiver) = unbounded();
let (sync_in_sender, sync_in_receiver) = unbounded();
let sync_log = log.new(o!());
let sync_db = db.clone();
let thread = thread::spawn(move || {
run_sync_future(
sync_db,
network_tx.clone(),
network_rx,
&sync_out_sender,
&sync_in_receiver,
sync_log,
);
});
(thread, sync_in_sender, sync_out_receiver)
};
// Return the client struct
Self {
db,
network_thread,
sync_thread,
}
}
}

View File

@ -4,13 +4,10 @@ extern crate slog_term;
extern crate slog_async;
// extern crate ssz;
extern crate clap;
extern crate network_libp2p;
extern crate futures;
extern crate db;
mod client;
mod sync;
mod config;
use std::path::PathBuf;
@ -18,7 +15,6 @@ use std::path::PathBuf;
use slog::Drain;
use clap::{ Arg, App };
use config::LighthouseConfig;
use client::Client;
fn main() {
let decorator = slog_term::TermDecorator::new().build();
@ -64,8 +60,8 @@ fn main() {
"data_dir" => &config.data_dir.to_str(),
"port" => &config.p2p_listen_port);
let client = Client::new(&config, &log);
client.sync_thread.join().unwrap();
error!(log,
"Lighthouse under development and does not provide a user demo.");
info!(log, "Exiting.");
}

View File

@ -1,12 +0,0 @@
extern crate futures;
extern crate slog;
extern crate tokio;
extern crate network_libp2p;
pub mod network;
pub mod sync_future;
pub mod wire_protocol;
pub use self::sync_future::run_sync_future;
use super::db;

View File

@ -1,86 +0,0 @@
use std::sync::Arc;
use super::db::ClientDB;
use slog::Logger;
use super::network_libp2p::message::{
NetworkEvent,
OutgoingMessage,
NetworkEventType,
};
use super::wire_protocol::{
WireMessage,
WireMessageHeader,
};
use super::futures::sync::mpsc::{
UnboundedSender,
};
/// Accept a network event and perform all required processing.
///
/// This function should be called whenever an underlying network
/// (e.g., libp2p) has an event to push up to the sync process.
pub fn handle_network_event(
event: NetworkEvent,
db: &Arc<ClientDB>,
network_tx: &UnboundedSender<OutgoingMessage>,
log: &Logger)
-> Result<(), ()>
{
debug!(&log, "";
"network_event" => format!("{:?}", &event));
match event.event {
NetworkEventType::PeerConnect => Ok(()),
NetworkEventType::PeerDrop => Ok(()),
NetworkEventType::Message => {
if let Some(data) = event.data {
handle_network_message(
&data,
&db,
&network_tx,
&log)
} else {
Ok(())
}
}
}
}
/// Accept a message from the network and perform all required
/// processing.
///
/// This function should be called whenever a peer from a network
/// (e.g., libp2p) has sent a message to us.
fn handle_network_message(
message: &[u8],
db: &Arc<ClientDB>,
_network_tx: &UnboundedSender<OutgoingMessage>,
log: &Logger)
-> Result<(), ()>
{
match WireMessage::decode(&message) {
Ok(msg) => {
match msg.header {
WireMessageHeader::Blocks => {
process_unverified_blocks(
msg.body,
&db,
&log
);
Ok(())
}
}
}
Err(_) => {
Ok(()) // No need to pass the error back
}
}
}
fn process_unverified_blocks(_message: &[u8],
_db: &Arc<ClientDB>,
_log: &Logger)
{
//
}

View File

@ -1,48 +0,0 @@
use super::tokio;
use super::futures::{ Future, Stream };
use super::futures::sync::mpsc::{
UnboundedReceiver,
UnboundedSender,
};
use super::network_libp2p::message::{
NetworkEvent,
OutgoingMessage,
};
use super::network::handle_network_event;
use std::sync::Arc;
use super::db::ClientDB;
use slog::Logger;
type NetworkSender = UnboundedSender<OutgoingMessage>;
type NetworkReceiver = UnboundedReceiver<NetworkEvent>;
type SyncSender = UnboundedSender<Vec<u8>>;
type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// Start a syncing tokio future.
///
/// Uses green-threading to process messages
/// from the network and the RPC and update
/// the state.
pub fn run_sync_future(
db: Arc<ClientDB>,
network_tx: NetworkSender,
network_rx: NetworkReceiver,
_sync_tx: &SyncSender,
_sync_rx: &SyncReceiver,
log: Logger)
{
let network_future = {
network_rx
.for_each(move |event| {
handle_network_event(
event,
&db.clone(),
&network_tx.clone(),
&log.clone())
})
.map_err(|_| panic!("rx failed"))
};
tokio::run(network_future);
}

View File

@ -1,92 +0,0 @@
pub enum WireMessageDecodeError {
TooShort,
UnknownType,
}
pub enum WireMessageHeader {
Blocks,
/*
// Leave out until used
Status,
NewBlockHashes,
GetBlockHashes,
BlockHashes,
GetBlocks,
NewBlock,
*/
}
pub struct WireMessage<'a> {
pub header: WireMessageHeader,
pub body: &'a [u8],
}
impl<'a> WireMessage<'a> {
pub fn decode(bytes: &'a [u8])
-> Result<Self, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageHeader::Blocks),
_ => None
};
match header {
Some(header) => Ok(Self{header, body}),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
}
/*
pub fn decode_wire_message(bytes: &[u8])
-> Result<WireMessage, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageType::Blocks),
_ => None
};
match header {
Some(header) => Ok((header, body)),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
/// Determines the message type of some given
/// message.
///
/// Does not check the validity of the message data,
/// it just reads the first byte.
pub fn message_type(message: &Vec<u8>)
-> Option<WireMessageType>
{
match message.get(0) {
Some(0x06) => Some(WireMessageType::Blocks),
_ => None
}
}
pub fn identify_wire_protocol_message(message: &Vec<u8>)
-> Result<(WireMessageType, &[u8]), WireMessageDecodeError>
{
fn strip_header(v: &Vec<u8>) -> &[u8] {
match v.get(1..v.len()) {
None => &vec![],
Some(s) => s
}
}
match message.get(0) {
Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))),
None => Err(WireMessageDecodeError::TooShort),
_ => Err(WireMessageDecodeError::UnknownType),
}
}
*/

View File

@ -1,24 +0,0 @@
[package]
name = "network-libp2p"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
[dependencies]
bigint = "4.2"
bytes = ""
eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" }
futures = "0.1.23"
libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
pem = "0.5.0"
rand = "0.3"
slog = "^2.2.3"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-stdin = "0.1"
tokio-timer = "0.1"

View File

@ -1,7 +0,0 @@
# libp2p Network
This is a fairly scrappy implementation of libp2p floodsub for the following
reasons:
- There is not presently a gossip-sub implementation for Rust libp2p.
- The networking layer for the beacon_chain is not yet finalized.

View File

@ -1,10 +0,0 @@
extern crate libp2p_core;
extern crate libp2p_peerstore;
extern crate pem;
extern crate secp256k1;
#[macro_use]
extern crate slog;
pub mod message;
pub mod service;
pub mod state;

View File

@ -1,18 +0,0 @@
#[derive(Debug)]
pub enum NetworkEventType {
PeerConnect,
PeerDrop,
Message,
}
#[derive(Debug)]
pub struct NetworkEvent {
pub event: NetworkEventType,
pub data: Option<Vec<u8>>,
}
#[derive(Debug)]
pub struct OutgoingMessage {
pub peer: Option<String>,
pub data: Vec<u8>,
}

View File

@ -1,315 +0,0 @@
extern crate bigint;
extern crate bytes;
extern crate futures;
extern crate libp2p_peerstore;
extern crate libp2p_floodsub;
extern crate libp2p_identify;
extern crate libp2p_core;
extern crate libp2p_mplex;
extern crate libp2p_tcp_transport;
extern crate libp2p_kad;
extern crate slog;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_timer;
extern crate tokio_stdin;
use super::state::NetworkState;
use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage };
use self::bigint::U512;
use self::futures::{ Future, Stream, Poll };
use self::futures::sync::mpsc::{
UnboundedSender, UnboundedReceiver
};
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr,
Transport, ConnectionUpgrade };
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade };
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
use self::slog::Logger;
use std::sync::{ Arc, RwLock };
use std::time::{ Duration, Instant };
use std::ops::Deref;
use std::io::Error as IoError;
use self::tokio_io::{ AsyncRead, AsyncWrite };
use self::bytes::Bytes;
pub use self::libp2p_floodsub::Message;
pub fn listen(state: NetworkState,
events_to_app: &UnboundedSender<NetworkEvent>,
raw_rx: UnboundedReceiver<OutgoingMessage>,
log: &Logger)
{
let peer_store = state.peer_store;
let peer_id = state.peer_id;
let listen_multiaddr = state.listen_multiaddr;
let listened_addrs = Arc::new(RwLock::new(vec![]));
let rx = ApplicationReciever{ inner: raw_rx };
// Build a tokio core
let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
// Build a base TCP libp2p transport
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_upgrade(libp2p_core::upgrade::PlainTextConfig)
.with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new())
.into_connection_reuse();
// Build an identify transport to allow identification and negotiation
// of layers running atop the TCP transport (e.g., kad)
let identify_transport = {
let listened_addrs = listened_addrs.clone();
let listen_multiaddr = listen_multiaddr.clone();
IdentifyTransport::new(transport.clone(), peer_store.clone())
// Managed NAT'ed connections - ensuring the external IP
// is stored not the internal addr.
.map(move |out, _, _| {
if let(Some(ref observed), ref listen_multiaddr) =
(out.observed_addr, listen_multiaddr)
{
if let Some(viewed_from_outside) =
transport.nat_traversal(listen_multiaddr, observed)
{
listened_addrs.write().unwrap()
.push(viewed_from_outside);
}
}
out.socket
})
};
// Configure and build a Kademlia upgrade to be applied
// to the base TCP transport.
let kad_config = libp2p_kad::KademliaConfig {
parallelism: 3,
record_store: (),
peer_store,
local_peer_id: peer_id.clone(),
timeout: Duration::from_secs(2)
};
let kad_ctl_proto = libp2p_kad::
KademliaControllerPrototype::new(kad_config);
let kad_upgrade = libp2p_kad::
KademliaUpgrade::from_prototype(&kad_ctl_proto);
// Build a floodsub upgrade to allow pushing topic'ed
// messages across the network.
let (floodsub_upgrade, floodsub_rx) =
FloodSubUpgrade::new(peer_id.clone());
// Combine the Kademlia and Identify upgrades into a single
// upgrader struct.
let upgrade = ConnectionUpgrader {
kad: kad_upgrade.clone(),
floodsub: floodsub_upgrade.clone(),
identify: libp2p_identify::IdentifyProtocolConfig,
};
// Build a Swarm to manage upgrading connections to peers.
let swarm_listened_addrs = listened_addrs.clone();
let swarm_peer_id = peer_id.clone();
let (swarm_ctl, swarm_future) = libp2p_core::swarm(
identify_transport.clone().with_upgrade(upgrade),
move |upgrade, client_addr| match upgrade {
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>,
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
IdentifyInfo {
public_key: swarm_peer_id.clone().into_bytes(),
agent_version: "lighthouse/1.0.0".to_owned(),
protocol_version: "rust-libp2p/1.0.0".to_owned(),
listen_addrs: swarm_listened_addrs.read().unwrap().to_vec(),
protocols: vec![
"/ipfs/kad/1.0.0".to_owned(),
"/ipfs/id/1.0.0".to_owned(),
"/floodsub/1.0.0".to_owned(),
]
},
&client_addr
),
FinalUpgrade::Identify(IdentifyOutput::RemoteInfo { .. }) => {
unreachable!("Never dial with the identify protocol.")
}
},
);
// Start the Swarm controller listening on the local machine
let actual_addr = swarm_ctl
.listen_on(listen_multiaddr)
.expect("Failed to listen on multiaddr");
info!(log, "libp2p listening"; "listen_addr" => actual_addr.to_string());
// Convert the kad prototype into a controller by providing it the
// newly built swarm.
let (kad_ctl, kad_init) = kad_ctl_proto.start(
swarm_ctl.clone(),
identify_transport.clone().with_upgrade(kad_upgrade.clone()));
// Create a new floodsub controller using a specific topic
let topic = libp2p_floodsub::TopicBuilder::new("beacon_chain").build();
let floodsub_ctl = libp2p_floodsub::FloodSubController::new(&floodsub_upgrade);
floodsub_ctl.subscribe(&topic);
// Generate a tokio timer "wheel" future that sends kad FIND_NODE at
// a routine interval.
let kad_poll_log = log.new(o!());
let kad_poll_event_tx = events_to_app.clone();
let kad_poll = {
let polling_peer_id = peer_id.clone();
tokio_timer::wheel()
.build()
.interval_at(Instant::now(), Duration::from_secs(30))
.map_err(|_| -> IoError { unreachable!() })
.and_then(move |()| kad_ctl.find_node(peer_id.clone()))
.for_each(move |peers| {
let local_hash = U512::from(polling_peer_id.hash());
for peer in peers {
let peer_hash = U512::from(peer.hash());
let distance = 512 - (local_hash ^ peer_hash).leading_zeros();
info!(kad_poll_log, "Discovered peer";
"distance" => distance,
"peer_id" => peer.to_base58());
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
let dial_result = swarm_ctl.dial(
peer_addr,
identify_transport.clone().with_upgrade(floodsub_upgrade.clone())
);
if let Err(err) = dial_result {
warn!(kad_poll_log, "Dialling {:?} failed.", err)
};
let event = NetworkEvent {
event: NetworkEventType::PeerConnect,
data: None,
};
kad_poll_event_tx.unbounded_send(event)
.expect("Network unable to contact application.");
};
Ok(())
})
};
// Create a future to handle message recieved from the network
let floodsub_rx = floodsub_rx.for_each(|msg| {
debug!(&log, "Network receive"; "msg" => format!("{:?}", msg));
let event = NetworkEvent {
event: NetworkEventType::Message,
data: Some(msg.data),
};
events_to_app.unbounded_send(event)
.expect("Network unable to contact application.");
Ok(())
});
// Create a future to handle messages recieved from the application
let app_rx = rx.for_each(|msg| {
debug!(&log, "Network publish"; "msg" => format!("{:?}", msg));
floodsub_ctl.publish(&topic, msg.data);
Ok(())
});
// Generate a full future
let final_future = swarm_future
.select(floodsub_rx).map_err(|(err, _)| err).map(|((), _)| ())
.select(app_rx).map_err(|(err, _)| err).map(|((), _)| ())
.select(kad_poll).map_err(|(err, _)| err).map(|((), _)| ())
.select(kad_init).map_err(|(err, _)| err).and_then(|((), n)| n);
core.run(final_future).unwrap();
}
struct ApplicationReciever {
inner: UnboundedReceiver<OutgoingMessage>,
}
impl Stream for ApplicationReciever {
type Item = OutgoingMessage;
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner
.poll()
.map_err(|_| unreachable!())
}
}
#[derive(Clone)]
struct ConnectionUpgrader<P, R> {
kad: KademliaUpgrade<P, R>,
identify: libp2p_identify::IdentifyProtocolConfig,
floodsub: FloodSubUpgrade,
}
impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R>
where
C: AsyncRead + AsyncWrite + 'static,
P: Deref<Target = Pc> + Clone + 'static,
for<'r> &'r Pc: libp2p_peerstore::Peerstore,
R: 'static
{
type NamesIter = ::std::vec::IntoIter<(Bytes, usize)>;
type UpgradeIdentifier = usize;
type Output = FinalUpgrade<C>;
type Future = Box<Future<Item = FinalUpgrade<C>, Error = IoError>>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
vec![
(Bytes::from("/ipfs/kad/1.0.0"), 0),
(Bytes::from("/ipfs/id/1.0.0"), 1),
(Bytes::from("/floodsub/1.0.0"), 2),
].into_iter()
}
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr)
-> Self::Future
{
match id {
0 => Box::new(
self.kad
.upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into())),
1 => Box::new(
self.identify
.upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into())),
2 => Box::new(
self.floodsub
.upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into()),
),
_ => unreachable!()
}
}
}
enum FinalUpgrade<C> {
Kad(KademliaProcessingFuture),
Identify(IdentifyOutput<C>),
FloodSub(FloodSubFuture),
}
impl<C> From<libp2p_kad::KademliaProcessingFuture> for FinalUpgrade<C> { #[inline]
fn from(upgrade: libp2p_kad::KademliaProcessingFuture) -> Self {
FinalUpgrade::Kad(upgrade)
}
}
impl<C> From<IdentifyOutput<C>> for FinalUpgrade<C> {
#[inline]
fn from(upgrade: IdentifyOutput<C>) -> Self {
FinalUpgrade::Identify(upgrade)
}
}
impl<C> From<FloodSubFuture> for FinalUpgrade<C> {
#[inline]
fn from(upgr: FloodSubFuture) -> Self {
FinalUpgrade::FloodSub(upgr)
}
}

View File

@ -1,119 +0,0 @@
extern crate rand;
use std::io::{ Read, Write };
use std::error::Error;
use std::fs::File;
use std::path::{ Path, PathBuf };
use std::sync::Arc;
use std::time::Duration;
use super::libp2p_core::Multiaddr;
use super::libp2p_peerstore::{ Peerstore, PeerAccess, PeerId };
use super::libp2p_peerstore::json_peerstore::JsonPeerstore;
use super::pem;
use super::secp256k1::Secp256k1;
use super::secp256k1::key::{ SecretKey, PublicKey };
use super::slog::Logger;
/// Location of the libp2p peerstore inside the Network base dir.
const PEERS_FILE: &str = "peerstore.json";
/// Location of the libp2p local peer secret key inside the Network base dir.
const LOCAL_PEM_FILE: &str = "local_peer_id.pem";
/// Represents the present state of a libp2p network.
pub struct NetworkState {
pub base_dir: PathBuf,
pub pubkey: PublicKey,
pub seckey: SecretKey,
pub peer_id: PeerId,
pub listen_multiaddr: Multiaddr,
pub peer_store: Arc<JsonPeerstore>,
}
impl NetworkState {
/// Create a new libp2p network state. Used to initialize
/// network service.
pub fn new(
// config: LighthouseConfig,
base_dir: &Path,
listen_port: u16,
log: &Logger)
-> Result <Self, Box<Error>>
{
let curve = Secp256k1::new();
let seckey = match
NetworkState::load_secret_key_from_pem_file(base_dir, &curve)
{
Ok(k) => k,
_ => NetworkState::generate_new_secret_key(base_dir, &curve)?
};
let pubkey = PublicKey::from_secret_key(&curve, &seckey)?;
let peer_id = PeerId::from_public_key(
&pubkey.serialize_vec(&curve, false));
info!(log, "Loaded keys"; "peer_id" => &peer_id.to_base58());
let peer_store = {
let path = base_dir.join(PEERS_FILE);
let base = JsonPeerstore::new(path)?;
Arc::new(base)
};
info!(log, "Loaded peerstore"; "peer_count" => &peer_store.peers().count());
let listen_multiaddr =
NetworkState::multiaddr_on_port(&listen_port.to_string());
Ok(Self {
base_dir: PathBuf::from(base_dir),
seckey,
pubkey,
peer_id,
listen_multiaddr,
peer_store,
})
}
/// Return a TCP multiaddress on 0.0.0.0 for a given port.
pub fn multiaddr_on_port(port: &str) -> Multiaddr {
format!("/ip4/0.0.0.0/tcp/{}", port)
.parse::<Multiaddr>().unwrap()
}
pub fn add_peer(&mut self,
peer_id: &PeerId,
multiaddr: Multiaddr,
duration_secs: u64) {
self.peer_store.peer_or_create(&peer_id)
.add_addr(multiaddr, Duration::from_secs(duration_secs));
}
/// Instantiate a SecretKey from a .pem file on disk.
pub fn load_secret_key_from_pem_file(
base_dir: &Path,
curve: &Secp256k1)
-> Result<SecretKey, Box<Error>>
{
let path = base_dir.join(LOCAL_PEM_FILE);
let mut contents = String::new();
let mut file = File::open(path)?;
file.read_to_string(&mut contents)?;
let pem_key = pem::parse(contents)?;
let key = SecretKey::from_slice(curve, &pem_key.contents)?;
Ok(key)
}
/// Generate a new SecretKey and store it on disk as a .pem file.
pub fn generate_new_secret_key(
base_dir: &Path,
curve: &Secp256k1)
-> Result<SecretKey, Box<Error>>
{
let mut rng = rand::thread_rng();
let sk = SecretKey::new(&curve, &mut rng);
let pem_key = pem::Pem {
tag: String::from("EC PRIVATE KEY"),
contents: sk[..].to_vec()
};
let s_string = pem::encode(&pem_key);
let path = base_dir.join(LOCAL_PEM_FILE);
let mut s_file = File::create(path)?;
s_file.write_all(s_string.as_bytes())?;
Ok(sk)
}
}