Add more plumbing for block processing

This commit is contained in:
Paul Hauner 2018-09-14 15:52:49 +10:00
parent 16bc6ba82a
commit 513972b75c
7 changed files with 123 additions and 31 deletions

View File

@ -1,4 +1,4 @@
use std::sync::{ Arc, RwLock }; use std::sync::Arc;
use std::thread; use std::thread;
use super::db::{ DB, open_db }; use super::db::{ DB, open_db };
use super::config::LighthouseConfig; use super::config::LighthouseConfig;
@ -13,7 +13,7 @@ use super::sync::run_sync_future;
/// Represents the co-ordination of the /// Represents the co-ordination of the
/// networking, syncing and RPC (not-yet-implemented) threads. /// networking, syncing and RPC (not-yet-implemented) threads.
pub struct Client { pub struct Client {
pub db: Arc<RwLock<DB>>, pub db: Arc<DB>,
pub network_thread: thread::JoinHandle<()>, pub network_thread: thread::JoinHandle<()>,
pub sync_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>,
} }
@ -30,7 +30,7 @@ impl Client {
// Open the local db // Open the local db
let db = { let db = {
let db = open_db(&config.data_dir); let db = open_db(&config.data_dir);
Arc::new(RwLock::new(db)) Arc::new(db)
}; };
// Start the network thread // Start the network thread
@ -57,7 +57,7 @@ impl Client {
let (sync_out_sender, sync_out_receiver) = unbounded(); let (sync_out_sender, sync_out_receiver) = unbounded();
let (sync_in_sender, sync_in_receiver) = unbounded(); let (sync_in_sender, sync_in_receiver) = unbounded();
let sync_log = log.new(o!()); let sync_log = log.new(o!());
let sync_db = Arc::clone(&db); let sync_db = db.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
run_sync_future( run_sync_future(
sync_db, sync_db,

23
lighthouse/sync/block.rs Normal file
View File

@ -0,0 +1,23 @@
use super::db::DB;
use slog::Logger;
pub enum BlockStatus {
Valid,
AlreadyKnown,
TooOld,
TimeInvalid,
UnknownPoWHash,
NoAttestations,
InvalidAttestation,
NotProposerSigned,
}
pub fn process_unverified_blocks(
_serialized_block: &[u8],
_db: &DB,
_log: Logger)
{
//
}

View File

@ -1,12 +0,0 @@
pub enum SyncEventType {
Invalid,
PeerConnect,
PeerDrop,
ReceiveBlocks,
ReceiveAttestationRecords,
}
pub struct SyncEvent {
event: SyncEventType,
data: Option<Vec<u8>>
}

View File

@ -3,7 +3,7 @@ extern crate slog;
extern crate tokio; extern crate tokio;
extern crate network_libp2p; extern crate network_libp2p;
pub mod messages; pub mod block;
pub mod network; pub mod network;
pub mod sync_future; pub mod sync_future;
pub mod wire_protocol; pub mod wire_protocol;

View File

@ -1,4 +1,4 @@
use std::sync::{ RwLock, Arc }; use std::sync::Arc;
use super::db::DB; use super::db::DB;
use slog::Logger; use slog::Logger;
@ -8,7 +8,12 @@ use super::network_libp2p::message::{
NetworkEventType, NetworkEventType,
}; };
use super::wire_protocol::{ WireMessageType, message_type }; use super::block::process_unverified_blocks;
use super::wire_protocol::{
WireMessage,
WireMessageHeader,
};
use super::futures::sync::mpsc::{ use super::futures::sync::mpsc::{
UnboundedSender, UnboundedSender,
@ -20,7 +25,7 @@ use super::futures::sync::mpsc::{
/// (e.g., libp2p) has an event to push up to the sync process. /// (e.g., libp2p) has an event to push up to the sync process.
pub fn handle_network_event( pub fn handle_network_event(
event: NetworkEvent, event: NetworkEvent,
db: Arc<RwLock<DB>>, db: Arc<DB>,
network_tx: UnboundedSender<OutgoingMessage>, network_tx: UnboundedSender<OutgoingMessage>,
log: Logger) log: Logger)
-> Result<(), ()> -> Result<(), ()>
@ -34,7 +39,7 @@ pub fn handle_network_event(
if let Some(data) = event.data { if let Some(data) = event.data {
handle_network_message( handle_network_message(
data, data,
db, &db,
network_tx, network_tx,
log) log)
} else { } else {
@ -51,16 +56,27 @@ pub fn handle_network_event(
/// (e.g., libp2p) has sent a message to us. /// (e.g., libp2p) has sent a message to us.
fn handle_network_message( fn handle_network_message(
message: Vec<u8>, message: Vec<u8>,
_db: Arc<RwLock<DB>>, db: &DB,
_network_tx: UnboundedSender<OutgoingMessage>, _network_tx: UnboundedSender<OutgoingMessage>,
_log: Logger) log: Logger)
-> Result<(), ()> -> Result<(), ()>
{ {
match message_type(&message) { match WireMessage::decode(&message) {
Some(WireMessageType::Blocks) => { Ok(msg) => {
// Do something with inbound blocks. match msg.header {
Ok(()) WireMessageHeader::Blocks => {
process_unverified_blocks(
msg.body,
db,
log
);
Ok(())
}
_ => Ok(())
}
}
Err(_) => {
return Ok(()) // No need to pass the error back
} }
_ => Ok(())
} }
} }

View File

@ -9,7 +9,7 @@ use super::network_libp2p::message::{
OutgoingMessage, OutgoingMessage,
}; };
use super::network::handle_network_event; use super::network::handle_network_event;
use std::sync::{ RwLock, Arc }; use std::sync::Arc;
use super::db::DB; use super::db::DB;
use slog::Logger; use slog::Logger;
@ -25,7 +25,7 @@ type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// from the network and the RPC and update /// from the network and the RPC and update
/// the state. /// the state.
pub fn run_sync_future( pub fn run_sync_future(
db: Arc<RwLock<DB>>, db: Arc<DB>,
network_tx: NetworkSender, network_tx: NetworkSender,
network_rx: NetworkReceiver, network_rx: NetworkReceiver,
_sync_tx: SyncSender, _sync_tx: SyncSender,

View File

@ -1,4 +1,9 @@
pub enum WireMessageType { pub enum WireMessageDecodeError {
TooShort,
UnknownType,
}
pub enum WireMessageHeader {
Status, Status,
NewBlockHashes, NewBlockHashes,
GetBlockHashes, GetBlockHashes,
@ -8,6 +13,48 @@ pub enum WireMessageType {
NewBlock, NewBlock,
} }
pub struct WireMessage<'a> {
pub header: WireMessageHeader,
pub body: &'a [u8],
}
impl<'a> WireMessage<'a> {
pub fn decode(bytes: &'a Vec<u8>)
-> Result<Self, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageHeader::Blocks),
_ => None
};
match header {
Some(header) => Ok(Self{header, body}),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
}
/*
pub fn decode_wire_message(bytes: &[u8])
-> Result<WireMessage, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageType::Blocks),
_ => None
};
match header {
Some(header) => Ok((header, body)),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
/// Determines the message type of some given /// Determines the message type of some given
/// message. /// message.
@ -22,3 +69,21 @@ pub fn message_type(message: &Vec<u8>)
_ => None _ => None
} }
} }
pub fn identify_wire_protocol_message(message: &Vec<u8>)
-> Result<(WireMessageType, &[u8]), WireMessageDecodeError>
{
fn strip_header(v: &Vec<u8>) -> &[u8] {
match v.get(1..v.len()) {
None => &vec![],
Some(s) => s
}
}
match message.get(0) {
Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))),
None => Err(WireMessageDecodeError::TooShort),
_ => Err(WireMessageDecodeError::UnknownType),
}
}
*/