Merge pull request #14 from sigp/db_trait

[WIP] Build abstract database trait
This commit is contained in:
Paul Hauner 2018-09-18 17:50:02 +10:00 committed by GitHub
commit 6ab6c3dc40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 340 additions and 45 deletions

View File

@ -1,6 +1,6 @@
use std::sync::{ Arc, RwLock };
use std::sync::Arc;
use std::thread;
use super::db::{ DB, open_db };
use super::db::{ DiskDB };
use super::config::LighthouseConfig;
use super::futures::sync::mpsc::{
unbounded,
@ -10,10 +10,12 @@ use super::network_libp2p::state::NetworkState;
use super::slog::Logger;
use super::sync::run_sync_future;
use super::db::ClientDB;
/// Represents the co-ordination of the
/// networking, syncing and RPC (not-yet-implemented) threads.
pub struct Client {
pub db: Arc<RwLock<DB>>,
pub db: Arc<ClientDB>,
pub network_thread: thread::JoinHandle<()>,
pub sync_thread: thread::JoinHandle<()>,
}
@ -29,8 +31,8 @@ impl Client {
{
// Open the local db
let db = {
let db = open_db(&config.data_dir);
Arc::new(RwLock::new(db))
let db = DiskDB::open(&config.data_dir, None);
Arc::new(db)
};
// Start the network thread
@ -57,7 +59,7 @@ impl Client {
let (sync_out_sender, sync_out_receiver) = unbounded();
let (sync_in_sender, sync_in_receiver) = unbounded();
let sync_log = log.new(o!());
let sync_db = Arc::clone(&db);
let sync_db = db.clone();
let thread = thread::spawn(move || {
run_sync_future(
sync_db,

173
lighthouse/db/disk_db.rs Normal file
View File

@ -0,0 +1,173 @@
extern crate rocksdb;
use std::fs;
use std::path::Path;
use super::rocksdb::{
DB,
Options,
};
use super::rocksdb::Error as RocksError;
use super::{
ClientDB,
DBValue,
DBError
};
/// A on-disk database which implements the ClientDB trait.
///
/// This implementation uses RocksDB with default options.
pub struct DiskDB {
db: DB,
}
impl DiskDB {
/// Open the RocksDB database, optionally supplying columns if required.
///
/// The RocksDB database will be contained in a directory titled
/// "database" in the supplied path.
///
/// # Panics
///
/// Panics if the database is unable to be created.
pub fn open(path: &Path, columns: Option<&[&str]>) -> Self {
/*
* Initialise the options
*/
let mut options = Options::default();
options.create_if_missing(true);
/*
* Initialise the path
*/
fs::create_dir_all(&path)
.expect(&format!("Unable to create {:?}", &path));
let db_path = path.join("database");
/*
* Open the database
*/
let db = match columns {
None => DB::open(&options, db_path),
Some(columns) => DB::open_cf(&options, db_path, columns)
}.expect("Unable to open local database");;
Self {
db,
}
}
}
impl From<RocksError> for DBError {
fn from(e: RocksError) -> Self {
Self { message: e.to_string() }
}
}
impl ClientDB for DiskDB {
/// Create a RocksDB column family. Corresponds to the
/// `create_cf()` function on the RocksDB API.
fn create_col(&mut self, col: &str)
-> Result<(), DBError>
{
match self.db.create_cf(col, &Options::default()) {
Err(e) => Err(e.into()),
Ok(_) => Ok(())
}
}
/// Get the value for some key on some column.
///
/// Corresponds to the `get_cf()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn get(&self, col: &str, key: &[u8])
-> Result<Option<DBValue>, DBError>
{
match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }),
Some(handle) => {
match self.db.get_cf(handle, key)? {
None => Ok(None),
Some(db_vec) => Ok(Some(DBValue::from(&*db_vec)))
}
}
}
}
/// Set some value for some key on some column.
///
/// Corresponds to the `cf_handle()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn put(&self, col: &str, key: &[u8], val: &[u8])
-> Result<(), DBError>
{
match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }),
Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::ClientDB;
use std::{ env, fs, thread };
use std::sync::Arc;
#[test]
#[ignore]
fn test_rocksdb_can_use_db() {
let pwd = env::current_dir().unwrap();
let path = pwd.join("testdb_please_remove");
let _ = fs::remove_dir_all(&path);
fs::create_dir_all(&path).unwrap();
let col_name: &str = "TestColumn";
let column_families = vec![col_name];
let mut db = DiskDB::open(&path, None);
for cf in column_families {
db.create_col(&cf).unwrap();
}
let db = Arc::new(db);
let thread_count = 10;
let write_count = 10;
// We're execting the product of these numbers to fit in one byte.
assert!(thread_count * write_count <= 255);
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let db = db.clone();
let col = col_name.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = (t * w) as u8;
let val = 42;
db.put(&col, &vec![key], &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = (t * w) as u8;
let val = db.get(&col_name, &vec![key]).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
fs::remove_dir_all(&path).unwrap();
}
}

View File

@ -1,14 +1,11 @@
extern crate rocksdb;
use std::fs;
use std::path::Path;
pub use self::rocksdb::DB;
mod disk_db;
mod traits;
pub fn open_db(path: &Path) -> DB {
let db_path = path.join("rocksdb");
fs::create_dir_all(&db_path)
.expect(&format!("Unable to create {:?}", &db_path));
let db = DB::open_default(db_path.join("lighthouse.rdb"))
.expect("Unable to open local database.");
db
}
pub use self::disk_db::DiskDB;
pub use self::traits::{
DBError,
DBValue,
ClientDB,
};

30
lighthouse/db/traits.rs Normal file
View File

@ -0,0 +1,30 @@
pub type DBValue = Vec<u8>;
#[derive(Debug)]
pub struct DBError {
pub message: String
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}
/// A generic database to be used by the "client' (i.e.,
/// the lighthouse blockchain client).
///
/// The purpose of having this generic trait is to allow the
/// program to use a persistent on-disk database during production,
/// but use a transient database during tests.
pub trait ClientDB: Sync + Send {
fn create_col(&mut self, col: &str)
-> Result<(), DBError>;
fn get(&self, col: &str, key: &[u8])
-> Result<Option<DBValue>, DBError>;
fn put(&self, col: &str, key: &[u8], val: &[u8])
-> Result<(), DBError>;
}

24
lighthouse/sync/block.rs Normal file
View File

@ -0,0 +1,24 @@
use std::sync::Arc;
use super::db::ClientDB;
use slog::Logger;
pub enum BlockStatus {
Valid,
AlreadyKnown,
TooOld,
TimeInvalid,
UnknownPoWHash,
NoAttestations,
InvalidAttestation,
NotProposerSigned,
}
pub fn process_unverified_blocks(
_serialized_block: &[u8],
_db: Arc<ClientDB>,
_log: Logger)
{
//
}

View File

@ -1,12 +0,0 @@
pub enum SyncEventType {
Invalid,
PeerConnect,
PeerDrop,
ReceiveBlocks,
ReceiveAttestationRecords,
}
pub struct SyncEvent {
event: SyncEventType,
data: Option<Vec<u8>>
}

View File

@ -3,7 +3,7 @@ extern crate slog;
extern crate tokio;
extern crate network_libp2p;
pub mod messages;
pub mod block;
pub mod network;
pub mod sync_future;
pub mod wire_protocol;

View File

@ -1,5 +1,5 @@
use std::sync::{ RwLock, Arc };
use super::db::DB;
use std::sync::Arc;
use super::db::ClientDB;
use slog::Logger;
use super::network_libp2p::message::{
@ -8,7 +8,12 @@ use super::network_libp2p::message::{
NetworkEventType,
};
use super::wire_protocol::{ WireMessageType, message_type };
use super::block::process_unverified_blocks;
use super::wire_protocol::{
WireMessage,
WireMessageHeader,
};
use super::futures::sync::mpsc::{
UnboundedSender,
@ -20,7 +25,7 @@ use super::futures::sync::mpsc::{
/// (e.g., libp2p) has an event to push up to the sync process.
pub fn handle_network_event(
event: NetworkEvent,
db: Arc<RwLock<DB>>,
db: Arc<ClientDB>,
network_tx: UnboundedSender<OutgoingMessage>,
log: Logger)
-> Result<(), ()>
@ -51,16 +56,27 @@ pub fn handle_network_event(
/// (e.g., libp2p) has sent a message to us.
fn handle_network_message(
message: Vec<u8>,
_db: Arc<RwLock<DB>>,
db: Arc<ClientDB>,
_network_tx: UnboundedSender<OutgoingMessage>,
_log: Logger)
log: Logger)
-> Result<(), ()>
{
match message_type(&message) {
Some(WireMessageType::Blocks) => {
// Do something with inbound blocks.
match WireMessage::decode(&message) {
Ok(msg) => {
match msg.header {
WireMessageHeader::Blocks => {
process_unverified_blocks(
msg.body,
db,
log
);
Ok(())
}
_ => Ok(())
}
}
Err(_) => {
return Ok(()) // No need to pass the error back
}
}
}

View File

@ -9,8 +9,8 @@ use super::network_libp2p::message::{
OutgoingMessage,
};
use super::network::handle_network_event;
use std::sync::{ RwLock, Arc };
use super::db::DB;
use std::sync::Arc;
use super::db::ClientDB;
use slog::Logger;
type NetworkSender = UnboundedSender<OutgoingMessage>;
@ -25,7 +25,7 @@ type SyncReceiver = UnboundedReceiver<Vec<u8>>;
/// from the network and the RPC and update
/// the state.
pub fn run_sync_future(
db: Arc<RwLock<DB>>,
db: Arc<ClientDB>,
network_tx: NetworkSender,
network_rx: NetworkReceiver,
_sync_tx: SyncSender,

View File

@ -1,4 +1,9 @@
pub enum WireMessageType {
pub enum WireMessageDecodeError {
TooShort,
UnknownType,
}
pub enum WireMessageHeader {
Status,
NewBlockHashes,
GetBlockHashes,
@ -8,6 +13,48 @@ pub enum WireMessageType {
NewBlock,
}
pub struct WireMessage<'a> {
pub header: WireMessageHeader,
pub body: &'a [u8],
}
impl<'a> WireMessage<'a> {
pub fn decode(bytes: &'a Vec<u8>)
-> Result<Self, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageHeader::Blocks),
_ => None
};
match header {
Some(header) => Ok(Self{header, body}),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
}
/*
pub fn decode_wire_message(bytes: &[u8])
-> Result<WireMessage, WireMessageDecodeError>
{
if let Some((header_byte, body)) = bytes.split_first() {
let header = match header_byte {
0x06 => Some(WireMessageType::Blocks),
_ => None
};
match header {
Some(header) => Ok((header, body)),
None => Err(WireMessageDecodeError::UnknownType)
}
} else {
Err(WireMessageDecodeError::TooShort)
}
}
/// Determines the message type of some given
/// message.
@ -22,3 +69,21 @@ pub fn message_type(message: &Vec<u8>)
_ => None
}
}
pub fn identify_wire_protocol_message(message: &Vec<u8>)
-> Result<(WireMessageType, &[u8]), WireMessageDecodeError>
{
fn strip_header(v: &Vec<u8>) -> &[u8] {
match v.get(1..v.len()) {
None => &vec![],
Some(s) => s
}
}
match message.get(0) {
Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))),
None => Err(WireMessageDecodeError::TooShort),
_ => Err(WireMessageDecodeError::UnknownType),
}
}
*/