Trim db2 down to basic new parts

This commit is contained in:
Paul Hauner 2019-05-01 11:42:18 +10:00
parent 05df7702d3
commit 85266f8db0
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
12 changed files with 113 additions and 1045 deletions

View File

@ -10,6 +10,7 @@ bls = { path = "../../eth2/utils/bls" }
bytes = "0.4.10"
db_encode = { path = "../db_encode" }
db_encode_derive = { path = "../db_encode_derive" }
parking_lot = "0.7"
rocksdb = "0.10.1"
ssz = { path = "../../eth2/utils/ssz" }
ssz_derive = { path = "../../eth2/utils/ssz_derive" }

View File

@ -1,9 +1,9 @@
extern crate rocksdb;
use super::rocksdb::Error as RocksError;
use super::rocksdb::{Options, DB};
use super::stores::COLUMNS;
// use super::stores::COLUMNS;
use super::{ClientDB, DBError, DBValue};
use rocksdb::Error as RocksError;
use rocksdb::{Options, DB};
use std::fs;
use std::path::Path;

View File

@ -0,0 +1,30 @@
use ssz::DecodeError;
#[derive(Debug, PartialEq)]
pub enum Error {
SszDecodeError(DecodeError),
DBError { message: String },
}
impl From<DecodeError> for Error {
fn from(e: DecodeError) -> Error {
Error::SszDecodeError(e)
}
}
impl From<DBError> for Error {
fn from(e: DBError) -> Error {
Error::DBError { message: e.message }
}
}
#[derive(Debug)]
pub struct DBError {
pub message: String,
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}

View File

@ -1,38 +1,42 @@
extern crate blake2_rfc as blake2;
extern crate bls;
extern crate rocksdb;
mod disk_db;
// mod disk_db;
mod errors;
mod memory_db;
pub mod stores;
mod traits;
use self::stores::COLUMNS;
use db_encode::{db_encode, DBDecode, DBEncode};
use ssz::DecodeError;
use std::sync::Arc;
pub use self::disk_db::DiskDB;
pub use self::memory_db::MemoryDB;
pub use self::traits::{ClientDB, DBError, DBValue};
pub use errors::Error;
pub use types::*;
pub type DBValue = Vec<u8>;
#[derive(Debug, PartialEq)]
pub enum Error {
SszDecodeError(DecodeError),
DBError { message: String },
pub trait StoreDB: Sync + Send + Sized {
fn put(&self, key: &Hash256, item: &impl DBRecord) -> Result<(), Error> {
item.db_put(self, key)
}
fn get<I: DBRecord>(&self, key: &Hash256) -> Result<Option<I>, Error> {
I::db_get(self, key)
}
fn exists<I: DBRecord>(&self, key: &Hash256) -> Result<bool, Error> {
I::db_exists(self, key)
}
fn delete<I: DBRecord>(&self, key: &Hash256) -> Result<(), Error> {
I::db_delete(self, key)
}
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, Error>;
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error>;
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error>;
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error>;
}
impl From<DecodeError> for Error {
fn from(e: DecodeError) -> Error {
Error::SszDecodeError(e)
}
}
impl From<DBError> for Error {
fn from(e: DBError) -> Error {
Error::DBError { message: e.message }
}
pub trait DBStore {
fn db_column(&self) -> DBColumn;
}
/// Currently available database options
@ -61,76 +65,41 @@ impl<'a> Into<&'a str> for DBColumn {
pub trait DBRecord: DBEncode + DBDecode {
fn db_column() -> DBColumn;
}
pub struct Store<T>
where
T: ClientDB,
{
db: Arc<T>,
}
impl Store<MemoryDB> {
fn new_in_memory() -> Self {
Self {
db: Arc::new(MemoryDB::open()),
}
}
}
impl<T> Store<T>
where
T: ClientDB,
{
/// Put `item` in the store as `key`.
fn put<I>(&self, key: &Hash256, item: &I) -> Result<(), Error>
where
I: DBRecord,
{
let column = I::db_column().into();
let key = key.as_bytes();
let val = db_encode(item);
self.db.put(column, key, &val).map_err(|e| e.into())
}
/// Retrieves an `Ok(Some(item))` from the store if `key` exists, otherwise returns `Ok(None)`.
fn get<I>(&self, key: &Hash256) -> Result<Option<I>, Error>
where
I: DBRecord,
{
let column = I::db_column().into();
fn db_put(&self, store: &impl StoreDB, key: &Hash256) -> Result<(), Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
match self.db.get(column, key)? {
store
.put_bytes(column, key, &db_encode(self))
.map_err(|e| e.into())
}
fn db_get(store: &impl StoreDB, key: &Hash256) -> Result<Option<Self>, Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
match store.get_bytes(column, key)? {
Some(bytes) => {
let (item, _index) = I::db_decode(&bytes, 0)?;
let (item, _index) = Self::db_decode(&bytes, 0)?;
Ok(Some(item))
}
None => Ok(None),
}
}
/// Returns `Ok(true)` `key` exists in the store.
fn exists<I>(&self, key: &Hash256) -> Result<bool, Error>
where
I: DBRecord,
{
let column = I::db_column().into();
fn db_exists(store: &impl StoreDB, key: &Hash256) -> Result<bool, Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
self.db.exists(column, key).map_err(|e| e.into())
store.key_exists(column, key)
}
/// Returns `Ok(())` if `key` was deleted from the database or did not exist.
fn delete<I>(&self, key: &Hash256) -> Result<(), Error>
where
I: DBRecord,
{
let column = I::db_column().into();
fn db_delete(store: &impl StoreDB, key: &Hash256) -> Result<(), Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
self.db.delete(column, key).map_err(|e| e.into())
store.key_delete(column, key)
}
}
@ -155,7 +124,7 @@ mod tests {
#[test]
fn memorydb_can_store_and_retrieve() {
let store = Store::new_in_memory();
let store = MemoryDB::open();
let key = Hash256::random();
let item = StorableThing { a: 1, b: 42 };
@ -169,7 +138,7 @@ mod tests {
#[test]
fn exists() {
let store = Store::new_in_memory();
let store = MemoryDB::open();
let key = Hash256::random();
let item = StorableThing { a: 1, b: 42 };

View File

@ -1,236 +1,61 @@
use super::blake2::blake2b::blake2b;
use super::COLUMNS;
use super::{ClientDB, DBError, DBValue};
use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
use super::{DBValue, Error, StoreDB};
use parking_lot::RwLock;
use std::collections::HashMap;
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
type ColumnHashSet = HashSet<String>;
/// An in-memory database implementing the ClientDB trait.
///
/// It is not particularily optimized, it exists for ease and speed of testing. It's not expected
/// this DB would be used outside of tests.
pub struct MemoryDB {
db: RwLock<DBHashMap>,
known_columns: RwLock<ColumnHashSet>,
}
impl MemoryDB {
/// Open the in-memory database.
///
/// All columns must be supplied initially, you will get an error if you try to access a column
/// that was not declared here. This condition is enforced artificially to simulate RocksDB.
pub fn open() -> Self {
let db: DBHashMap = HashMap::new();
let mut known_columns: ColumnHashSet = HashSet::new();
for col in &COLUMNS {
known_columns.insert(col.to_string());
}
Self {
db: RwLock::new(db),
known_columns: RwLock::new(known_columns),
db: RwLock::new(HashMap::new()),
}
}
/// Hashes a key and a column name in order to get a unique key for the supplied column.
fn get_key_for_col(col: &str, key: &[u8]) -> Vec<u8> {
blake2b(32, col.as_bytes(), key).as_bytes().to_vec()
let mut col = col.as_bytes().to_vec();
col.append(&mut key.to_vec());
col
}
}
impl ClientDB for MemoryDB {
impl StoreDB for MemoryDB {
/// Get the value of some key from the database. Returns `None` if the key does not exist.
fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError> {
// Panic if the DB locks are poisoned.
let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap();
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, Error> {
let column_key = MemoryDB::get_key_for_col(col, key);
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.get(&column_key).and_then(|val| Some(val.clone())))
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
Ok(self
.db
.read()
.get(&column_key)
.and_then(|val| Some(val.clone())))
}
/// Puts a key in the database.
fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> {
// Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap();
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = MemoryDB::get_key_for_col(col, key);
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
db.insert(column_key, val.to_vec());
Ok(())
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
self.db.write().insert(column_key, val.to_vec());
Ok(())
}
/// Return true if some key exists in some column.
fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError> {
// Panic if the DB locks are poisoned.
let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap();
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = MemoryDB::get_key_for_col(col, key);
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.contains_key(&column_key))
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
Ok(self.db.read().contains_key(&column_key))
}
/// Delete some key from the database.
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError> {
// Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap();
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = MemoryDB::get_key_for_col(col, key);
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
db.remove(&column_key);
Ok(())
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
}
}
#[cfg(test)]
mod tests {
use super::super::stores::{BLOCKS_DB_COLUMN, VALIDATOR_DB_COLUMN};
use super::super::ClientDB;
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_memorydb_can_delete() {
let col_a: &str = BLOCKS_DB_COLUMN;
let db = MemoryDB::open();
db.put(col_a, "dogs".as_bytes(), "lol".as_bytes()).unwrap();
assert_eq!(
db.get(col_a, "dogs".as_bytes()).unwrap().unwrap(),
"lol".as_bytes()
);
db.delete(col_a, "dogs".as_bytes()).unwrap();
assert_eq!(db.get(col_a, "dogs".as_bytes()).unwrap(), None);
}
#[test]
fn test_memorydb_column_access() {
let col_a: &str = BLOCKS_DB_COLUMN;
let col_b: &str = VALIDATOR_DB_COLUMN;
let db = MemoryDB::open();
/*
* Testing that if we write to the same key in different columns that
* there is not an overlap.
*/
db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap();
db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap();
assert_eq!(
db.get(col_a, "same".as_bytes()).unwrap().unwrap(),
"cat".as_bytes()
);
assert_eq!(
db.get(col_b, "same".as_bytes()).unwrap().unwrap(),
"dog".as_bytes()
);
}
#[test]
fn test_memorydb_unknown_column_access() {
let col_a: &str = BLOCKS_DB_COLUMN;
let col_x: &str = "ColumnX";
let db = MemoryDB::open();
/*
* Test that we get errors when using undeclared columns
*/
assert!(db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).is_ok());
assert!(db.put(col_x, "cats".as_bytes(), "lol".as_bytes()).is_err());
assert!(db.get(col_a, "cats".as_bytes()).is_ok());
assert!(db.get(col_x, "cats".as_bytes()).is_err());
}
#[test]
fn test_memorydb_exists() {
let col_a: &str = BLOCKS_DB_COLUMN;
let col_b: &str = VALIDATOR_DB_COLUMN;
let db = MemoryDB::open();
/*
* Testing that if we write to the same key in different columns that
* there is not an overlap.
*/
db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).unwrap();
assert_eq!(true, db.exists(col_a, "cats".as_bytes()).unwrap());
assert_eq!(false, db.exists(col_b, "cats".as_bytes()).unwrap());
assert_eq!(false, db.exists(col_a, "dogs".as_bytes()).unwrap());
assert_eq!(false, db.exists(col_b, "dogs".as_bytes()).unwrap());
}
#[test]
fn test_memorydb_threading() {
let col_name: &str = BLOCKS_DB_COLUMN;
let db = Arc::new(MemoryDB::open());
let thread_count = 10;
let write_count = 10;
// We're execting the product of these numbers to fit in one byte.
assert!(thread_count * write_count <= 255);
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let db = db.clone();
let col = col_name.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = (t * w) as u8;
let val = 42;
db.put(&col, &vec![key], &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = (t * w) as u8;
let val = db.get(&col_name, &vec![key]).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
self.db.write().remove(&column_key);
Ok(())
}
}

View File

@ -1,246 +0,0 @@
use super::BLOCKS_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use ssz::Decodable;
use std::sync::Arc;
use types::{BeaconBlock, Hash256, Slot};
#[derive(Clone, Debug, PartialEq)]
pub enum BeaconBlockAtSlotError {
UnknownBeaconBlock(Hash256),
InvalidBeaconBlock(Hash256),
DBError(String),
}
pub struct BeaconBlockStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
// Implements `put`, `get`, `exists` and `delete` for the store.
impl_crud_for_store!(BeaconBlockStore, DB_COLUMN);
impl<T: ClientDB> BeaconBlockStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
pub fn get_deserialized(&self, hash: &Hash256) -> Result<Option<BeaconBlock>, DBError> {
match self.get(&hash)? {
None => Ok(None),
Some(ssz) => {
let (block, _) = BeaconBlock::ssz_decode(&ssz, 0).map_err(|_| DBError {
message: "Bad BeaconBlock SSZ.".to_string(),
})?;
Ok(Some(block))
}
}
}
/// Retrieve the block at a slot given a "head_hash" and a slot.
///
/// A "head_hash" must be a block hash with a slot number greater than or equal to the desired
/// slot.
///
/// This function will read each block down the chain until it finds a block with the given
/// slot number. If the slot is skipped, the function will return None.
///
/// If a block is found, a tuple of (block_hash, serialized_block) is returned.
///
/// Note: this function uses a loop instead of recursion as the compiler is over-strict when it
/// comes to recursion and the `impl Trait` pattern. See:
/// https://stackoverflow.com/questions/54032940/using-impl-trait-in-a-recursive-function
pub fn block_at_slot(
&self,
head_hash: &Hash256,
slot: Slot,
) -> Result<Option<(Hash256, BeaconBlock)>, BeaconBlockAtSlotError> {
let mut current_hash = *head_hash;
loop {
if let Some(block) = self.get_deserialized(&current_hash)? {
if block.slot == slot {
break Ok(Some((current_hash, block)));
} else if block.slot < slot {
break Ok(None);
} else {
current_hash = block.previous_block_root;
}
} else {
break Err(BeaconBlockAtSlotError::UnknownBeaconBlock(current_hash));
}
}
}
}
impl From<DBError> for BeaconBlockAtSlotError {
fn from(e: DBError) -> Self {
BeaconBlockAtSlotError::DBError(e.message)
}
}
#[cfg(test)]
mod tests {
use super::super::super::MemoryDB;
use super::*;
use std::sync::Arc;
use std::thread;
use ssz::ssz_encode;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use types::BeaconBlock;
use types::Hash256;
test_crud_for_store!(BeaconBlockStore, DB_COLUMN);
#[test]
fn head_hash_slot_too_low() {
let db = Arc::new(MemoryDB::open());
let bs = Arc::new(BeaconBlockStore::new(db.clone()));
let mut rng = XorShiftRng::from_seed([42; 16]);
let mut block = BeaconBlock::random_for_test(&mut rng);
block.slot = Slot::from(10_u64);
let block_root = block.canonical_root();
bs.put(&block_root, &ssz_encode(&block)).unwrap();
let result = bs.block_at_slot(&block_root, Slot::from(11_u64)).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_invalid_block_at_slot() {
let db = Arc::new(MemoryDB::open());
let store = BeaconBlockStore::new(db.clone());
let ssz = "definitly not a valid block".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert_eq!(
store.block_at_slot(hash, Slot::from(42_u64)),
Err(BeaconBlockAtSlotError::DBError(
"Bad BeaconBlock SSZ.".into()
))
);
}
#[test]
fn test_unknown_block_at_slot() {
let db = Arc::new(MemoryDB::open());
let store = BeaconBlockStore::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
let other_hash = &Hash256::from([0xBB; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert_eq!(
store.block_at_slot(other_hash, Slot::from(42_u64)),
Err(BeaconBlockAtSlotError::UnknownBeaconBlock(*other_hash))
);
}
#[test]
fn test_block_store_on_memory_db() {
let db = Arc::new(MemoryDB::open());
let bs = Arc::new(BeaconBlockStore::new(db.clone()));
let thread_count = 10;
let write_count = 10;
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let bs = bs.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = t * w;
let val = 42;
bs.put(&Hash256::from_low_u64_le(key), &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = t * w;
assert!(bs.exists(&Hash256::from_low_u64_le(key)).unwrap());
let val = bs.get(&Hash256::from_low_u64_le(key)).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
}
#[test]
#[ignore]
fn test_block_at_slot() {
let db = Arc::new(MemoryDB::open());
let bs = Arc::new(BeaconBlockStore::new(db.clone()));
let mut rng = XorShiftRng::from_seed([42; 16]);
// Specify test block parameters.
let hashes = [
Hash256::from([0; 32]),
Hash256::from([1; 32]),
Hash256::from([2; 32]),
Hash256::from([3; 32]),
Hash256::from([4; 32]),
];
let parent_hashes = [
Hash256::from([255; 32]), // Genesis block.
Hash256::from([0; 32]),
Hash256::from([1; 32]),
Hash256::from([2; 32]),
Hash256::from([3; 32]),
];
let unknown_hash = Hash256::from([101; 32]); // different from all above
let slots: Vec<Slot> = vec![0, 1, 3, 4, 5].iter().map(|x| Slot::new(*x)).collect();
// Generate a vec of random blocks and store them in the DB.
let block_count = 5;
let mut blocks: Vec<BeaconBlock> = Vec::with_capacity(5);
for i in 0..block_count {
let mut block = BeaconBlock::random_for_test(&mut rng);
block.previous_block_root = parent_hashes[i];
block.slot = slots[i];
let ssz = ssz_encode(&block);
db.put(DB_COLUMN, hashes[i].as_bytes(), &ssz).unwrap();
blocks.push(block);
}
// Test that certain slots can be reached from certain hashes.
let test_cases = vec![(4, 4), (4, 3), (4, 2), (4, 1), (4, 0)];
for (hashes_index, slot_index) in test_cases {
let (matched_block_hash, block) = bs
.block_at_slot(&hashes[hashes_index], slots[slot_index])
.unwrap()
.unwrap();
assert_eq!(matched_block_hash, hashes[slot_index]);
assert_eq!(block.slot, slots[slot_index]);
}
let ssz = bs.block_at_slot(&hashes[4], Slot::new(2)).unwrap();
assert_eq!(ssz, None);
let ssz = bs.block_at_slot(&hashes[4], Slot::new(6)).unwrap();
assert_eq!(ssz, None);
let ssz = bs.block_at_slot(&unknown_hash, Slot::new(2));
assert_eq!(
ssz,
Err(BeaconBlockAtSlotError::UnknownBeaconBlock(unknown_hash))
);
}
}

View File

@ -1,62 +0,0 @@
use super::STATES_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use ssz::Decodable;
use std::sync::Arc;
use types::{BeaconState, Hash256};
pub struct BeaconStateStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
// Implements `put`, `get`, `exists` and `delete` for the store.
impl_crud_for_store!(BeaconStateStore, DB_COLUMN);
impl<T: ClientDB> BeaconStateStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
pub fn get_deserialized(&self, hash: &Hash256) -> Result<Option<BeaconState>, DBError> {
match self.get(&hash)? {
None => Ok(None),
Some(ssz) => {
let (state, _) = BeaconState::ssz_decode(&ssz, 0).map_err(|_| DBError {
message: "Bad State SSZ.".to_string(),
})?;
Ok(Some(state))
}
}
}
}
#[cfg(test)]
mod tests {
use super::super::super::MemoryDB;
use super::*;
use ssz::ssz_encode;
use std::sync::Arc;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use types::Hash256;
test_crud_for_store!(BeaconStateStore, DB_COLUMN);
#[test]
fn test_reader() {
let db = Arc::new(MemoryDB::open());
let store = BeaconStateStore::new(db.clone());
let mut rng = XorShiftRng::from_seed([42; 16]);
let state = BeaconState::random_for_test(&mut rng);
let state_root = state.canonical_root();
store.put(&state_root, &ssz_encode(&state)).unwrap();
let decoded = store.get_deserialized(&state_root).unwrap().unwrap();
assert_eq!(state, decoded);
}
}

View File

@ -1,103 +0,0 @@
macro_rules! impl_crud_for_store {
($store: ident, $db_column: expr) => {
impl<T: ClientDB> $store<T> {
pub fn put(&self, hash: &Hash256, ssz: &[u8]) -> Result<(), DBError> {
self.db.put($db_column, hash.as_bytes(), ssz)
}
pub fn get(&self, hash: &Hash256) -> Result<Option<Vec<u8>>, DBError> {
self.db.get($db_column, hash.as_bytes())
}
pub fn exists(&self, hash: &Hash256) -> Result<bool, DBError> {
self.db.exists($db_column, hash.as_bytes())
}
pub fn delete(&self, hash: &Hash256) -> Result<(), DBError> {
self.db.delete($db_column, hash.as_bytes())
}
}
};
}
#[cfg(test)]
macro_rules! test_crud_for_store {
($store: ident, $db_column: expr) => {
#[test]
fn test_put() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
store.put(hash, ssz).unwrap();
assert_eq!(db.get(DB_COLUMN, hash.as_bytes()).unwrap().unwrap(), ssz);
}
#[test]
fn test_get() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert_eq!(store.get(hash).unwrap().unwrap(), ssz);
}
#[test]
fn test_get_unknown() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
let other_hash = &Hash256::from([0xBB; 32]);
db.put(DB_COLUMN, other_hash.as_bytes(), ssz).unwrap();
assert_eq!(store.get(hash).unwrap(), None);
}
#[test]
fn test_exists() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert!(store.exists(hash).unwrap());
}
#[test]
fn test_block_does_not_exist() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
let other_hash = &Hash256::from([0xBB; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert!(!store.exists(other_hash).unwrap());
}
#[test]
fn test_delete() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert!(db.exists(DB_COLUMN, hash.as_bytes()).unwrap());
store.delete(hash).unwrap();
assert!(!db.exists(DB_COLUMN, hash.as_bytes()).unwrap());
}
};
}

View File

@ -1,25 +0,0 @@
use super::{ClientDB, DBError};
#[macro_use]
mod macros;
mod beacon_block_store;
mod beacon_state_store;
mod pow_chain_store;
mod validator_store;
pub use self::beacon_block_store::{BeaconBlockAtSlotError, BeaconBlockStore};
pub use self::beacon_state_store::BeaconStateStore;
pub use self::pow_chain_store::PoWChainStore;
pub use self::validator_store::{ValidatorStore, ValidatorStoreError};
pub const BLOCKS_DB_COLUMN: &str = "blocks";
pub const STATES_DB_COLUMN: &str = "states";
pub const POW_CHAIN_DB_COLUMN: &str = "powchain";
pub const VALIDATOR_DB_COLUMN: &str = "validator";
pub const COLUMNS: [&str; 4] = [
BLOCKS_DB_COLUMN,
STATES_DB_COLUMN,
POW_CHAIN_DB_COLUMN,
VALIDATOR_DB_COLUMN,
];

View File

@ -1,68 +0,0 @@
use super::POW_CHAIN_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use std::sync::Arc;
pub struct PoWChainStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
impl<T: ClientDB> PoWChainStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
pub fn put_block_hash(&self, hash: &[u8]) -> Result<(), DBError> {
self.db.put(DB_COLUMN, hash, &[0])
}
pub fn block_hash_exists(&self, hash: &[u8]) -> Result<bool, DBError> {
self.db.exists(DB_COLUMN, hash)
}
}
#[cfg(test)]
mod tests {
extern crate types;
use super::super::super::MemoryDB;
use super::*;
use self::types::Hash256;
#[test]
fn test_put_block_hash() {
let db = Arc::new(MemoryDB::open());
let store = PoWChainStore::new(db.clone());
let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec();
store.put_block_hash(hash).unwrap();
assert!(db.exists(DB_COLUMN, hash).unwrap());
}
#[test]
fn test_block_hash_exists() {
let db = Arc::new(MemoryDB::open());
let store = PoWChainStore::new(db.clone());
let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec();
db.put(DB_COLUMN, hash, &[0]).unwrap();
assert!(store.block_hash_exists(hash).unwrap());
}
#[test]
fn test_block_hash_does_not_exist() {
let db = Arc::new(MemoryDB::open());
let store = PoWChainStore::new(db.clone());
let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec();
let other_hash = &Hash256::from([0xBB; 32]).as_bytes().to_vec();
db.put(DB_COLUMN, hash, &[0]).unwrap();
assert!(!store.block_hash_exists(other_hash).unwrap());
}
}

View File

@ -1,215 +0,0 @@
extern crate bytes;
use self::bytes::{BufMut, BytesMut};
use super::VALIDATOR_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use bls::PublicKey;
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
#[derive(Debug, PartialEq)]
pub enum ValidatorStoreError {
DBError(String),
DecodeError,
}
impl From<DBError> for ValidatorStoreError {
fn from(error: DBError) -> Self {
ValidatorStoreError::DBError(error.message)
}
}
#[derive(Debug, PartialEq)]
enum KeyPrefixes {
PublicKey,
}
pub struct ValidatorStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
impl<T: ClientDB> ValidatorStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
fn prefix_bytes(&self, key_prefix: &KeyPrefixes) -> Vec<u8> {
match key_prefix {
KeyPrefixes::PublicKey => b"pubkey".to_vec(),
}
}
fn get_db_key_for_index(&self, key_prefix: &KeyPrefixes, index: usize) -> Vec<u8> {
let mut buf = BytesMut::with_capacity(6 + 8);
buf.put(self.prefix_bytes(key_prefix));
buf.put_u64_be(index as u64);
buf.take().to_vec()
}
pub fn put_public_key_by_index(
&self,
index: usize,
public_key: &PublicKey,
) -> Result<(), ValidatorStoreError> {
let key = self.get_db_key_for_index(&KeyPrefixes::PublicKey, index);
let val = ssz_encode(public_key);
self.db
.put(DB_COLUMN, &key[..], &val[..])
.map_err(ValidatorStoreError::from)
}
pub fn get_public_key_by_index(
&self,
index: usize,
) -> Result<Option<PublicKey>, ValidatorStoreError> {
let key = self.get_db_key_for_index(&KeyPrefixes::PublicKey, index);
let val = self.db.get(DB_COLUMN, &key[..])?;
match val {
None => Ok(None),
Some(val) => match PublicKey::ssz_decode(&val, 0) {
Ok((key, _)) => Ok(Some(key)),
Err(_) => Err(ValidatorStoreError::DecodeError),
},
}
}
}
#[cfg(test)]
mod tests {
use super::super::super::MemoryDB;
use super::*;
use bls::Keypair;
#[test]
fn test_prefix_bytes() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
assert_eq!(
store.prefix_bytes(&KeyPrefixes::PublicKey),
b"pubkey".to_vec()
);
}
#[test]
fn test_get_db_key_for_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let mut buf = BytesMut::with_capacity(6 + 8);
buf.put(b"pubkey".to_vec());
buf.put_u64_be(42);
assert_eq!(
store.get_db_key_for_index(&KeyPrefixes::PublicKey, 42),
buf.take().to_vec()
)
}
#[test]
fn test_put_public_key_by_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let index = 3;
let public_key = Keypair::random().pk;
store.put_public_key_by_index(index, &public_key).unwrap();
let public_key_at_index = db
.get(
DB_COLUMN,
&store.get_db_key_for_index(&KeyPrefixes::PublicKey, index)[..],
)
.unwrap()
.unwrap();
assert_eq!(public_key_at_index, ssz_encode(&public_key));
}
#[test]
fn test_get_public_key_by_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let index = 4;
let public_key = Keypair::random().pk;
db.put(
DB_COLUMN,
&store.get_db_key_for_index(&KeyPrefixes::PublicKey, index)[..],
&ssz_encode(&public_key)[..],
)
.unwrap();
let public_key_at_index = store.get_public_key_by_index(index).unwrap().unwrap();
assert_eq!(public_key_at_index, public_key);
}
#[test]
fn test_get_public_key_by_unknown_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let public_key = Keypair::random().pk;
db.put(
DB_COLUMN,
&store.get_db_key_for_index(&KeyPrefixes::PublicKey, 3)[..],
&ssz_encode(&public_key)[..],
)
.unwrap();
let public_key_at_index = store.get_public_key_by_index(4).unwrap();
assert_eq!(public_key_at_index, None);
}
#[test]
fn test_get_invalid_public_key() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let key = store.get_db_key_for_index(&KeyPrefixes::PublicKey, 42);
db.put(DB_COLUMN, &key[..], "cats".as_bytes()).unwrap();
assert_eq!(
store.get_public_key_by_index(42),
Err(ValidatorStoreError::DecodeError)
);
}
#[test]
fn test_validator_store_put_get() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db);
let keys = vec![
Keypair::random(),
Keypair::random(),
Keypair::random(),
Keypair::random(),
Keypair::random(),
];
for i in 0..keys.len() {
store.put_public_key_by_index(i, &keys[i].pk).unwrap();
}
/*
* Check all keys are retrieved correctly.
*/
for i in 0..keys.len() {
let retrieved = store.get_public_key_by_index(i).unwrap().unwrap();
assert_eq!(retrieved, keys[i].pk);
}
/*
* Check that an index that wasn't stored returns None.
*/
assert!(store
.get_public_key_by_index(keys.len() + 1)
.unwrap()
.is_none());
}
}

View File

@ -1,38 +0,0 @@
pub type DBValue = Vec<u8>;
#[derive(Debug)]
pub struct DBError {
pub message: String,
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}
/// A generic database to be used by the "client' (i.e.,
/// the lighthouse blockchain client).
///
/// The purpose of having this generic trait is to allow the
/// program to use a persistent on-disk database during production,
/// but use a transient database during tests.
pub trait ClientDB: Sync + Send {
fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError>;
fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError>;
fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError>;
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError>;
}
pub enum DBColumn {
Block,
State,
BeaconChain,
}
pub trait DBStore {
fn db_column(&self) -> DBColumn;
}