Merge pull request #64 from sigp/db-delete

Add delete method to ClientDB
This commit is contained in:
Age Manning 2018-10-31 12:49:57 +01:00 committed by GitHub
commit 96c54352f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 85 deletions

View File

@ -1,17 +1,10 @@
extern crate rocksdb; extern crate rocksdb;
use super::rocksdb::Error as RocksError;
use super::rocksdb::{Options, DB};
use super::{ClientDB, DBError, DBValue};
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use super::rocksdb::{
DB,
Options,
};
use super::rocksdb::Error as RocksError;
use super::{
ClientDB,
DBValue,
DBError
};
/// A on-disk database which implements the ClientDB trait. /// A on-disk database which implements the ClientDB trait.
/// ///
@ -42,8 +35,7 @@ impl DiskDB {
/* /*
* Initialise the path * Initialise the path
*/ */
fs::create_dir_all(&path) fs::create_dir_all(&path).unwrap_or_else(|_| panic!("Unable to create {:?}", &path));
.unwrap_or_else(|_| panic!("Unable to create {:?}", &path));
let db_path = path.join("database"); let db_path = path.join("database");
/* /*
@ -51,31 +43,28 @@ impl DiskDB {
*/ */
let db = match columns { let db = match columns {
None => DB::open(&options, db_path), None => DB::open(&options, db_path),
Some(columns) => DB::open_cf(&options, db_path, columns) Some(columns) => DB::open_cf(&options, db_path, columns),
}.expect("Unable to open local database");; }.expect("Unable to open local database");;
Self { Self { db }
db,
}
} }
/// Create a RocksDB column family. Corresponds to the /// Create a RocksDB column family. Corresponds to the
/// `create_cf()` function on the RocksDB API. /// `create_cf()` function on the RocksDB API.
#[allow(dead_code)] #[allow(dead_code)]
fn create_col(&mut self, col: &str) fn create_col(&mut self, col: &str) -> Result<(), DBError> {
-> Result<(), DBError>
{
match self.db.create_cf(col, &Options::default()) { match self.db.create_cf(col, &Options::default()) {
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
Ok(_) => Ok(()) Ok(_) => Ok(()),
} }
} }
} }
impl From<RocksError> for DBError { impl From<RocksError> for DBError {
fn from(e: RocksError) -> Self { fn from(e: RocksError) -> Self {
Self { message: e.to_string() } Self {
message: e.to_string(),
}
} }
} }
@ -85,17 +74,15 @@ impl ClientDB for DiskDB {
/// Corresponds to the `get_cf()` method on the RocksDB API. /// Corresponds to the `get_cf()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err /// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails. /// if it fails.
fn get(&self, col: &str, key: &[u8]) fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError> {
-> Result<Option<DBValue>, DBError>
{
match self.db.cf_handle(col) { match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }), None => Err(DBError {
Some(handle) => { message: "Unknown column".to_string(),
match self.db.get_cf(handle, key)? { }),
None => Ok(None), Some(handle) => match self.db.get_cf(handle, key)? {
Some(db_vec) => Ok(Some(DBValue::from(&*db_vec))) None => Ok(None),
} Some(db_vec) => Ok(Some(DBValue::from(&*db_vec))),
} },
} }
} }
@ -104,38 +91,54 @@ impl ClientDB for DiskDB {
/// Corresponds to the `cf_handle()` method on the RocksDB API. /// Corresponds to the `cf_handle()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err /// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails. /// if it fails.
fn put(&self, col: &str, key: &[u8], val: &[u8]) fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> {
-> Result<(), DBError>
{
match self.db.cf_handle(col) { match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }), None => Err(DBError {
Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()) message: "Unknown column".to_string(),
}),
Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()),
} }
} }
/// Return true if some key exists in some column. /// Return true if some key exists in some column.
fn exists(&self, col: &str, key: &[u8]) fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError> {
-> Result<bool, DBError>
{
/* /*
* I'm not sure if this is the correct way to read if some * I'm not sure if this is the correct way to read if some
* block exists. Naievely I would expect this to unncessarily * block exists. Naively I would expect this to unncessarily
* copy some data, but I could be wrong. * copy some data, but I could be wrong.
*/ */
match self.db.cf_handle(col) { match self.db.cf_handle(col) {
None => Err(DBError{ message: "Unknown column".to_string() }), None => Err(DBError {
Some(handle) => Ok(self.db.get_cf(handle, key)?.is_some()) message: "Unknown column".to_string(),
}),
Some(handle) => Ok(self.db.get_cf(handle, key)?.is_some()),
}
}
/// Delete the value for some key on some column.
///
/// Corresponds to the `delete_cf()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError> {
match self.db.cf_handle(col) {
None => Err(DBError {
message: "Unknown column".to_string(),
}),
Some(handle) => {
self.db.delete_cf(handle, key)?;
Ok(())
}
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use super::super::ClientDB; use super::super::ClientDB;
use std::{ env, fs, thread }; use super::*;
use std::sync::Arc; use std::sync::Arc;
use std::{env, fs, thread};
#[test] #[test]
#[ignore] #[ignore]

View File

@ -1,12 +1,8 @@
use std::collections::{ HashSet, HashMap };
use std::sync::RwLock;
use super::blake2::blake2b::blake2b; use super::blake2::blake2b::blake2b;
use super::COLUMNS; use super::COLUMNS;
use super::{ use super::{ClientDB, DBError, DBValue};
ClientDB, use std::collections::{HashMap, HashSet};
DBValue, use std::sync::RwLock;
DBError
};
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>; type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
type ColumnHashSet = HashSet<String>; type ColumnHashSet = HashSet<String>;
@ -17,7 +13,7 @@ type ColumnHashSet = HashSet<String>;
/// this DB would be used outside of tests. /// this DB would be used outside of tests.
pub struct MemoryDB { pub struct MemoryDB {
db: RwLock<DBHashMap>, db: RwLock<DBHashMap>,
known_columns: RwLock<ColumnHashSet> known_columns: RwLock<ColumnHashSet>,
} }
impl MemoryDB { impl MemoryDB {
@ -45,9 +41,7 @@ impl MemoryDB {
impl ClientDB for MemoryDB { impl ClientDB for MemoryDB {
/// Get the value of some key from the database. Returns `None` if the key does not exist. /// Get the value of some key from the database. Returns `None` if the key does not exist.
fn get(&self, col: &str, key: &[u8]) fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError> {
-> Result<Option<DBValue>, DBError>
{
// Panic if the DB locks are poisoned. // Panic if the DB locks are poisoned.
let db = self.db.read().unwrap(); let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap(); let known_columns = self.known_columns.read().unwrap();
@ -56,14 +50,14 @@ impl ClientDB for MemoryDB {
let column_key = MemoryDB::get_key_for_col(col, key); let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.get(&column_key).and_then(|val| Some(val.clone()))) Ok(db.get(&column_key).and_then(|val| Some(val.clone())))
} else { } else {
Err(DBError{ message: "Unknown column".to_string() }) Err(DBError {
message: "Unknown column".to_string(),
})
} }
} }
/// Puts a key in the database. /// Puts a key in the database.
fn put(&self, col: &str, key: &[u8], val: &[u8]) fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> {
-> Result<(), DBError>
{
// Panic if the DB locks are poisoned. // Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap(); let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap(); let known_columns = self.known_columns.read().unwrap();
@ -73,14 +67,14 @@ impl ClientDB for MemoryDB {
db.insert(column_key, val.to_vec()); db.insert(column_key, val.to_vec());
Ok(()) Ok(())
} else { } else {
Err(DBError{ message: "Unknown column".to_string() }) Err(DBError {
message: "Unknown column".to_string(),
})
} }
} }
/// Return true if some key exists in some column. /// Return true if some key exists in some column.
fn exists(&self, col: &str, key: &[u8]) fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError> {
-> Result<bool, DBError>
{
// Panic if the DB locks are poisoned. // Panic if the DB locks are poisoned.
let db = self.db.read().unwrap(); let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap(); let known_columns = self.known_columns.read().unwrap();
@ -89,23 +83,55 @@ impl ClientDB for MemoryDB {
let column_key = MemoryDB::get_key_for_col(col, key); let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.contains_key(&column_key)) Ok(db.contains_key(&column_key))
} else { } else {
Err(DBError{ message: "Unknown column".to_string() }) Err(DBError {
message: "Unknown column".to_string(),
})
}
}
/// Delete some key from the database.
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError> {
// Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
db.remove(&column_key);
Ok(())
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::super::stores::{BLOCKS_DB_COLUMN, VALIDATOR_DB_COLUMN};
use super::super::ClientDB; use super::super::ClientDB;
use std::thread; use super::*;
use std::sync::Arc; use std::sync::Arc;
use super::super::stores::{ use std::thread;
BLOCKS_DB_COLUMN,
VALIDATOR_DB_COLUMN, #[test]
}; fn test_memorydb_can_delete() {
let col_a: &str = BLOCKS_DB_COLUMN;
let db = MemoryDB::open();
db.put(col_a, "dogs".as_bytes(), "lol".as_bytes()).unwrap();
assert_eq!(
db.get(col_a, "dogs".as_bytes()).unwrap().unwrap(),
"lol".as_bytes()
);
db.delete(col_a, "dogs".as_bytes()).unwrap();
assert_eq!(db.get(col_a, "dogs".as_bytes()).unwrap(), None);
}
#[test] #[test]
fn test_memorydb_column_access() { fn test_memorydb_column_access() {
@ -121,10 +147,14 @@ mod tests {
db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap(); db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap();
db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap(); db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap();
assert_eq!(db.get(col_a, "same".as_bytes()).unwrap().unwrap(), "cat".as_bytes()); assert_eq!(
assert_eq!(db.get(col_b, "same".as_bytes()).unwrap().unwrap(), "dog".as_bytes()); db.get(col_a, "same".as_bytes()).unwrap().unwrap(),
"cat".as_bytes()
);
assert_eq!(
db.get(col_b, "same".as_bytes()).unwrap().unwrap(),
"dog".as_bytes()
);
} }
#[test] #[test]

View File

@ -2,7 +2,7 @@ pub type DBValue = Vec<u8>;
#[derive(Debug)] #[derive(Debug)]
pub struct DBError { pub struct DBError {
pub message: String pub message: String,
} }
impl DBError { impl DBError {
@ -18,13 +18,11 @@ impl DBError {
/// program to use a persistent on-disk database during production, /// program to use a persistent on-disk database during production,
/// but use a transient database during tests. /// but use a transient database during tests.
pub trait ClientDB: Sync + Send { pub trait ClientDB: Sync + Send {
fn get(&self, col: &str, key: &[u8]) fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError>;
-> Result<Option<DBValue>, DBError>;
fn put(&self, col: &str, key: &[u8], val: &[u8]) fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError>;
-> Result<(), DBError>;
fn exists(&self, col: &str, key: &[u8]) fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError>;
-> Result<bool, DBError>;
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError>;
} }