diff --git a/Cargo.toml b/Cargo.toml index 893189941..00c354309 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ "eth2/utils/fisher_yates_shuffle", "eth2/utils/test_random_derive", "beacon_node", - "beacon_node/db", + "beacon_node/store", "beacon_node/client", "beacon_node/network", "beacon_node/eth2-libp2p", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 37d96a497..d78a5b596 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] types = { path = "../eth2/types" } +store = { path = "./store" } client = { path = "client" } version = { path = "version" } clap = "2.32.0" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 34b6e11c6..3a84256a7 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] bls = { path = "../../eth2/utils/bls" } boolean-bitfield = { path = "../../eth2/utils/boolean-bitfield" } -db = { path = "../db" } +store = { path = "../store" } failure = "0.1" failure_derive = "0.1" hashing = { path = "../../eth2/utils/hashing" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index db5ea1cdb..f2c4b3dbe 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,16 +1,11 @@ use crate::checkpoint::CheckPoint; use crate::errors::{BeaconChainError as Error, BlockProductionError}; -use db::{ - stores::{BeaconBlockStore, BeaconStateStore}, - ClientDB, DBError, -}; use fork_choice::{ForkChoice, ForkChoiceError}; use log::{debug, trace}; use operation_pool::DepositInsertStatus; use operation_pool::OperationPool; use parking_lot::{RwLock, RwLockReadGuard}; use slot_clock::SlotClock; -use ssz::ssz_encode; use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError, @@ -20,6 +15,7 @@ use state_processing::{ per_slot_processing, BlockProcessingError, SlotProcessingError, }; use std::sync::Arc; +use store::{Error as DBError, Store}; use types::*; #[derive(Debug, PartialEq)] @@ -83,9 +79,8 @@ impl BlockProcessingOutcome { } } -pub struct BeaconChain { - pub block_store: Arc>, - pub state_store: Arc>, +pub struct BeaconChain { + pub store: Arc, pub slot_clock: U, pub op_pool: OperationPool, canonical_head: RwLock>, @@ -97,15 +92,14 @@ pub struct BeaconChain BeaconChain where - T: ClientDB, + T: Store, U: SlotClock, F: ForkChoice, E: EthSpec, { /// Instantiate a new Beacon Chain, from genesis. pub fn from_genesis( - state_store: Arc>, - block_store: Arc>, + store: Arc, slot_clock: U, mut genesis_state: BeaconState, genesis_block: BeaconBlock, @@ -113,10 +107,10 @@ where fork_choice: F, ) -> Result { let state_root = genesis_state.canonical_root(); - state_store.put(&state_root, &ssz_encode(&genesis_state)[..])?; + store.put(&state_root, &genesis_state)?; let block_root = genesis_block.block_header().canonical_root(); - block_store.put(&block_root, &ssz_encode(&genesis_block)[..])?; + store.put(&block_root, &genesis_block)?; let finalized_head = RwLock::new(CheckPoint::new( genesis_block.clone(), @@ -134,8 +128,7 @@ where genesis_state.build_all_caches(&spec)?; Ok(Self { - block_store, - state_store, + store, slot_clock, op_pool: OperationPool::new(), state: RwLock::new(genesis_state), @@ -235,7 +228,7 @@ where let new_state_root = state.get_state_root(earliest_historic_slot)?; // Break if the DB is unable to load the state. - state = match self.state_store.get_deserialized(&new_state_root) { + state = match self.store.get(&new_state_root) { Ok(Some(state)) => state, _ => break, } @@ -262,7 +255,7 @@ where /// /// May return a database error. pub fn get_block(&self, block_root: &Hash256) -> Result, Error> { - Ok(self.block_store.get_deserialized(block_root)?) + Ok(self.store.get(block_root)?) } /// Update the canonical head to some new values. @@ -588,7 +581,7 @@ where // Load the blocks parent block from the database, returning invalid if that block is not // found. let parent_block_root = block.previous_block_root; - let parent_block = match self.block_store.get_deserialized(&parent_block_root)? { + let parent_block: BeaconBlock = match self.store.get(&parent_block_root)? { Some(previous_block_root) => previous_block_root, None => { return Ok(BlockProcessingOutcome::InvalidBlock( @@ -601,15 +594,15 @@ where // It is an error because if know the parent block we should also know the parent state. let parent_state_root = parent_block.state_root; let parent_state = self - .state_store - .get_deserialized(&parent_state_root)? + .store + .get(&parent_state_root)? .ok_or_else(|| Error::DBInconsistent(format!("Missing state {}", parent_state_root)))?; // TODO: check the block proposer signature BEFORE doing a state transition. This will // significantly lower exposure surface to DoS attacks. // Transition the parent state to the block slot. - let mut state = parent_state; + let mut state: BeaconState = parent_state; for _ in state.slot.as_u64()..block.slot.as_u64() { if let Err(e) = per_slot_processing(&mut state, &self.spec) { return Ok(BlockProcessingOutcome::InvalidBlock( @@ -635,8 +628,8 @@ where } // Store the block and state. - self.block_store.put(&block_root, &ssz_encode(&block)[..])?; - self.state_store.put(&state_root, &ssz_encode(&state)[..])?; + self.store.put(&block_root, &block)?; + self.store.put(&state_root, &state)?; // run the fork_choice add_block logic self.fork_choice @@ -729,15 +722,15 @@ where .find_head(&present_head, &self.spec)?; if new_head != present_head { - let block = self - .block_store - .get_deserialized(&new_head)? + let block: BeaconBlock = self + .store + .get(&new_head)? .ok_or_else(|| Error::MissingBeaconBlock(new_head))?; let block_root = block.canonical_root(); - let state = self - .state_store - .get_deserialized(&block.state_root)? + let state: BeaconState = self + .store + .get(&block.state_root)? .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; let state_root = state.canonical_root(); @@ -752,7 +745,7 @@ where /// Returns `true` if the given block root has not been processed. pub fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result { - Ok(!self.block_store.exists(beacon_block_root)?) + Ok(!self.store.exists::(beacon_block_root)?) } /// Dumps the entire canonical chain, from the head to genesis to a vector for analysis. @@ -778,19 +771,14 @@ where break; // Genesis has been reached. } - let beacon_block = self - .block_store - .get_deserialized(&beacon_block_root)? - .ok_or_else(|| { + let beacon_block: BeaconBlock = + self.store.get(&beacon_block_root)?.ok_or_else(|| { Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) })?; let beacon_state_root = beacon_block.state_root; - let beacon_state = self - .state_store - .get_deserialized(&beacon_state_root)? - .ok_or_else(|| { - Error::DBInconsistent(format!("Missing state {}", beacon_state_root)) - })?; + let beacon_state = self.store.get(&beacon_state_root)?.ok_or_else(|| { + Error::DBInconsistent(format!("Missing state {}", beacon_state_root)) + })?; let slot = CheckPoint { beacon_block, @@ -811,7 +799,7 @@ where impl From for Error { fn from(e: DBError) -> Error { - Error::DBError(e.message) + Error::DBError(e) } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index a84e4b10e..73884916a 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -20,7 +20,7 @@ pub enum BeaconChainError { UnableToReadSlot, BeaconStateError(BeaconStateError), DBInconsistent(String), - DBError(String), + DBError(store::Error), ForkChoiceError(ForkChoiceError), MissingBeaconBlock(Hash256), MissingBeaconState(Hash256), diff --git a/beacon_node/beacon_chain/src/initialise.rs b/beacon_node/beacon_chain/src/initialise.rs index 83b60a4f7..b9d950ed5 100644 --- a/beacon_node/beacon_chain/src/initialise.rs +++ b/beacon_node/beacon_chain/src/initialise.rs @@ -3,12 +3,11 @@ // testnet. These are examples. Also. there is code duplication which can/should be cleaned up. use crate::BeaconChain; -use db::stores::{BeaconBlockStore, BeaconStateStore}; -use db::{DiskDB, MemoryDB}; use fork_choice::BitwiseLMDGhost; use slot_clock::SystemTimeSlotClock; use std::path::PathBuf; use std::sync::Arc; +use store::{DiskStore, MemoryStore}; use tree_hash::TreeHash; use types::test_utils::TestingBeaconStateBuilder; use types::{BeaconBlock, ChainSpec, FewValidatorsEthSpec, FoundationEthSpec, Hash256}; @@ -20,20 +19,15 @@ pub fn initialise_beacon_chain( db_name: Option<&PathBuf>, ) -> Arc< BeaconChain< - DiskDB, + DiskStore, SystemTimeSlotClock, - BitwiseLMDGhost, + BitwiseLMDGhost, FoundationEthSpec, >, > { - // set up the db - let db = Arc::new(DiskDB::open( - db_name.expect("Database directory must be included"), - None, - )); - - let block_store = Arc::new(BeaconBlockStore::new(db.clone())); - let state_store = Arc::new(BeaconStateStore::new(db.clone())); + let path = db_name.expect("db_name cannot be None."); + let store = DiskStore::open(path).expect("Unable to open DB."); + let store = Arc::new(store); let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, &spec); let (genesis_state, _keypairs) = state_builder.build(); @@ -49,14 +43,13 @@ pub fn initialise_beacon_chain( ) .expect("Unable to load SystemTimeSlotClock"); // Choose the fork choice - let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + let fork_choice = BitwiseLMDGhost::new(store.clone()); // Genesis chain //TODO: Handle error correctly Arc::new( BeaconChain::from_genesis( - state_store.clone(), - block_store.clone(), + store, slot_clock, genesis_state, genesis_block, @@ -68,20 +61,18 @@ pub fn initialise_beacon_chain( } /// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time. -pub fn initialise_test_beacon_chain( +pub fn initialise_test_beacon_chain_with_memory_db( spec: &ChainSpec, _db_name: Option<&PathBuf>, ) -> Arc< BeaconChain< - MemoryDB, + MemoryStore, SystemTimeSlotClock, - BitwiseLMDGhost, + BitwiseLMDGhost, FewValidatorsEthSpec, >, > { - let db = Arc::new(MemoryDB::open()); - let block_store = Arc::new(BeaconBlockStore::new(db.clone())); - let state_store = Arc::new(BeaconStateStore::new(db.clone())); + let store = Arc::new(MemoryStore::open()); let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec); let (genesis_state, _keypairs) = state_builder.build(); @@ -97,14 +88,60 @@ pub fn initialise_test_beacon_chain( ) .expect("Unable to load SystemTimeSlotClock"); // Choose the fork choice - let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + let fork_choice = BitwiseLMDGhost::new(store.clone()); // Genesis chain //TODO: Handle error correctly Arc::new( BeaconChain::from_genesis( - state_store.clone(), - block_store.clone(), + store, + slot_clock, + genesis_state, + genesis_block, + spec.clone(), + fork_choice, + ) + .expect("Terminate if beacon chain generation fails"), + ) +} + +/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time. +pub fn initialise_test_beacon_chain_with_disk_db( + spec: &ChainSpec, + db_name: Option<&PathBuf>, +) -> Arc< + BeaconChain< + DiskStore, + SystemTimeSlotClock, + BitwiseLMDGhost, + FewValidatorsEthSpec, + >, +> { + let path = db_name.expect("db_name cannot be None."); + let store = DiskStore::open(path).expect("Unable to open DB."); + let store = Arc::new(store); + + let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec); + let (genesis_state, _keypairs) = state_builder.build(); + + let mut genesis_block = BeaconBlock::empty(spec); + genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root()); + + // Slot clock + let slot_clock = SystemTimeSlotClock::new( + spec.genesis_slot, + genesis_state.genesis_time, + spec.seconds_per_slot, + ) + .expect("Unable to load SystemTimeSlotClock"); + // Choose the fork choice + let fork_choice = BitwiseLMDGhost::new(store.clone()); + + // Genesis chain + //TODO: Handle error correctly + Arc::new( + BeaconChain::from_genesis( + store, slot_clock, genesis_state, genesis_block, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index d8d85a8a6..6ac01a5d5 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -7,7 +7,6 @@ pub mod test_utils; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::checkpoint::CheckPoint; pub use self::errors::{BeaconChainError, BlockProductionError}; -pub use db; pub use fork_choice; pub use parking_lot; pub use slot_clock; @@ -15,4 +14,5 @@ pub use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError, }; +pub use store; pub use types; diff --git a/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs b/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs index f7ff3cdae..b6b1defcc 100644 --- a/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs +++ b/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs @@ -1,17 +1,18 @@ pub use crate::{BeaconChain, BeaconChainError, CheckPoint}; -use db::{ - stores::{BeaconBlockStore, BeaconStateStore}, - MemoryDB, -}; use fork_choice::BitwiseLMDGhost; use slot_clock::TestingSlotClock; use std::sync::Arc; +use store::MemoryStore; use tree_hash::TreeHash; use types::*; use types::{test_utils::TestingBeaconStateBuilder, EthSpec, FewValidatorsEthSpec}; -type TestingBeaconChain = - BeaconChain, E>; +type TestingBeaconChain = BeaconChain< + MemoryStore, + TestingSlotClock, + BitwiseLMDGhost, + E, +>; pub struct TestingBeaconChainBuilder { state_builder: TestingBeaconStateBuilder, @@ -19,11 +20,9 @@ pub struct TestingBeaconChainBuilder { impl TestingBeaconChainBuilder { pub fn build(self, spec: &ChainSpec) -> TestingBeaconChain { - let db = Arc::new(MemoryDB::open()); - let block_store = Arc::new(BeaconBlockStore::new(db.clone())); - let state_store = Arc::new(BeaconStateStore::new(db.clone())); + let store = Arc::new(MemoryStore::open()); let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64()); - let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + let fork_choice = BitwiseLMDGhost::new(store.clone()); let (genesis_state, _keypairs) = self.state_builder.build(); @@ -32,8 +31,7 @@ impl TestingBeaconChainBuilder { // Create the Beacon Chain BeaconChain::from_genesis( - state_store.clone(), - block_store.clone(), + store, slot_clock, genesis_state, genesis_block, diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 8956dbb07..4a976eec4 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } -db = { path = "../db" } +store = { path = "../store" } rpc = { path = "../rpc" } fork_choice = { path = "../../eth2/fork_choice" } types = { path = "../../eth2/types" } diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/client_config.rs index d84b63f4f..8d7176c2c 100644 --- a/beacon_node/client/src/client_config.rs +++ b/beacon_node/client/src/client_config.rs @@ -1,5 +1,4 @@ use clap::ArgMatches; -use db::DBType; use fork_choice::ForkChoiceAlgorithm; use network::NetworkConfig; use slog::error; @@ -12,6 +11,12 @@ use types::multiaddr::ToMultiaddr; use types::Multiaddr; use types::{ChainSpec, EthSpec, LighthouseTestnetEthSpec}; +#[derive(Debug, Clone)] +pub enum DBType { + Memory, + Disk, +} + /// Stores the client configuration for this Lighthouse instance. #[derive(Debug, Clone)] pub struct ClientConfig { @@ -48,7 +53,7 @@ impl Default for ClientConfig { // default to memory db for now db_type: DBType::Memory, // default db name for disk-based dbs - db_name: data_dir.join("chain.db"), + db_name: data_dir.join("chain_db"), rpc_conf: rpc::RPCConfig::default(), } } @@ -131,6 +136,12 @@ impl ClientConfig { } } + match args.value_of("db") { + Some("disk") => config.db_type = DBType::Disk, + Some("memory") => config.db_type = DBType::Memory, + _ => unreachable!(), // clap prevents this. + }; + Ok(config) } } diff --git a/beacon_node/client/src/client_types.rs b/beacon_node/client/src/client_types.rs index 8c9352d7c..4cce42a06 100644 --- a/beacon_node/client/src/client_types.rs +++ b/beacon_node/client/src/client_types.rs @@ -1,15 +1,15 @@ use crate::{ArcBeaconChain, ClientConfig}; use beacon_chain::{ - db::{ClientDB, DiskDB, MemoryDB}, fork_choice::BitwiseLMDGhost, initialise, slot_clock::{SlotClock, SystemTimeSlotClock}, + store::{DiskStore, MemoryStore, Store}, }; use fork_choice::ForkChoice; use types::{EthSpec, FewValidatorsEthSpec, FoundationEthSpec}; pub trait ClientTypes { - type DB: ClientDB + 'static; + type DB: Store + 'static; type SlotClock: SlotClock + 'static; type ForkChoice: ForkChoice + 'static; type EthSpec: EthSpec + 'static; @@ -22,9 +22,9 @@ pub trait ClientTypes { pub struct StandardClientType; impl ClientTypes for StandardClientType { - type DB = DiskDB; + type DB = DiskStore; type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; + type ForkChoice = BitwiseLMDGhost; type EthSpec = FoundationEthSpec; fn initialise_beacon_chain( @@ -34,17 +34,32 @@ impl ClientTypes for StandardClientType { } } -pub struct TestingClientType; +pub struct MemoryStoreTestingClientType; -impl ClientTypes for TestingClientType { - type DB = MemoryDB; +impl ClientTypes for MemoryStoreTestingClientType { + type DB = MemoryStore; type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; + type ForkChoice = BitwiseLMDGhost; type EthSpec = FewValidatorsEthSpec; fn initialise_beacon_chain( config: &ClientConfig, ) -> ArcBeaconChain { - initialise::initialise_test_beacon_chain(&config.spec, None) + initialise::initialise_test_beacon_chain_with_memory_db(&config.spec, None) + } +} + +pub struct DiskStoreTestingClientType; + +impl ClientTypes for DiskStoreTestingClientType { + type DB = DiskStore; + type SlotClock = SystemTimeSlotClock; + type ForkChoice = BitwiseLMDGhost; + type EthSpec = FewValidatorsEthSpec; + + fn initialise_beacon_chain( + config: &ClientConfig, + ) -> ArcBeaconChain { + initialise::initialise_test_beacon_chain_with_disk_db(&config.spec, Some(&config.db_name)) } } diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 5d7c221ef..71d4013d3 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -6,9 +6,8 @@ pub mod error; pub mod notifier; use beacon_chain::BeaconChain; -pub use client_config::ClientConfig; +pub use client_config::{ClientConfig, DBType}; pub use client_types::ClientTypes; -use db::ClientDB; use exit_future::Signal; use fork_choice::ForkChoice; use futures::{future::Future, Stream}; @@ -18,6 +17,7 @@ use slot_clock::SlotClock; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; +use store::Store; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; use types::EthSpec; @@ -146,7 +146,7 @@ impl Client { fn do_state_catchup(chain: &Arc>, log: &slog::Logger) where - T: ClientDB, + T: Store, U: SlotClock, F: ForkChoice, E: EthSpec, diff --git a/beacon_node/db/Cargo.toml b/beacon_node/db/Cargo.toml deleted file mode 100644 index 122aaa34d..000000000 --- a/beacon_node/db/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "db" -version = "0.1.0" -authors = ["Paul Hauner "] -edition = "2018" - -[dependencies] -blake2-rfc = "0.2.18" -bls = { path = "../../eth2/utils/bls" } -bytes = "0.4.10" -rocksdb = "0.10.1" -ssz = { path = "../../eth2/utils/ssz" } -types = { path = "../../eth2/types" } diff --git a/beacon_node/db/src/lib.rs b/beacon_node/db/src/lib.rs deleted file mode 100644 index 5e710ae9a..000000000 --- a/beacon_node/db/src/lib.rs +++ /dev/null @@ -1,21 +0,0 @@ -extern crate blake2_rfc as blake2; -extern crate bls; -extern crate rocksdb; - -mod disk_db; -mod memory_db; -pub mod stores; -mod traits; - -use self::stores::COLUMNS; - -pub use self::disk_db::DiskDB; -pub use self::memory_db::MemoryDB; -pub use self::traits::{ClientDB, DBError, DBValue}; - -/// Currently available database options -#[derive(Debug, Clone)] -pub enum DBType { - Memory, - RocksDB, -} diff --git a/beacon_node/db/src/memory_db.rs b/beacon_node/db/src/memory_db.rs deleted file mode 100644 index 008e5912f..000000000 --- a/beacon_node/db/src/memory_db.rs +++ /dev/null @@ -1,236 +0,0 @@ -use super::blake2::blake2b::blake2b; -use super::COLUMNS; -use super::{ClientDB, DBError, DBValue}; -use std::collections::{HashMap, HashSet}; -use std::sync::RwLock; - -type DBHashMap = HashMap, Vec>; -type ColumnHashSet = HashSet; - -/// An in-memory database implementing the ClientDB trait. -/// -/// It is not particularily optimized, it exists for ease and speed of testing. It's not expected -/// this DB would be used outside of tests. -pub struct MemoryDB { - db: RwLock, - known_columns: RwLock, -} - -impl MemoryDB { - /// Open the in-memory database. - /// - /// All columns must be supplied initially, you will get an error if you try to access a column - /// that was not declared here. This condition is enforced artificially to simulate RocksDB. - pub fn open() -> Self { - let db: DBHashMap = HashMap::new(); - let mut known_columns: ColumnHashSet = HashSet::new(); - for col in &COLUMNS { - known_columns.insert(col.to_string()); - } - Self { - db: RwLock::new(db), - known_columns: RwLock::new(known_columns), - } - } - - /// Hashes a key and a column name in order to get a unique key for the supplied column. - fn get_key_for_col(col: &str, key: &[u8]) -> Vec { - blake2b(32, col.as_bytes(), key).as_bytes().to_vec() - } -} - -impl ClientDB for MemoryDB { - /// Get the value of some key from the database. Returns `None` if the key does not exist. - fn get(&self, col: &str, key: &[u8]) -> Result, DBError> { - // Panic if the DB locks are poisoned. - let db = self.db.read().unwrap(); - let known_columns = self.known_columns.read().unwrap(); - - if known_columns.contains(&col.to_string()) { - let column_key = MemoryDB::get_key_for_col(col, key); - Ok(db.get(&column_key).and_then(|val| Some(val.clone()))) - } else { - Err(DBError { - message: "Unknown column".to_string(), - }) - } - } - - /// Puts a key in the database. - fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> { - // Panic if the DB locks are poisoned. - let mut db = self.db.write().unwrap(); - let known_columns = self.known_columns.read().unwrap(); - - if known_columns.contains(&col.to_string()) { - let column_key = MemoryDB::get_key_for_col(col, key); - db.insert(column_key, val.to_vec()); - Ok(()) - } else { - Err(DBError { - message: "Unknown column".to_string(), - }) - } - } - - /// Return true if some key exists in some column. - fn exists(&self, col: &str, key: &[u8]) -> Result { - // Panic if the DB locks are poisoned. - let db = self.db.read().unwrap(); - let known_columns = self.known_columns.read().unwrap(); - - if known_columns.contains(&col.to_string()) { - let column_key = MemoryDB::get_key_for_col(col, key); - Ok(db.contains_key(&column_key)) - } else { - Err(DBError { - message: "Unknown column".to_string(), - }) - } - } - - /// Delete some key from the database. - fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError> { - // Panic if the DB locks are poisoned. - let mut db = self.db.write().unwrap(); - let known_columns = self.known_columns.read().unwrap(); - - if known_columns.contains(&col.to_string()) { - let column_key = MemoryDB::get_key_for_col(col, key); - db.remove(&column_key); - Ok(()) - } else { - Err(DBError { - message: "Unknown column".to_string(), - }) - } - } -} - -#[cfg(test)] -mod tests { - use super::super::stores::{BLOCKS_DB_COLUMN, VALIDATOR_DB_COLUMN}; - use super::super::ClientDB; - use super::*; - use std::sync::Arc; - use std::thread; - - #[test] - fn test_memorydb_can_delete() { - let col_a: &str = BLOCKS_DB_COLUMN; - - let db = MemoryDB::open(); - - db.put(col_a, "dogs".as_bytes(), "lol".as_bytes()).unwrap(); - - assert_eq!( - db.get(col_a, "dogs".as_bytes()).unwrap().unwrap(), - "lol".as_bytes() - ); - - db.delete(col_a, "dogs".as_bytes()).unwrap(); - - assert_eq!(db.get(col_a, "dogs".as_bytes()).unwrap(), None); - } - - #[test] - fn test_memorydb_column_access() { - let col_a: &str = BLOCKS_DB_COLUMN; - let col_b: &str = VALIDATOR_DB_COLUMN; - - let db = MemoryDB::open(); - - /* - * Testing that if we write to the same key in different columns that - * there is not an overlap. - */ - db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap(); - db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap(); - - assert_eq!( - db.get(col_a, "same".as_bytes()).unwrap().unwrap(), - "cat".as_bytes() - ); - assert_eq!( - db.get(col_b, "same".as_bytes()).unwrap().unwrap(), - "dog".as_bytes() - ); - } - - #[test] - fn test_memorydb_unknown_column_access() { - let col_a: &str = BLOCKS_DB_COLUMN; - let col_x: &str = "ColumnX"; - - let db = MemoryDB::open(); - - /* - * Test that we get errors when using undeclared columns - */ - assert!(db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).is_ok()); - assert!(db.put(col_x, "cats".as_bytes(), "lol".as_bytes()).is_err()); - - assert!(db.get(col_a, "cats".as_bytes()).is_ok()); - assert!(db.get(col_x, "cats".as_bytes()).is_err()); - } - - #[test] - fn test_memorydb_exists() { - let col_a: &str = BLOCKS_DB_COLUMN; - let col_b: &str = VALIDATOR_DB_COLUMN; - - let db = MemoryDB::open(); - - /* - * Testing that if we write to the same key in different columns that - * there is not an overlap. - */ - db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).unwrap(); - - assert_eq!(true, db.exists(col_a, "cats".as_bytes()).unwrap()); - assert_eq!(false, db.exists(col_b, "cats".as_bytes()).unwrap()); - - assert_eq!(false, db.exists(col_a, "dogs".as_bytes()).unwrap()); - assert_eq!(false, db.exists(col_b, "dogs".as_bytes()).unwrap()); - } - - #[test] - fn test_memorydb_threading() { - let col_name: &str = BLOCKS_DB_COLUMN; - - let db = Arc::new(MemoryDB::open()); - - let thread_count = 10; - let write_count = 10; - - // We're execting the product of these numbers to fit in one byte. - assert!(thread_count * write_count <= 255); - - let mut handles = vec![]; - for t in 0..thread_count { - let wc = write_count; - let db = db.clone(); - let col = col_name.clone(); - let handle = thread::spawn(move || { - for w in 0..wc { - let key = (t * w) as u8; - let val = 42; - db.put(&col, &vec![key], &vec![val]).unwrap(); - } - }); - handles.push(handle); - } - - for handle in handles { - handle.join().unwrap(); - } - - for t in 0..thread_count { - for w in 0..write_count { - let key = (t * w) as u8; - let val = db.get(&col_name, &vec![key]).unwrap().unwrap(); - assert_eq!(vec![42], val); - } - } - } -} diff --git a/beacon_node/db/src/stores/beacon_block_store.rs b/beacon_node/db/src/stores/beacon_block_store.rs deleted file mode 100644 index 868caafe2..000000000 --- a/beacon_node/db/src/stores/beacon_block_store.rs +++ /dev/null @@ -1,246 +0,0 @@ -use super::BLOCKS_DB_COLUMN as DB_COLUMN; -use super::{ClientDB, DBError}; -use ssz::Decode; -use std::sync::Arc; -use types::{BeaconBlock, Hash256, Slot}; - -#[derive(Clone, Debug, PartialEq)] -pub enum BeaconBlockAtSlotError { - UnknownBeaconBlock(Hash256), - InvalidBeaconBlock(Hash256), - DBError(String), -} - -pub struct BeaconBlockStore -where - T: ClientDB, -{ - db: Arc, -} - -// Implements `put`, `get`, `exists` and `delete` for the store. -impl_crud_for_store!(BeaconBlockStore, DB_COLUMN); - -impl BeaconBlockStore { - pub fn new(db: Arc) -> Self { - Self { db } - } - - pub fn get_deserialized(&self, hash: &Hash256) -> Result, DBError> { - match self.get(&hash)? { - None => Ok(None), - Some(ssz) => { - let block = BeaconBlock::from_ssz_bytes(&ssz).map_err(|_| DBError { - message: "Bad BeaconBlock SSZ.".to_string(), - })?; - Ok(Some(block)) - } - } - } - - /// Retrieve the block at a slot given a "head_hash" and a slot. - /// - /// A "head_hash" must be a block hash with a slot number greater than or equal to the desired - /// slot. - /// - /// This function will read each block down the chain until it finds a block with the given - /// slot number. If the slot is skipped, the function will return None. - /// - /// If a block is found, a tuple of (block_hash, serialized_block) is returned. - /// - /// Note: this function uses a loop instead of recursion as the compiler is over-strict when it - /// comes to recursion and the `impl Trait` pattern. See: - /// https://stackoverflow.com/questions/54032940/using-impl-trait-in-a-recursive-function - pub fn block_at_slot( - &self, - head_hash: &Hash256, - slot: Slot, - ) -> Result, BeaconBlockAtSlotError> { - let mut current_hash = *head_hash; - - loop { - if let Some(block) = self.get_deserialized(¤t_hash)? { - if block.slot == slot { - break Ok(Some((current_hash, block))); - } else if block.slot < slot { - break Ok(None); - } else { - current_hash = block.previous_block_root; - } - } else { - break Err(BeaconBlockAtSlotError::UnknownBeaconBlock(current_hash)); - } - } - } -} - -impl From for BeaconBlockAtSlotError { - fn from(e: DBError) -> Self { - BeaconBlockAtSlotError::DBError(e.message) - } -} - -#[cfg(test)] -mod tests { - use super::super::super::MemoryDB; - use super::*; - - use std::sync::Arc; - use std::thread; - - use ssz::ssz_encode; - use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; - use types::BeaconBlock; - use types::Hash256; - - test_crud_for_store!(BeaconBlockStore, DB_COLUMN); - - #[test] - fn head_hash_slot_too_low() { - let db = Arc::new(MemoryDB::open()); - let bs = Arc::new(BeaconBlockStore::new(db.clone())); - let mut rng = XorShiftRng::from_seed([42; 16]); - - let mut block = BeaconBlock::random_for_test(&mut rng); - block.slot = Slot::from(10_u64); - - let block_root = block.canonical_root(); - bs.put(&block_root, &ssz_encode(&block)).unwrap(); - - let result = bs.block_at_slot(&block_root, Slot::from(11_u64)).unwrap(); - assert_eq!(result, None); - } - - #[test] - fn test_invalid_block_at_slot() { - let db = Arc::new(MemoryDB::open()); - let store = BeaconBlockStore::new(db.clone()); - - let ssz = "definitly not a valid block".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - - db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap(); - assert_eq!( - store.block_at_slot(hash, Slot::from(42_u64)), - Err(BeaconBlockAtSlotError::DBError( - "Bad BeaconBlock SSZ.".into() - )) - ); - } - - #[test] - fn test_unknown_block_at_slot() { - let db = Arc::new(MemoryDB::open()); - let store = BeaconBlockStore::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - let other_hash = &Hash256::from([0xBB; 32]); - - db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap(); - assert_eq!( - store.block_at_slot(other_hash, Slot::from(42_u64)), - Err(BeaconBlockAtSlotError::UnknownBeaconBlock(*other_hash)) - ); - } - - #[test] - fn test_block_store_on_memory_db() { - let db = Arc::new(MemoryDB::open()); - let bs = Arc::new(BeaconBlockStore::new(db.clone())); - - let thread_count = 10; - let write_count = 10; - - let mut handles = vec![]; - for t in 0..thread_count { - let wc = write_count; - let bs = bs.clone(); - let handle = thread::spawn(move || { - for w in 0..wc { - let key = t * w; - let val = 42; - bs.put(&Hash256::from_low_u64_le(key), &vec![val]).unwrap(); - } - }); - handles.push(handle); - } - - for handle in handles { - handle.join().unwrap(); - } - - for t in 0..thread_count { - for w in 0..write_count { - let key = t * w; - assert!(bs.exists(&Hash256::from_low_u64_le(key)).unwrap()); - let val = bs.get(&Hash256::from_low_u64_le(key)).unwrap().unwrap(); - assert_eq!(vec![42], val); - } - } - } - - #[test] - #[ignore] - fn test_block_at_slot() { - let db = Arc::new(MemoryDB::open()); - let bs = Arc::new(BeaconBlockStore::new(db.clone())); - let mut rng = XorShiftRng::from_seed([42; 16]); - - // Specify test block parameters. - let hashes = [ - Hash256::from([0; 32]), - Hash256::from([1; 32]), - Hash256::from([2; 32]), - Hash256::from([3; 32]), - Hash256::from([4; 32]), - ]; - let parent_hashes = [ - Hash256::from([255; 32]), // Genesis block. - Hash256::from([0; 32]), - Hash256::from([1; 32]), - Hash256::from([2; 32]), - Hash256::from([3; 32]), - ]; - let unknown_hash = Hash256::from([101; 32]); // different from all above - let slots: Vec = vec![0, 1, 3, 4, 5].iter().map(|x| Slot::new(*x)).collect(); - - // Generate a vec of random blocks and store them in the DB. - let block_count = 5; - let mut blocks: Vec = Vec::with_capacity(5); - for i in 0..block_count { - let mut block = BeaconBlock::random_for_test(&mut rng); - - block.previous_block_root = parent_hashes[i]; - block.slot = slots[i]; - - let ssz = ssz_encode(&block); - db.put(DB_COLUMN, hashes[i].as_bytes(), &ssz).unwrap(); - - blocks.push(block); - } - - // Test that certain slots can be reached from certain hashes. - let test_cases = vec![(4, 4), (4, 3), (4, 2), (4, 1), (4, 0)]; - for (hashes_index, slot_index) in test_cases { - let (matched_block_hash, block) = bs - .block_at_slot(&hashes[hashes_index], slots[slot_index]) - .unwrap() - .unwrap(); - assert_eq!(matched_block_hash, hashes[slot_index]); - assert_eq!(block.slot, slots[slot_index]); - } - - let ssz = bs.block_at_slot(&hashes[4], Slot::new(2)).unwrap(); - assert_eq!(ssz, None); - - let ssz = bs.block_at_slot(&hashes[4], Slot::new(6)).unwrap(); - assert_eq!(ssz, None); - - let ssz = bs.block_at_slot(&unknown_hash, Slot::new(2)); - assert_eq!( - ssz, - Err(BeaconBlockAtSlotError::UnknownBeaconBlock(unknown_hash)) - ); - } -} diff --git a/beacon_node/db/src/stores/beacon_state_store.rs b/beacon_node/db/src/stores/beacon_state_store.rs deleted file mode 100644 index 044290592..000000000 --- a/beacon_node/db/src/stores/beacon_state_store.rs +++ /dev/null @@ -1,65 +0,0 @@ -use super::STATES_DB_COLUMN as DB_COLUMN; -use super::{ClientDB, DBError}; -use ssz::Decode; -use std::sync::Arc; -use types::{BeaconState, EthSpec, Hash256}; - -pub struct BeaconStateStore -where - T: ClientDB, -{ - db: Arc, -} - -// Implements `put`, `get`, `exists` and `delete` for the store. -impl_crud_for_store!(BeaconStateStore, DB_COLUMN); - -impl BeaconStateStore { - pub fn new(db: Arc) -> Self { - Self { db } - } - - pub fn get_deserialized( - &self, - hash: &Hash256, - ) -> Result>, DBError> { - match self.get(&hash)? { - None => Ok(None), - Some(ssz) => { - let state = BeaconState::from_ssz_bytes(&ssz).map_err(|_| DBError { - message: "Bad State SSZ.".to_string(), - })?; - Ok(Some(state)) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::super::super::MemoryDB; - use super::*; - - use ssz::ssz_encode; - use std::sync::Arc; - use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; - use types::{FoundationBeaconState, Hash256}; - - test_crud_for_store!(BeaconStateStore, DB_COLUMN); - - #[test] - fn test_reader() { - let db = Arc::new(MemoryDB::open()); - let store = BeaconStateStore::new(db.clone()); - - let mut rng = XorShiftRng::from_seed([42; 16]); - let state: FoundationBeaconState = BeaconState::random_for_test(&mut rng); - let state_root = state.canonical_root(); - - store.put(&state_root, &ssz_encode(&state)).unwrap(); - - let decoded = store.get_deserialized(&state_root).unwrap().unwrap(); - - assert_eq!(state, decoded); - } -} diff --git a/beacon_node/db/src/stores/macros.rs b/beacon_node/db/src/stores/macros.rs deleted file mode 100644 index 6c53e40ee..000000000 --- a/beacon_node/db/src/stores/macros.rs +++ /dev/null @@ -1,103 +0,0 @@ -macro_rules! impl_crud_for_store { - ($store: ident, $db_column: expr) => { - impl $store { - pub fn put(&self, hash: &Hash256, ssz: &[u8]) -> Result<(), DBError> { - self.db.put($db_column, hash.as_bytes(), ssz) - } - - pub fn get(&self, hash: &Hash256) -> Result>, DBError> { - self.db.get($db_column, hash.as_bytes()) - } - - pub fn exists(&self, hash: &Hash256) -> Result { - self.db.exists($db_column, hash.as_bytes()) - } - - pub fn delete(&self, hash: &Hash256) -> Result<(), DBError> { - self.db.delete($db_column, hash.as_bytes()) - } - } - }; -} - -#[cfg(test)] -macro_rules! test_crud_for_store { - ($store: ident, $db_column: expr) => { - #[test] - fn test_put() { - let db = Arc::new(MemoryDB::open()); - let store = $store::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - - store.put(hash, ssz).unwrap(); - assert_eq!(db.get(DB_COLUMN, hash.as_bytes()).unwrap().unwrap(), ssz); - } - - #[test] - fn test_get() { - let db = Arc::new(MemoryDB::open()); - let store = $store::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - - db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap(); - assert_eq!(store.get(hash).unwrap().unwrap(), ssz); - } - - #[test] - fn test_get_unknown() { - let db = Arc::new(MemoryDB::open()); - let store = $store::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - let other_hash = &Hash256::from([0xBB; 32]); - - db.put(DB_COLUMN, other_hash.as_bytes(), ssz).unwrap(); - assert_eq!(store.get(hash).unwrap(), None); - } - - #[test] - fn test_exists() { - let db = Arc::new(MemoryDB::open()); - let store = $store::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - - db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap(); - assert!(store.exists(hash).unwrap()); - } - - #[test] - fn test_block_does_not_exist() { - let db = Arc::new(MemoryDB::open()); - let store = $store::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - let other_hash = &Hash256::from([0xBB; 32]); - - db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap(); - assert!(!store.exists(other_hash).unwrap()); - } - - #[test] - fn test_delete() { - let db = Arc::new(MemoryDB::open()); - let store = $store::new(db.clone()); - - let ssz = "some bytes".as_bytes(); - let hash = &Hash256::from([0xAA; 32]); - - db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap(); - assert!(db.exists(DB_COLUMN, hash.as_bytes()).unwrap()); - - store.delete(hash).unwrap(); - assert!(!db.exists(DB_COLUMN, hash.as_bytes()).unwrap()); - } - }; -} diff --git a/beacon_node/db/src/stores/mod.rs b/beacon_node/db/src/stores/mod.rs deleted file mode 100644 index 44de7eed1..000000000 --- a/beacon_node/db/src/stores/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -use super::{ClientDB, DBError}; - -#[macro_use] -mod macros; -mod beacon_block_store; -mod beacon_state_store; -mod pow_chain_store; -mod validator_store; - -pub use self::beacon_block_store::{BeaconBlockAtSlotError, BeaconBlockStore}; -pub use self::beacon_state_store::BeaconStateStore; -pub use self::pow_chain_store::PoWChainStore; -pub use self::validator_store::{ValidatorStore, ValidatorStoreError}; - -pub const BLOCKS_DB_COLUMN: &str = "blocks"; -pub const STATES_DB_COLUMN: &str = "states"; -pub const POW_CHAIN_DB_COLUMN: &str = "powchain"; -pub const VALIDATOR_DB_COLUMN: &str = "validator"; - -pub const COLUMNS: [&str; 4] = [ - BLOCKS_DB_COLUMN, - STATES_DB_COLUMN, - POW_CHAIN_DB_COLUMN, - VALIDATOR_DB_COLUMN, -]; diff --git a/beacon_node/db/src/stores/pow_chain_store.rs b/beacon_node/db/src/stores/pow_chain_store.rs deleted file mode 100644 index 5c8b97907..000000000 --- a/beacon_node/db/src/stores/pow_chain_store.rs +++ /dev/null @@ -1,68 +0,0 @@ -use super::POW_CHAIN_DB_COLUMN as DB_COLUMN; -use super::{ClientDB, DBError}; -use std::sync::Arc; - -pub struct PoWChainStore -where - T: ClientDB, -{ - db: Arc, -} - -impl PoWChainStore { - pub fn new(db: Arc) -> Self { - Self { db } - } - - pub fn put_block_hash(&self, hash: &[u8]) -> Result<(), DBError> { - self.db.put(DB_COLUMN, hash, &[0]) - } - - pub fn block_hash_exists(&self, hash: &[u8]) -> Result { - self.db.exists(DB_COLUMN, hash) - } -} - -#[cfg(test)] -mod tests { - extern crate types; - - use super::super::super::MemoryDB; - use super::*; - - use self::types::Hash256; - - #[test] - fn test_put_block_hash() { - let db = Arc::new(MemoryDB::open()); - let store = PoWChainStore::new(db.clone()); - - let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec(); - store.put_block_hash(hash).unwrap(); - - assert!(db.exists(DB_COLUMN, hash).unwrap()); - } - - #[test] - fn test_block_hash_exists() { - let db = Arc::new(MemoryDB::open()); - let store = PoWChainStore::new(db.clone()); - - let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec(); - db.put(DB_COLUMN, hash, &[0]).unwrap(); - - assert!(store.block_hash_exists(hash).unwrap()); - } - - #[test] - fn test_block_hash_does_not_exist() { - let db = Arc::new(MemoryDB::open()); - let store = PoWChainStore::new(db.clone()); - - let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec(); - let other_hash = &Hash256::from([0xBB; 32]).as_bytes().to_vec(); - db.put(DB_COLUMN, hash, &[0]).unwrap(); - - assert!(!store.block_hash_exists(other_hash).unwrap()); - } -} diff --git a/beacon_node/db/src/stores/validator_store.rs b/beacon_node/db/src/stores/validator_store.rs deleted file mode 100644 index f653c9f71..000000000 --- a/beacon_node/db/src/stores/validator_store.rs +++ /dev/null @@ -1,215 +0,0 @@ -extern crate bytes; - -use self::bytes::{BufMut, BytesMut}; -use super::VALIDATOR_DB_COLUMN as DB_COLUMN; -use super::{ClientDB, DBError}; -use bls::PublicKey; -use ssz::{Decode, Encode}; -use std::sync::Arc; - -#[derive(Debug, PartialEq)] -pub enum ValidatorStoreError { - DBError(String), - DecodeError, -} - -impl From for ValidatorStoreError { - fn from(error: DBError) -> Self { - ValidatorStoreError::DBError(error.message) - } -} - -#[derive(Debug, PartialEq)] -enum KeyPrefixes { - PublicKey, -} - -pub struct ValidatorStore -where - T: ClientDB, -{ - db: Arc, -} - -impl ValidatorStore { - pub fn new(db: Arc) -> Self { - Self { db } - } - - fn prefix_bytes(&self, key_prefix: &KeyPrefixes) -> Vec { - match key_prefix { - KeyPrefixes::PublicKey => b"pubkey".to_vec(), - } - } - - fn get_db_key_for_index(&self, key_prefix: &KeyPrefixes, index: usize) -> Vec { - let mut buf = BytesMut::with_capacity(6 + 8); - buf.put(self.prefix_bytes(key_prefix)); - buf.put_u64_be(index as u64); - buf.take().to_vec() - } - - pub fn put_public_key_by_index( - &self, - index: usize, - public_key: &PublicKey, - ) -> Result<(), ValidatorStoreError> { - let key = self.get_db_key_for_index(&KeyPrefixes::PublicKey, index); - let val = public_key.as_ssz_bytes(); - self.db - .put(DB_COLUMN, &key[..], &val[..]) - .map_err(ValidatorStoreError::from) - } - - pub fn get_public_key_by_index( - &self, - index: usize, - ) -> Result, ValidatorStoreError> { - let key = self.get_db_key_for_index(&KeyPrefixes::PublicKey, index); - let val = self.db.get(DB_COLUMN, &key[..])?; - match val { - None => Ok(None), - Some(val) => match PublicKey::from_ssz_bytes(&val) { - Ok(key) => Ok(Some(key)), - Err(_) => Err(ValidatorStoreError::DecodeError), - }, - } - } -} - -#[cfg(test)] -mod tests { - use super::super::super::MemoryDB; - use super::*; - use bls::Keypair; - - #[test] - fn test_prefix_bytes() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db.clone()); - - assert_eq!( - store.prefix_bytes(&KeyPrefixes::PublicKey), - b"pubkey".to_vec() - ); - } - - #[test] - fn test_get_db_key_for_index() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db.clone()); - - let mut buf = BytesMut::with_capacity(6 + 8); - buf.put(b"pubkey".to_vec()); - buf.put_u64_be(42); - assert_eq!( - store.get_db_key_for_index(&KeyPrefixes::PublicKey, 42), - buf.take().to_vec() - ) - } - - #[test] - fn test_put_public_key_by_index() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db.clone()); - - let index = 3; - let public_key = Keypair::random().pk; - - store.put_public_key_by_index(index, &public_key).unwrap(); - let public_key_at_index = db - .get( - DB_COLUMN, - &store.get_db_key_for_index(&KeyPrefixes::PublicKey, index)[..], - ) - .unwrap() - .unwrap(); - - assert_eq!(public_key_at_index, public_key.as_ssz_bytes()); - } - - #[test] - fn test_get_public_key_by_index() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db.clone()); - - let index = 4; - let public_key = Keypair::random().pk; - - db.put( - DB_COLUMN, - &store.get_db_key_for_index(&KeyPrefixes::PublicKey, index)[..], - &public_key.as_ssz_bytes(), - ) - .unwrap(); - - let public_key_at_index = store.get_public_key_by_index(index).unwrap().unwrap(); - assert_eq!(public_key_at_index, public_key); - } - - #[test] - fn test_get_public_key_by_unknown_index() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db.clone()); - - let public_key = Keypair::random().pk; - - db.put( - DB_COLUMN, - &store.get_db_key_for_index(&KeyPrefixes::PublicKey, 3)[..], - &public_key.as_ssz_bytes(), - ) - .unwrap(); - - let public_key_at_index = store.get_public_key_by_index(4).unwrap(); - assert_eq!(public_key_at_index, None); - } - - #[test] - fn test_get_invalid_public_key() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db.clone()); - - let key = store.get_db_key_for_index(&KeyPrefixes::PublicKey, 42); - db.put(DB_COLUMN, &key[..], "cats".as_bytes()).unwrap(); - - assert_eq!( - store.get_public_key_by_index(42), - Err(ValidatorStoreError::DecodeError) - ); - } - - #[test] - fn test_validator_store_put_get() { - let db = Arc::new(MemoryDB::open()); - let store = ValidatorStore::new(db); - - let keys = vec![ - Keypair::random(), - Keypair::random(), - Keypair::random(), - Keypair::random(), - Keypair::random(), - ]; - - for i in 0..keys.len() { - store.put_public_key_by_index(i, &keys[i].pk).unwrap(); - } - - /* - * Check all keys are retrieved correctly. - */ - for i in 0..keys.len() { - let retrieved = store.get_public_key_by_index(i).unwrap().unwrap(); - assert_eq!(retrieved, keys[i].pk); - } - - /* - * Check that an index that wasn't stored returns None. - */ - assert!(store - .get_public_key_by_index(keys.len() + 1) - .unwrap() - .is_none()); - } -} diff --git a/beacon_node/db/src/traits.rs b/beacon_node/db/src/traits.rs deleted file mode 100644 index 41be3e23d..000000000 --- a/beacon_node/db/src/traits.rs +++ /dev/null @@ -1,28 +0,0 @@ -pub type DBValue = Vec; - -#[derive(Debug)] -pub struct DBError { - pub message: String, -} - -impl DBError { - pub fn new(message: String) -> Self { - Self { message } - } -} - -/// A generic database to be used by the "client' (i.e., -/// the lighthouse blockchain client). -/// -/// The purpose of having this generic trait is to allow the -/// program to use a persistent on-disk database during production, -/// but use a transient database during tests. -pub trait ClientDB: Sync + Send { - fn get(&self, col: &str, key: &[u8]) -> Result, DBError>; - - fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError>; - - fn exists(&self, col: &str, key: &[u8]) -> Result; - - fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError>; -} diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index a98aa73de..2a42376f7 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -1,9 +1,9 @@ use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::{ - db::ClientDB, fork_choice::ForkChoice, parking_lot::RwLockReadGuard, slot_clock::SlotClock, + store::Store, types::{BeaconState, ChainSpec}, AttestationValidationError, CheckPoint, }; @@ -66,7 +66,7 @@ pub trait BeaconChain: Send + Sync { impl BeaconChain for RawBeaconChain where - T: ClientDB + Sized, + T: Store, U: SlotClock, F: ForkChoice, E: EthSpec, diff --git a/beacon_node/rpc/Cargo.toml b/beacon_node/rpc/Cargo.toml index 3fc52c6b1..a361c94ab 100644 --- a/beacon_node/rpc/Cargo.toml +++ b/beacon_node/rpc/Cargo.toml @@ -17,7 +17,7 @@ protos = { path = "../../protos" } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } protobuf = "2.0.2" clap = "2.32.0" -db = { path = "../db" } +store = { path = "../store" } dirs = "1.0.3" futures = "0.1.23" slog = "^2.2.3" diff --git a/beacon_node/rpc/src/beacon_chain.rs b/beacon_node/rpc/src/beacon_chain.rs index 7e75b32ce..d12baf1d1 100644 --- a/beacon_node/rpc/src/beacon_chain.rs +++ b/beacon_node/rpc/src/beacon_chain.rs @@ -1,9 +1,9 @@ use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::{ - db::ClientDB, fork_choice::ForkChoice, parking_lot::{RwLockReadGuard, RwLockWriteGuard}, slot_clock::SlotClock, + store::Store, types::{BeaconState, ChainSpec, Signature}, AttestationValidationError, BlockProductionError, }; @@ -36,7 +36,7 @@ pub trait BeaconChain: Send + Sync { impl BeaconChain for RawBeaconChain where - T: ClientDB + Sized, + T: Store, U: SlotClock, F: ForkChoice, E: EthSpec, diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index 45aafb3ce..ef2121882 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -68,6 +68,15 @@ fn main() { .help("Listen port for RPC endpoint.") .takes_value(true), ) + .arg( + Arg::with_name("db") + .long("db") + .value_name("DB") + .help("Type of database to use.") + .takes_value(true) + .possible_values(&["disk", "memory"]) + .default_value("memory"), + ) .get_matches(); // invalid arguments, panic diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 1d9156124..4cf930060 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -1,15 +1,17 @@ -use client::client_types::TestingClientType; -use client::error; -use client::{notifier, Client, ClientConfig}; +use client::client_types::{DiskStoreTestingClientType, MemoryStoreTestingClientType}; +use client::{error, DBType}; +use client::{notifier, Client, ClientConfig, ClientTypes}; use futures::sync::oneshot; use futures::Future; use slog::info; use std::cell::RefCell; use tokio::runtime::Builder; +use tokio::runtime::Runtime; +use tokio::runtime::TaskExecutor; use tokio_timer::clock::Clock; pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> { - let mut runtime = Builder::new() + let runtime = Builder::new() .name_prefix("main-") .clock(Clock::system()) .build() @@ -20,8 +22,42 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul "data_dir" => &config.data_dir.to_str(), "port" => &config.net_conf.listen_port); + let executor = runtime.executor(); + + match config.db_type { + DBType::Disk => { + info!( + log, + "BeaconNode starting"; + "type" => "DiskStoreTestingClientType" + ); + let client: Client = + Client::new(config, log.clone(), &executor)?; + + run(client, executor, runtime, log) + } + DBType::Memory => { + info!( + log, + "BeaconNode starting"; + "type" => "MemoryStoreTestingClientType" + ); + let client: Client = + Client::new(config, log.clone(), &executor)?; + + run(client, executor, runtime, log) + } + } +} + +pub fn run( + client: Client, + executor: TaskExecutor, + mut runtime: Runtime, + log: &slog::Logger, +) -> error::Result<()> { // run service until ctrl-c - let (ctrlc_send, ctrlc) = oneshot::channel(); + let (ctrlc_send, ctrlc_oneshot) = oneshot::channel(); let ctrlc_send_c = RefCell::new(Some(ctrlc_send)); ctrlc::set_handler(move || { if let Some(ctrlc_send) = ctrlc_send_c.try_borrow_mut().unwrap().take() { @@ -32,14 +68,10 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul let (exit_signal, exit) = exit_future::signal(); - let executor = runtime.executor(); - - // currently testing - using TestingClientType - let client: Client = Client::new(config, log.clone(), &executor)?; notifier::run(&client, executor, exit); runtime - .block_on(ctrlc) + .block_on(ctrlc_oneshot) .map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))?; // perform global shutdown operations. diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml new file mode 100644 index 000000000..a95dafa90 --- /dev/null +++ b/beacon_node/store/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "store" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" + +[dev-dependencies] +tempfile = "3" + +[dependencies] +blake2-rfc = "0.2.18" +bls = { path = "../../eth2/utils/bls" } +bytes = "0.4.10" +db-key = "0.0.5" +leveldb = "0.8.4" +parking_lot = "0.7" +ssz = { path = "../../eth2/utils/ssz" } +ssz_derive = { path = "../../eth2/utils/ssz_derive" } +tree_hash = { path = "../../eth2/utils/tree_hash" } +types = { path = "../../eth2/types" } diff --git a/beacon_node/store/src/block_at_slot.rs b/beacon_node/store/src/block_at_slot.rs new file mode 100644 index 000000000..4a8abaefd --- /dev/null +++ b/beacon_node/store/src/block_at_slot.rs @@ -0,0 +1,183 @@ +use super::*; +use ssz::{Decode, DecodeError}; + +fn get_block_bytes(store: &T, root: Hash256) -> Result>, Error> { + store.get_bytes(BeaconBlock::db_column().into(), &root[..]) +} + +fn read_slot_from_block_bytes(bytes: &[u8]) -> Result { + let end = std::cmp::min(Slot::ssz_fixed_len(), bytes.len()); + + Slot::from_ssz_bytes(&bytes[0..end]) +} + +fn read_previous_block_root_from_block_bytes(bytes: &[u8]) -> Result { + let previous_bytes = Slot::ssz_fixed_len(); + let slice = bytes + .get(previous_bytes..previous_bytes + Hash256::ssz_fixed_len()) + .ok_or_else(|| DecodeError::BytesInvalid("Not enough bytes.".to_string()))?; + + Hash256::from_ssz_bytes(slice) +} + +pub fn get_block_at_preceeding_slot( + store: &T, + slot: Slot, + start_root: Hash256, +) -> Result, Error> { + let mut root = start_root; + + loop { + if let Some(bytes) = get_block_bytes(store, root)? { + let this_slot = read_slot_from_block_bytes(&bytes)?; + + if this_slot == slot { + let block = BeaconBlock::from_ssz_bytes(&bytes)?; + break Ok(Some((root, block))); + } else if this_slot < slot { + break Ok(None); + } else { + root = read_previous_block_root_from_block_bytes(&bytes)?; + } + } else { + break Ok(None); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ssz::Encode; + use tree_hash::TreeHash; + + #[test] + fn read_slot() { + let spec = FewValidatorsEthSpec::spec(); + + let test_slot = |slot: Slot| { + let mut block = BeaconBlock::empty(&spec); + block.slot = slot; + let bytes = block.as_ssz_bytes(); + assert_eq!(read_slot_from_block_bytes(&bytes).unwrap(), slot); + }; + + test_slot(Slot::new(0)); + test_slot(Slot::new(1)); + test_slot(Slot::new(42)); + test_slot(Slot::new(u64::max_value())); + } + + #[test] + fn bad_slot() { + for i in 0..8 { + assert!(read_slot_from_block_bytes(&vec![0; i]).is_err()); + } + } + + #[test] + fn read_previous_block_root() { + let spec = FewValidatorsEthSpec::spec(); + + let test_root = |root: Hash256| { + let mut block = BeaconBlock::empty(&spec); + block.previous_block_root = root; + let bytes = block.as_ssz_bytes(); + assert_eq!( + read_previous_block_root_from_block_bytes(&bytes).unwrap(), + root + ); + }; + + test_root(Hash256::random()); + test_root(Hash256::random()); + test_root(Hash256::random()); + } + + fn build_chain( + store: &impl Store, + slots: &[usize], + spec: &ChainSpec, + ) -> Vec<(Hash256, BeaconBlock)> { + let mut blocks_and_roots: Vec<(Hash256, BeaconBlock)> = vec![]; + + for (i, slot) in slots.iter().enumerate() { + let mut block = BeaconBlock::empty(spec); + block.slot = Slot::from(*slot); + + if i > 0 { + block.previous_block_root = blocks_and_roots[i - 1].0; + } + + let root = Hash256::from_slice(&block.tree_hash_root()); + + store.put(&root, &block).unwrap(); + blocks_and_roots.push((root, block)); + } + + blocks_and_roots + } + + #[test] + fn chain_without_skips() { + let n: usize = 10; + let store = MemoryStore::open(); + let spec = FewValidatorsEthSpec::spec(); + + let slots: Vec = (0..n).collect(); + let blocks_and_roots = build_chain(&store, &slots, &spec); + + for source in 1..n { + for target in 0..=source { + let (source_root, _source_block) = &blocks_and_roots[source]; + let (target_root, target_block) = &blocks_and_roots[target]; + + let (found_root, found_block) = store + .get_block_at_preceeding_slot(*source_root, target_block.slot) + .unwrap() + .unwrap(); + + assert_eq!(found_root, *target_root); + assert_eq!(found_block, *target_block); + } + } + } + + #[test] + fn chain_with_skips() { + let store = MemoryStore::open(); + let spec = FewValidatorsEthSpec::spec(); + + let slots = vec![0, 1, 2, 5]; + + let blocks_and_roots = build_chain(&store, &slots, &spec); + + // Valid slots + for target in 0..3 { + let (source_root, _source_block) = &blocks_and_roots[3]; + let (target_root, target_block) = &blocks_and_roots[target]; + + let (found_root, found_block) = store + .get_block_at_preceeding_slot(*source_root, target_block.slot) + .unwrap() + .unwrap(); + + assert_eq!(found_root, *target_root); + assert_eq!(found_block, *target_block); + } + + // Slot that doesn't exist + let (source_root, _source_block) = &blocks_and_roots[3]; + assert!(store + .get_block_at_preceeding_slot(*source_root, Slot::new(3)) + .unwrap() + .is_none()); + + // Slot too high + let (source_root, _source_block) = &blocks_and_roots[3]; + assert!(store + .get_block_at_preceeding_slot(*source_root, Slot::new(3)) + .unwrap() + .is_none()); + } +} diff --git a/beacon_node/db/src/disk_db.rs b/beacon_node/store/src/disk_db.rs similarity index 86% rename from beacon_node/db/src/disk_db.rs rename to beacon_node/store/src/disk_db.rs index 2d26315da..eb2b885c6 100644 --- a/beacon_node/db/src/disk_db.rs +++ b/beacon_node/store/src/disk_db.rs @@ -1,19 +1,20 @@ extern crate rocksdb; -use super::rocksdb::Error as RocksError; -use super::rocksdb::{Options, DB}; +// use super::stores::COLUMNS; use super::{ClientDB, DBError, DBValue}; +use rocksdb::Error as RocksError; +use rocksdb::{Options, DB}; use std::fs; use std::path::Path; /// A on-disk database which implements the ClientDB trait. /// /// This implementation uses RocksDB with default options. -pub struct DiskDB { +pub struct DiskStore { db: DB, } -impl DiskDB { +impl DiskStore { /// Open the RocksDB database, optionally supplying columns if required. /// /// The RocksDB database will be contained in a directory titled @@ -23,31 +24,32 @@ impl DiskDB { /// /// Panics if the database is unable to be created. pub fn open(path: &Path, columns: Option<&[&str]>) -> Self { - /* - * Initialise the options - */ + // Rocks options. let mut options = Options::default(); options.create_if_missing(true); - // TODO: ensure that columns are created (and remove - // the dead_code allow) - - /* - * Initialise the path - */ + // Ensure the path exists. fs::create_dir_all(&path).unwrap_or_else(|_| panic!("Unable to create {:?}", &path)); let db_path = path.join("database"); - /* - * Open the database - */ - let db = match columns { - None => DB::open(&options, db_path), - Some(columns) => DB::open_cf(&options, db_path, columns), - } - .expect("Unable to open local database");; + let columns = columns.unwrap_or(&COLUMNS); - Self { db } + if db_path.exists() { + Self { + db: DB::open_cf(&options, db_path, &COLUMNS) + .expect("Unable to open local database"), + } + } else { + let mut db = Self { + db: DB::open(&options, db_path).expect("Unable to open local database"), + }; + + for cf in columns { + db.create_col(cf).unwrap(); + } + + db + } } /// Create a RocksDB column family. Corresponds to the @@ -69,7 +71,7 @@ impl From for DBError { } } -impl ClientDB for DiskDB { +impl ClientDB for DiskStore { /// Get the value for some key on some column. /// /// Corresponds to the `get_cf()` method on the RocksDB API. @@ -97,7 +99,7 @@ impl ClientDB for DiskDB { None => Err(DBError { message: "Unknown column".to_string(), }), - Some(handle) => self.db.put_cf(handle, key, val).map_err(Into::into), + Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()), } } @@ -152,7 +154,7 @@ mod tests { let col_name: &str = "TestColumn"; let column_families = vec![col_name]; - let mut db = DiskDB::open(&path, None); + let mut db = DiskStore::open(&path, None); for cf in column_families { db.create_col(&cf).unwrap(); diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs new file mode 100644 index 000000000..815b35a8e --- /dev/null +++ b/beacon_node/store/src/errors.rs @@ -0,0 +1,30 @@ +use ssz::DecodeError; + +#[derive(Debug, PartialEq)] +pub enum Error { + SszDecodeError(DecodeError), + DBError { message: String }, +} + +impl From for Error { + fn from(e: DecodeError) -> Error { + Error::SszDecodeError(e) + } +} + +impl From for Error { + fn from(e: DBError) -> Error { + Error::DBError { message: e.message } + } +} + +#[derive(Debug)] +pub struct DBError { + pub message: String, +} + +impl DBError { + pub fn new(message: String) -> Self { + Self { message } + } +} diff --git a/beacon_node/store/src/impls.rs b/beacon_node/store/src/impls.rs new file mode 100644 index 000000000..91f8d52de --- /dev/null +++ b/beacon_node/store/src/impls.rs @@ -0,0 +1,30 @@ +use crate::*; +use ssz::{Decode, Encode}; + +impl StoreItem for BeaconBlock { + fn db_column() -> DBColumn { + DBColumn::BeaconBlock + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &mut [u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +} + +impl StoreItem for BeaconState { + fn db_column() -> DBColumn { + DBColumn::BeaconState + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &mut [u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +} diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs new file mode 100644 index 000000000..09aec46fa --- /dev/null +++ b/beacon_node/store/src/leveldb_store.rs @@ -0,0 +1,100 @@ +use super::*; +use db_key::Key; +use leveldb::database::kv::KV; +use leveldb::database::Database; +use leveldb::error::Error as LevelDBError; +use leveldb::options::{Options, ReadOptions, WriteOptions}; +use std::path::Path; + +/// A wrapped leveldb database. +pub struct LevelDB { + db: Database, +} + +impl LevelDB { + /// Open a database at `path`, creating a new database if one does not already exist. + pub fn open(path: &Path) -> Result { + let mut options = Options::new(); + + options.create_if_missing = true; + + let db = Database::open(path, options)?; + + Ok(Self { db }) + } + + fn read_options(&self) -> ReadOptions { + ReadOptions::new() + } + + fn write_options(&self) -> WriteOptions { + WriteOptions::new() + } + + fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey { + let mut col = col.as_bytes().to_vec(); + col.append(&mut key.to_vec()); + BytesKey { key: col } + } +} + +/// Used for keying leveldb. +pub struct BytesKey { + key: Vec, +} + +impl Key for BytesKey { + fn from_u8(key: &[u8]) -> Self { + Self { key: key.to_vec() } + } + + fn as_slice T>(&self, f: F) -> T { + f(self.key.as_slice()) + } +} + +impl Store for LevelDB { + /// Retrieve some bytes in `column` with `key`. + fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { + let column_key = Self::get_key_for_col(col, key); + + self.db + .get(self.read_options(), column_key) + .map_err(Into::into) + } + + /// Store some `value` in `column`, indexed with `key`. + fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + let column_key = Self::get_key_for_col(col, key); + + self.db + .put(self.write_options(), column_key, val) + .map_err(Into::into) + } + + /// Return `true` if `key` exists in `column`. + fn key_exists(&self, col: &str, key: &[u8]) -> Result { + let column_key = Self::get_key_for_col(col, key); + + self.db + .get(self.read_options(), column_key) + .map_err(Into::into) + .and_then(|val| Ok(val.is_some())) + } + + /// Removes `key` from `column`. + fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { + let column_key = Self::get_key_for_col(col, key); + self.db + .delete(self.write_options(), column_key) + .map_err(Into::into) + } +} + +impl From for Error { + fn from(e: LevelDBError) -> Error { + Error::DBError { + message: format!("{:?}", e), + } + } +} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs new file mode 100644 index 000000000..59875601a --- /dev/null +++ b/beacon_node/store/src/lib.rs @@ -0,0 +1,223 @@ +//! Storage functionality for Lighthouse. +//! +//! Provides the following stores: +//! +//! - `DiskStore`: an on-disk store backed by leveldb. Used in production. +//! - `MemoryStore`: an in-memory store backed by a hash-map. Used for testing. +//! +//! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See +//! tests for implementation examples. + +mod block_at_slot; +mod errors; +mod impls; +mod leveldb_store; +mod memory_store; + +pub use self::leveldb_store::LevelDB as DiskStore; +pub use self::memory_store::MemoryStore; +pub use errors::Error; +pub use types::*; + +/// An object capable of storing and retrieving objects implementing `StoreItem`. +/// +/// A `Store` is fundamentally backed by a key-value database, however it provides support for +/// columns. A simple column implementation might involve prefixing a key with some bytes unique to +/// each column. +pub trait Store: Sync + Send + Sized { + /// Store an item in `Self`. + fn put(&self, key: &Hash256, item: &impl StoreItem) -> Result<(), Error> { + item.db_put(self, key) + } + + /// Retrieve an item from `Self`. + fn get(&self, key: &Hash256) -> Result, Error> { + I::db_get(self, key) + } + + /// Returns `true` if the given key represents an item in `Self`. + fn exists(&self, key: &Hash256) -> Result { + I::db_exists(self, key) + } + + /// Remove an item from `Self`. + fn delete(&self, key: &Hash256) -> Result<(), Error> { + I::db_delete(self, key) + } + + /// Given the root of an existing block in the store (`start_block_root`), return a parent + /// block with the specified `slot`. + /// + /// Returns `None` if no parent block exists at that slot, or if `slot` is greater than the + /// slot of `start_block_root`. + fn get_block_at_preceeding_slot( + &self, + start_block_root: Hash256, + slot: Slot, + ) -> Result, Error> { + block_at_slot::get_block_at_preceeding_slot(self, slot, start_block_root) + } + + /// Retrieve some bytes in `column` with `key`. + fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error>; + + /// Store some `value` in `column`, indexed with `key`. + fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>; + + /// Return `true` if `key` exists in `column`. + fn key_exists(&self, column: &str, key: &[u8]) -> Result; + + /// Removes `key` from `column`. + fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; +} + +/// A unique column identifier. +pub enum DBColumn { + BeaconBlock, + BeaconState, + BeaconChain, +} + +impl<'a> Into<&'a str> for DBColumn { + /// Returns a `&str` that can be used for keying a key-value data base. + fn into(self) -> &'a str { + match self { + DBColumn::BeaconBlock => &"blk", + DBColumn::BeaconState => &"ste", + DBColumn::BeaconChain => &"bch", + } + } +} + +/// An item that may be stored in a `Store`. +/// +/// Provides default methods that are suitable for most applications, however when overridden they +/// provide full customizability of `Store` operations. +pub trait StoreItem: Sized { + /// Identifies which column this item should be placed in. + fn db_column() -> DBColumn; + + /// Serialize `self` as bytes. + fn as_store_bytes(&self) -> Vec; + + /// De-serialize `self` from bytes. + fn from_store_bytes(bytes: &mut [u8]) -> Result; + + /// Store `self`. + fn db_put(&self, store: &impl Store, key: &Hash256) -> Result<(), Error> { + let column = Self::db_column().into(); + let key = key.as_bytes(); + + store + .put_bytes(column, key, &self.as_store_bytes()) + .map_err(Into::into) + } + + /// Retrieve an instance of `Self`. + fn db_get(store: &impl Store, key: &Hash256) -> Result, Error> { + let column = Self::db_column().into(); + let key = key.as_bytes(); + + match store.get_bytes(column, key)? { + Some(mut bytes) => Ok(Some(Self::from_store_bytes(&mut bytes[..])?)), + None => Ok(None), + } + } + + /// Return `true` if an instance of `Self` exists in `Store`. + fn db_exists(store: &impl Store, key: &Hash256) -> Result { + let column = Self::db_column().into(); + let key = key.as_bytes(); + + store.key_exists(column, key) + } + + /// Delete `self` from the `Store`. + fn db_delete(store: &impl Store, key: &Hash256) -> Result<(), Error> { + let column = Self::db_column().into(); + let key = key.as_bytes(); + + store.key_delete(column, key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ssz::{Decode, Encode}; + use ssz_derive::{Decode, Encode}; + use tempfile::tempdir; + + #[derive(PartialEq, Debug, Encode, Decode)] + struct StorableThing { + a: u64, + b: u64, + } + + impl StoreItem for StorableThing { + fn db_column() -> DBColumn { + DBColumn::BeaconBlock + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &mut [u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } + } + + fn test_impl(store: impl Store) { + let key = Hash256::random(); + let item = StorableThing { a: 1, b: 42 }; + + assert_eq!(store.exists::(&key), Ok(false)); + + store.put(&key, &item).unwrap(); + + assert_eq!(store.exists::(&key), Ok(true)); + + let retrieved = store.get(&key).unwrap().unwrap(); + assert_eq!(item, retrieved); + + store.delete::(&key).unwrap(); + + assert_eq!(store.exists::(&key), Ok(false)); + + assert_eq!(store.get::(&key), Ok(None)); + } + + #[test] + fn diskdb() { + let dir = tempdir().unwrap(); + let path = dir.path(); + let store = DiskStore::open(&path).unwrap(); + + test_impl(store); + } + + #[test] + fn memorydb() { + let store = MemoryStore::open(); + + test_impl(store); + } + + #[test] + fn exists() { + let store = MemoryStore::open(); + let key = Hash256::random(); + let item = StorableThing { a: 1, b: 42 }; + + assert_eq!(store.exists::(&key).unwrap(), false); + + store.put(&key, &item).unwrap(); + + assert_eq!(store.exists::(&key).unwrap(), true); + + store.delete::(&key).unwrap(); + + assert_eq!(store.exists::(&key).unwrap(), false); + } +} diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs new file mode 100644 index 000000000..086a16c26 --- /dev/null +++ b/beacon_node/store/src/memory_store.rs @@ -0,0 +1,63 @@ +use super::{Error, Store}; +use parking_lot::RwLock; +use std::collections::HashMap; + +type DBHashMap = HashMap, Vec>; + +/// A thread-safe `HashMap` wrapper. +pub struct MemoryStore { + db: RwLock, +} + +impl MemoryStore { + /// Create a new, empty database. + pub fn open() -> Self { + Self { + db: RwLock::new(HashMap::new()), + } + } + + fn get_key_for_col(col: &str, key: &[u8]) -> Vec { + let mut col = col.as_bytes().to_vec(); + col.append(&mut key.to_vec()); + col + } +} + +impl Store for MemoryStore { + /// Get the value of some key from the database. Returns `None` if the key does not exist. + fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { + let column_key = MemoryStore::get_key_for_col(col, key); + + Ok(self + .db + .read() + .get(&column_key) + .and_then(|val| Some(val.clone()))) + } + + /// Puts a key in the database. + fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + let column_key = MemoryStore::get_key_for_col(col, key); + + self.db.write().insert(column_key, val.to_vec()); + + Ok(()) + } + + /// Return true if some key exists in some column. + fn key_exists(&self, col: &str, key: &[u8]) -> Result { + let column_key = MemoryStore::get_key_for_col(col, key); + + Ok(self.db.read().contains_key(&column_key)) + } + + /// Delete some key from the database. + fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { + let column_key = MemoryStore::get_key_for_col(col, key); + + self.db.write().remove(&column_key); + + Ok(()) + } +} diff --git a/beacon_node/store/src/store.rs b/beacon_node/store/src/store.rs new file mode 100644 index 000000000..5d18c7ba5 --- /dev/null +++ b/beacon_node/store/src/store.rs @@ -0,0 +1,37 @@ +use super::*; + +pub type Vec = Vec; + +pub trait Store: Sync + Send + Sized { + fn put(&self, key: &Hash256, item: &impl StoreItem) -> Result<(), Error> { + item.db_put(self, key) + } + + fn get(&self, key: &Hash256) -> Result, Error> { + I::db_get(self, key) + } + + fn exists(&self, key: &Hash256) -> Result { + I::db_exists(self, key) + } + + fn delete(&self, key: &Hash256) -> Result<(), Error> { + I::db_delete(self, key) + } + + fn get_block_at_preceeding_slot( + &self, + start_block_root: Hash256, + slot: Slot, + ) -> Result, Error> { + block_at_slot::get_block_at_preceeding_slot(self, slot, start_block_root) + } + + fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error>; + + fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error>; + + fn key_exists(&self, col: &str, key: &[u8]) -> Result; + + fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error>; +} diff --git a/eth2/fork_choice/Cargo.toml b/eth2/fork_choice/Cargo.toml index 819b84055..f2e6825ed 100644 --- a/eth2/fork_choice/Cargo.toml +++ b/eth2/fork_choice/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Age Manning "] edition = "2018" [dependencies] -db = { path = "../../beacon_node/db" } +store = { path = "../../beacon_node/store" } ssz = { path = "../utils/ssz" } types = { path = "../types" } log = "0.4.6" diff --git a/eth2/fork_choice/src/bitwise_lmd_ghost.rs b/eth2/fork_choice/src/bitwise_lmd_ghost.rs index 0bbac6bb6..0e579c0b9 100644 --- a/eth2/fork_choice/src/bitwise_lmd_ghost.rs +++ b/eth2/fork_choice/src/bitwise_lmd_ghost.rs @@ -1,16 +1,11 @@ //! The optimised bitwise LMD-GHOST fork choice rule. -extern crate bit_vec; - use crate::{ForkChoice, ForkChoiceError}; use bit_vec::BitVec; -use db::{ - stores::{BeaconBlockStore, BeaconStateStore}, - ClientDB, -}; use log::{debug, trace}; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; +use store::Store; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot, SlotHeight}; //TODO: Pruning - Children @@ -34,7 +29,7 @@ fn power_of_2_below(x: u64) -> u64 { } /// Stores the necessary data structures to run the optimised bitwise lmd ghost algorithm. -pub struct BitwiseLMDGhost { +pub struct BitwiseLMDGhost { /// A cache of known ancestors at given heights for a specific block. //TODO: Consider FnvHashMap cache: HashMap, Hash256>, @@ -46,30 +41,21 @@ pub struct BitwiseLMDGhost { /// The latest attestation targets as a map of validator index to block hash. //TODO: Could this be a fixed size vec latest_attestation_targets: HashMap, - /// Block storage access. - block_store: Arc>, - /// State storage access. - state_store: Arc>, + /// Block and state storage. + store: Arc, max_known_height: SlotHeight, _phantom: PhantomData, } -impl BitwiseLMDGhost -where - T: ClientDB + Sized, -{ - pub fn new( - block_store: Arc>, - state_store: Arc>, - ) -> Self { +impl BitwiseLMDGhost { + pub fn new(store: Arc) -> Self { BitwiseLMDGhost { cache: HashMap::new(), ancestors: vec![HashMap::new(); 16], latest_attestation_targets: HashMap::new(), children: HashMap::new(), max_known_height: SlotHeight::new(0), - block_store, - state_store, + store, _phantom: PhantomData, } } @@ -89,8 +75,8 @@ where let mut latest_votes: HashMap = HashMap::new(); // gets the current weighted votes let current_state: BeaconState = self - .state_store - .get_deserialized(&state_root)? + .store + .get(&state_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?; let active_validator_indices = @@ -121,8 +107,8 @@ where // return None if we can't get the block from the db. let block_height = { let block_slot = self - .block_store - .get_deserialized(&block_hash) + .store + .get::(&block_hash) .ok()? .expect("Should have returned already if None") .slot; @@ -243,7 +229,7 @@ where } } -impl ForkChoice for BitwiseLMDGhost { +impl ForkChoice for BitwiseLMDGhost { fn add_block( &mut self, block: &BeaconBlock, @@ -252,8 +238,8 @@ impl ForkChoice for BitwiseLMDGhost { ) -> Result<(), ForkChoiceError> { // get the height of the parent let parent_height = self - .block_store - .get_deserialized(&block.previous_block_root)? + .store + .get::(&block.previous_block_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(block.previous_block_root))? .slot .height(spec.genesis_slot); @@ -304,16 +290,16 @@ impl ForkChoice for BitwiseLMDGhost { trace!("Old attestation found: {:?}", attestation_target); // get the height of the target block let block_height = self - .block_store - .get_deserialized(&target_block_root)? + .store + .get::(&target_block_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))? .slot .height(spec.genesis_slot); // get the height of the past target block let past_block_height = self - .block_store - .get_deserialized(&attestation_target)? + .store + .get::(&attestation_target)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))? .slot .height(spec.genesis_slot); @@ -337,8 +323,8 @@ impl ForkChoice for BitwiseLMDGhost { justified_block_start ); let block = self - .block_store - .get_deserialized(&justified_block_start)? + .store + .get::(&justified_block_start)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?; let block_slot = block.slot; @@ -429,8 +415,8 @@ impl ForkChoice for BitwiseLMDGhost { // didn't find head yet, proceed to next iteration // update block height block_height = self - .block_store - .get_deserialized(¤t_head)? + .store + .get::(¤t_head)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(current_head))? .slot .height(spec.genesis_slot); diff --git a/eth2/fork_choice/src/lib.rs b/eth2/fork_choice/src/lib.rs index 016cd5dea..ffc40e6c6 100644 --- a/eth2/fork_choice/src/lib.rs +++ b/eth2/fork_choice/src/lib.rs @@ -16,17 +16,14 @@ //! [`slow_lmd_ghost`]: struct.SlowLmdGhost.html //! [`bitwise_lmd_ghost`]: struct.OptimisedLmdGhost.html -extern crate db; -extern crate ssz; -extern crate types; - pub mod bitwise_lmd_ghost; pub mod longest_chain; pub mod optimized_lmd_ghost; pub mod slow_lmd_ghost; -use db::stores::BeaconBlockAtSlotError; -use db::DBError; +// use store::stores::BeaconBlockAtSlotError; +// use store::DBError; +use store::Error as DBError; use types::{BeaconBlock, ChainSpec, Hash256}; pub use bitwise_lmd_ghost::BitwiseLMDGhost; @@ -77,10 +74,11 @@ pub enum ForkChoiceError { impl From for ForkChoiceError { fn from(e: DBError) -> ForkChoiceError { - ForkChoiceError::StorageError(e.message) + ForkChoiceError::StorageError(format!("{:?}", e)) } } +/* impl From for ForkChoiceError { fn from(e: BeaconBlockAtSlotError) -> ForkChoiceError { match e { @@ -94,6 +92,7 @@ impl From for ForkChoiceError { } } } +*/ /// Fork choice options that are currently implemented. #[derive(Debug, Clone)] diff --git a/eth2/fork_choice/src/longest_chain.rs b/eth2/fork_choice/src/longest_chain.rs index 423edc567..11453cf49 100644 --- a/eth2/fork_choice/src/longest_chain.rs +++ b/eth2/fork_choice/src/longest_chain.rs @@ -1,31 +1,25 @@ use crate::{ForkChoice, ForkChoiceError}; -use db::{stores::BeaconBlockStore, ClientDB}; use std::sync::Arc; +use store::Store; use types::{BeaconBlock, ChainSpec, Hash256, Slot}; -pub struct LongestChain -where - T: ClientDB + Sized, -{ +pub struct LongestChain { /// List of head block hashes head_block_hashes: Vec, - /// Block storage access. - block_store: Arc>, + /// Block storage. + store: Arc, } -impl LongestChain -where - T: ClientDB + Sized, -{ - pub fn new(block_store: Arc>) -> Self { +impl LongestChain { + pub fn new(store: Arc) -> Self { LongestChain { head_block_hashes: Vec::new(), - block_store, + store, } } } -impl ForkChoice for LongestChain { +impl ForkChoice for LongestChain { fn add_block( &mut self, block: &BeaconBlock, @@ -55,9 +49,9 @@ impl ForkChoice for LongestChain { * Load all the head_block hashes from the DB as SszBeaconBlocks. */ for (index, block_hash) in self.head_block_hashes.iter().enumerate() { - let block = self - .block_store - .get_deserialized(&block_hash)? + let block: BeaconBlock = self + .store + .get(&block_hash)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_hash))?; head_blocks.push((index, block)); } diff --git a/eth2/fork_choice/src/optimized_lmd_ghost.rs b/eth2/fork_choice/src/optimized_lmd_ghost.rs index 3f585e3c1..dba6e60da 100644 --- a/eth2/fork_choice/src/optimized_lmd_ghost.rs +++ b/eth2/fork_choice/src/optimized_lmd_ghost.rs @@ -1,16 +1,11 @@ //! The optimised bitwise LMD-GHOST fork choice rule. -extern crate bit_vec; - use crate::{ForkChoice, ForkChoiceError}; -use db::{ - stores::{BeaconBlockStore, BeaconStateStore}, - ClientDB, -}; use log::{debug, trace}; use std::cmp::Ordering; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; +use store::Store; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot, SlotHeight}; //TODO: Pruning - Children @@ -34,7 +29,7 @@ fn power_of_2_below(x: u64) -> u64 { } /// Stores the necessary data structures to run the optimised lmd ghost algorithm. -pub struct OptimizedLMDGhost { +pub struct OptimizedLMDGhost { /// A cache of known ancestors at given heights for a specific block. //TODO: Consider FnvHashMap cache: HashMap, Hash256>, @@ -46,30 +41,21 @@ pub struct OptimizedLMDGhost { /// The latest attestation targets as a map of validator index to block hash. //TODO: Could this be a fixed size vec latest_attestation_targets: HashMap, - /// Block storage access. - block_store: Arc>, - /// State storage access. - state_store: Arc>, + /// Block and state storage. + store: Arc, max_known_height: SlotHeight, _phantom: PhantomData, } -impl OptimizedLMDGhost -where - T: ClientDB + Sized, -{ - pub fn new( - block_store: Arc>, - state_store: Arc>, - ) -> Self { +impl OptimizedLMDGhost { + pub fn new(store: Arc) -> Self { OptimizedLMDGhost { cache: HashMap::new(), ancestors: vec![HashMap::new(); 16], latest_attestation_targets: HashMap::new(), children: HashMap::new(), max_known_height: SlotHeight::new(0), - block_store, - state_store, + store, _phantom: PhantomData, } } @@ -89,8 +75,8 @@ where let mut latest_votes: HashMap = HashMap::new(); // gets the current weighted votes let current_state: BeaconState = self - .state_store - .get_deserialized(&state_root)? + .store + .get(&state_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?; let active_validator_indices = @@ -121,8 +107,8 @@ where // return None if we can't get the block from the db. let block_height = { let block_slot = self - .block_store - .get_deserialized(&block_hash) + .store + .get::(&block_hash) .ok()? .expect("Should have returned already if None") .slot; @@ -214,7 +200,7 @@ where } } -impl ForkChoice for OptimizedLMDGhost { +impl ForkChoice for OptimizedLMDGhost { fn add_block( &mut self, block: &BeaconBlock, @@ -223,8 +209,8 @@ impl ForkChoice for OptimizedLMDGhost { ) -> Result<(), ForkChoiceError> { // get the height of the parent let parent_height = self - .block_store - .get_deserialized(&block.previous_block_root)? + .store + .get::(&block.previous_block_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(block.previous_block_root))? .slot .height(spec.genesis_slot); @@ -275,16 +261,16 @@ impl ForkChoice for OptimizedLMDGhost { trace!("Old attestation found: {:?}", attestation_target); // get the height of the target block let block_height = self - .block_store - .get_deserialized(&target_block_root)? + .store + .get::(&target_block_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))? .slot .height(spec.genesis_slot); // get the height of the past target block let past_block_height = self - .block_store - .get_deserialized(&attestation_target)? + .store + .get::(&attestation_target)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))? .slot .height(spec.genesis_slot); @@ -308,8 +294,8 @@ impl ForkChoice for OptimizedLMDGhost { justified_block_start ); let block = self - .block_store - .get_deserialized(&justified_block_start)? + .store + .get::(&justified_block_start)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?; let block_slot = block.slot; @@ -400,8 +386,8 @@ impl ForkChoice for OptimizedLMDGhost { // didn't find head yet, proceed to next iteration // update block height block_height = self - .block_store - .get_deserialized(¤t_head)? + .store + .get::(¤t_head)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(current_head))? .slot .height(spec.genesis_slot); diff --git a/eth2/fork_choice/src/slow_lmd_ghost.rs b/eth2/fork_choice/src/slow_lmd_ghost.rs index c9aaa70d1..888356417 100644 --- a/eth2/fork_choice/src/slow_lmd_ghost.rs +++ b/eth2/fork_choice/src/slow_lmd_ghost.rs @@ -1,44 +1,30 @@ -extern crate db; - use crate::{ForkChoice, ForkChoiceError}; -use db::{ - stores::{BeaconBlockStore, BeaconStateStore}, - ClientDB, -}; use log::{debug, trace}; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; +use store::Store; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot}; //TODO: Pruning and syncing -pub struct SlowLMDGhost { +pub struct SlowLMDGhost { /// The latest attestation targets as a map of validator index to block hash. //TODO: Could this be a fixed size vec latest_attestation_targets: HashMap, /// Stores the children for any given parent. children: HashMap>, - /// Block storage access. - block_store: Arc>, - /// State storage access. - state_store: Arc>, + /// Block and state storage. + store: Arc, _phantom: PhantomData, } -impl SlowLMDGhost -where - T: ClientDB + Sized, -{ - pub fn new( - block_store: Arc>, - state_store: Arc>, - ) -> Self { +impl SlowLMDGhost { + pub fn new(store: Arc) -> Self { SlowLMDGhost { latest_attestation_targets: HashMap::new(), children: HashMap::new(), - block_store, - state_store, + store, _phantom: PhantomData, } } @@ -58,8 +44,8 @@ where let mut latest_votes: HashMap = HashMap::new(); // gets the current weighted votes let current_state: BeaconState = self - .state_store - .get_deserialized(&state_root)? + .store + .get(state_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?; let active_validator_indices = @@ -90,15 +76,15 @@ where ) -> Result { let mut count = 0; let block_slot = self - .block_store - .get_deserialized(&block_root)? + .store + .get::(&block_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_root))? .slot; for (vote_hash, votes) in latest_votes.iter() { let (root_at_slot, _) = self - .block_store - .block_at_slot(&vote_hash, block_slot)? + .store + .get_block_at_preceeding_slot(*vote_hash, block_slot)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_root))?; if root_at_slot == *block_root { count += votes; @@ -108,7 +94,7 @@ where } } -impl ForkChoice for SlowLMDGhost { +impl ForkChoice for SlowLMDGhost { /// Process when a block is added fn add_block( &mut self, @@ -150,16 +136,16 @@ impl ForkChoice for SlowLMDGhost { trace!("Old attestation found: {:?}", attestation_target); // get the height of the target block let block_height = self - .block_store - .get_deserialized(&target_block_root)? + .store + .get::(&target_block_root)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))? .slot .height(spec.genesis_slot); // get the height of the past target block let past_block_height = self - .block_store - .get_deserialized(&attestation_target)? + .store + .get::(&attestation_target)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))? .slot .height(spec.genesis_slot); @@ -180,8 +166,8 @@ impl ForkChoice for SlowLMDGhost { ) -> Result { debug!("Running LMD Ghost Fork-choice rule"); let start = self - .block_store - .get_deserialized(&justified_block_start)? + .store + .get::(&justified_block_start)? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?; let start_state_root = start.state_root; diff --git a/eth2/fork_choice/tests/tests.rs b/eth2/fork_choice/tests/tests.rs index 067d39da4..0327e8cb3 100644 --- a/eth2/fork_choice/tests/tests.rs +++ b/eth2/fork_choice/tests/tests.rs @@ -1,26 +1,14 @@ #![cfg(not(debug_assertions))] // Tests the available fork-choice algorithms -extern crate beacon_chain; -extern crate bls; -extern crate db; -// extern crate env_logger; // for debugging -extern crate fork_choice; -extern crate hex; -extern crate log; -extern crate slot_clock; -extern crate types; -extern crate yaml_rust; - pub use beacon_chain::BeaconChain; use bls::Signature; -use db::stores::{BeaconBlockStore, BeaconStateStore}; -use db::MemoryDB; +use store::MemoryStore; +use store::Store; // use env_logger::{Builder, Env}; use fork_choice::{ BitwiseLMDGhost, ForkChoice, ForkChoiceAlgorithm, LongestChain, OptimizedLMDGhost, SlowLMDGhost, }; -use ssz::ssz_encode; use std::collections::HashMap; use std::sync::Arc; use std::{fs::File, io::prelude::*, path::PathBuf}; @@ -106,7 +94,7 @@ fn test_yaml_vectors( // process the tests for test_case in test_cases { // setup a fresh test - let (mut fork_choice, block_store, state_root) = + let (mut fork_choice, store, state_root) = setup_inital_state(&fork_choice_algo, emulated_validators); // keep a hashmap of block_id's to block_hashes (random hashes to abstract block_id) @@ -149,9 +137,7 @@ fn test_yaml_vectors( }; // Store the block. - block_store - .put(&block_hash, &ssz_encode(&beacon_block)[..]) - .unwrap(); + store.put(&block_hash, &beacon_block).unwrap(); // run add block for fork choice if not genesis if parent_id != block_id { @@ -222,29 +208,26 @@ fn load_test_cases_from_yaml(file_path: &str) -> Vec { fn setup_inital_state( fork_choice_algo: &ForkChoiceAlgorithm, num_validators: usize, -) -> (Box, Arc>, Hash256) { - let db = Arc::new(MemoryDB::open()); - let block_store = Arc::new(BeaconBlockStore::new(db.clone())); - let state_store = Arc::new(BeaconStateStore::new(db.clone())); +) -> (Box, Arc, Hash256) { + let store = Arc::new(MemoryStore::open()); // the fork choice instantiation let fork_choice: Box = match fork_choice_algo { ForkChoiceAlgorithm::OptimizedLMDGhost => { - let f: OptimizedLMDGhost = - OptimizedLMDGhost::new(block_store.clone(), state_store.clone()); + let f: OptimizedLMDGhost = + OptimizedLMDGhost::new(store.clone()); Box::new(f) } ForkChoiceAlgorithm::BitwiseLMDGhost => { - let f: BitwiseLMDGhost = - BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + let f: BitwiseLMDGhost = + BitwiseLMDGhost::new(store.clone()); Box::new(f) } ForkChoiceAlgorithm::SlowLMDGhost => { - let f: SlowLMDGhost = - SlowLMDGhost::new(block_store.clone(), state_store.clone()); + let f: SlowLMDGhost = SlowLMDGhost::new(store.clone()); Box::new(f) } - ForkChoiceAlgorithm::LongestChain => Box::new(LongestChain::new(block_store.clone())), + ForkChoiceAlgorithm::LongestChain => Box::new(LongestChain::new(store.clone())), }; let spec = FoundationEthSpec::spec(); @@ -255,12 +238,10 @@ fn setup_inital_state( let (state, _keypairs) = state_builder.build(); let state_root = state.canonical_root(); - state_store - .put(&state_root, &ssz_encode(&state)[..]) - .unwrap(); + store.put(&state_root, &state).unwrap(); // return initialised vars - (fork_choice, block_store, state_root) + (fork_choice, store, state_root) } // convert a block_id into a Hash256 -- assume input is hex encoded;