Merge branch 'master' into ssz-ints

This commit is contained in:
Paul Hauner 2018-09-22 12:19:35 +10:00
commit 2080368cf2
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
17 changed files with 570 additions and 119 deletions

View File

@ -1,6 +1,6 @@
# Lighthouse: a (future) Ethereum 2.0 client # Lighthouse: a (future) Ethereum 2.0 client
[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) [![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/sigp/lighthouse?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)
A **work-in-progress** implementation of the Ethereum 2.0 Beacon Chain in Rust. A **work-in-progress** implementation of the Ethereum 2.0 Beacon Chain in Rust.

View File

@ -1,6 +1,6 @@
use std::sync::{ Arc, RwLock }; use std::sync::Arc;
use std::thread; use std::thread;
use super::db::{ DB, open_db }; use super::db::{ DiskDB };
use super::config::LighthouseConfig; use super::config::LighthouseConfig;
use super::futures::sync::mpsc::{ use super::futures::sync::mpsc::{
unbounded, unbounded,
@ -10,10 +10,12 @@ use super::network_libp2p::state::NetworkState;
use super::slog::Logger; use super::slog::Logger;
use super::sync::run_sync_future; use super::sync::run_sync_future;
use super::db::ClientDB;
/// Represents the co-ordination of the /// Represents the co-ordination of the
/// networking, syncing and RPC (not-yet-implemented) threads. /// networking, syncing and RPC (not-yet-implemented) threads.
pub struct Client { pub struct Client {
pub db: Arc<RwLock<DB>>, pub db: Arc<ClientDB>,
pub network_thread: thread::JoinHandle<()>, pub network_thread: thread::JoinHandle<()>,
pub sync_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>,
} }
@ -23,14 +25,14 @@ impl Client {
/// ///
/// Presently, this means starting network and sync threads /// Presently, this means starting network and sync threads
/// and plumbing them together. /// and plumbing them together.
pub fn new(config: LighthouseConfig, pub fn new(config: &LighthouseConfig,
log: Logger) log: &Logger)
-> Self -> Self
{ {
// Open the local db // Open the local db
let db = { let db = {
let db = open_db(&config.data_dir); let db = DiskDB::open(&config.data_dir, None);
Arc::new(RwLock::new(db)) Arc::new(db)
}; };
// Start the network thread // Start the network thread
@ -57,14 +59,14 @@ impl Client {
let (sync_out_sender, sync_out_receiver) = unbounded(); let (sync_out_sender, sync_out_receiver) = unbounded();
let (sync_in_sender, sync_in_receiver) = unbounded(); let (sync_in_sender, sync_in_receiver) = unbounded();
let sync_log = log.new(o!()); let sync_log = log.new(o!());
let sync_db = Arc::clone(&db); let sync_db = db.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
run_sync_future( run_sync_future(
sync_db, sync_db,
network_tx.clone(), network_tx.clone(),
network_rx, network_rx,
sync_out_sender, &sync_out_sender,
sync_in_receiver, &sync_in_receiver,
sync_log, sync_log,
); );
}); });
@ -73,7 +75,7 @@ impl Client {
// Return the client struct // Return the client struct
Self { Self {
db: db, db,
network_thread, network_thread,
sync_thread, sync_thread,
} }

View File

@ -1,4 +1,6 @@
use std::{ env, fs }; extern crate dirs;
use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
/// Stores the core configuration for this Lighthouse instance. /// Stores the core configuration for this Lighthouse instance.
@ -16,12 +18,12 @@ impl LighthouseConfig {
/// Build a new lighthouse configuration from defaults. /// Build a new lighthouse configuration from defaults.
pub fn default() -> Self{ pub fn default() -> Self{
let data_dir = { let data_dir = {
let home = env::home_dir() let home = dirs::home_dir()
.expect("Unable to determine home dir."); .expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR) home.join(DEFAULT_LIGHTHOUSE_DIR)
}; };
fs::create_dir_all(&data_dir) fs::create_dir_all(&data_dir)
.expect(&format!("Unable to create {:?}", &data_dir)); .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let p2p_listen_port = 0; let p2p_listen_port = 0;
Self { Self {
data_dir, data_dir,

174
lighthouse/db/disk_db.rs Normal file
View File

@ -0,0 +1,174 @@
extern crate rocksdb;
use std::fs;
use std::path::Path;
use super::rocksdb::{
DB,
Options,
};
use super::rocksdb::Error as RocksError;
use super::{
ClientDB,
DBValue,
DBError
};
/// A on-disk database which implements the ClientDB trait.
///
/// This implementation uses RocksDB with default options.
pub struct DiskDB {
db: DB,
}
impl DiskDB {
/// Open the RocksDB database, optionally supplying columns if required.
///
/// The RocksDB database will be contained in a directory titled
/// "database" in the supplied path.
///
/// # Panics
///
/// Panics if the database is unable to be created.
pub fn open(path: &Path, columns: Option<&[&str]>) -> Self {
/*
* Initialise the options
*/
let mut options = Options::default();
options.create_if_missing(true);
/*
* Initialise the path
*/
fs::create_dir_all(&path)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &path));
let db_path = path.join("database");
/*
* Open the database
*/
let db = match columns {
None => DB::open(&options, db_path),
Some(columns) => DB::open_cf(&options, db_path, columns)
}.expect("Unable to open local database");;
Self {
db,
}
}
/// Create a RocksDB column family. Corresponds to the
/// `create_cf()` function on the RocksDB API.
fn create_col(&mut self, col: &str)
-> Result<(), DBError>
{
match self.db.create_cf(col, &Options::default()) {
Err(e) => Err(e.into()),
Ok(_) => Ok(())
}
}
}
impl From<RocksError> for DBError {
fn from(e: RocksError) -> Self {
Self { message: e.to_string() }
}
}
impl ClientDB for DiskDB {
/// Get the value for some key on some column.
///
/// Corresponds to the `get_cf()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn get(&self, col: &str, key: &[u8])
-> Result<Option<DBValue>, DBError>
{
match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }),
Some(handle) => {
match self.db.get_cf(handle, key)? {
None => Ok(None),
Some(db_vec) => Ok(Some(DBValue::from(&*db_vec)))
}
}
}
}
/// Set some value for some key on some column.
///
/// Corresponds to the `cf_handle()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn put(&self, col: &str, key: &[u8], val: &[u8])
-> Result<(), DBError>
{
match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }),
Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::ClientDB;
use std::{ env, fs, thread };
use std::sync::Arc;
#[test]
#[ignore]
fn test_rocksdb_can_use_db() {
let pwd = env::current_dir().unwrap();
let path = pwd.join("testdb_please_remove");
let _ = fs::remove_dir_all(&path);
fs::create_dir_all(&path).unwrap();
let col_name: &str = "TestColumn";
let column_families = vec![col_name];
let mut db = DiskDB::open(&path, None);
for cf in column_families {
db.create_col(&cf).unwrap();
}
let db = Arc::new(db);
let thread_count = 10;
let write_count = 10;
// We're execting the product of these numbers to fit in one byte.
assert!(thread_count * write_count <= 255);
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let db = db.clone();
let col = col_name.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = (t * w) as u8;
let val = 42;
db.put(&col, &vec![key], &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = (t * w) as u8;
let val = db.get(&col_name, &vec![key]).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
fs::remove_dir_all(&path).unwrap();
}
}

177
lighthouse/db/memory_db.rs Normal file
View File

@ -0,0 +1,177 @@
use std::collections::{ HashSet, HashMap };
use std::sync::RwLock;
use super::blake2::blake2b::blake2b;
use super::{
ClientDB,
DBValue,
DBError
};
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
type ColumnHashSet = HashSet<String>;
/// An in-memory database implementing the ClientDB trait.
///
/// It is not particularily optimized, it exists for ease and speed of testing. It's not expected
/// this DB would be used outside of tests.
pub struct MemoryDB {
db: RwLock<DBHashMap>,
known_columns: RwLock<ColumnHashSet>
}
impl MemoryDB {
/// Open the in-memory database.
///
/// All columns must be supplied initially, you will get an error if you try to access a column
/// that was not declared here. This condition is enforced artificially to simulate RocksDB.
pub fn open(columns: Option<&[&str]>) -> Self {
let db: DBHashMap = HashMap::new();
let mut known_columns: ColumnHashSet = HashSet::new();
if let Some(columns) = columns {
for col in columns {
known_columns.insert(col.to_string());
}
}
Self {
db: RwLock::new(db),
known_columns: RwLock::new(known_columns),
}
}
/// Hashes a key and a column name in order to get a unique key for the supplied column.
fn get_key_for_col(col: &str, key: &[u8]) -> Vec<u8> {
blake2b(32, col.as_bytes(), key).as_bytes().to_vec()
}
}
impl ClientDB for MemoryDB {
/// Get the value of some key from the database. Returns `None` if the key does not exist.
fn get(&self, col: &str, key: &[u8])
-> Result<Option<DBValue>, DBError>
{
// Panic if the DB locks are poisoned.
let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.get(&column_key).and_then(|val| Some(val.clone())))
} else {
Err(DBError{ message: "Unknown column".to_string() })
}
}
/// Puts a key in the database.
fn put(&self, col: &str, key: &[u8], val: &[u8])
-> Result<(), DBError>
{
// Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
db.insert(column_key, val.to_vec());
Ok(())
} else {
Err(DBError{ message: "Unknown column".to_string() })
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::ClientDB;
use std::thread;
use std::sync::Arc;
#[test]
fn test_memorydb_column_access() {
let col_a: &str = "ColumnA";
let col_b: &str = "ColumnB";
let column_families = vec![
col_a,
col_b,
];
let db = MemoryDB::open(Some(&column_families));
/*
* Testing that if we write to the same key in different columns that
* there is not an overlap.
*/
db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap();
db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap();
assert_eq!(db.get(col_a, "same".as_bytes()).unwrap().unwrap(), "cat".as_bytes());
assert_eq!(db.get(col_b, "same".as_bytes()).unwrap().unwrap(), "dog".as_bytes());
}
#[test]
fn test_memorydb_unknown_column_access() {
let col_a: &str = "ColumnA";
let col_x: &str = "ColumnX";
let column_families = vec![
col_a,
// col_x is excluded on purpose
];
let db = MemoryDB::open(Some(&column_families));
/*
* Test that we get errors when using undeclared columns
*/
assert!(db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).is_ok());
assert!(db.put(col_x, "cats".as_bytes(), "lol".as_bytes()).is_err());
assert!(db.get(col_a, "cats".as_bytes()).is_ok());
assert!(db.get(col_x, "cats".as_bytes()).is_err());
}
#[test]
fn test_memorydb_threading() {
let col_name: &str = "TestColumn";
let column_families = vec![col_name];
let db = Arc::new(MemoryDB::open(Some(&column_families)));
let thread_count = 10;
let write_count = 10;
// We're execting the product of these numbers to fit in one byte.
assert!(thread_count * write_count <= 255);
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let db = db.clone();
let col = col_name.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = (t * w) as u8;
let val = 42;
db.put(&col, &vec![key], &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = (t * w) as u8;
let val = db.get(&col_name, &vec![key]).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
}
}

View File

@ -1,14 +1,14 @@
extern crate rocksdb; extern crate rocksdb;
extern crate blake2_rfc as blake2;
use std::fs; mod disk_db;
use std::path::Path; mod memory_db;
pub use self::rocksdb::DB; mod traits;
pub fn open_db(path: &Path) -> DB { pub use self::disk_db::DiskDB;
let db_path = path.join("rocksdb"); pub use self::memory_db::MemoryDB;
fs::create_dir_all(&db_path) pub use self::traits::{
.expect(&format!("Unable to create {:?}", &db_path)); DBError,
let db = DB::open_default(db_path.join("lighthouse.rdb")) DBValue,
.expect("Unable to open local database."); ClientDB,
db };
}

27
lighthouse/db/traits.rs Normal file
View File

@ -0,0 +1,27 @@
pub type DBValue = Vec<u8>;
#[derive(Debug)]
pub struct DBError {
pub message: String
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}
/// A generic database to be used by the "client' (i.e.,
/// the lighthouse blockchain client).
///
/// The purpose of having this generic trait is to allow the
/// program to use a persistent on-disk database during production,
/// but use a transient database during tests.
pub trait ClientDB: Sync + Send {
fn get(&self, col: &str, key: &[u8])
-> Result<Option<DBValue>, DBError>;
fn put(&self, col: &str, key: &[u8], val: &[u8])
-> Result<(), DBError>;
}

View File

@ -64,7 +64,7 @@ fn main() {
"data_dir" => &config.data_dir.to_str(), "data_dir" => &config.data_dir.to_str(),
"port" => &config.p2p_listen_port); "port" => &config.p2p_listen_port);
let client = Client::new(config, log.new(o!())); let client = Client::new(&config, &log);
client.sync_thread.join().unwrap(); client.sync_thread.join().unwrap();
info!(log, "Exiting."); info!(log, "Exiting.");

View File

@ -13,16 +13,16 @@ use super::TransitionError;
/// See this slide for more information: /// See this slide for more information:
/// https://tinyurl.com/ybzn2spw /// https://tinyurl.com/ybzn2spw
pub fn attestation_parent_hashes( pub fn attestation_parent_hashes(
cycle_length: &u8, cycle_length: u8,
block_slot: &u64, block_slot: u64,
attestation_slot: &u64, attestation_slot: u64,
current_hashes: &Vec<Hash256>, current_hashes: &[Hash256],
oblique_hashes: &Vec<Hash256>) oblique_hashes: &[Hash256])
-> Result<Vec<Hash256>, TransitionError> -> Result<Vec<Hash256>, TransitionError>
{ {
// This cast places a limit on cycle_length. If you change it, check math // This cast places a limit on cycle_length. If you change it, check math
// for overflow. // for overflow.
let cycle_length: u64 = *cycle_length as u64; let cycle_length: u64 = u64::from(cycle_length);
if current_hashes.len() as u64 != (cycle_length * 2) { if current_hashes.len() as u64 != (cycle_length * 2) {
return Err(TransitionError::InvalidInput(String::from( return Err(TransitionError::InvalidInput(String::from(
@ -69,7 +69,7 @@ pub fn attestation_parent_hashes(
let mut hashes = Vec::new(); let mut hashes = Vec::new();
hashes.extend_from_slice( hashes.extend_from_slice(
&current_hashes[(start as usize)..(end as usize)]); &current_hashes[(start as usize)..(end as usize)]);
hashes.append(&mut oblique_hashes.clone()); hashes.extend_from_slice(oblique_hashes);
Ok(hashes) Ok(hashes)
} }
@ -98,9 +98,9 @@ mod tests {
let current_hashes = get_range_of_hashes(3, 19); let current_hashes = get_range_of_hashes(3, 19);
let oblique_hashes = get_range_of_hashes(100, 102); let oblique_hashes = get_range_of_hashes(100, 102);
let result = attestation_parent_hashes( let result = attestation_parent_hashes(
&cycle_length, cycle_length,
&block_slot, block_slot,
&attestation_slot, attestation_slot,
&current_hashes, &current_hashes,
&oblique_hashes); &oblique_hashes);
assert!(result.is_ok()); assert!(result.is_ok());
@ -123,9 +123,9 @@ mod tests {
let current_hashes = get_range_of_hashes(3, 19); let current_hashes = get_range_of_hashes(3, 19);
let oblique_hashes = get_range_of_hashes(100, 108); let oblique_hashes = get_range_of_hashes(100, 108);
let result = attestation_parent_hashes( let result = attestation_parent_hashes(
&cycle_length, cycle_length,
&block_slot, block_slot,
&attestation_slot, attestation_slot,
&current_hashes, &current_hashes,
&oblique_hashes); &oblique_hashes);
assert!(result.is_ok()); assert!(result.is_ok());
@ -148,9 +148,9 @@ mod tests {
let current_hashes = get_range_of_hashes(3, 19); let current_hashes = get_range_of_hashes(3, 19);
let oblique_hashes = vec![]; let oblique_hashes = vec![];
let result = attestation_parent_hashes( let result = attestation_parent_hashes(
&cycle_length, cycle_length,
&block_slot, block_slot,
&attestation_slot, attestation_slot,
&current_hashes, &current_hashes,
&oblique_hashes); &oblique_hashes);
assert!(result.is_ok()); assert!(result.is_ok());
@ -171,9 +171,9 @@ mod tests {
let current_hashes = get_range_of_hashes(0, 16); let current_hashes = get_range_of_hashes(0, 16);
let oblique_hashes = vec![]; let oblique_hashes = vec![];
let result = attestation_parent_hashes( let result = attestation_parent_hashes(
&cycle_length, cycle_length,
&block_slot, block_slot,
&attestation_slot, attestation_slot,
&current_hashes, &current_hashes,
&oblique_hashes); &oblique_hashes);
assert!(result.is_ok()); assert!(result.is_ok());
@ -194,9 +194,9 @@ mod tests {
let current_hashes = get_range_of_hashes(0, 16); let current_hashes = get_range_of_hashes(0, 16);
let oblique_hashes = vec![]; let oblique_hashes = vec![];
let result = attestation_parent_hashes( let result = attestation_parent_hashes(
&cycle_length, cycle_length,
&block_slot, block_slot,
&attestation_slot, attestation_slot,
&current_hashes, &current_hashes,
&oblique_hashes); &oblique_hashes);
assert!(result.is_err()); assert!(result.is_err());
@ -213,9 +213,9 @@ mod tests {
let current_hashes = get_range_of_hashes(0, 15); let current_hashes = get_range_of_hashes(0, 15);
let oblique_hashes = vec![]; let oblique_hashes = vec![];
let result = attestation_parent_hashes( let result = attestation_parent_hashes(
&cycle_length, cycle_length,
&block_slot, block_slot,
&attestation_slot, attestation_slot,
&current_hashes, &current_hashes,
&oblique_hashes); &oblique_hashes);
assert!(result.is_err()); assert!(result.is_err());

View File

@ -2,7 +2,7 @@ use super::blake2_rfc::blake2s::{ Blake2s, Blake2sResult };
const SEED_SIZE_BYTES: usize = 32; const SEED_SIZE_BYTES: usize = 32;
const RAND_BYTES: usize = 3; // 24 / 8 const RAND_BYTES: usize = 3; // 24 / 8
const RAND_MAX: u32 = 16777216; // 2**24 const RAND_MAX: u32 = 16_777_216; // 2**24
/// A pseudo-random number generator which given a seed /// A pseudo-random number generator which given a seed
/// uses successive blake2s hashing to generate "entropy". /// uses successive blake2s hashing to generate "entropy".
@ -31,19 +31,16 @@ impl ShuffleRng {
/// Extracts 3 bytes from the `seed`. Rehashes seed if required. /// Extracts 3 bytes from the `seed`. Rehashes seed if required.
fn rand(&mut self) -> u32 { fn rand(&mut self) -> u32 {
self.idx += RAND_BYTES; self.idx += RAND_BYTES;
match self.idx >= SEED_SIZE_BYTES { if self.idx >= SEED_SIZE_BYTES {
true => {
self.rehash_seed(); self.rehash_seed();
self.rand() self.rand()
} } else {
false => {
int_from_byte_slice( int_from_byte_slice(
self.seed.as_bytes(), self.seed.as_bytes(),
self.idx - RAND_BYTES, self.idx - RAND_BYTES,
) )
} }
} }
}
/// Generate a random u32 below the specified maximum `n`. /// Generate a random u32 below the specified maximum `n`.
/// ///
@ -65,9 +62,9 @@ impl ShuffleRng {
/// Returns that integer. /// Returns that integer.
fn int_from_byte_slice(source: &[u8], offset: usize) -> u32 { fn int_from_byte_slice(source: &[u8], offset: usize) -> u32 {
( (
source[offset + 2] as u32) | u32::from(source[offset + 2])) |
((source[offset + 1] as u32) << 8) | (u32::from(source[offset + 1]) << 8) |
((source[offset ] as u32) << 16 (u32::from(source[offset ]) << 16
) )
} }

View File

@ -1,12 +0,0 @@
pub enum SyncEventType {
Invalid,
PeerConnect,
PeerDrop,
ReceiveBlocks,
ReceiveAttestationRecords,
}
pub struct SyncEvent {
event: SyncEventType,
data: Option<Vec<u8>>
}

View File

@ -3,7 +3,6 @@ extern crate slog;
extern crate tokio; extern crate tokio;
extern crate network_libp2p; extern crate network_libp2p;
pub mod messages;
pub mod network; pub mod network;
pub mod sync_future; pub mod sync_future;
pub mod wire_protocol; pub mod wire_protocol;

View File

@ -1,5 +1,5 @@
use std::sync::{ RwLock, Arc }; use std::sync::Arc;
use super::db::DB; use super::db::ClientDB;
use slog::Logger; use slog::Logger;
use super::network_libp2p::message::{ use super::network_libp2p::message::{
@ -8,7 +8,10 @@ use super::network_libp2p::message::{
NetworkEventType, NetworkEventType,
}; };
use super::wire_protocol::{ WireMessageType, message_type }; use super::wire_protocol::{
WireMessage,
WireMessageHeader,
};
use super::futures::sync::mpsc::{ use super::futures::sync::mpsc::{
UnboundedSender, UnboundedSender,
@ -20,9 +23,9 @@ use super::futures::sync::mpsc::{
/// (e.g., libp2p) has an event to push up to the sync process. /// (e.g., libp2p) has an event to push up to the sync process.
pub fn handle_network_event( pub fn handle_network_event(
event: NetworkEvent, event: NetworkEvent,
db: Arc<RwLock<DB>>, db: &Arc<ClientDB>,
network_tx: UnboundedSender<OutgoingMessage>, network_tx: &UnboundedSender<OutgoingMessage>,
log: Logger) log: &Logger)
-> Result<(), ()> -> Result<(), ()>
{ {
debug!(&log, ""; debug!(&log, "";
@ -33,10 +36,10 @@ pub fn handle_network_event(
NetworkEventType::Message => { NetworkEventType::Message => {
if let Some(data) = event.data { if let Some(data) = event.data {
handle_network_message( handle_network_message(
data, &data,
db, &db,
network_tx, &network_tx,
log) &log)
} else { } else {
Ok(()) Ok(())
} }
@ -50,17 +53,35 @@ pub fn handle_network_event(
/// This function should be called whenever a peer from a network /// This function should be called whenever a peer from a network
/// (e.g., libp2p) has sent a message to us. /// (e.g., libp2p) has sent a message to us.
fn handle_network_message( fn handle_network_message(
message: Vec<u8>, message: &[u8],
_db: Arc<RwLock<DB>>, db: &Arc<ClientDB>,
_network_tx: UnboundedSender<OutgoingMessage>, _network_tx: &UnboundedSender<OutgoingMessage>,
_log: Logger) log: &Logger)
-> Result<(), ()> -> Result<(), ()>
{ {
match message_type(&message) { match WireMessage::decode(&message) {
Some(WireMessageType::Blocks) => { Ok(msg) => {
// Do something with inbound blocks. match msg.header {
WireMessageHeader::Blocks => {
process_unverified_blocks(
msg.body,
&db,
&log
);
Ok(()) Ok(())
} }
_ => Ok(()) _ => Ok(())
} }
}
Err(_) => {
Ok(()) // No need to pass the error back
}
}
}
fn process_unverified_blocks(_message: &[u8],
_db: &Arc<ClientDB>,
_log: &Logger)
{
//
} }

View File

@ -9,8 +9,8 @@ use super::network_libp2p::message::{
OutgoingMessage, OutgoingMessage,
}; };
use super::network::handle_network_event; use super::network::handle_network_event;
use std::sync::{ RwLock, Arc }; use std::sync::Arc;
use super::db::DB; use super::db::ClientDB;
use slog::Logger; use slog::Logger;
type NetworkSender = UnboundedSender<OutgoingMessage>; type NetworkSender = UnboundedSender<OutgoingMessage>;
@ -25,11 +25,11 @@ type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// from the network and the RPC and update /// from the network and the RPC and update
/// the state. /// the state.
pub fn run_sync_future( pub fn run_sync_future(
db: Arc<RwLock<DB>>, db: Arc<ClientDB>,
network_tx: NetworkSender, network_tx: NetworkSender,
network_rx: NetworkReceiver, network_rx: NetworkReceiver,
_sync_tx: SyncSender, _sync_tx: &SyncSender,
_sync_rx: SyncReceiver, _sync_rx: &SyncReceiver,
log: Logger) log: Logger)
{ {
let network_future = { let network_future = {
@ -37,9 +37,9 @@ pub fn run_sync_future(
.for_each(move |event| { .for_each(move |event| {
handle_network_event( handle_network_event(
event, event,
db.clone(), &db.clone(),
network_tx.clone(), &network_tx.clone(),
log.clone()) &log.clone())
}) })
.map_err(|_| panic!("rx failed")) .map_err(|_| panic!("rx failed"))
}; };

View File

@ -1,4 +1,9 @@
pub enum WireMessageType { pub enum WireMessageDecodeError {
TooShort,
UnknownType,
}
pub enum WireMessageHeader {
Status, Status,
NewBlockHashes, NewBlockHashes,
GetBlockHashes, GetBlockHashes,
@ -8,6 +13,48 @@ pub enum WireMessageType {
NewBlock, NewBlock,
} }
pub struct WireMessage<'a> {
pub header: WireMessageHeader,
pub body: &'a [u8],
}
impl<'a> WireMessage<'a> {
pub fn decode(bytes: &'a [u8])
-> Result<Self, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageHeader::Blocks),
_ => None
};
match header {
Some(header) => Ok(Self{header, body}),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
}
/*
pub fn decode_wire_message(bytes: &[u8])
-> Result<WireMessage, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageType::Blocks),
_ => None
};
match header {
Some(header) => Ok((header, body)),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
/// Determines the message type of some given /// Determines the message type of some given
/// message. /// message.
@ -22,3 +69,21 @@ pub fn message_type(message: &Vec<u8>)
_ => None _ => None
} }
} }
pub fn identify_wire_protocol_message(message: &Vec<u8>)
-> Result<(WireMessageType, &[u8]), WireMessageDecodeError>
{
fn strip_header(v: &Vec<u8>) -> &[u8] {
match v.get(1..v.len()) {
None => &vec![],
Some(s) => s
}
}
match message.get(0) {
Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))),
None => Err(WireMessageDecodeError::TooShort),
_ => Err(WireMessageDecodeError::UnknownType),
}
}
*/

View File

@ -7,11 +7,10 @@ pub use slog::Logger;
pub fn test_logger() -> slog::Logger { pub fn test_logger() -> slog::Logger {
let plain = slog_term::PlainSyncDecorator::new(slog_term::TestStdoutWriter); let plain = slog_term::PlainSyncDecorator::new(slog_term::TestStdoutWriter);
let logger = Logger::root( Logger::root(
slog_term::FullFormat::new(plain) slog_term::FullFormat::new(plain)
.build().fuse(), o!() .build().fuse(), o!()
); )
logger
} }
pub fn get_logger() -> slog::Logger { pub fn get_logger() -> slog::Logger {

View File

@ -8,13 +8,13 @@ bigint = "4.2"
bytes = "" bytes = ""
eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" } eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" }
futures = "0.1.23" futures = "0.1.23"
libp2p-peerstore = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-core = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-mplex = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-tcp-transport = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-floodsub = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-identify = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
libp2p-kad = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" }
pem = "0.5.0" pem = "0.5.0"
rand = "0.3" rand = "0.3"
slog = "^2.2.3" slog = "^2.2.3"