Implement skeleton network/sync framework.

This commit is contained in:
Age Manning 2019-03-04 18:31:01 +11:00
parent 3b8f29a914
commit b68adc1ae3
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
16 changed files with 147 additions and 39 deletions

View File

@ -56,7 +56,7 @@ impl ClientConfig {
// TODO: Handle list of addresses
if let Some(listen_address_str) = args.value_of("listen_address") {
if let Ok(listen_address) = listen_address_str.parse::<IpAddr>() {
config.net_conf.listen_address = Some(Vec::new(listen_address));
config.net_conf.listen_addresses = Some(vec![listen_address]);
} else {
error!(log, "Invalid IP Address"; "Address" => listen_address_str);
return Err("Invalid IP Address");

View File

@ -1,8 +1,14 @@
// generates error types
use network;
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {}
error_chain! {
links {
Network(network::error::Error, network::error::ErrorKind);
}
}

View File

@ -11,9 +11,10 @@ pub use client_types::ClientTypes;
//use beacon_chain::BeaconChain;
use exit_future::{Exit, Signal};
use network::Service as NetworkService;
use slog::o;
use std::marker::PhantomData;
//use std::sync::Arc;
use network::NetworkService;
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
/// Main beacon node client service. This provides the connection and initialisation of the clients
@ -39,11 +40,11 @@ impl<T: ClientTypes> Client<T> {
// TODO: generate a beacon_chain service.
// start the network service, libp2p and syncing threads
// Start the network service, libp2p and syncing threads
// TODO: Add beacon_chain reference to network parameters
let network_config = config.net_config;
let network_logger = client.log.new(o!("Service" => "Network"));
let (network, network_send) = NetworkService::new(network_config, network_logger);
let network_config = config.net_conf.clone();
let network_logger = log.new(o!("Service" => "Network"));
let (network, network_send) = NetworkService::new(network_config, network_logger)?;
Ok(Client {
config,

View File

@ -4,7 +4,7 @@ use db::ClientDB;
use exit_future::Exit;
use fork_choice::ForkChoice;
use futures::{Future, Stream};
use slog::{debug, info};
use slog::{debug, info, o};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -26,7 +26,7 @@ pub fn run<T: ClientTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exi
};
// map error and spawn
let log = client.logger();
let log = client.log.clone();
let heartbeat_interval = interval
.map_err(move |e| debug!(log, "Timer error {}", e))
.for_each(heartbeat);

View File

@ -5,3 +5,6 @@ authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
# SigP repository until PR is merged
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
slog = "2.4.1"

View File

@ -2,10 +2,11 @@
/// all required libp2p functionality.
///
/// This crate builds and manages the libp2p services required by the beacon node.
extern crate libp2p;
mod service;
mod libp2p_service;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
};
pub use libp2p::{GossipsubConfig, PeerId};
pub use libp2p_service::LibP2PService;
pub use service::Service;

View File

@ -0,0 +1,11 @@
use slog::debug;
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service {}
impl Service {
pub fn new(log: slog::Logger) -> Self {
debug!(log, "Service starting");
Service {}
}
}

View File

@ -5,6 +5,11 @@ authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
libp2p = { git = "../libp2p" }
libp2p = { path = "../libp2p" }
version = { path = "../version" }
types = { path = "../../eth2/types" }
sync = { path = "../sync" }
slog = "2.4.1"
futures = "0.1.25"
error-chain = "0.12.0"
crossbeam-channel = "0.3.8"

View File

@ -1,6 +1,9 @@
/// This crate provides the network server for Lighthouse.
pub mod error;
mod message_handler;
mod messages;
mod network_config;
mod service;
pub use network_config::NetworkConfig;
pub use service::NetworkService;
pub use service::Service;

View File

@ -1,8 +1,14 @@
use crate::node_message::NodeMessage;
use crate::error;
use crate::messages::NodeMessage;
use crossbeam_channel::{unbounded as channel, Sender};
use libp2p::PeerId;
use slog::debug;
use sync::SimpleSync;
use types::Hash256;
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler {
sync: Syncer,
sync: SimpleSync,
//TODO: Implement beacon chain
//chain: BeaconChain
}
@ -14,5 +20,25 @@ pub enum HandlerMessage {
/// Peer has disconnected,
PeerDisconnected(PeerId),
/// A Node message has been received.
Message(Peer, NodeMessage),
Message(PeerId, NodeMessage),
}
impl MessageHandler {
/// Initializes and runs the MessageHandler.
pub fn new(log: slog::Logger) -> error::Result<Sender<HandlerMessage>> {
debug!(log, "Service starting");
let (handler_send, handler_recv) = channel();
// Initialise sync and begin processing in thread
//TODO: Load genesis from BeaconChain
let temp_genesis = Hash256::zero();
let sync = SimpleSync::new(temp_genesis);
let handler = MessageHandler { sync };
// spawn handler thread
Ok(handler_send)
}
}

View File

@ -1,27 +1,26 @@
use types::{H256,Slot}
use libp2p::PeerId;
use types::{Hash256, Slot};
/// Messages between nodes across the network.
pub enum NodeMessage {
Status(Status),
BlockRequest,
}
pub struct Status {
/// Current node version.
version: u8
version: u8,
/// Genesis Hash.
genesis_hash: H256
genesis_hash: Hash256,
/// Best known slot number.
best_slot: Slot
best_slot: Slot,
/// Best known slot hash.
best_slot_hash: H256
best_slot_hash: Hash256,
}
/// Types of messages that the network service can receive.
pub enum NetworkMessage {
/// Send a message to libp2p service.
//TODO: Define typing for messages accross the wire
Send(Node, Message),
//TODO: Define typing for messages across the wire
Send(PeerId, NodeMessage),
}

View File

@ -1,4 +1,4 @@
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
use libp2p::{GossipsubConfig, GossipsubConfigBuilder};
use std::net::IpAddr;
use version;

View File

@ -0,0 +1,41 @@
use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::messages::{NetworkMessage, NodeMessage};
use crate::NetworkConfig;
use crossbeam_channel::{unbounded as channel, Sender};
use futures::sync::oneshot;
use libp2p::Service as LibP2PService;
use slog::{debug, info, o, trace, warn, Logger};
use std::sync::{Arc, Mutex};
/// Service that handles communication between internal services and the libp2p network service.
pub struct Service {
//libp2p_service: Arc<Mutex<LibP2PService>>,
//libp2p_thread: oneshot::Sender<()>,
//message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>,
}
impl Service {
pub fn new(
config: NetworkConfig,
log: slog::Logger,
) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> {
debug!(log, "Service starting");
let (network_send, network_recv) = channel::<NetworkMessage>();
// launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::new(message_handler_log);
// launch libp2p service
let libp2p_log = log.new(o!("Service" => "Libp2p"));
let libp2p_service = LibP2PService::new(libp2p_log);
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let network = Service {};
Ok((Arc::new(network), network_send))
}
}

View File

@ -6,4 +6,4 @@ edition = "2018"
[dependencies]
types = { path = "../../eth2/types" }
libp2p = { git = "../libp2p/" }
libp2p = { path = "../libp2p" }

View File

@ -3,8 +3,9 @@
/// Stores the various syncing methods for the beacon chain.
mod simple_sync;
pub use crate::SimpleSync;
pub use simple_sync::SimpleSync;
/// Currently implemented sync methods.
pub enum SyncMethod {
SimpleSync,
}

View File

@ -1,10 +1,11 @@
use libp2p::PeerId;
use std::collections::HashMap;
use types::{Slot, H256};
use types::{Hash256, Slot};
/// Keeps track of syncing information for known connected peers.
pub struct PeerSyncInfo {
best_slot: Slot,
best_slot_hash: H256,
best_slot_hash: Hash256,
}
/// The current syncing state.
@ -16,7 +17,17 @@ pub enum SyncState {
/// Simple Syncing protocol.
pub struct SimpleSync {
genesis_hash: H256,
genesis_hash: Hash256,
known_peers: HashMap<PeerId, PeerSyncInfo>,
state: SyncState,
}
impl SimpleSync {
pub fn new(genesis_hash: Hash256) -> Self {
SimpleSync {
genesis_hash,
known_peers: HashMap::new(),
state: SyncState::Idle,
}
}
}