Begin sync framework

This commit is contained in:
Paul Hauner 2018-09-09 16:36:00 +02:00
parent b4ca8cbde8
commit c33d3689a7
7 changed files with 130 additions and 60 deletions

View File

@ -8,7 +8,7 @@ use super::futures::sync::mpsc::{
use super::network_libp2p::service::listen as network_listen; use super::network_libp2p::service::listen as network_listen;
use super::network_libp2p::state::NetworkState; use super::network_libp2p::state::NetworkState;
use super::slog::Logger; use super::slog::Logger;
use super::sync::start_sync; use super::sync::run_sync_future;
/// Represents the co-ordination of the /// Represents the co-ordination of the
/// networking, syncing and RPC (not-yet-implemented) threads. /// networking, syncing and RPC (not-yet-implemented) threads.
@ -59,7 +59,7 @@ impl Client {
let sync_log = log.new(o!()); let sync_log = log.new(o!());
let sync_db = Arc::clone(&db); let sync_db = Arc::clone(&db);
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
start_sync( run_sync_future(
sync_db, sync_db,
network_tx.clone(), network_tx.clone(),
network_rx, network_rx,

View File

@ -12,7 +12,7 @@ pub struct Block {
pub pow_chain_ref: Hash256, pub pow_chain_ref: Hash256,
pub active_state_root: Hash256, pub active_state_root: Hash256,
pub crystallized_state_root: Hash256, pub crystallized_state_root: Hash256,
} }
impl Block { impl Block {
pub fn zero() -> Self { pub fn zero() -> Self {

View File

@ -0,0 +1,12 @@
pub enum SyncEventType {
Invalid,
PeerConnect,
PeerDrop,
ReceiveBlocks,
ReceiveAttestationRecords,
}
pub struct SyncEvent {
event: SyncEventType,
data: Option<Vec<u8>>
}

View File

@ -1,50 +1,12 @@
extern crate futures; extern crate futures;
extern crate slog; extern crate slog;
extern crate tokio; extern crate tokio;
extern crate network_libp2p;
use self::futures::sync::mpsc::{ pub mod messages;
UnboundedReceiver, pub mod network;
UnboundedSender, pub mod sync_future;
};
use self::tokio::prelude::*;
use std::sync::{ RwLock, Arc };
use super::network_libp2p::message::{
NetworkEvent,
OutgoingMessage,
};
use super::db::DB;
use slog::Logger;
type NetworkSender = UnboundedSender<OutgoingMessage>; pub use self::sync_future::run_sync_future;
type NetworkReceiver = UnboundedReceiver<NetworkEvent>;
type SyncSender = UnboundedSender<Vec<u8>>; use super::db;
type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// Start a syncing tokio future.
///
/// This is effectively a stub function being
/// used to test network functionality.
///
/// Expect a full re-write.
pub fn start_sync(
_db: Arc<RwLock<DB>>,
_network_tx: NetworkSender,
network_rx: NetworkReceiver,
_sync_tx: SyncSender,
_sync_rx: SyncReceiver,
log: Logger) {
let rx_future = network_rx
.for_each(move |event| {
debug!(&log, "Sync receive";
"msg" => format!("{:?}", event));
Ok(())
})
.map_err(|_| panic!("rx failed"));
/*
* This is an unfinished stub function.
*/
tokio::run(rx_future);
}

View File

@ -0,0 +1,44 @@
use std::sync::{ RwLock, Arc };
use super::db::DB;
use slog::Logger;
use super::network_libp2p::message::{
NetworkEvent,
OutgoingMessage,
NetworkEventType,
};
use super::futures::sync::mpsc::{
UnboundedSender,
};
pub fn handle_network_event(
event: NetworkEvent,
db: Arc<RwLock<DB>>,
network_tx: UnboundedSender<OutgoingMessage>,
log: Logger)
-> Result<(), ()>
{
match event.event {
NetworkEventType::PeerConnect => Ok(()),
NetworkEventType::PeerDrop => Ok(()),
NetworkEventType::Message => handle_network_message(
event.data,
db,
network_tx,
log
)
}
}
fn handle_network_message(
message: Option<Vec<u8>>,
_db: Arc<RwLock<DB>>,
_network_tx: UnboundedSender<OutgoingMessage>,
log: Logger)
-> Result<(), ()>
{
debug!(&log, "";
"network_msg" => format!("{:?}", message));
Ok(())
}

View File

@ -0,0 +1,52 @@
use super::tokio;
use super::futures::{ Future, Stream };
use super::futures::sync::mpsc::{
UnboundedReceiver,
UnboundedSender,
};
use super::network_libp2p::message::{
NetworkEvent,
OutgoingMessage,
};
use super::network::handle_network_event;
use std::sync::{ RwLock, Arc };
use super::db::DB;
use slog::Logger;
type NetworkSender = UnboundedSender<OutgoingMessage>;
type NetworkReceiver = UnboundedReceiver<NetworkEvent>;
type SyncSender = UnboundedSender<Vec<u8>>;
type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// Start a syncing tokio future.
///
/// This is effectively a stub function being
/// used to test network functionality.
///
/// Expect a full re-write.
pub fn run_sync_future(
db: Arc<RwLock<DB>>,
network_tx: NetworkSender,
network_rx: NetworkReceiver,
_sync_tx: SyncSender,
_sync_rx: SyncReceiver,
log: Logger) {
let network_future = {
network_rx
.for_each(move |event| {
handle_network_event(
event,
db.clone(),
network_tx.clone(),
log.clone())
})
.map_err(|_| panic!("rx failed"))
};
/*
* This is an unfinished stub function.
*/
tokio::run(network_future);
}

View File

@ -18,7 +18,7 @@ use super::state::NetworkState;
use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage }; use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage };
use self::bigint::U512; use self::bigint::U512;
use self::futures::{ Future, Stream, Poll }; use self::futures::{ Future, Stream, Poll };
use self::futures::sync::mpsc::{ use self::futures::sync::mpsc::{
UnboundedSender, UnboundedReceiver UnboundedSender, UnboundedReceiver
}; };
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr,
@ -39,14 +39,14 @@ pub use self::libp2p_floodsub::Message;
pub fn listen(state: NetworkState, pub fn listen(state: NetworkState,
events_to_app: UnboundedSender<NetworkEvent>, events_to_app: UnboundedSender<NetworkEvent>,
raw_rx: UnboundedReceiver<OutgoingMessage>, raw_rx: UnboundedReceiver<OutgoingMessage>,
log: Logger) log: Logger)
{ {
let peer_store = state.peer_store; let peer_store = state.peer_store;
let peer_id = state.peer_id; let peer_id = state.peer_id;
let listen_multiaddr = state.listen_multiaddr; let listen_multiaddr = state.listen_multiaddr;
let listened_addrs = Arc::new(RwLock::new(vec![])); let listened_addrs = Arc::new(RwLock::new(vec![]));
let rx = ApplicationReciever{ inner: raw_rx }; let rx = ApplicationReciever{ inner: raw_rx };
// Build a tokio core // Build a tokio core
let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
// Build a base TCP libp2p transport // Build a base TCP libp2p transport
@ -65,10 +65,10 @@ pub fn listen(state: NetworkState,
// is stored not the internal addr. // is stored not the internal addr.
.map(move |out, _, _| { .map(move |out, _, _| {
if let(Some(ref observed), ref listen_multiaddr) = if let(Some(ref observed), ref listen_multiaddr) =
(out.observed_addr, listen_multiaddr) (out.observed_addr, listen_multiaddr)
{ {
if let Some(viewed_from_outside) = if let Some(viewed_from_outside) =
transport.nat_traversal(listen_multiaddr, observed) transport.nat_traversal(listen_multiaddr, observed)
{ {
listened_addrs.write().unwrap() listened_addrs.write().unwrap()
.push(viewed_from_outside); .push(viewed_from_outside);
@ -79,7 +79,7 @@ pub fn listen(state: NetworkState,
}; };
// Configure and build a Kademlia upgrade to be applied // Configure and build a Kademlia upgrade to be applied
// to the base TCP transport. // to the base TCP transport.
let kad_config = libp2p_kad::KademliaConfig { let kad_config = libp2p_kad::KademliaConfig {
parallelism: 3, parallelism: 3,
record_store: (), record_store: (),
@ -91,10 +91,10 @@ pub fn listen(state: NetworkState,
KademliaControllerPrototype::new(kad_config); KademliaControllerPrototype::new(kad_config);
let kad_upgrade = libp2p_kad:: let kad_upgrade = libp2p_kad::
KademliaUpgrade::from_prototype(&kad_ctl_proto); KademliaUpgrade::from_prototype(&kad_ctl_proto);
// Build a floodsub upgrade to allow pushing topic'ed // Build a floodsub upgrade to allow pushing topic'ed
// messages across the network. // messages across the network.
let (floodsub_upgrade, floodsub_rx) = let (floodsub_upgrade, floodsub_rx) =
FloodSubUpgrade::new(peer_id.clone()); FloodSubUpgrade::new(peer_id.clone());
// Combine the Kademlia and Identify upgrades into a single // Combine the Kademlia and Identify upgrades into a single
@ -104,7 +104,7 @@ pub fn listen(state: NetworkState,
floodsub: floodsub_upgrade.clone(), floodsub: floodsub_upgrade.clone(),
identify: libp2p_identify::IdentifyProtocolConfig, identify: libp2p_identify::IdentifyProtocolConfig,
}; };
// Build a Swarm to manage upgrading connections to peers. // Build a Swarm to manage upgrading connections to peers.
let swarm_listened_addrs = listened_addrs.clone(); let swarm_listened_addrs = listened_addrs.clone();
let swarm_peer_id = peer_id.clone(); let swarm_peer_id = peer_id.clone();
@ -166,7 +166,7 @@ pub fn listen(state: NetworkState,
for peer in peers { for peer in peers {
let peer_hash = U512::from(peer.hash()); let peer_hash = U512::from(peer.hash());
let distance = 512 - (local_hash ^ peer_hash).leading_zeros(); let distance = 512 - (local_hash ^ peer_hash).leading_zeros();
info!(kad_poll_log, "Discovered peer"; info!(kad_poll_log, "Discovered peer";
"distance" => distance, "distance" => distance,
"peer_id" => peer.to_base58()); "peer_id" => peer.to_base58());
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
@ -240,7 +240,7 @@ struct ConnectionUpgrader<P, R> {
} }
impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R> impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R>
where where
C: AsyncRead + AsyncWrite + 'static, C: AsyncRead + AsyncWrite + 'static,
P: Deref<Target = Pc> + Clone + 'static, P: Deref<Target = Pc> + Clone + 'static,
for<'r> &'r Pc: libp2p_peerstore::Peerstore, for<'r> &'r Pc: libp2p_peerstore::Peerstore,
@ -251,7 +251,7 @@ where
type Output = FinalUpgrade<C>; type Output = FinalUpgrade<C>;
type Future = Box<Future<Item = FinalUpgrade<C>, Error = IoError>>; type Future = Box<Future<Item = FinalUpgrade<C>, Error = IoError>>;
#[inline] #[inline]
fn protocol_names(&self) -> Self::NamesIter { fn protocol_names(&self) -> Self::NamesIter {
vec![ vec![
(Bytes::from("/ipfs/kad/1.0.0"), 0), (Bytes::from("/ipfs/kad/1.0.0"), 0),