From a65531ba950e41f5262222f4fd965778da81e321 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 9 Oct 2018 13:36:54 +1100 Subject: [PATCH] Remove all libp2p and syncing code This will all need to be rebuilt in the future. This code will be available at a "legacy_libp2p" branch. --- Cargo.toml | 1 - lighthouse/client/mod.rs | 83 -------- lighthouse/main.rs | 8 +- lighthouse/sync/mod.rs | 12 -- lighthouse/sync/network.rs | 86 --------- lighthouse/sync/sync_future.rs | 48 ----- lighthouse/sync/wire_protocol.rs | 92 --------- network-libp2p/Cargo.toml | 24 --- network-libp2p/README.md | 7 - network-libp2p/src/lib.rs | 10 - network-libp2p/src/message.rs | 18 -- network-libp2p/src/service.rs | 315 ------------------------------- network-libp2p/src/state.rs | 119 ------------ 13 files changed, 2 insertions(+), 821 deletions(-) delete mode 100644 lighthouse/client/mod.rs delete mode 100644 lighthouse/sync/mod.rs delete mode 100644 lighthouse/sync/network.rs delete mode 100644 lighthouse/sync/sync_future.rs delete mode 100644 lighthouse/sync/wire_protocol.rs delete mode 100644 network-libp2p/Cargo.toml delete mode 100644 network-libp2p/README.md delete mode 100644 network-libp2p/src/lib.rs delete mode 100644 network-libp2p/src/message.rs delete mode 100644 network-libp2p/src/service.rs delete mode 100644 network-libp2p/src/state.rs diff --git a/Cargo.toml b/Cargo.toml index de69506de..fa6728a29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ clap = "2.32.0" db = { path = "lighthouse/db" } dirs = "1.0.3" futures = "0.1.23" -network-libp2p = { path = "network-libp2p" } rand = "0.3" rlp = { git = "https://github.com/paritytech/parity-common" } slog = "^2.2.3" diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs deleted file mode 100644 index 4f8a8377a..000000000 --- a/lighthouse/client/mod.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::sync::Arc; -use std::thread; -use super::db::{ DiskDB }; -use super::config::LighthouseConfig; -use super::futures::sync::mpsc::{ - unbounded, -}; -use super::network_libp2p::service::listen as network_listen; -use super::network_libp2p::state::NetworkState; -use super::slog::Logger; -use super::sync::run_sync_future; - -use super::db::ClientDB; - -/// Represents the co-ordination of the -/// networking, syncing and RPC (not-yet-implemented) threads. -pub struct Client { - pub db: Arc, - pub network_thread: thread::JoinHandle<()>, - pub sync_thread: thread::JoinHandle<()>, -} - -impl Client { - /// Instantiates a new "Client". - /// - /// Presently, this means starting network and sync threads - /// and plumbing them together. - pub fn new(config: &LighthouseConfig, - log: &Logger) - -> Self - { - // Open the local db - let db = { - let db = DiskDB::open(&config.data_dir, None); - Arc::new(db) - }; - - // Start the network thread - let network_state = NetworkState::new( - &config.data_dir, - config.p2p_listen_port, - &log).expect("Network setup failed"); let (network_thread, network_tx, network_rx) = { - let (message_sender, message_receiver) = unbounded(); - let (event_sender, event_receiver) = unbounded(); - let network_log = log.new(o!()); - let thread = thread::spawn(move || { - network_listen( - network_state, - &event_sender, - message_receiver, - &network_log, - ); - }); - (thread, message_sender, event_receiver) - }; - - // Start the sync thread - let (sync_thread, _sync_tx, _sync_rx) = { - let (sync_out_sender, sync_out_receiver) = unbounded(); - let (sync_in_sender, sync_in_receiver) = unbounded(); - let sync_log = log.new(o!()); - let sync_db = db.clone(); - let thread = thread::spawn(move || { - run_sync_future( - sync_db, - network_tx.clone(), - network_rx, - &sync_out_sender, - &sync_in_receiver, - sync_log, - ); - }); - (thread, sync_in_sender, sync_out_receiver) - }; - - // Return the client struct - Self { - db, - network_thread, - sync_thread, - } - } -} diff --git a/lighthouse/main.rs b/lighthouse/main.rs index ee90649f5..9cfbe45e4 100644 --- a/lighthouse/main.rs +++ b/lighthouse/main.rs @@ -4,13 +4,10 @@ extern crate slog_term; extern crate slog_async; // extern crate ssz; extern crate clap; -extern crate network_libp2p; extern crate futures; extern crate db; -mod client; -mod sync; mod config; use std::path::PathBuf; @@ -18,7 +15,6 @@ use std::path::PathBuf; use slog::Drain; use clap::{ Arg, App }; use config::LighthouseConfig; -use client::Client; fn main() { let decorator = slog_term::TermDecorator::new().build(); @@ -64,8 +60,8 @@ fn main() { "data_dir" => &config.data_dir.to_str(), "port" => &config.p2p_listen_port); - let client = Client::new(&config, &log); - client.sync_thread.join().unwrap(); + error!(log, + "Lighthouse under development and does not provide a user demo."); info!(log, "Exiting."); } diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs deleted file mode 100644 index 6e1f0be11..000000000 --- a/lighthouse/sync/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -extern crate futures; -extern crate slog; -extern crate tokio; -extern crate network_libp2p; - -pub mod network; -pub mod sync_future; -pub mod wire_protocol; - -pub use self::sync_future::run_sync_future; - -use super::db; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs deleted file mode 100644 index 45035c84d..000000000 --- a/lighthouse/sync/network.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::sync::Arc; -use super::db::ClientDB; -use slog::Logger; - -use super::network_libp2p::message::{ - NetworkEvent, - OutgoingMessage, - NetworkEventType, -}; - -use super::wire_protocol::{ - WireMessage, - WireMessageHeader, -}; - -use super::futures::sync::mpsc::{ - UnboundedSender, -}; - -/// Accept a network event and perform all required processing. -/// -/// This function should be called whenever an underlying network -/// (e.g., libp2p) has an event to push up to the sync process. -pub fn handle_network_event( - event: NetworkEvent, - db: &Arc, - network_tx: &UnboundedSender, - log: &Logger) - -> Result<(), ()> -{ - debug!(&log, ""; - "network_event" => format!("{:?}", &event)); - match event.event { - NetworkEventType::PeerConnect => Ok(()), - NetworkEventType::PeerDrop => Ok(()), - NetworkEventType::Message => { - if let Some(data) = event.data { - handle_network_message( - &data, - &db, - &network_tx, - &log) - } else { - Ok(()) - } - } - } -} - -/// Accept a message from the network and perform all required -/// processing. -/// -/// This function should be called whenever a peer from a network -/// (e.g., libp2p) has sent a message to us. -fn handle_network_message( - message: &[u8], - db: &Arc, - _network_tx: &UnboundedSender, - log: &Logger) - -> Result<(), ()> -{ - match WireMessage::decode(&message) { - Ok(msg) => { - match msg.header { - WireMessageHeader::Blocks => { - process_unverified_blocks( - msg.body, - &db, - &log - ); - Ok(()) - } - } - } - Err(_) => { - Ok(()) // No need to pass the error back - } - } -} - -fn process_unverified_blocks(_message: &[u8], - _db: &Arc, - _log: &Logger) -{ - // -} diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs deleted file mode 100644 index 31cc933ca..000000000 --- a/lighthouse/sync/sync_future.rs +++ /dev/null @@ -1,48 +0,0 @@ -use super::tokio; -use super::futures::{ Future, Stream }; -use super::futures::sync::mpsc::{ - UnboundedReceiver, - UnboundedSender, -}; -use super::network_libp2p::message::{ - NetworkEvent, - OutgoingMessage, -}; -use super::network::handle_network_event; -use std::sync::Arc; -use super::db::ClientDB; -use slog::Logger; - -type NetworkSender = UnboundedSender; -type NetworkReceiver = UnboundedReceiver; - -type SyncSender = UnboundedSender>; -type SyncReceiver = UnboundedReceiver>; - -/// Start a syncing tokio future. -/// -/// Uses green-threading to process messages -/// from the network and the RPC and update -/// the state. -pub fn run_sync_future( - db: Arc, - network_tx: NetworkSender, - network_rx: NetworkReceiver, - _sync_tx: &SyncSender, - _sync_rx: &SyncReceiver, - log: Logger) -{ - let network_future = { - network_rx - .for_each(move |event| { - handle_network_event( - event, - &db.clone(), - &network_tx.clone(), - &log.clone()) - }) - .map_err(|_| panic!("rx failed")) - }; - - tokio::run(network_future); -} diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs deleted file mode 100644 index e5dd75d30..000000000 --- a/lighthouse/sync/wire_protocol.rs +++ /dev/null @@ -1,92 +0,0 @@ -pub enum WireMessageDecodeError { - TooShort, - UnknownType, -} - -pub enum WireMessageHeader { - Blocks, - /* - // Leave out until used - Status, - NewBlockHashes, - GetBlockHashes, - BlockHashes, - GetBlocks, - NewBlock, - */ -} - -pub struct WireMessage<'a> { - pub header: WireMessageHeader, - pub body: &'a [u8], -} - -impl<'a> WireMessage<'a> { - pub fn decode(bytes: &'a [u8]) - -> Result - { - if let Some((header_byte, body)) = bytes.split_first() { - let header = match header_byte { - 0x06 => Some(WireMessageHeader::Blocks), - _ => None - }; - match header { - Some(header) => Ok(Self{header, body}), - None => Err(WireMessageDecodeError::UnknownType) - } - } else { - Err(WireMessageDecodeError::TooShort) - } - } -} - -/* -pub fn decode_wire_message(bytes: &[u8]) - -> Result -{ - if let Some((header_byte, body)) = bytes.split_first() { - let header = match header_byte { - 0x06 => Some(WireMessageType::Blocks), - _ => None - }; - match header { - Some(header) => Ok((header, body)), - None => Err(WireMessageDecodeError::UnknownType) - } - } else { - Err(WireMessageDecodeError::TooShort) - } -} - - -/// Determines the message type of some given -/// message. -/// -/// Does not check the validity of the message data, -/// it just reads the first byte. -pub fn message_type(message: &Vec) - -> Option -{ - match message.get(0) { - Some(0x06) => Some(WireMessageType::Blocks), - _ => None - } -} - -pub fn identify_wire_protocol_message(message: &Vec) - -> Result<(WireMessageType, &[u8]), WireMessageDecodeError> -{ - fn strip_header(v: &Vec) -> &[u8] { - match v.get(1..v.len()) { - None => &vec![], - Some(s) => s - } - } - - match message.get(0) { - Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))), - None => Err(WireMessageDecodeError::TooShort), - _ => Err(WireMessageDecodeError::UnknownType), - } -} -*/ diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml deleted file mode 100644 index de8077ac6..000000000 --- a/network-libp2p/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "network-libp2p" -version = "0.1.0" -authors = ["Paul Hauner "] - -[dependencies] -bigint = "4.2" -bytes = "" -eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" } -futures = "0.1.23" -libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } -pem = "0.5.0" -rand = "0.3" -slog = "^2.2.3" -tokio-core = "0.1" -tokio-io = "0.1" -tokio-stdin = "0.1" -tokio-timer = "0.1" diff --git a/network-libp2p/README.md b/network-libp2p/README.md deleted file mode 100644 index dd3c68997..000000000 --- a/network-libp2p/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# libp2p Network - -This is a fairly scrappy implementation of libp2p floodsub for the following -reasons: - - - There is not presently a gossip-sub implementation for Rust libp2p. - - The networking layer for the beacon_chain is not yet finalized. diff --git a/network-libp2p/src/lib.rs b/network-libp2p/src/lib.rs deleted file mode 100644 index bbec78c08..000000000 --- a/network-libp2p/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -extern crate libp2p_core; -extern crate libp2p_peerstore; -extern crate pem; -extern crate secp256k1; -#[macro_use] -extern crate slog; - -pub mod message; -pub mod service; -pub mod state; diff --git a/network-libp2p/src/message.rs b/network-libp2p/src/message.rs deleted file mode 100644 index aa3ef54f0..000000000 --- a/network-libp2p/src/message.rs +++ /dev/null @@ -1,18 +0,0 @@ -#[derive(Debug)] -pub enum NetworkEventType { - PeerConnect, - PeerDrop, - Message, -} - -#[derive(Debug)] -pub struct NetworkEvent { - pub event: NetworkEventType, - pub data: Option>, -} - -#[derive(Debug)] -pub struct OutgoingMessage { - pub peer: Option, - pub data: Vec, -} diff --git a/network-libp2p/src/service.rs b/network-libp2p/src/service.rs deleted file mode 100644 index f9b062f85..000000000 --- a/network-libp2p/src/service.rs +++ /dev/null @@ -1,315 +0,0 @@ -extern crate bigint; -extern crate bytes; -extern crate futures; -extern crate libp2p_peerstore; -extern crate libp2p_floodsub; -extern crate libp2p_identify; -extern crate libp2p_core; -extern crate libp2p_mplex; -extern crate libp2p_tcp_transport; -extern crate libp2p_kad; -extern crate slog; -extern crate tokio_core; -extern crate tokio_io; -extern crate tokio_timer; -extern crate tokio_stdin; - -use super::state::NetworkState; -use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage }; -use self::bigint::U512; -use self::futures::{ Future, Stream, Poll }; -use self::futures::sync::mpsc::{ - UnboundedSender, UnboundedReceiver -}; -use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, - Transport, ConnectionUpgrade }; -use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; -use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade }; -use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; -use self::slog::Logger; -use std::sync::{ Arc, RwLock }; -use std::time::{ Duration, Instant }; -use std::ops::Deref; -use std::io::Error as IoError; -use self::tokio_io::{ AsyncRead, AsyncWrite }; -use self::bytes::Bytes; - -pub use self::libp2p_floodsub::Message; - -pub fn listen(state: NetworkState, - events_to_app: &UnboundedSender, - raw_rx: UnboundedReceiver, - log: &Logger) -{ - let peer_store = state.peer_store; - let peer_id = state.peer_id; - let listen_multiaddr = state.listen_multiaddr; - let listened_addrs = Arc::new(RwLock::new(vec![])); - let rx = ApplicationReciever{ inner: raw_rx }; - - // Build a tokio core - let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); - // Build a base TCP libp2p transport - let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) - .with_upgrade(libp2p_core::upgrade::PlainTextConfig) - .with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new()) - .into_connection_reuse(); - - // Build an identify transport to allow identification and negotiation - // of layers running atop the TCP transport (e.g., kad) - let identify_transport = { - let listened_addrs = listened_addrs.clone(); - let listen_multiaddr = listen_multiaddr.clone(); - IdentifyTransport::new(transport.clone(), peer_store.clone()) - // Managed NAT'ed connections - ensuring the external IP - // is stored not the internal addr. - .map(move |out, _, _| { - if let(Some(ref observed), ref listen_multiaddr) = - (out.observed_addr, listen_multiaddr) - { - if let Some(viewed_from_outside) = - transport.nat_traversal(listen_multiaddr, observed) - { - listened_addrs.write().unwrap() - .push(viewed_from_outside); - } - } - out.socket - }) - }; - - // Configure and build a Kademlia upgrade to be applied - // to the base TCP transport. - let kad_config = libp2p_kad::KademliaConfig { - parallelism: 3, - record_store: (), - peer_store, - local_peer_id: peer_id.clone(), - timeout: Duration::from_secs(2) - }; - let kad_ctl_proto = libp2p_kad:: - KademliaControllerPrototype::new(kad_config); - let kad_upgrade = libp2p_kad:: - KademliaUpgrade::from_prototype(&kad_ctl_proto); - - // Build a floodsub upgrade to allow pushing topic'ed - // messages across the network. - let (floodsub_upgrade, floodsub_rx) = - FloodSubUpgrade::new(peer_id.clone()); - - // Combine the Kademlia and Identify upgrades into a single - // upgrader struct. - let upgrade = ConnectionUpgrader { - kad: kad_upgrade.clone(), - floodsub: floodsub_upgrade.clone(), - identify: libp2p_identify::IdentifyProtocolConfig, - }; - - // Build a Swarm to manage upgrading connections to peers. - let swarm_listened_addrs = listened_addrs.clone(); - let swarm_peer_id = peer_id.clone(); - let (swarm_ctl, swarm_future) = libp2p_core::swarm( - identify_transport.clone().with_upgrade(upgrade), - move |upgrade, client_addr| match upgrade { - FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, - FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>, - FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( - IdentifyInfo { - public_key: swarm_peer_id.clone().into_bytes(), - agent_version: "lighthouse/1.0.0".to_owned(), - protocol_version: "rust-libp2p/1.0.0".to_owned(), - listen_addrs: swarm_listened_addrs.read().unwrap().to_vec(), - protocols: vec![ - "/ipfs/kad/1.0.0".to_owned(), - "/ipfs/id/1.0.0".to_owned(), - "/floodsub/1.0.0".to_owned(), - ] - }, - &client_addr - ), - FinalUpgrade::Identify(IdentifyOutput::RemoteInfo { .. }) => { - unreachable!("Never dial with the identify protocol.") - } - }, - ); - - // Start the Swarm controller listening on the local machine - let actual_addr = swarm_ctl - .listen_on(listen_multiaddr) - .expect("Failed to listen on multiaddr"); - info!(log, "libp2p listening"; "listen_addr" => actual_addr.to_string()); - - // Convert the kad prototype into a controller by providing it the - // newly built swarm. - let (kad_ctl, kad_init) = kad_ctl_proto.start( - swarm_ctl.clone(), - identify_transport.clone().with_upgrade(kad_upgrade.clone())); - - // Create a new floodsub controller using a specific topic - let topic = libp2p_floodsub::TopicBuilder::new("beacon_chain").build(); - let floodsub_ctl = libp2p_floodsub::FloodSubController::new(&floodsub_upgrade); - floodsub_ctl.subscribe(&topic); - - // Generate a tokio timer "wheel" future that sends kad FIND_NODE at - // a routine interval. - let kad_poll_log = log.new(o!()); - let kad_poll_event_tx = events_to_app.clone(); - let kad_poll = { - let polling_peer_id = peer_id.clone(); - tokio_timer::wheel() - .build() - .interval_at(Instant::now(), Duration::from_secs(30)) - .map_err(|_| -> IoError { unreachable!() }) - .and_then(move |()| kad_ctl.find_node(peer_id.clone())) - .for_each(move |peers| { - let local_hash = U512::from(polling_peer_id.hash()); - for peer in peers { - let peer_hash = U512::from(peer.hash()); - let distance = 512 - (local_hash ^ peer_hash).leading_zeros(); - info!(kad_poll_log, "Discovered peer"; - "distance" => distance, - "peer_id" => peer.to_base58()); - let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); - let dial_result = swarm_ctl.dial( - peer_addr, - identify_transport.clone().with_upgrade(floodsub_upgrade.clone()) - ); - if let Err(err) = dial_result { - warn!(kad_poll_log, "Dialling {:?} failed.", err) - }; - let event = NetworkEvent { - event: NetworkEventType::PeerConnect, - data: None, - }; - kad_poll_event_tx.unbounded_send(event) - .expect("Network unable to contact application."); - }; - Ok(()) - }) - }; - - // Create a future to handle message recieved from the network - let floodsub_rx = floodsub_rx.for_each(|msg| { - debug!(&log, "Network receive"; "msg" => format!("{:?}", msg)); - let event = NetworkEvent { - event: NetworkEventType::Message, - data: Some(msg.data), - }; - events_to_app.unbounded_send(event) - .expect("Network unable to contact application."); - Ok(()) - }); - - // Create a future to handle messages recieved from the application - let app_rx = rx.for_each(|msg| { - debug!(&log, "Network publish"; "msg" => format!("{:?}", msg)); - floodsub_ctl.publish(&topic, msg.data); - Ok(()) - }); - - // Generate a full future - let final_future = swarm_future - .select(floodsub_rx).map_err(|(err, _)| err).map(|((), _)| ()) - .select(app_rx).map_err(|(err, _)| err).map(|((), _)| ()) - .select(kad_poll).map_err(|(err, _)| err).map(|((), _)| ()) - .select(kad_init).map_err(|(err, _)| err).and_then(|((), n)| n); - - core.run(final_future).unwrap(); -} - -struct ApplicationReciever { - inner: UnboundedReceiver, -} - -impl Stream for ApplicationReciever { - type Item = OutgoingMessage; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner - .poll() - .map_err(|_| unreachable!()) - } -} - -#[derive(Clone)] -struct ConnectionUpgrader { - kad: KademliaUpgrade, - identify: libp2p_identify::IdentifyProtocolConfig, - floodsub: FloodSubUpgrade, -} - -impl ConnectionUpgrade for ConnectionUpgrader -where - C: AsyncRead + AsyncWrite + 'static, - P: Deref + Clone + 'static, - for<'r> &'r Pc: libp2p_peerstore::Peerstore, - R: 'static -{ - type NamesIter = ::std::vec::IntoIter<(Bytes, usize)>; - type UpgradeIdentifier = usize; - type Output = FinalUpgrade; - type Future = Box, Error = IoError>>; - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - vec![ - (Bytes::from("/ipfs/kad/1.0.0"), 0), - (Bytes::from("/ipfs/id/1.0.0"), 1), - (Bytes::from("/floodsub/1.0.0"), 2), - ].into_iter() - } - - fn upgrade( - self, - socket: C, - id: Self::UpgradeIdentifier, - ty: Endpoint, - remote_addr: &Multiaddr) - -> Self::Future - { - match id { - 0 => Box::new( - self.kad - .upgrade(socket, (), ty, remote_addr) - .map(|upg| upg.into())), - 1 => Box::new( - self.identify - .upgrade(socket, (), ty, remote_addr) - .map(|upg| upg.into())), - 2 => Box::new( - self.floodsub - .upgrade(socket, (), ty, remote_addr) - .map(|upg| upg.into()), - ), - _ => unreachable!() - } - - } -} - -enum FinalUpgrade { - Kad(KademliaProcessingFuture), - Identify(IdentifyOutput), - FloodSub(FloodSubFuture), -} - -impl From for FinalUpgrade { #[inline] - fn from(upgrade: libp2p_kad::KademliaProcessingFuture) -> Self { - FinalUpgrade::Kad(upgrade) - } -} - -impl From> for FinalUpgrade { - #[inline] - fn from(upgrade: IdentifyOutput) -> Self { - FinalUpgrade::Identify(upgrade) - } -} - -impl From for FinalUpgrade { - #[inline] - fn from(upgr: FloodSubFuture) -> Self { - FinalUpgrade::FloodSub(upgr) - } -} diff --git a/network-libp2p/src/state.rs b/network-libp2p/src/state.rs deleted file mode 100644 index e45741fe6..000000000 --- a/network-libp2p/src/state.rs +++ /dev/null @@ -1,119 +0,0 @@ -extern crate rand; - -use std::io::{ Read, Write }; -use std::error::Error; -use std::fs::File; -use std::path::{ Path, PathBuf }; -use std::sync::Arc; -use std::time::Duration; - -use super::libp2p_core::Multiaddr; -use super::libp2p_peerstore::{ Peerstore, PeerAccess, PeerId }; -use super::libp2p_peerstore::json_peerstore::JsonPeerstore; -use super::pem; -use super::secp256k1::Secp256k1; -use super::secp256k1::key::{ SecretKey, PublicKey }; -use super::slog::Logger; - -/// Location of the libp2p peerstore inside the Network base dir. -const PEERS_FILE: &str = "peerstore.json"; -/// Location of the libp2p local peer secret key inside the Network base dir. -const LOCAL_PEM_FILE: &str = "local_peer_id.pem"; - -/// Represents the present state of a libp2p network. -pub struct NetworkState { - pub base_dir: PathBuf, - pub pubkey: PublicKey, - pub seckey: SecretKey, - pub peer_id: PeerId, - pub listen_multiaddr: Multiaddr, - pub peer_store: Arc, -} - -impl NetworkState { - /// Create a new libp2p network state. Used to initialize - /// network service. - pub fn new( - // config: LighthouseConfig, - base_dir: &Path, - listen_port: u16, - log: &Logger) - -> Result > - { - let curve = Secp256k1::new(); - let seckey = match - NetworkState::load_secret_key_from_pem_file(base_dir, &curve) - { - Ok(k) => k, - _ => NetworkState::generate_new_secret_key(base_dir, &curve)? - }; - let pubkey = PublicKey::from_secret_key(&curve, &seckey)?; - let peer_id = PeerId::from_public_key( - &pubkey.serialize_vec(&curve, false)); - info!(log, "Loaded keys"; "peer_id" => &peer_id.to_base58()); - let peer_store = { - let path = base_dir.join(PEERS_FILE); - let base = JsonPeerstore::new(path)?; - Arc::new(base) - }; - info!(log, "Loaded peerstore"; "peer_count" => &peer_store.peers().count()); - let listen_multiaddr = - NetworkState::multiaddr_on_port(&listen_port.to_string()); - Ok(Self { - base_dir: PathBuf::from(base_dir), - seckey, - pubkey, - peer_id, - listen_multiaddr, - peer_store, - }) - } - - /// Return a TCP multiaddress on 0.0.0.0 for a given port. - pub fn multiaddr_on_port(port: &str) -> Multiaddr { - format!("/ip4/0.0.0.0/tcp/{}", port) - .parse::().unwrap() - } - - pub fn add_peer(&mut self, - peer_id: &PeerId, - multiaddr: Multiaddr, - duration_secs: u64) { - self.peer_store.peer_or_create(&peer_id) - .add_addr(multiaddr, Duration::from_secs(duration_secs)); - } - - /// Instantiate a SecretKey from a .pem file on disk. - pub fn load_secret_key_from_pem_file( - base_dir: &Path, - curve: &Secp256k1) - -> Result> - { - let path = base_dir.join(LOCAL_PEM_FILE); - let mut contents = String::new(); - let mut file = File::open(path)?; - file.read_to_string(&mut contents)?; - let pem_key = pem::parse(contents)?; - let key = SecretKey::from_slice(curve, &pem_key.contents)?; - Ok(key) - } - - /// Generate a new SecretKey and store it on disk as a .pem file. - pub fn generate_new_secret_key( - base_dir: &Path, - curve: &Secp256k1) - -> Result> - { - let mut rng = rand::thread_rng(); - let sk = SecretKey::new(&curve, &mut rng); - let pem_key = pem::Pem { - tag: String::from("EC PRIVATE KEY"), - contents: sk[..].to_vec() - }; - let s_string = pem::encode(&pem_key); - let path = base_dir.join(LOCAL_PEM_FILE); - let mut s_file = File::create(path)?; - s_file.write_all(s_string.as_bytes())?; - Ok(sk) - } -}