Merge pull request #42 from sigp/remove-libp2p
Remove all libp2p and syncing code
This commit is contained in:
commit
6dacb1c654
@ -12,7 +12,6 @@ clap = "2.32.0"
|
|||||||
db = { path = "lighthouse/db" }
|
db = { path = "lighthouse/db" }
|
||||||
dirs = "1.0.3"
|
dirs = "1.0.3"
|
||||||
futures = "0.1.23"
|
futures = "0.1.23"
|
||||||
network-libp2p = { path = "network-libp2p" }
|
|
||||||
rand = "0.3"
|
rand = "0.3"
|
||||||
rlp = { git = "https://github.com/paritytech/parity-common" }
|
rlp = { git = "https://github.com/paritytech/parity-common" }
|
||||||
slog = "^2.2.3"
|
slog = "^2.2.3"
|
||||||
|
@ -1,83 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
use std::thread;
|
|
||||||
use super::db::{ DiskDB };
|
|
||||||
use super::config::LighthouseConfig;
|
|
||||||
use super::futures::sync::mpsc::{
|
|
||||||
unbounded,
|
|
||||||
};
|
|
||||||
use super::network_libp2p::service::listen as network_listen;
|
|
||||||
use super::network_libp2p::state::NetworkState;
|
|
||||||
use super::slog::Logger;
|
|
||||||
use super::sync::run_sync_future;
|
|
||||||
|
|
||||||
use super::db::ClientDB;
|
|
||||||
|
|
||||||
/// Represents the co-ordination of the
|
|
||||||
/// networking, syncing and RPC (not-yet-implemented) threads.
|
|
||||||
pub struct Client {
|
|
||||||
pub db: Arc<ClientDB>,
|
|
||||||
pub network_thread: thread::JoinHandle<()>,
|
|
||||||
pub sync_thread: thread::JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Client {
|
|
||||||
/// Instantiates a new "Client".
|
|
||||||
///
|
|
||||||
/// Presently, this means starting network and sync threads
|
|
||||||
/// and plumbing them together.
|
|
||||||
pub fn new(config: &LighthouseConfig,
|
|
||||||
log: &Logger)
|
|
||||||
-> Self
|
|
||||||
{
|
|
||||||
// Open the local db
|
|
||||||
let db = {
|
|
||||||
let db = DiskDB::open(&config.data_dir, None);
|
|
||||||
Arc::new(db)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Start the network thread
|
|
||||||
let network_state = NetworkState::new(
|
|
||||||
&config.data_dir,
|
|
||||||
config.p2p_listen_port,
|
|
||||||
&log).expect("Network setup failed"); let (network_thread, network_tx, network_rx) = {
|
|
||||||
let (message_sender, message_receiver) = unbounded();
|
|
||||||
let (event_sender, event_receiver) = unbounded();
|
|
||||||
let network_log = log.new(o!());
|
|
||||||
let thread = thread::spawn(move || {
|
|
||||||
network_listen(
|
|
||||||
network_state,
|
|
||||||
&event_sender,
|
|
||||||
message_receiver,
|
|
||||||
&network_log,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
(thread, message_sender, event_receiver)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Start the sync thread
|
|
||||||
let (sync_thread, _sync_tx, _sync_rx) = {
|
|
||||||
let (sync_out_sender, sync_out_receiver) = unbounded();
|
|
||||||
let (sync_in_sender, sync_in_receiver) = unbounded();
|
|
||||||
let sync_log = log.new(o!());
|
|
||||||
let sync_db = db.clone();
|
|
||||||
let thread = thread::spawn(move || {
|
|
||||||
run_sync_future(
|
|
||||||
sync_db,
|
|
||||||
network_tx.clone(),
|
|
||||||
network_rx,
|
|
||||||
&sync_out_sender,
|
|
||||||
&sync_in_receiver,
|
|
||||||
sync_log,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
(thread, sync_in_sender, sync_out_receiver)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Return the client struct
|
|
||||||
Self {
|
|
||||||
db,
|
|
||||||
network_thread,
|
|
||||||
sync_thread,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -4,13 +4,10 @@ extern crate slog_term;
|
|||||||
extern crate slog_async;
|
extern crate slog_async;
|
||||||
// extern crate ssz;
|
// extern crate ssz;
|
||||||
extern crate clap;
|
extern crate clap;
|
||||||
extern crate network_libp2p;
|
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
|
|
||||||
extern crate db;
|
extern crate db;
|
||||||
|
|
||||||
mod client;
|
|
||||||
mod sync;
|
|
||||||
mod config;
|
mod config;
|
||||||
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -18,7 +15,6 @@ use std::path::PathBuf;
|
|||||||
use slog::Drain;
|
use slog::Drain;
|
||||||
use clap::{ Arg, App };
|
use clap::{ Arg, App };
|
||||||
use config::LighthouseConfig;
|
use config::LighthouseConfig;
|
||||||
use client::Client;
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let decorator = slog_term::TermDecorator::new().build();
|
let decorator = slog_term::TermDecorator::new().build();
|
||||||
@ -64,8 +60,8 @@ fn main() {
|
|||||||
"data_dir" => &config.data_dir.to_str(),
|
"data_dir" => &config.data_dir.to_str(),
|
||||||
"port" => &config.p2p_listen_port);
|
"port" => &config.p2p_listen_port);
|
||||||
|
|
||||||
let client = Client::new(&config, &log);
|
error!(log,
|
||||||
client.sync_thread.join().unwrap();
|
"Lighthouse under development and does not provide a user demo.");
|
||||||
|
|
||||||
info!(log, "Exiting.");
|
info!(log, "Exiting.");
|
||||||
}
|
}
|
||||||
|
@ -1,12 +0,0 @@
|
|||||||
extern crate futures;
|
|
||||||
extern crate slog;
|
|
||||||
extern crate tokio;
|
|
||||||
extern crate network_libp2p;
|
|
||||||
|
|
||||||
pub mod network;
|
|
||||||
pub mod sync_future;
|
|
||||||
pub mod wire_protocol;
|
|
||||||
|
|
||||||
pub use self::sync_future::run_sync_future;
|
|
||||||
|
|
||||||
use super::db;
|
|
@ -1,86 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
use super::db::ClientDB;
|
|
||||||
use slog::Logger;
|
|
||||||
|
|
||||||
use super::network_libp2p::message::{
|
|
||||||
NetworkEvent,
|
|
||||||
OutgoingMessage,
|
|
||||||
NetworkEventType,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::wire_protocol::{
|
|
||||||
WireMessage,
|
|
||||||
WireMessageHeader,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::futures::sync::mpsc::{
|
|
||||||
UnboundedSender,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Accept a network event and perform all required processing.
|
|
||||||
///
|
|
||||||
/// This function should be called whenever an underlying network
|
|
||||||
/// (e.g., libp2p) has an event to push up to the sync process.
|
|
||||||
pub fn handle_network_event(
|
|
||||||
event: NetworkEvent,
|
|
||||||
db: &Arc<ClientDB>,
|
|
||||||
network_tx: &UnboundedSender<OutgoingMessage>,
|
|
||||||
log: &Logger)
|
|
||||||
-> Result<(), ()>
|
|
||||||
{
|
|
||||||
debug!(&log, "";
|
|
||||||
"network_event" => format!("{:?}", &event));
|
|
||||||
match event.event {
|
|
||||||
NetworkEventType::PeerConnect => Ok(()),
|
|
||||||
NetworkEventType::PeerDrop => Ok(()),
|
|
||||||
NetworkEventType::Message => {
|
|
||||||
if let Some(data) = event.data {
|
|
||||||
handle_network_message(
|
|
||||||
&data,
|
|
||||||
&db,
|
|
||||||
&network_tx,
|
|
||||||
&log)
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Accept a message from the network and perform all required
|
|
||||||
/// processing.
|
|
||||||
///
|
|
||||||
/// This function should be called whenever a peer from a network
|
|
||||||
/// (e.g., libp2p) has sent a message to us.
|
|
||||||
fn handle_network_message(
|
|
||||||
message: &[u8],
|
|
||||||
db: &Arc<ClientDB>,
|
|
||||||
_network_tx: &UnboundedSender<OutgoingMessage>,
|
|
||||||
log: &Logger)
|
|
||||||
-> Result<(), ()>
|
|
||||||
{
|
|
||||||
match WireMessage::decode(&message) {
|
|
||||||
Ok(msg) => {
|
|
||||||
match msg.header {
|
|
||||||
WireMessageHeader::Blocks => {
|
|
||||||
process_unverified_blocks(
|
|
||||||
msg.body,
|
|
||||||
&db,
|
|
||||||
&log
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
Ok(()) // No need to pass the error back
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_unverified_blocks(_message: &[u8],
|
|
||||||
_db: &Arc<ClientDB>,
|
|
||||||
_log: &Logger)
|
|
||||||
{
|
|
||||||
//
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
use super::tokio;
|
|
||||||
use super::futures::{ Future, Stream };
|
|
||||||
use super::futures::sync::mpsc::{
|
|
||||||
UnboundedReceiver,
|
|
||||||
UnboundedSender,
|
|
||||||
};
|
|
||||||
use super::network_libp2p::message::{
|
|
||||||
NetworkEvent,
|
|
||||||
OutgoingMessage,
|
|
||||||
};
|
|
||||||
use super::network::handle_network_event;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use super::db::ClientDB;
|
|
||||||
use slog::Logger;
|
|
||||||
|
|
||||||
type NetworkSender = UnboundedSender<OutgoingMessage>;
|
|
||||||
type NetworkReceiver = UnboundedReceiver<NetworkEvent>;
|
|
||||||
|
|
||||||
type SyncSender = UnboundedSender<Vec<u8>>;
|
|
||||||
type SyncReceiver = UnboundedReceiver<Vec<u8>>;
|
|
||||||
|
|
||||||
/// Start a syncing tokio future.
|
|
||||||
///
|
|
||||||
/// Uses green-threading to process messages
|
|
||||||
/// from the network and the RPC and update
|
|
||||||
/// the state.
|
|
||||||
pub fn run_sync_future(
|
|
||||||
db: Arc<ClientDB>,
|
|
||||||
network_tx: NetworkSender,
|
|
||||||
network_rx: NetworkReceiver,
|
|
||||||
_sync_tx: &SyncSender,
|
|
||||||
_sync_rx: &SyncReceiver,
|
|
||||||
log: Logger)
|
|
||||||
{
|
|
||||||
let network_future = {
|
|
||||||
network_rx
|
|
||||||
.for_each(move |event| {
|
|
||||||
handle_network_event(
|
|
||||||
event,
|
|
||||||
&db.clone(),
|
|
||||||
&network_tx.clone(),
|
|
||||||
&log.clone())
|
|
||||||
})
|
|
||||||
.map_err(|_| panic!("rx failed"))
|
|
||||||
};
|
|
||||||
|
|
||||||
tokio::run(network_future);
|
|
||||||
}
|
|
@ -1,92 +0,0 @@
|
|||||||
pub enum WireMessageDecodeError {
|
|
||||||
TooShort,
|
|
||||||
UnknownType,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum WireMessageHeader {
|
|
||||||
Blocks,
|
|
||||||
/*
|
|
||||||
// Leave out until used
|
|
||||||
Status,
|
|
||||||
NewBlockHashes,
|
|
||||||
GetBlockHashes,
|
|
||||||
BlockHashes,
|
|
||||||
GetBlocks,
|
|
||||||
NewBlock,
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WireMessage<'a> {
|
|
||||||
pub header: WireMessageHeader,
|
|
||||||
pub body: &'a [u8],
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> WireMessage<'a> {
|
|
||||||
pub fn decode(bytes: &'a [u8])
|
|
||||||
-> Result<Self, WireMessageDecodeError>
|
|
||||||
{
|
|
||||||
if let Some((header_byte, body)) = bytes.split_first() {
|
|
||||||
let header = match header_byte {
|
|
||||||
0x06 => Some(WireMessageHeader::Blocks),
|
|
||||||
_ => None
|
|
||||||
};
|
|
||||||
match header {
|
|
||||||
Some(header) => Ok(Self{header, body}),
|
|
||||||
None => Err(WireMessageDecodeError::UnknownType)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(WireMessageDecodeError::TooShort)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
pub fn decode_wire_message(bytes: &[u8])
|
|
||||||
-> Result<WireMessage, WireMessageDecodeError>
|
|
||||||
{
|
|
||||||
if let Some((header_byte, body)) = bytes.split_first() {
|
|
||||||
let header = match header_byte {
|
|
||||||
0x06 => Some(WireMessageType::Blocks),
|
|
||||||
_ => None
|
|
||||||
};
|
|
||||||
match header {
|
|
||||||
Some(header) => Ok((header, body)),
|
|
||||||
None => Err(WireMessageDecodeError::UnknownType)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(WireMessageDecodeError::TooShort)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Determines the message type of some given
|
|
||||||
/// message.
|
|
||||||
///
|
|
||||||
/// Does not check the validity of the message data,
|
|
||||||
/// it just reads the first byte.
|
|
||||||
pub fn message_type(message: &Vec<u8>)
|
|
||||||
-> Option<WireMessageType>
|
|
||||||
{
|
|
||||||
match message.get(0) {
|
|
||||||
Some(0x06) => Some(WireMessageType::Blocks),
|
|
||||||
_ => None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn identify_wire_protocol_message(message: &Vec<u8>)
|
|
||||||
-> Result<(WireMessageType, &[u8]), WireMessageDecodeError>
|
|
||||||
{
|
|
||||||
fn strip_header(v: &Vec<u8>) -> &[u8] {
|
|
||||||
match v.get(1..v.len()) {
|
|
||||||
None => &vec![],
|
|
||||||
Some(s) => s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match message.get(0) {
|
|
||||||
Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))),
|
|
||||||
None => Err(WireMessageDecodeError::TooShort),
|
|
||||||
_ => Err(WireMessageDecodeError::UnknownType),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
@ -1,24 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "network-libp2p"
|
|
||||||
version = "0.1.0"
|
|
||||||
authors = ["Paul Hauner <paul@paulhauner.com>"]
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
bigint = "4.2"
|
|
||||||
bytes = ""
|
|
||||||
eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" }
|
|
||||||
futures = "0.1.23"
|
|
||||||
libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
|
|
||||||
pem = "0.5.0"
|
|
||||||
rand = "0.3"
|
|
||||||
slog = "^2.2.3"
|
|
||||||
tokio-core = "0.1"
|
|
||||||
tokio-io = "0.1"
|
|
||||||
tokio-stdin = "0.1"
|
|
||||||
tokio-timer = "0.1"
|
|
@ -1,7 +0,0 @@
|
|||||||
# libp2p Network
|
|
||||||
|
|
||||||
This is a fairly scrappy implementation of libp2p floodsub for the following
|
|
||||||
reasons:
|
|
||||||
|
|
||||||
- There is not presently a gossip-sub implementation for Rust libp2p.
|
|
||||||
- The networking layer for the beacon_chain is not yet finalized.
|
|
@ -1,10 +0,0 @@
|
|||||||
extern crate libp2p_core;
|
|
||||||
extern crate libp2p_peerstore;
|
|
||||||
extern crate pem;
|
|
||||||
extern crate secp256k1;
|
|
||||||
#[macro_use]
|
|
||||||
extern crate slog;
|
|
||||||
|
|
||||||
pub mod message;
|
|
||||||
pub mod service;
|
|
||||||
pub mod state;
|
|
@ -1,18 +0,0 @@
|
|||||||
#[derive(Debug)]
|
|
||||||
pub enum NetworkEventType {
|
|
||||||
PeerConnect,
|
|
||||||
PeerDrop,
|
|
||||||
Message,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct NetworkEvent {
|
|
||||||
pub event: NetworkEventType,
|
|
||||||
pub data: Option<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct OutgoingMessage {
|
|
||||||
pub peer: Option<String>,
|
|
||||||
pub data: Vec<u8>,
|
|
||||||
}
|
|
@ -1,315 +0,0 @@
|
|||||||
extern crate bigint;
|
|
||||||
extern crate bytes;
|
|
||||||
extern crate futures;
|
|
||||||
extern crate libp2p_peerstore;
|
|
||||||
extern crate libp2p_floodsub;
|
|
||||||
extern crate libp2p_identify;
|
|
||||||
extern crate libp2p_core;
|
|
||||||
extern crate libp2p_mplex;
|
|
||||||
extern crate libp2p_tcp_transport;
|
|
||||||
extern crate libp2p_kad;
|
|
||||||
extern crate slog;
|
|
||||||
extern crate tokio_core;
|
|
||||||
extern crate tokio_io;
|
|
||||||
extern crate tokio_timer;
|
|
||||||
extern crate tokio_stdin;
|
|
||||||
|
|
||||||
use super::state::NetworkState;
|
|
||||||
use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage };
|
|
||||||
use self::bigint::U512;
|
|
||||||
use self::futures::{ Future, Stream, Poll };
|
|
||||||
use self::futures::sync::mpsc::{
|
|
||||||
UnboundedSender, UnboundedReceiver
|
|
||||||
};
|
|
||||||
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr,
|
|
||||||
Transport, ConnectionUpgrade };
|
|
||||||
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
|
|
||||||
use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade };
|
|
||||||
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
|
|
||||||
use self::slog::Logger;
|
|
||||||
use std::sync::{ Arc, RwLock };
|
|
||||||
use std::time::{ Duration, Instant };
|
|
||||||
use std::ops::Deref;
|
|
||||||
use std::io::Error as IoError;
|
|
||||||
use self::tokio_io::{ AsyncRead, AsyncWrite };
|
|
||||||
use self::bytes::Bytes;
|
|
||||||
|
|
||||||
pub use self::libp2p_floodsub::Message;
|
|
||||||
|
|
||||||
pub fn listen(state: NetworkState,
|
|
||||||
events_to_app: &UnboundedSender<NetworkEvent>,
|
|
||||||
raw_rx: UnboundedReceiver<OutgoingMessage>,
|
|
||||||
log: &Logger)
|
|
||||||
{
|
|
||||||
let peer_store = state.peer_store;
|
|
||||||
let peer_id = state.peer_id;
|
|
||||||
let listen_multiaddr = state.listen_multiaddr;
|
|
||||||
let listened_addrs = Arc::new(RwLock::new(vec![]));
|
|
||||||
let rx = ApplicationReciever{ inner: raw_rx };
|
|
||||||
|
|
||||||
// Build a tokio core
|
|
||||||
let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
|
|
||||||
// Build a base TCP libp2p transport
|
|
||||||
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
|
||||||
.with_upgrade(libp2p_core::upgrade::PlainTextConfig)
|
|
||||||
.with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new())
|
|
||||||
.into_connection_reuse();
|
|
||||||
|
|
||||||
// Build an identify transport to allow identification and negotiation
|
|
||||||
// of layers running atop the TCP transport (e.g., kad)
|
|
||||||
let identify_transport = {
|
|
||||||
let listened_addrs = listened_addrs.clone();
|
|
||||||
let listen_multiaddr = listen_multiaddr.clone();
|
|
||||||
IdentifyTransport::new(transport.clone(), peer_store.clone())
|
|
||||||
// Managed NAT'ed connections - ensuring the external IP
|
|
||||||
// is stored not the internal addr.
|
|
||||||
.map(move |out, _, _| {
|
|
||||||
if let(Some(ref observed), ref listen_multiaddr) =
|
|
||||||
(out.observed_addr, listen_multiaddr)
|
|
||||||
{
|
|
||||||
if let Some(viewed_from_outside) =
|
|
||||||
transport.nat_traversal(listen_multiaddr, observed)
|
|
||||||
{
|
|
||||||
listened_addrs.write().unwrap()
|
|
||||||
.push(viewed_from_outside);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out.socket
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
// Configure and build a Kademlia upgrade to be applied
|
|
||||||
// to the base TCP transport.
|
|
||||||
let kad_config = libp2p_kad::KademliaConfig {
|
|
||||||
parallelism: 3,
|
|
||||||
record_store: (),
|
|
||||||
peer_store,
|
|
||||||
local_peer_id: peer_id.clone(),
|
|
||||||
timeout: Duration::from_secs(2)
|
|
||||||
};
|
|
||||||
let kad_ctl_proto = libp2p_kad::
|
|
||||||
KademliaControllerPrototype::new(kad_config);
|
|
||||||
let kad_upgrade = libp2p_kad::
|
|
||||||
KademliaUpgrade::from_prototype(&kad_ctl_proto);
|
|
||||||
|
|
||||||
// Build a floodsub upgrade to allow pushing topic'ed
|
|
||||||
// messages across the network.
|
|
||||||
let (floodsub_upgrade, floodsub_rx) =
|
|
||||||
FloodSubUpgrade::new(peer_id.clone());
|
|
||||||
|
|
||||||
// Combine the Kademlia and Identify upgrades into a single
|
|
||||||
// upgrader struct.
|
|
||||||
let upgrade = ConnectionUpgrader {
|
|
||||||
kad: kad_upgrade.clone(),
|
|
||||||
floodsub: floodsub_upgrade.clone(),
|
|
||||||
identify: libp2p_identify::IdentifyProtocolConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Build a Swarm to manage upgrading connections to peers.
|
|
||||||
let swarm_listened_addrs = listened_addrs.clone();
|
|
||||||
let swarm_peer_id = peer_id.clone();
|
|
||||||
let (swarm_ctl, swarm_future) = libp2p_core::swarm(
|
|
||||||
identify_transport.clone().with_upgrade(upgrade),
|
|
||||||
move |upgrade, client_addr| match upgrade {
|
|
||||||
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
|
|
||||||
FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>,
|
|
||||||
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
|
|
||||||
IdentifyInfo {
|
|
||||||
public_key: swarm_peer_id.clone().into_bytes(),
|
|
||||||
agent_version: "lighthouse/1.0.0".to_owned(),
|
|
||||||
protocol_version: "rust-libp2p/1.0.0".to_owned(),
|
|
||||||
listen_addrs: swarm_listened_addrs.read().unwrap().to_vec(),
|
|
||||||
protocols: vec![
|
|
||||||
"/ipfs/kad/1.0.0".to_owned(),
|
|
||||||
"/ipfs/id/1.0.0".to_owned(),
|
|
||||||
"/floodsub/1.0.0".to_owned(),
|
|
||||||
]
|
|
||||||
},
|
|
||||||
&client_addr
|
|
||||||
),
|
|
||||||
FinalUpgrade::Identify(IdentifyOutput::RemoteInfo { .. }) => {
|
|
||||||
unreachable!("Never dial with the identify protocol.")
|
|
||||||
}
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Start the Swarm controller listening on the local machine
|
|
||||||
let actual_addr = swarm_ctl
|
|
||||||
.listen_on(listen_multiaddr)
|
|
||||||
.expect("Failed to listen on multiaddr");
|
|
||||||
info!(log, "libp2p listening"; "listen_addr" => actual_addr.to_string());
|
|
||||||
|
|
||||||
// Convert the kad prototype into a controller by providing it the
|
|
||||||
// newly built swarm.
|
|
||||||
let (kad_ctl, kad_init) = kad_ctl_proto.start(
|
|
||||||
swarm_ctl.clone(),
|
|
||||||
identify_transport.clone().with_upgrade(kad_upgrade.clone()));
|
|
||||||
|
|
||||||
// Create a new floodsub controller using a specific topic
|
|
||||||
let topic = libp2p_floodsub::TopicBuilder::new("beacon_chain").build();
|
|
||||||
let floodsub_ctl = libp2p_floodsub::FloodSubController::new(&floodsub_upgrade);
|
|
||||||
floodsub_ctl.subscribe(&topic);
|
|
||||||
|
|
||||||
// Generate a tokio timer "wheel" future that sends kad FIND_NODE at
|
|
||||||
// a routine interval.
|
|
||||||
let kad_poll_log = log.new(o!());
|
|
||||||
let kad_poll_event_tx = events_to_app.clone();
|
|
||||||
let kad_poll = {
|
|
||||||
let polling_peer_id = peer_id.clone();
|
|
||||||
tokio_timer::wheel()
|
|
||||||
.build()
|
|
||||||
.interval_at(Instant::now(), Duration::from_secs(30))
|
|
||||||
.map_err(|_| -> IoError { unreachable!() })
|
|
||||||
.and_then(move |()| kad_ctl.find_node(peer_id.clone()))
|
|
||||||
.for_each(move |peers| {
|
|
||||||
let local_hash = U512::from(polling_peer_id.hash());
|
|
||||||
for peer in peers {
|
|
||||||
let peer_hash = U512::from(peer.hash());
|
|
||||||
let distance = 512 - (local_hash ^ peer_hash).leading_zeros();
|
|
||||||
info!(kad_poll_log, "Discovered peer";
|
|
||||||
"distance" => distance,
|
|
||||||
"peer_id" => peer.to_base58());
|
|
||||||
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
|
|
||||||
let dial_result = swarm_ctl.dial(
|
|
||||||
peer_addr,
|
|
||||||
identify_transport.clone().with_upgrade(floodsub_upgrade.clone())
|
|
||||||
);
|
|
||||||
if let Err(err) = dial_result {
|
|
||||||
warn!(kad_poll_log, "Dialling {:?} failed.", err)
|
|
||||||
};
|
|
||||||
let event = NetworkEvent {
|
|
||||||
event: NetworkEventType::PeerConnect,
|
|
||||||
data: None,
|
|
||||||
};
|
|
||||||
kad_poll_event_tx.unbounded_send(event)
|
|
||||||
.expect("Network unable to contact application.");
|
|
||||||
};
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
// Create a future to handle message recieved from the network
|
|
||||||
let floodsub_rx = floodsub_rx.for_each(|msg| {
|
|
||||||
debug!(&log, "Network receive"; "msg" => format!("{:?}", msg));
|
|
||||||
let event = NetworkEvent {
|
|
||||||
event: NetworkEventType::Message,
|
|
||||||
data: Some(msg.data),
|
|
||||||
};
|
|
||||||
events_to_app.unbounded_send(event)
|
|
||||||
.expect("Network unable to contact application.");
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
// Create a future to handle messages recieved from the application
|
|
||||||
let app_rx = rx.for_each(|msg| {
|
|
||||||
debug!(&log, "Network publish"; "msg" => format!("{:?}", msg));
|
|
||||||
floodsub_ctl.publish(&topic, msg.data);
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
// Generate a full future
|
|
||||||
let final_future = swarm_future
|
|
||||||
.select(floodsub_rx).map_err(|(err, _)| err).map(|((), _)| ())
|
|
||||||
.select(app_rx).map_err(|(err, _)| err).map(|((), _)| ())
|
|
||||||
.select(kad_poll).map_err(|(err, _)| err).map(|((), _)| ())
|
|
||||||
.select(kad_init).map_err(|(err, _)| err).and_then(|((), n)| n);
|
|
||||||
|
|
||||||
core.run(final_future).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ApplicationReciever {
|
|
||||||
inner: UnboundedReceiver<OutgoingMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream for ApplicationReciever {
|
|
||||||
type Item = OutgoingMessage;
|
|
||||||
type Error = IoError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
|
||||||
self.inner
|
|
||||||
.poll()
|
|
||||||
.map_err(|_| unreachable!())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct ConnectionUpgrader<P, R> {
|
|
||||||
kad: KademliaUpgrade<P, R>,
|
|
||||||
identify: libp2p_identify::IdentifyProtocolConfig,
|
|
||||||
floodsub: FloodSubUpgrade,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R>
|
|
||||||
where
|
|
||||||
C: AsyncRead + AsyncWrite + 'static,
|
|
||||||
P: Deref<Target = Pc> + Clone + 'static,
|
|
||||||
for<'r> &'r Pc: libp2p_peerstore::Peerstore,
|
|
||||||
R: 'static
|
|
||||||
{
|
|
||||||
type NamesIter = ::std::vec::IntoIter<(Bytes, usize)>;
|
|
||||||
type UpgradeIdentifier = usize;
|
|
||||||
type Output = FinalUpgrade<C>;
|
|
||||||
type Future = Box<Future<Item = FinalUpgrade<C>, Error = IoError>>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn protocol_names(&self) -> Self::NamesIter {
|
|
||||||
vec![
|
|
||||||
(Bytes::from("/ipfs/kad/1.0.0"), 0),
|
|
||||||
(Bytes::from("/ipfs/id/1.0.0"), 1),
|
|
||||||
(Bytes::from("/floodsub/1.0.0"), 2),
|
|
||||||
].into_iter()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn upgrade(
|
|
||||||
self,
|
|
||||||
socket: C,
|
|
||||||
id: Self::UpgradeIdentifier,
|
|
||||||
ty: Endpoint,
|
|
||||||
remote_addr: &Multiaddr)
|
|
||||||
-> Self::Future
|
|
||||||
{
|
|
||||||
match id {
|
|
||||||
0 => Box::new(
|
|
||||||
self.kad
|
|
||||||
.upgrade(socket, (), ty, remote_addr)
|
|
||||||
.map(|upg| upg.into())),
|
|
||||||
1 => Box::new(
|
|
||||||
self.identify
|
|
||||||
.upgrade(socket, (), ty, remote_addr)
|
|
||||||
.map(|upg| upg.into())),
|
|
||||||
2 => Box::new(
|
|
||||||
self.floodsub
|
|
||||||
.upgrade(socket, (), ty, remote_addr)
|
|
||||||
.map(|upg| upg.into()),
|
|
||||||
),
|
|
||||||
_ => unreachable!()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum FinalUpgrade<C> {
|
|
||||||
Kad(KademliaProcessingFuture),
|
|
||||||
Identify(IdentifyOutput<C>),
|
|
||||||
FloodSub(FloodSubFuture),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> From<libp2p_kad::KademliaProcessingFuture> for FinalUpgrade<C> { #[inline]
|
|
||||||
fn from(upgrade: libp2p_kad::KademliaProcessingFuture) -> Self {
|
|
||||||
FinalUpgrade::Kad(upgrade)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> From<IdentifyOutput<C>> for FinalUpgrade<C> {
|
|
||||||
#[inline]
|
|
||||||
fn from(upgrade: IdentifyOutput<C>) -> Self {
|
|
||||||
FinalUpgrade::Identify(upgrade)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> From<FloodSubFuture> for FinalUpgrade<C> {
|
|
||||||
#[inline]
|
|
||||||
fn from(upgr: FloodSubFuture) -> Self {
|
|
||||||
FinalUpgrade::FloodSub(upgr)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,119 +0,0 @@
|
|||||||
extern crate rand;
|
|
||||||
|
|
||||||
use std::io::{ Read, Write };
|
|
||||||
use std::error::Error;
|
|
||||||
use std::fs::File;
|
|
||||||
use std::path::{ Path, PathBuf };
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use super::libp2p_core::Multiaddr;
|
|
||||||
use super::libp2p_peerstore::{ Peerstore, PeerAccess, PeerId };
|
|
||||||
use super::libp2p_peerstore::json_peerstore::JsonPeerstore;
|
|
||||||
use super::pem;
|
|
||||||
use super::secp256k1::Secp256k1;
|
|
||||||
use super::secp256k1::key::{ SecretKey, PublicKey };
|
|
||||||
use super::slog::Logger;
|
|
||||||
|
|
||||||
/// Location of the libp2p peerstore inside the Network base dir.
|
|
||||||
const PEERS_FILE: &str = "peerstore.json";
|
|
||||||
/// Location of the libp2p local peer secret key inside the Network base dir.
|
|
||||||
const LOCAL_PEM_FILE: &str = "local_peer_id.pem";
|
|
||||||
|
|
||||||
/// Represents the present state of a libp2p network.
|
|
||||||
pub struct NetworkState {
|
|
||||||
pub base_dir: PathBuf,
|
|
||||||
pub pubkey: PublicKey,
|
|
||||||
pub seckey: SecretKey,
|
|
||||||
pub peer_id: PeerId,
|
|
||||||
pub listen_multiaddr: Multiaddr,
|
|
||||||
pub peer_store: Arc<JsonPeerstore>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetworkState {
|
|
||||||
/// Create a new libp2p network state. Used to initialize
|
|
||||||
/// network service.
|
|
||||||
pub fn new(
|
|
||||||
// config: LighthouseConfig,
|
|
||||||
base_dir: &Path,
|
|
||||||
listen_port: u16,
|
|
||||||
log: &Logger)
|
|
||||||
-> Result <Self, Box<Error>>
|
|
||||||
{
|
|
||||||
let curve = Secp256k1::new();
|
|
||||||
let seckey = match
|
|
||||||
NetworkState::load_secret_key_from_pem_file(base_dir, &curve)
|
|
||||||
{
|
|
||||||
Ok(k) => k,
|
|
||||||
_ => NetworkState::generate_new_secret_key(base_dir, &curve)?
|
|
||||||
};
|
|
||||||
let pubkey = PublicKey::from_secret_key(&curve, &seckey)?;
|
|
||||||
let peer_id = PeerId::from_public_key(
|
|
||||||
&pubkey.serialize_vec(&curve, false));
|
|
||||||
info!(log, "Loaded keys"; "peer_id" => &peer_id.to_base58());
|
|
||||||
let peer_store = {
|
|
||||||
let path = base_dir.join(PEERS_FILE);
|
|
||||||
let base = JsonPeerstore::new(path)?;
|
|
||||||
Arc::new(base)
|
|
||||||
};
|
|
||||||
info!(log, "Loaded peerstore"; "peer_count" => &peer_store.peers().count());
|
|
||||||
let listen_multiaddr =
|
|
||||||
NetworkState::multiaddr_on_port(&listen_port.to_string());
|
|
||||||
Ok(Self {
|
|
||||||
base_dir: PathBuf::from(base_dir),
|
|
||||||
seckey,
|
|
||||||
pubkey,
|
|
||||||
peer_id,
|
|
||||||
listen_multiaddr,
|
|
||||||
peer_store,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return a TCP multiaddress on 0.0.0.0 for a given port.
|
|
||||||
pub fn multiaddr_on_port(port: &str) -> Multiaddr {
|
|
||||||
format!("/ip4/0.0.0.0/tcp/{}", port)
|
|
||||||
.parse::<Multiaddr>().unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add_peer(&mut self,
|
|
||||||
peer_id: &PeerId,
|
|
||||||
multiaddr: Multiaddr,
|
|
||||||
duration_secs: u64) {
|
|
||||||
self.peer_store.peer_or_create(&peer_id)
|
|
||||||
.add_addr(multiaddr, Duration::from_secs(duration_secs));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Instantiate a SecretKey from a .pem file on disk.
|
|
||||||
pub fn load_secret_key_from_pem_file(
|
|
||||||
base_dir: &Path,
|
|
||||||
curve: &Secp256k1)
|
|
||||||
-> Result<SecretKey, Box<Error>>
|
|
||||||
{
|
|
||||||
let path = base_dir.join(LOCAL_PEM_FILE);
|
|
||||||
let mut contents = String::new();
|
|
||||||
let mut file = File::open(path)?;
|
|
||||||
file.read_to_string(&mut contents)?;
|
|
||||||
let pem_key = pem::parse(contents)?;
|
|
||||||
let key = SecretKey::from_slice(curve, &pem_key.contents)?;
|
|
||||||
Ok(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generate a new SecretKey and store it on disk as a .pem file.
|
|
||||||
pub fn generate_new_secret_key(
|
|
||||||
base_dir: &Path,
|
|
||||||
curve: &Secp256k1)
|
|
||||||
-> Result<SecretKey, Box<Error>>
|
|
||||||
{
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
let sk = SecretKey::new(&curve, &mut rng);
|
|
||||||
let pem_key = pem::Pem {
|
|
||||||
tag: String::from("EC PRIVATE KEY"),
|
|
||||||
contents: sk[..].to_vec()
|
|
||||||
};
|
|
||||||
let s_string = pem::encode(&pem_key);
|
|
||||||
let path = base_dir.join(LOCAL_PEM_FILE);
|
|
||||||
let mut s_file = File::create(path)?;
|
|
||||||
s_file.write_all(s_string.as_bytes())?;
|
|
||||||
Ok(sk)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user