Expand sync works

This commit is contained in:
Paul Hauner 2018-09-09 18:28:36 +02:00
parent c33d3689a7
commit 3399b02393
4 changed files with 63 additions and 20 deletions

View File

@ -6,6 +6,7 @@ extern crate network_libp2p;
pub mod messages; pub mod messages;
pub mod network; pub mod network;
pub mod sync_future; pub mod sync_future;
pub mod wire_protocol;
pub use self::sync_future::run_sync_future; pub use self::sync_future::run_sync_future;

View File

@ -8,10 +8,16 @@ use super::network_libp2p::message::{
NetworkEventType, NetworkEventType,
}; };
use super::wire_protocol::{ WireMessageType, message_type };
use super::futures::sync::mpsc::{ use super::futures::sync::mpsc::{
UnboundedSender, UnboundedSender,
}; };
/// Accept a network event and perform all required processing.
///
/// This function should be called whenever an underlying network
/// (e.g., libp2p) has an event to push up to the sync process.
pub fn handle_network_event( pub fn handle_network_event(
event: NetworkEvent, event: NetworkEvent,
db: Arc<RwLock<DB>>, db: Arc<RwLock<DB>>,
@ -19,26 +25,42 @@ pub fn handle_network_event(
log: Logger) log: Logger)
-> Result<(), ()> -> Result<(), ()>
{ {
debug!(&log, "";
"network_event" => format!("{:?}", &event));
match event.event { match event.event {
NetworkEventType::PeerConnect => Ok(()), NetworkEventType::PeerConnect => Ok(()),
NetworkEventType::PeerDrop => Ok(()), NetworkEventType::PeerDrop => Ok(()),
NetworkEventType::Message => handle_network_message( NetworkEventType::Message => {
event.data, if let Some(data) = event.data {
handle_network_message(
data,
db, db,
network_tx, network_tx,
log log)
) } else {
Ok(())
}
}
} }
} }
/// Accept a message from the network and perform all required
/// processing.
///
/// This function should be called whenever a peer from a network
/// (e.g., libp2p) has sent a message to us.
fn handle_network_message( fn handle_network_message(
message: Option<Vec<u8>>, message: Vec<u8>,
_db: Arc<RwLock<DB>>, _db: Arc<RwLock<DB>>,
_network_tx: UnboundedSender<OutgoingMessage>, _network_tx: UnboundedSender<OutgoingMessage>,
log: Logger) _log: Logger)
-> Result<(), ()> -> Result<(), ()>
{ {
debug!(&log, ""; match message_type(&message) {
"network_msg" => format!("{:?}", message)); Some(WireMessageType::Blocks) => {
// Do something with inbound blocks.
Ok(()) Ok(())
} }
_ => Ok(())
}
}

View File

@ -21,17 +21,17 @@ type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// Start a syncing tokio future. /// Start a syncing tokio future.
/// ///
/// This is effectively a stub function being /// Uses green-threading to process messages
/// used to test network functionality. /// from the network and the RPC and update
/// /// the state.
/// Expect a full re-write.
pub fn run_sync_future( pub fn run_sync_future(
db: Arc<RwLock<DB>>, db: Arc<RwLock<DB>>,
network_tx: NetworkSender, network_tx: NetworkSender,
network_rx: NetworkReceiver, network_rx: NetworkReceiver,
_sync_tx: SyncSender, _sync_tx: SyncSender,
_sync_rx: SyncReceiver, _sync_rx: SyncReceiver,
log: Logger) { log: Logger)
{
let network_future = { let network_future = {
network_rx network_rx
.for_each(move |event| { .for_each(move |event| {
@ -44,9 +44,5 @@ pub fn run_sync_future(
.map_err(|_| panic!("rx failed")) .map_err(|_| panic!("rx failed"))
}; };
/*
* This is an unfinished stub function.
*/
tokio::run(network_future); tokio::run(network_future);
} }

View File

@ -0,0 +1,24 @@
pub enum WireMessageType {
Status,
NewBlockHashes,
GetBlockHashes,
BlockHashes,
GetBlocks,
Blocks,
NewBlock,
}
/// Determines the message type of some given
/// message.
///
/// Does not check the validity of the message data,
/// it just reads the first byte.
pub fn message_type(message: &Vec<u8>)
-> Option<WireMessageType>
{
match message.get(0) {
Some(0x06) => Some(WireMessageType::Blocks),
_ => None
}
}