Merge pull request #383 from sigp/disk-db

Refactor database crate
This commit is contained in:
Paul Hauner 2019-05-22 10:45:45 +10:00 committed by GitHub
commit 389951530a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1011 additions and 1320 deletions

View File

@ -22,7 +22,7 @@ members = [
"eth2/utils/fisher_yates_shuffle", "eth2/utils/fisher_yates_shuffle",
"eth2/utils/test_random_derive", "eth2/utils/test_random_derive",
"beacon_node", "beacon_node",
"beacon_node/db", "beacon_node/store",
"beacon_node/client", "beacon_node/client",
"beacon_node/network", "beacon_node/network",
"beacon_node/eth2-libp2p", "beacon_node/eth2-libp2p",

View File

@ -6,6 +6,7 @@ edition = "2018"
[dependencies] [dependencies]
types = { path = "../eth2/types" } types = { path = "../eth2/types" }
store = { path = "./store" }
client = { path = "client" } client = { path = "client" }
version = { path = "version" } version = { path = "version" }
clap = "2.32.0" clap = "2.32.0"

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
bls = { path = "../../eth2/utils/bls" } bls = { path = "../../eth2/utils/bls" }
boolean-bitfield = { path = "../../eth2/utils/boolean-bitfield" } boolean-bitfield = { path = "../../eth2/utils/boolean-bitfield" }
db = { path = "../db" } store = { path = "../store" }
failure = "0.1" failure = "0.1"
failure_derive = "0.1" failure_derive = "0.1"
hashing = { path = "../../eth2/utils/hashing" } hashing = { path = "../../eth2/utils/hashing" }

View File

@ -1,16 +1,11 @@
use crate::checkpoint::CheckPoint; use crate::checkpoint::CheckPoint;
use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::errors::{BeaconChainError as Error, BlockProductionError};
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
ClientDB, DBError,
};
use fork_choice::{ForkChoice, ForkChoiceError}; use fork_choice::{ForkChoice, ForkChoiceError};
use log::{debug, trace}; use log::{debug, trace};
use operation_pool::DepositInsertStatus; use operation_pool::DepositInsertStatus;
use operation_pool::OperationPool; use operation_pool::OperationPool;
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::{RwLock, RwLockReadGuard};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::ssz_encode;
use state_processing::per_block_processing::errors::{ use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
ExitValidationError, ProposerSlashingValidationError, TransferValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError,
@ -20,6 +15,7 @@ use state_processing::{
per_slot_processing, BlockProcessingError, SlotProcessingError, per_slot_processing, BlockProcessingError, SlotProcessingError,
}; };
use std::sync::Arc; use std::sync::Arc;
use store::{Error as DBError, Store};
use types::*; use types::*;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -83,9 +79,8 @@ impl BlockProcessingOutcome {
} }
} }
pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice, E: EthSpec> { pub struct BeaconChain<T, U, F, E: EthSpec> {
pub block_store: Arc<BeaconBlockStore<T>>, pub store: Arc<T>,
pub state_store: Arc<BeaconStateStore<T>>,
pub slot_clock: U, pub slot_clock: U,
pub op_pool: OperationPool<E>, pub op_pool: OperationPool<E>,
canonical_head: RwLock<CheckPoint<E>>, canonical_head: RwLock<CheckPoint<E>>,
@ -97,15 +92,14 @@ pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice, E: EthS
impl<T, U, F, E> BeaconChain<T, U, F, E> impl<T, U, F, E> BeaconChain<T, U, F, E>
where where
T: ClientDB, T: Store,
U: SlotClock, U: SlotClock,
F: ForkChoice, F: ForkChoice,
E: EthSpec, E: EthSpec,
{ {
/// Instantiate a new Beacon Chain, from genesis. /// Instantiate a new Beacon Chain, from genesis.
pub fn from_genesis( pub fn from_genesis(
state_store: Arc<BeaconStateStore<T>>, store: Arc<T>,
block_store: Arc<BeaconBlockStore<T>>,
slot_clock: U, slot_clock: U,
mut genesis_state: BeaconState<E>, mut genesis_state: BeaconState<E>,
genesis_block: BeaconBlock, genesis_block: BeaconBlock,
@ -113,10 +107,10 @@ where
fork_choice: F, fork_choice: F,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let state_root = genesis_state.canonical_root(); let state_root = genesis_state.canonical_root();
state_store.put(&state_root, &ssz_encode(&genesis_state)[..])?; store.put(&state_root, &genesis_state)?;
let block_root = genesis_block.block_header().canonical_root(); let block_root = genesis_block.block_header().canonical_root();
block_store.put(&block_root, &ssz_encode(&genesis_block)[..])?; store.put(&block_root, &genesis_block)?;
let finalized_head = RwLock::new(CheckPoint::new( let finalized_head = RwLock::new(CheckPoint::new(
genesis_block.clone(), genesis_block.clone(),
@ -134,8 +128,7 @@ where
genesis_state.build_all_caches(&spec)?; genesis_state.build_all_caches(&spec)?;
Ok(Self { Ok(Self {
block_store, store,
state_store,
slot_clock, slot_clock,
op_pool: OperationPool::new(), op_pool: OperationPool::new(),
state: RwLock::new(genesis_state), state: RwLock::new(genesis_state),
@ -235,7 +228,7 @@ where
let new_state_root = state.get_state_root(earliest_historic_slot)?; let new_state_root = state.get_state_root(earliest_historic_slot)?;
// Break if the DB is unable to load the state. // Break if the DB is unable to load the state.
state = match self.state_store.get_deserialized(&new_state_root) { state = match self.store.get(&new_state_root) {
Ok(Some(state)) => state, Ok(Some(state)) => state,
_ => break, _ => break,
} }
@ -262,7 +255,7 @@ where
/// ///
/// May return a database error. /// May return a database error.
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock>, Error> { pub fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock>, Error> {
Ok(self.block_store.get_deserialized(block_root)?) Ok(self.store.get(block_root)?)
} }
/// Update the canonical head to some new values. /// Update the canonical head to some new values.
@ -588,7 +581,7 @@ where
// Load the blocks parent block from the database, returning invalid if that block is not // Load the blocks parent block from the database, returning invalid if that block is not
// found. // found.
let parent_block_root = block.previous_block_root; let parent_block_root = block.previous_block_root;
let parent_block = match self.block_store.get_deserialized(&parent_block_root)? { let parent_block: BeaconBlock = match self.store.get(&parent_block_root)? {
Some(previous_block_root) => previous_block_root, Some(previous_block_root) => previous_block_root,
None => { None => {
return Ok(BlockProcessingOutcome::InvalidBlock( return Ok(BlockProcessingOutcome::InvalidBlock(
@ -601,15 +594,15 @@ where
// It is an error because if know the parent block we should also know the parent state. // It is an error because if know the parent block we should also know the parent state.
let parent_state_root = parent_block.state_root; let parent_state_root = parent_block.state_root;
let parent_state = self let parent_state = self
.state_store .store
.get_deserialized(&parent_state_root)? .get(&parent_state_root)?
.ok_or_else(|| Error::DBInconsistent(format!("Missing state {}", parent_state_root)))?; .ok_or_else(|| Error::DBInconsistent(format!("Missing state {}", parent_state_root)))?;
// TODO: check the block proposer signature BEFORE doing a state transition. This will // TODO: check the block proposer signature BEFORE doing a state transition. This will
// significantly lower exposure surface to DoS attacks. // significantly lower exposure surface to DoS attacks.
// Transition the parent state to the block slot. // Transition the parent state to the block slot.
let mut state = parent_state; let mut state: BeaconState<E> = parent_state;
for _ in state.slot.as_u64()..block.slot.as_u64() { for _ in state.slot.as_u64()..block.slot.as_u64() {
if let Err(e) = per_slot_processing(&mut state, &self.spec) { if let Err(e) = per_slot_processing(&mut state, &self.spec) {
return Ok(BlockProcessingOutcome::InvalidBlock( return Ok(BlockProcessingOutcome::InvalidBlock(
@ -635,8 +628,8 @@ where
} }
// Store the block and state. // Store the block and state.
self.block_store.put(&block_root, &ssz_encode(&block)[..])?; self.store.put(&block_root, &block)?;
self.state_store.put(&state_root, &ssz_encode(&state)[..])?; self.store.put(&state_root, &state)?;
// run the fork_choice add_block logic // run the fork_choice add_block logic
self.fork_choice self.fork_choice
@ -729,15 +722,15 @@ where
.find_head(&present_head, &self.spec)?; .find_head(&present_head, &self.spec)?;
if new_head != present_head { if new_head != present_head {
let block = self let block: BeaconBlock = self
.block_store .store
.get_deserialized(&new_head)? .get(&new_head)?
.ok_or_else(|| Error::MissingBeaconBlock(new_head))?; .ok_or_else(|| Error::MissingBeaconBlock(new_head))?;
let block_root = block.canonical_root(); let block_root = block.canonical_root();
let state = self let state: BeaconState<E> = self
.state_store .store
.get_deserialized(&block.state_root)? .get(&block.state_root)?
.ok_or_else(|| Error::MissingBeaconState(block.state_root))?; .ok_or_else(|| Error::MissingBeaconState(block.state_root))?;
let state_root = state.canonical_root(); let state_root = state.canonical_root();
@ -752,7 +745,7 @@ where
/// Returns `true` if the given block root has not been processed. /// Returns `true` if the given block root has not been processed.
pub fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result<bool, Error> { pub fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result<bool, Error> {
Ok(!self.block_store.exists(beacon_block_root)?) Ok(!self.store.exists::<BeaconBlock>(beacon_block_root)?)
} }
/// Dumps the entire canonical chain, from the head to genesis to a vector for analysis. /// Dumps the entire canonical chain, from the head to genesis to a vector for analysis.
@ -778,19 +771,14 @@ where
break; // Genesis has been reached. break; // Genesis has been reached.
} }
let beacon_block = self let beacon_block: BeaconBlock =
.block_store self.store.get(&beacon_block_root)?.ok_or_else(|| {
.get_deserialized(&beacon_block_root)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?; })?;
let beacon_state_root = beacon_block.state_root; let beacon_state_root = beacon_block.state_root;
let beacon_state = self let beacon_state = self.store.get(&beacon_state_root)?.ok_or_else(|| {
.state_store Error::DBInconsistent(format!("Missing state {}", beacon_state_root))
.get_deserialized(&beacon_state_root)? })?;
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing state {}", beacon_state_root))
})?;
let slot = CheckPoint { let slot = CheckPoint {
beacon_block, beacon_block,
@ -811,7 +799,7 @@ where
impl From<DBError> for Error { impl From<DBError> for Error {
fn from(e: DBError) -> Error { fn from(e: DBError) -> Error {
Error::DBError(e.message) Error::DBError(e)
} }
} }

View File

@ -20,7 +20,7 @@ pub enum BeaconChainError {
UnableToReadSlot, UnableToReadSlot,
BeaconStateError(BeaconStateError), BeaconStateError(BeaconStateError),
DBInconsistent(String), DBInconsistent(String),
DBError(String), DBError(store::Error),
ForkChoiceError(ForkChoiceError), ForkChoiceError(ForkChoiceError),
MissingBeaconBlock(Hash256), MissingBeaconBlock(Hash256),
MissingBeaconState(Hash256), MissingBeaconState(Hash256),

View File

@ -3,12 +3,11 @@
// testnet. These are examples. Also. there is code duplication which can/should be cleaned up. // testnet. These are examples. Also. there is code duplication which can/should be cleaned up.
use crate::BeaconChain; use crate::BeaconChain;
use db::stores::{BeaconBlockStore, BeaconStateStore};
use db::{DiskDB, MemoryDB};
use fork_choice::BitwiseLMDGhost; use fork_choice::BitwiseLMDGhost;
use slot_clock::SystemTimeSlotClock; use slot_clock::SystemTimeSlotClock;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use store::{DiskStore, MemoryStore};
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::test_utils::TestingBeaconStateBuilder; use types::test_utils::TestingBeaconStateBuilder;
use types::{BeaconBlock, ChainSpec, FewValidatorsEthSpec, FoundationEthSpec, Hash256}; use types::{BeaconBlock, ChainSpec, FewValidatorsEthSpec, FoundationEthSpec, Hash256};
@ -20,20 +19,15 @@ pub fn initialise_beacon_chain(
db_name: Option<&PathBuf>, db_name: Option<&PathBuf>,
) -> Arc< ) -> Arc<
BeaconChain< BeaconChain<
DiskDB, DiskStore,
SystemTimeSlotClock, SystemTimeSlotClock,
BitwiseLMDGhost<DiskDB, FoundationEthSpec>, BitwiseLMDGhost<DiskStore, FoundationEthSpec>,
FoundationEthSpec, FoundationEthSpec,
>, >,
> { > {
// set up the db let path = db_name.expect("db_name cannot be None.");
let db = Arc::new(DiskDB::open( let store = DiskStore::open(path).expect("Unable to open DB.");
db_name.expect("Database directory must be included"), let store = Arc::new(store);
None,
));
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, &spec); let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, &spec);
let (genesis_state, _keypairs) = state_builder.build(); let (genesis_state, _keypairs) = state_builder.build();
@ -49,14 +43,13 @@ pub fn initialise_beacon_chain(
) )
.expect("Unable to load SystemTimeSlotClock"); .expect("Unable to load SystemTimeSlotClock");
// Choose the fork choice // Choose the fork choice
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); let fork_choice = BitwiseLMDGhost::new(store.clone());
// Genesis chain // Genesis chain
//TODO: Handle error correctly //TODO: Handle error correctly
Arc::new( Arc::new(
BeaconChain::from_genesis( BeaconChain::from_genesis(
state_store.clone(), store,
block_store.clone(),
slot_clock, slot_clock,
genesis_state, genesis_state,
genesis_block, genesis_block,
@ -68,20 +61,18 @@ pub fn initialise_beacon_chain(
} }
/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time. /// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time.
pub fn initialise_test_beacon_chain( pub fn initialise_test_beacon_chain_with_memory_db(
spec: &ChainSpec, spec: &ChainSpec,
_db_name: Option<&PathBuf>, _db_name: Option<&PathBuf>,
) -> Arc< ) -> Arc<
BeaconChain< BeaconChain<
MemoryDB, MemoryStore,
SystemTimeSlotClock, SystemTimeSlotClock,
BitwiseLMDGhost<MemoryDB, FewValidatorsEthSpec>, BitwiseLMDGhost<MemoryStore, FewValidatorsEthSpec>,
FewValidatorsEthSpec, FewValidatorsEthSpec,
>, >,
> { > {
let db = Arc::new(MemoryDB::open()); let store = Arc::new(MemoryStore::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec); let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec);
let (genesis_state, _keypairs) = state_builder.build(); let (genesis_state, _keypairs) = state_builder.build();
@ -97,14 +88,60 @@ pub fn initialise_test_beacon_chain(
) )
.expect("Unable to load SystemTimeSlotClock"); .expect("Unable to load SystemTimeSlotClock");
// Choose the fork choice // Choose the fork choice
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); let fork_choice = BitwiseLMDGhost::new(store.clone());
// Genesis chain // Genesis chain
//TODO: Handle error correctly //TODO: Handle error correctly
Arc::new( Arc::new(
BeaconChain::from_genesis( BeaconChain::from_genesis(
state_store.clone(), store,
block_store.clone(), slot_clock,
genesis_state,
genesis_block,
spec.clone(),
fork_choice,
)
.expect("Terminate if beacon chain generation fails"),
)
}
/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time.
pub fn initialise_test_beacon_chain_with_disk_db(
spec: &ChainSpec,
db_name: Option<&PathBuf>,
) -> Arc<
BeaconChain<
DiskStore,
SystemTimeSlotClock,
BitwiseLMDGhost<DiskStore, FewValidatorsEthSpec>,
FewValidatorsEthSpec,
>,
> {
let path = db_name.expect("db_name cannot be None.");
let store = DiskStore::open(path).expect("Unable to open DB.");
let store = Arc::new(store);
let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec);
let (genesis_state, _keypairs) = state_builder.build();
let mut genesis_block = BeaconBlock::empty(spec);
genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root());
// Slot clock
let slot_clock = SystemTimeSlotClock::new(
spec.genesis_slot,
genesis_state.genesis_time,
spec.seconds_per_slot,
)
.expect("Unable to load SystemTimeSlotClock");
// Choose the fork choice
let fork_choice = BitwiseLMDGhost::new(store.clone());
// Genesis chain
//TODO: Handle error correctly
Arc::new(
BeaconChain::from_genesis(
store,
slot_clock, slot_clock,
genesis_state, genesis_state,
genesis_block, genesis_block,

View File

@ -7,7 +7,6 @@ pub mod test_utils;
pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock};
pub use self::checkpoint::CheckPoint; pub use self::checkpoint::CheckPoint;
pub use self::errors::{BeaconChainError, BlockProductionError}; pub use self::errors::{BeaconChainError, BlockProductionError};
pub use db;
pub use fork_choice; pub use fork_choice;
pub use parking_lot; pub use parking_lot;
pub use slot_clock; pub use slot_clock;
@ -15,4 +14,5 @@ pub use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, AttestationValidationError, AttesterSlashingValidationError, DepositValidationError,
ExitValidationError, ProposerSlashingValidationError, TransferValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError,
}; };
pub use store;
pub use types; pub use types;

View File

@ -1,17 +1,18 @@
pub use crate::{BeaconChain, BeaconChainError, CheckPoint}; pub use crate::{BeaconChain, BeaconChainError, CheckPoint};
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
MemoryDB,
};
use fork_choice::BitwiseLMDGhost; use fork_choice::BitwiseLMDGhost;
use slot_clock::TestingSlotClock; use slot_clock::TestingSlotClock;
use std::sync::Arc; use std::sync::Arc;
use store::MemoryStore;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::*; use types::*;
use types::{test_utils::TestingBeaconStateBuilder, EthSpec, FewValidatorsEthSpec}; use types::{test_utils::TestingBeaconStateBuilder, EthSpec, FewValidatorsEthSpec};
type TestingBeaconChain<E> = type TestingBeaconChain<E> = BeaconChain<
BeaconChain<MemoryDB, TestingSlotClock, BitwiseLMDGhost<MemoryDB, FewValidatorsEthSpec>, E>; MemoryStore,
TestingSlotClock,
BitwiseLMDGhost<MemoryStore, FewValidatorsEthSpec>,
E,
>;
pub struct TestingBeaconChainBuilder<E: EthSpec> { pub struct TestingBeaconChainBuilder<E: EthSpec> {
state_builder: TestingBeaconStateBuilder<E>, state_builder: TestingBeaconStateBuilder<E>,
@ -19,11 +20,9 @@ pub struct TestingBeaconChainBuilder<E: EthSpec> {
impl<E: EthSpec> TestingBeaconChainBuilder<E> { impl<E: EthSpec> TestingBeaconChainBuilder<E> {
pub fn build(self, spec: &ChainSpec) -> TestingBeaconChain<E> { pub fn build(self, spec: &ChainSpec) -> TestingBeaconChain<E> {
let db = Arc::new(MemoryDB::open()); let store = Arc::new(MemoryStore::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64()); let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64());
let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); let fork_choice = BitwiseLMDGhost::new(store.clone());
let (genesis_state, _keypairs) = self.state_builder.build(); let (genesis_state, _keypairs) = self.state_builder.build();
@ -32,8 +31,7 @@ impl<E: EthSpec> TestingBeaconChainBuilder<E> {
// Create the Beacon Chain // Create the Beacon Chain
BeaconChain::from_genesis( BeaconChain::from_genesis(
state_store.clone(), store,
block_store.clone(),
slot_clock, slot_clock,
genesis_state, genesis_state,
genesis_block, genesis_block,

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
beacon_chain = { path = "../beacon_chain" } beacon_chain = { path = "../beacon_chain" }
network = { path = "../network" } network = { path = "../network" }
db = { path = "../db" } store = { path = "../store" }
rpc = { path = "../rpc" } rpc = { path = "../rpc" }
fork_choice = { path = "../../eth2/fork_choice" } fork_choice = { path = "../../eth2/fork_choice" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }

View File

@ -1,5 +1,4 @@
use clap::ArgMatches; use clap::ArgMatches;
use db::DBType;
use fork_choice::ForkChoiceAlgorithm; use fork_choice::ForkChoiceAlgorithm;
use network::NetworkConfig; use network::NetworkConfig;
use slog::error; use slog::error;
@ -12,6 +11,12 @@ use types::multiaddr::ToMultiaddr;
use types::Multiaddr; use types::Multiaddr;
use types::{ChainSpec, EthSpec, LighthouseTestnetEthSpec}; use types::{ChainSpec, EthSpec, LighthouseTestnetEthSpec};
#[derive(Debug, Clone)]
pub enum DBType {
Memory,
Disk,
}
/// Stores the client configuration for this Lighthouse instance. /// Stores the client configuration for this Lighthouse instance.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ClientConfig { pub struct ClientConfig {
@ -48,7 +53,7 @@ impl Default for ClientConfig {
// default to memory db for now // default to memory db for now
db_type: DBType::Memory, db_type: DBType::Memory,
// default db name for disk-based dbs // default db name for disk-based dbs
db_name: data_dir.join("chain.db"), db_name: data_dir.join("chain_db"),
rpc_conf: rpc::RPCConfig::default(), rpc_conf: rpc::RPCConfig::default(),
} }
} }
@ -131,6 +136,12 @@ impl ClientConfig {
} }
} }
match args.value_of("db") {
Some("disk") => config.db_type = DBType::Disk,
Some("memory") => config.db_type = DBType::Memory,
_ => unreachable!(), // clap prevents this.
};
Ok(config) Ok(config)
} }
} }

View File

@ -1,15 +1,15 @@
use crate::{ArcBeaconChain, ClientConfig}; use crate::{ArcBeaconChain, ClientConfig};
use beacon_chain::{ use beacon_chain::{
db::{ClientDB, DiskDB, MemoryDB},
fork_choice::BitwiseLMDGhost, fork_choice::BitwiseLMDGhost,
initialise, initialise,
slot_clock::{SlotClock, SystemTimeSlotClock}, slot_clock::{SlotClock, SystemTimeSlotClock},
store::{DiskStore, MemoryStore, Store},
}; };
use fork_choice::ForkChoice; use fork_choice::ForkChoice;
use types::{EthSpec, FewValidatorsEthSpec, FoundationEthSpec}; use types::{EthSpec, FewValidatorsEthSpec, FoundationEthSpec};
pub trait ClientTypes { pub trait ClientTypes {
type DB: ClientDB + 'static; type DB: Store + 'static;
type SlotClock: SlotClock + 'static; type SlotClock: SlotClock + 'static;
type ForkChoice: ForkChoice + 'static; type ForkChoice: ForkChoice + 'static;
type EthSpec: EthSpec + 'static; type EthSpec: EthSpec + 'static;
@ -22,9 +22,9 @@ pub trait ClientTypes {
pub struct StandardClientType; pub struct StandardClientType;
impl ClientTypes for StandardClientType { impl ClientTypes for StandardClientType {
type DB = DiskDB; type DB = DiskStore;
type SlotClock = SystemTimeSlotClock; type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<DiskDB, Self::EthSpec>; type ForkChoice = BitwiseLMDGhost<Self::DB, Self::EthSpec>;
type EthSpec = FoundationEthSpec; type EthSpec = FoundationEthSpec;
fn initialise_beacon_chain( fn initialise_beacon_chain(
@ -34,17 +34,32 @@ impl ClientTypes for StandardClientType {
} }
} }
pub struct TestingClientType; pub struct MemoryStoreTestingClientType;
impl ClientTypes for TestingClientType { impl ClientTypes for MemoryStoreTestingClientType {
type DB = MemoryDB; type DB = MemoryStore;
type SlotClock = SystemTimeSlotClock; type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<MemoryDB, Self::EthSpec>; type ForkChoice = BitwiseLMDGhost<Self::DB, Self::EthSpec>;
type EthSpec = FewValidatorsEthSpec; type EthSpec = FewValidatorsEthSpec;
fn initialise_beacon_chain( fn initialise_beacon_chain(
config: &ClientConfig, config: &ClientConfig,
) -> ArcBeaconChain<Self::DB, Self::SlotClock, Self::ForkChoice, Self::EthSpec> { ) -> ArcBeaconChain<Self::DB, Self::SlotClock, Self::ForkChoice, Self::EthSpec> {
initialise::initialise_test_beacon_chain(&config.spec, None) initialise::initialise_test_beacon_chain_with_memory_db(&config.spec, None)
}
}
pub struct DiskStoreTestingClientType;
impl ClientTypes for DiskStoreTestingClientType {
type DB = DiskStore;
type SlotClock = SystemTimeSlotClock;
type ForkChoice = BitwiseLMDGhost<Self::DB, Self::EthSpec>;
type EthSpec = FewValidatorsEthSpec;
fn initialise_beacon_chain(
config: &ClientConfig,
) -> ArcBeaconChain<Self::DB, Self::SlotClock, Self::ForkChoice, Self::EthSpec> {
initialise::initialise_test_beacon_chain_with_disk_db(&config.spec, Some(&config.db_name))
} }
} }

View File

@ -6,9 +6,8 @@ pub mod error;
pub mod notifier; pub mod notifier;
use beacon_chain::BeaconChain; use beacon_chain::BeaconChain;
pub use client_config::ClientConfig; pub use client_config::{ClientConfig, DBType};
pub use client_types::ClientTypes; pub use client_types::ClientTypes;
use db::ClientDB;
use exit_future::Signal; use exit_future::Signal;
use fork_choice::ForkChoice; use fork_choice::ForkChoice;
use futures::{future::Future, Stream}; use futures::{future::Future, Stream};
@ -18,6 +17,7 @@ use slot_clock::SlotClock;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use store::Store;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use tokio::timer::Interval; use tokio::timer::Interval;
use types::EthSpec; use types::EthSpec;
@ -146,7 +146,7 @@ impl<TClientType: ClientTypes> Client<TClientType> {
fn do_state_catchup<T, U, F, E>(chain: &Arc<BeaconChain<T, U, F, E>>, log: &slog::Logger) fn do_state_catchup<T, U, F, E>(chain: &Arc<BeaconChain<T, U, F, E>>, log: &slog::Logger)
where where
T: ClientDB, T: Store,
U: SlotClock, U: SlotClock,
F: ForkChoice, F: ForkChoice,
E: EthSpec, E: EthSpec,

View File

@ -1,13 +0,0 @@
[package]
name = "db"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
blake2-rfc = "0.2.18"
bls = { path = "../../eth2/utils/bls" }
bytes = "0.4.10"
rocksdb = "0.10.1"
ssz = { path = "../../eth2/utils/ssz" }
types = { path = "../../eth2/types" }

View File

@ -1,21 +0,0 @@
extern crate blake2_rfc as blake2;
extern crate bls;
extern crate rocksdb;
mod disk_db;
mod memory_db;
pub mod stores;
mod traits;
use self::stores::COLUMNS;
pub use self::disk_db::DiskDB;
pub use self::memory_db::MemoryDB;
pub use self::traits::{ClientDB, DBError, DBValue};
/// Currently available database options
#[derive(Debug, Clone)]
pub enum DBType {
Memory,
RocksDB,
}

View File

@ -1,236 +0,0 @@
use super::blake2::blake2b::blake2b;
use super::COLUMNS;
use super::{ClientDB, DBError, DBValue};
use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
type ColumnHashSet = HashSet<String>;
/// An in-memory database implementing the ClientDB trait.
///
/// It is not particularily optimized, it exists for ease and speed of testing. It's not expected
/// this DB would be used outside of tests.
pub struct MemoryDB {
db: RwLock<DBHashMap>,
known_columns: RwLock<ColumnHashSet>,
}
impl MemoryDB {
/// Open the in-memory database.
///
/// All columns must be supplied initially, you will get an error if you try to access a column
/// that was not declared here. This condition is enforced artificially to simulate RocksDB.
pub fn open() -> Self {
let db: DBHashMap = HashMap::new();
let mut known_columns: ColumnHashSet = HashSet::new();
for col in &COLUMNS {
known_columns.insert(col.to_string());
}
Self {
db: RwLock::new(db),
known_columns: RwLock::new(known_columns),
}
}
/// Hashes a key and a column name in order to get a unique key for the supplied column.
fn get_key_for_col(col: &str, key: &[u8]) -> Vec<u8> {
blake2b(32, col.as_bytes(), key).as_bytes().to_vec()
}
}
impl ClientDB for MemoryDB {
/// Get the value of some key from the database. Returns `None` if the key does not exist.
fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError> {
// Panic if the DB locks are poisoned.
let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.get(&column_key).and_then(|val| Some(val.clone())))
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
}
/// Puts a key in the database.
fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> {
// Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
db.insert(column_key, val.to_vec());
Ok(())
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
}
/// Return true if some key exists in some column.
fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError> {
// Panic if the DB locks are poisoned.
let db = self.db.read().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
Ok(db.contains_key(&column_key))
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
}
/// Delete some key from the database.
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError> {
// Panic if the DB locks are poisoned.
let mut db = self.db.write().unwrap();
let known_columns = self.known_columns.read().unwrap();
if known_columns.contains(&col.to_string()) {
let column_key = MemoryDB::get_key_for_col(col, key);
db.remove(&column_key);
Ok(())
} else {
Err(DBError {
message: "Unknown column".to_string(),
})
}
}
}
#[cfg(test)]
mod tests {
use super::super::stores::{BLOCKS_DB_COLUMN, VALIDATOR_DB_COLUMN};
use super::super::ClientDB;
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_memorydb_can_delete() {
let col_a: &str = BLOCKS_DB_COLUMN;
let db = MemoryDB::open();
db.put(col_a, "dogs".as_bytes(), "lol".as_bytes()).unwrap();
assert_eq!(
db.get(col_a, "dogs".as_bytes()).unwrap().unwrap(),
"lol".as_bytes()
);
db.delete(col_a, "dogs".as_bytes()).unwrap();
assert_eq!(db.get(col_a, "dogs".as_bytes()).unwrap(), None);
}
#[test]
fn test_memorydb_column_access() {
let col_a: &str = BLOCKS_DB_COLUMN;
let col_b: &str = VALIDATOR_DB_COLUMN;
let db = MemoryDB::open();
/*
* Testing that if we write to the same key in different columns that
* there is not an overlap.
*/
db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap();
db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap();
assert_eq!(
db.get(col_a, "same".as_bytes()).unwrap().unwrap(),
"cat".as_bytes()
);
assert_eq!(
db.get(col_b, "same".as_bytes()).unwrap().unwrap(),
"dog".as_bytes()
);
}
#[test]
fn test_memorydb_unknown_column_access() {
let col_a: &str = BLOCKS_DB_COLUMN;
let col_x: &str = "ColumnX";
let db = MemoryDB::open();
/*
* Test that we get errors when using undeclared columns
*/
assert!(db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).is_ok());
assert!(db.put(col_x, "cats".as_bytes(), "lol".as_bytes()).is_err());
assert!(db.get(col_a, "cats".as_bytes()).is_ok());
assert!(db.get(col_x, "cats".as_bytes()).is_err());
}
#[test]
fn test_memorydb_exists() {
let col_a: &str = BLOCKS_DB_COLUMN;
let col_b: &str = VALIDATOR_DB_COLUMN;
let db = MemoryDB::open();
/*
* Testing that if we write to the same key in different columns that
* there is not an overlap.
*/
db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).unwrap();
assert_eq!(true, db.exists(col_a, "cats".as_bytes()).unwrap());
assert_eq!(false, db.exists(col_b, "cats".as_bytes()).unwrap());
assert_eq!(false, db.exists(col_a, "dogs".as_bytes()).unwrap());
assert_eq!(false, db.exists(col_b, "dogs".as_bytes()).unwrap());
}
#[test]
fn test_memorydb_threading() {
let col_name: &str = BLOCKS_DB_COLUMN;
let db = Arc::new(MemoryDB::open());
let thread_count = 10;
let write_count = 10;
// We're execting the product of these numbers to fit in one byte.
assert!(thread_count * write_count <= 255);
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let db = db.clone();
let col = col_name.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = (t * w) as u8;
let val = 42;
db.put(&col, &vec![key], &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = (t * w) as u8;
let val = db.get(&col_name, &vec![key]).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
}
}

View File

@ -1,246 +0,0 @@
use super::BLOCKS_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use ssz::Decode;
use std::sync::Arc;
use types::{BeaconBlock, Hash256, Slot};
#[derive(Clone, Debug, PartialEq)]
pub enum BeaconBlockAtSlotError {
UnknownBeaconBlock(Hash256),
InvalidBeaconBlock(Hash256),
DBError(String),
}
pub struct BeaconBlockStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
// Implements `put`, `get`, `exists` and `delete` for the store.
impl_crud_for_store!(BeaconBlockStore, DB_COLUMN);
impl<T: ClientDB> BeaconBlockStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
pub fn get_deserialized(&self, hash: &Hash256) -> Result<Option<BeaconBlock>, DBError> {
match self.get(&hash)? {
None => Ok(None),
Some(ssz) => {
let block = BeaconBlock::from_ssz_bytes(&ssz).map_err(|_| DBError {
message: "Bad BeaconBlock SSZ.".to_string(),
})?;
Ok(Some(block))
}
}
}
/// Retrieve the block at a slot given a "head_hash" and a slot.
///
/// A "head_hash" must be a block hash with a slot number greater than or equal to the desired
/// slot.
///
/// This function will read each block down the chain until it finds a block with the given
/// slot number. If the slot is skipped, the function will return None.
///
/// If a block is found, a tuple of (block_hash, serialized_block) is returned.
///
/// Note: this function uses a loop instead of recursion as the compiler is over-strict when it
/// comes to recursion and the `impl Trait` pattern. See:
/// https://stackoverflow.com/questions/54032940/using-impl-trait-in-a-recursive-function
pub fn block_at_slot(
&self,
head_hash: &Hash256,
slot: Slot,
) -> Result<Option<(Hash256, BeaconBlock)>, BeaconBlockAtSlotError> {
let mut current_hash = *head_hash;
loop {
if let Some(block) = self.get_deserialized(&current_hash)? {
if block.slot == slot {
break Ok(Some((current_hash, block)));
} else if block.slot < slot {
break Ok(None);
} else {
current_hash = block.previous_block_root;
}
} else {
break Err(BeaconBlockAtSlotError::UnknownBeaconBlock(current_hash));
}
}
}
}
impl From<DBError> for BeaconBlockAtSlotError {
fn from(e: DBError) -> Self {
BeaconBlockAtSlotError::DBError(e.message)
}
}
#[cfg(test)]
mod tests {
use super::super::super::MemoryDB;
use super::*;
use std::sync::Arc;
use std::thread;
use ssz::ssz_encode;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use types::BeaconBlock;
use types::Hash256;
test_crud_for_store!(BeaconBlockStore, DB_COLUMN);
#[test]
fn head_hash_slot_too_low() {
let db = Arc::new(MemoryDB::open());
let bs = Arc::new(BeaconBlockStore::new(db.clone()));
let mut rng = XorShiftRng::from_seed([42; 16]);
let mut block = BeaconBlock::random_for_test(&mut rng);
block.slot = Slot::from(10_u64);
let block_root = block.canonical_root();
bs.put(&block_root, &ssz_encode(&block)).unwrap();
let result = bs.block_at_slot(&block_root, Slot::from(11_u64)).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_invalid_block_at_slot() {
let db = Arc::new(MemoryDB::open());
let store = BeaconBlockStore::new(db.clone());
let ssz = "definitly not a valid block".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert_eq!(
store.block_at_slot(hash, Slot::from(42_u64)),
Err(BeaconBlockAtSlotError::DBError(
"Bad BeaconBlock SSZ.".into()
))
);
}
#[test]
fn test_unknown_block_at_slot() {
let db = Arc::new(MemoryDB::open());
let store = BeaconBlockStore::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
let other_hash = &Hash256::from([0xBB; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert_eq!(
store.block_at_slot(other_hash, Slot::from(42_u64)),
Err(BeaconBlockAtSlotError::UnknownBeaconBlock(*other_hash))
);
}
#[test]
fn test_block_store_on_memory_db() {
let db = Arc::new(MemoryDB::open());
let bs = Arc::new(BeaconBlockStore::new(db.clone()));
let thread_count = 10;
let write_count = 10;
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let bs = bs.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = t * w;
let val = 42;
bs.put(&Hash256::from_low_u64_le(key), &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = t * w;
assert!(bs.exists(&Hash256::from_low_u64_le(key)).unwrap());
let val = bs.get(&Hash256::from_low_u64_le(key)).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
}
#[test]
#[ignore]
fn test_block_at_slot() {
let db = Arc::new(MemoryDB::open());
let bs = Arc::new(BeaconBlockStore::new(db.clone()));
let mut rng = XorShiftRng::from_seed([42; 16]);
// Specify test block parameters.
let hashes = [
Hash256::from([0; 32]),
Hash256::from([1; 32]),
Hash256::from([2; 32]),
Hash256::from([3; 32]),
Hash256::from([4; 32]),
];
let parent_hashes = [
Hash256::from([255; 32]), // Genesis block.
Hash256::from([0; 32]),
Hash256::from([1; 32]),
Hash256::from([2; 32]),
Hash256::from([3; 32]),
];
let unknown_hash = Hash256::from([101; 32]); // different from all above
let slots: Vec<Slot> = vec![0, 1, 3, 4, 5].iter().map(|x| Slot::new(*x)).collect();
// Generate a vec of random blocks and store them in the DB.
let block_count = 5;
let mut blocks: Vec<BeaconBlock> = Vec::with_capacity(5);
for i in 0..block_count {
let mut block = BeaconBlock::random_for_test(&mut rng);
block.previous_block_root = parent_hashes[i];
block.slot = slots[i];
let ssz = ssz_encode(&block);
db.put(DB_COLUMN, hashes[i].as_bytes(), &ssz).unwrap();
blocks.push(block);
}
// Test that certain slots can be reached from certain hashes.
let test_cases = vec![(4, 4), (4, 3), (4, 2), (4, 1), (4, 0)];
for (hashes_index, slot_index) in test_cases {
let (matched_block_hash, block) = bs
.block_at_slot(&hashes[hashes_index], slots[slot_index])
.unwrap()
.unwrap();
assert_eq!(matched_block_hash, hashes[slot_index]);
assert_eq!(block.slot, slots[slot_index]);
}
let ssz = bs.block_at_slot(&hashes[4], Slot::new(2)).unwrap();
assert_eq!(ssz, None);
let ssz = bs.block_at_slot(&hashes[4], Slot::new(6)).unwrap();
assert_eq!(ssz, None);
let ssz = bs.block_at_slot(&unknown_hash, Slot::new(2));
assert_eq!(
ssz,
Err(BeaconBlockAtSlotError::UnknownBeaconBlock(unknown_hash))
);
}
}

View File

@ -1,65 +0,0 @@
use super::STATES_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use ssz::Decode;
use std::sync::Arc;
use types::{BeaconState, EthSpec, Hash256};
pub struct BeaconStateStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
// Implements `put`, `get`, `exists` and `delete` for the store.
impl_crud_for_store!(BeaconStateStore, DB_COLUMN);
impl<T: ClientDB> BeaconStateStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
pub fn get_deserialized<E: EthSpec>(
&self,
hash: &Hash256,
) -> Result<Option<BeaconState<E>>, DBError> {
match self.get(&hash)? {
None => Ok(None),
Some(ssz) => {
let state = BeaconState::from_ssz_bytes(&ssz).map_err(|_| DBError {
message: "Bad State SSZ.".to_string(),
})?;
Ok(Some(state))
}
}
}
}
#[cfg(test)]
mod tests {
use super::super::super::MemoryDB;
use super::*;
use ssz::ssz_encode;
use std::sync::Arc;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use types::{FoundationBeaconState, Hash256};
test_crud_for_store!(BeaconStateStore, DB_COLUMN);
#[test]
fn test_reader() {
let db = Arc::new(MemoryDB::open());
let store = BeaconStateStore::new(db.clone());
let mut rng = XorShiftRng::from_seed([42; 16]);
let state: FoundationBeaconState = BeaconState::random_for_test(&mut rng);
let state_root = state.canonical_root();
store.put(&state_root, &ssz_encode(&state)).unwrap();
let decoded = store.get_deserialized(&state_root).unwrap().unwrap();
assert_eq!(state, decoded);
}
}

View File

@ -1,103 +0,0 @@
macro_rules! impl_crud_for_store {
($store: ident, $db_column: expr) => {
impl<T: ClientDB> $store<T> {
pub fn put(&self, hash: &Hash256, ssz: &[u8]) -> Result<(), DBError> {
self.db.put($db_column, hash.as_bytes(), ssz)
}
pub fn get(&self, hash: &Hash256) -> Result<Option<Vec<u8>>, DBError> {
self.db.get($db_column, hash.as_bytes())
}
pub fn exists(&self, hash: &Hash256) -> Result<bool, DBError> {
self.db.exists($db_column, hash.as_bytes())
}
pub fn delete(&self, hash: &Hash256) -> Result<(), DBError> {
self.db.delete($db_column, hash.as_bytes())
}
}
};
}
#[cfg(test)]
macro_rules! test_crud_for_store {
($store: ident, $db_column: expr) => {
#[test]
fn test_put() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
store.put(hash, ssz).unwrap();
assert_eq!(db.get(DB_COLUMN, hash.as_bytes()).unwrap().unwrap(), ssz);
}
#[test]
fn test_get() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert_eq!(store.get(hash).unwrap().unwrap(), ssz);
}
#[test]
fn test_get_unknown() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
let other_hash = &Hash256::from([0xBB; 32]);
db.put(DB_COLUMN, other_hash.as_bytes(), ssz).unwrap();
assert_eq!(store.get(hash).unwrap(), None);
}
#[test]
fn test_exists() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert!(store.exists(hash).unwrap());
}
#[test]
fn test_block_does_not_exist() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
let other_hash = &Hash256::from([0xBB; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert!(!store.exists(other_hash).unwrap());
}
#[test]
fn test_delete() {
let db = Arc::new(MemoryDB::open());
let store = $store::new(db.clone());
let ssz = "some bytes".as_bytes();
let hash = &Hash256::from([0xAA; 32]);
db.put(DB_COLUMN, hash.as_bytes(), ssz).unwrap();
assert!(db.exists(DB_COLUMN, hash.as_bytes()).unwrap());
store.delete(hash).unwrap();
assert!(!db.exists(DB_COLUMN, hash.as_bytes()).unwrap());
}
};
}

View File

@ -1,25 +0,0 @@
use super::{ClientDB, DBError};
#[macro_use]
mod macros;
mod beacon_block_store;
mod beacon_state_store;
mod pow_chain_store;
mod validator_store;
pub use self::beacon_block_store::{BeaconBlockAtSlotError, BeaconBlockStore};
pub use self::beacon_state_store::BeaconStateStore;
pub use self::pow_chain_store::PoWChainStore;
pub use self::validator_store::{ValidatorStore, ValidatorStoreError};
pub const BLOCKS_DB_COLUMN: &str = "blocks";
pub const STATES_DB_COLUMN: &str = "states";
pub const POW_CHAIN_DB_COLUMN: &str = "powchain";
pub const VALIDATOR_DB_COLUMN: &str = "validator";
pub const COLUMNS: [&str; 4] = [
BLOCKS_DB_COLUMN,
STATES_DB_COLUMN,
POW_CHAIN_DB_COLUMN,
VALIDATOR_DB_COLUMN,
];

View File

@ -1,68 +0,0 @@
use super::POW_CHAIN_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use std::sync::Arc;
pub struct PoWChainStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
impl<T: ClientDB> PoWChainStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
pub fn put_block_hash(&self, hash: &[u8]) -> Result<(), DBError> {
self.db.put(DB_COLUMN, hash, &[0])
}
pub fn block_hash_exists(&self, hash: &[u8]) -> Result<bool, DBError> {
self.db.exists(DB_COLUMN, hash)
}
}
#[cfg(test)]
mod tests {
extern crate types;
use super::super::super::MemoryDB;
use super::*;
use self::types::Hash256;
#[test]
fn test_put_block_hash() {
let db = Arc::new(MemoryDB::open());
let store = PoWChainStore::new(db.clone());
let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec();
store.put_block_hash(hash).unwrap();
assert!(db.exists(DB_COLUMN, hash).unwrap());
}
#[test]
fn test_block_hash_exists() {
let db = Arc::new(MemoryDB::open());
let store = PoWChainStore::new(db.clone());
let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec();
db.put(DB_COLUMN, hash, &[0]).unwrap();
assert!(store.block_hash_exists(hash).unwrap());
}
#[test]
fn test_block_hash_does_not_exist() {
let db = Arc::new(MemoryDB::open());
let store = PoWChainStore::new(db.clone());
let hash = &Hash256::from([0xAA; 32]).as_bytes().to_vec();
let other_hash = &Hash256::from([0xBB; 32]).as_bytes().to_vec();
db.put(DB_COLUMN, hash, &[0]).unwrap();
assert!(!store.block_hash_exists(other_hash).unwrap());
}
}

View File

@ -1,215 +0,0 @@
extern crate bytes;
use self::bytes::{BufMut, BytesMut};
use super::VALIDATOR_DB_COLUMN as DB_COLUMN;
use super::{ClientDB, DBError};
use bls::PublicKey;
use ssz::{Decode, Encode};
use std::sync::Arc;
#[derive(Debug, PartialEq)]
pub enum ValidatorStoreError {
DBError(String),
DecodeError,
}
impl From<DBError> for ValidatorStoreError {
fn from(error: DBError) -> Self {
ValidatorStoreError::DBError(error.message)
}
}
#[derive(Debug, PartialEq)]
enum KeyPrefixes {
PublicKey,
}
pub struct ValidatorStore<T>
where
T: ClientDB,
{
db: Arc<T>,
}
impl<T: ClientDB> ValidatorStore<T> {
pub fn new(db: Arc<T>) -> Self {
Self { db }
}
fn prefix_bytes(&self, key_prefix: &KeyPrefixes) -> Vec<u8> {
match key_prefix {
KeyPrefixes::PublicKey => b"pubkey".to_vec(),
}
}
fn get_db_key_for_index(&self, key_prefix: &KeyPrefixes, index: usize) -> Vec<u8> {
let mut buf = BytesMut::with_capacity(6 + 8);
buf.put(self.prefix_bytes(key_prefix));
buf.put_u64_be(index as u64);
buf.take().to_vec()
}
pub fn put_public_key_by_index(
&self,
index: usize,
public_key: &PublicKey,
) -> Result<(), ValidatorStoreError> {
let key = self.get_db_key_for_index(&KeyPrefixes::PublicKey, index);
let val = public_key.as_ssz_bytes();
self.db
.put(DB_COLUMN, &key[..], &val[..])
.map_err(ValidatorStoreError::from)
}
pub fn get_public_key_by_index(
&self,
index: usize,
) -> Result<Option<PublicKey>, ValidatorStoreError> {
let key = self.get_db_key_for_index(&KeyPrefixes::PublicKey, index);
let val = self.db.get(DB_COLUMN, &key[..])?;
match val {
None => Ok(None),
Some(val) => match PublicKey::from_ssz_bytes(&val) {
Ok(key) => Ok(Some(key)),
Err(_) => Err(ValidatorStoreError::DecodeError),
},
}
}
}
#[cfg(test)]
mod tests {
use super::super::super::MemoryDB;
use super::*;
use bls::Keypair;
#[test]
fn test_prefix_bytes() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
assert_eq!(
store.prefix_bytes(&KeyPrefixes::PublicKey),
b"pubkey".to_vec()
);
}
#[test]
fn test_get_db_key_for_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let mut buf = BytesMut::with_capacity(6 + 8);
buf.put(b"pubkey".to_vec());
buf.put_u64_be(42);
assert_eq!(
store.get_db_key_for_index(&KeyPrefixes::PublicKey, 42),
buf.take().to_vec()
)
}
#[test]
fn test_put_public_key_by_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let index = 3;
let public_key = Keypair::random().pk;
store.put_public_key_by_index(index, &public_key).unwrap();
let public_key_at_index = db
.get(
DB_COLUMN,
&store.get_db_key_for_index(&KeyPrefixes::PublicKey, index)[..],
)
.unwrap()
.unwrap();
assert_eq!(public_key_at_index, public_key.as_ssz_bytes());
}
#[test]
fn test_get_public_key_by_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let index = 4;
let public_key = Keypair::random().pk;
db.put(
DB_COLUMN,
&store.get_db_key_for_index(&KeyPrefixes::PublicKey, index)[..],
&public_key.as_ssz_bytes(),
)
.unwrap();
let public_key_at_index = store.get_public_key_by_index(index).unwrap().unwrap();
assert_eq!(public_key_at_index, public_key);
}
#[test]
fn test_get_public_key_by_unknown_index() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let public_key = Keypair::random().pk;
db.put(
DB_COLUMN,
&store.get_db_key_for_index(&KeyPrefixes::PublicKey, 3)[..],
&public_key.as_ssz_bytes(),
)
.unwrap();
let public_key_at_index = store.get_public_key_by_index(4).unwrap();
assert_eq!(public_key_at_index, None);
}
#[test]
fn test_get_invalid_public_key() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db.clone());
let key = store.get_db_key_for_index(&KeyPrefixes::PublicKey, 42);
db.put(DB_COLUMN, &key[..], "cats".as_bytes()).unwrap();
assert_eq!(
store.get_public_key_by_index(42),
Err(ValidatorStoreError::DecodeError)
);
}
#[test]
fn test_validator_store_put_get() {
let db = Arc::new(MemoryDB::open());
let store = ValidatorStore::new(db);
let keys = vec![
Keypair::random(),
Keypair::random(),
Keypair::random(),
Keypair::random(),
Keypair::random(),
];
for i in 0..keys.len() {
store.put_public_key_by_index(i, &keys[i].pk).unwrap();
}
/*
* Check all keys are retrieved correctly.
*/
for i in 0..keys.len() {
let retrieved = store.get_public_key_by_index(i).unwrap().unwrap();
assert_eq!(retrieved, keys[i].pk);
}
/*
* Check that an index that wasn't stored returns None.
*/
assert!(store
.get_public_key_by_index(keys.len() + 1)
.unwrap()
.is_none());
}
}

View File

@ -1,28 +0,0 @@
pub type DBValue = Vec<u8>;
#[derive(Debug)]
pub struct DBError {
pub message: String,
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}
/// A generic database to be used by the "client' (i.e.,
/// the lighthouse blockchain client).
///
/// The purpose of having this generic trait is to allow the
/// program to use a persistent on-disk database during production,
/// but use a transient database during tests.
pub trait ClientDB: Sync + Send {
fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError>;
fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError>;
fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError>;
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError>;
}

View File

@ -1,9 +1,9 @@
use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::BeaconChain as RawBeaconChain;
use beacon_chain::{ use beacon_chain::{
db::ClientDB,
fork_choice::ForkChoice, fork_choice::ForkChoice,
parking_lot::RwLockReadGuard, parking_lot::RwLockReadGuard,
slot_clock::SlotClock, slot_clock::SlotClock,
store::Store,
types::{BeaconState, ChainSpec}, types::{BeaconState, ChainSpec},
AttestationValidationError, CheckPoint, AttestationValidationError, CheckPoint,
}; };
@ -66,7 +66,7 @@ pub trait BeaconChain<E: EthSpec>: Send + Sync {
impl<T, U, F, E> BeaconChain<E> for RawBeaconChain<T, U, F, E> impl<T, U, F, E> BeaconChain<E> for RawBeaconChain<T, U, F, E>
where where
T: ClientDB + Sized, T: Store,
U: SlotClock, U: SlotClock,
F: ForkChoice, F: ForkChoice,
E: EthSpec, E: EthSpec,

View File

@ -17,7 +17,7 @@ protos = { path = "../../protos" }
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2" protobuf = "2.0.2"
clap = "2.32.0" clap = "2.32.0"
db = { path = "../db" } store = { path = "../store" }
dirs = "1.0.3" dirs = "1.0.3"
futures = "0.1.23" futures = "0.1.23"
slog = "^2.2.3" slog = "^2.2.3"

View File

@ -1,9 +1,9 @@
use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::BeaconChain as RawBeaconChain;
use beacon_chain::{ use beacon_chain::{
db::ClientDB,
fork_choice::ForkChoice, fork_choice::ForkChoice,
parking_lot::{RwLockReadGuard, RwLockWriteGuard}, parking_lot::{RwLockReadGuard, RwLockWriteGuard},
slot_clock::SlotClock, slot_clock::SlotClock,
store::Store,
types::{BeaconState, ChainSpec, Signature}, types::{BeaconState, ChainSpec, Signature},
AttestationValidationError, BlockProductionError, AttestationValidationError, BlockProductionError,
}; };
@ -36,7 +36,7 @@ pub trait BeaconChain<E: EthSpec>: Send + Sync {
impl<T, U, F, E> BeaconChain<E> for RawBeaconChain<T, U, F, E> impl<T, U, F, E> BeaconChain<E> for RawBeaconChain<T, U, F, E>
where where
T: ClientDB + Sized, T: Store,
U: SlotClock, U: SlotClock,
F: ForkChoice, F: ForkChoice,
E: EthSpec, E: EthSpec,

View File

@ -68,6 +68,15 @@ fn main() {
.help("Listen port for RPC endpoint.") .help("Listen port for RPC endpoint.")
.takes_value(true), .takes_value(true),
) )
.arg(
Arg::with_name("db")
.long("db")
.value_name("DB")
.help("Type of database to use.")
.takes_value(true)
.possible_values(&["disk", "memory"])
.default_value("memory"),
)
.get_matches(); .get_matches();
// invalid arguments, panic // invalid arguments, panic

View File

@ -1,15 +1,17 @@
use client::client_types::TestingClientType; use client::client_types::{DiskStoreTestingClientType, MemoryStoreTestingClientType};
use client::error; use client::{error, DBType};
use client::{notifier, Client, ClientConfig}; use client::{notifier, Client, ClientConfig, ClientTypes};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::Future; use futures::Future;
use slog::info; use slog::info;
use std::cell::RefCell; use std::cell::RefCell;
use tokio::runtime::Builder; use tokio::runtime::Builder;
use tokio::runtime::Runtime;
use tokio::runtime::TaskExecutor;
use tokio_timer::clock::Clock; use tokio_timer::clock::Clock;
pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> { pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> {
let mut runtime = Builder::new() let runtime = Builder::new()
.name_prefix("main-") .name_prefix("main-")
.clock(Clock::system()) .clock(Clock::system())
.build() .build()
@ -20,8 +22,42 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul
"data_dir" => &config.data_dir.to_str(), "data_dir" => &config.data_dir.to_str(),
"port" => &config.net_conf.listen_port); "port" => &config.net_conf.listen_port);
let executor = runtime.executor();
match config.db_type {
DBType::Disk => {
info!(
log,
"BeaconNode starting";
"type" => "DiskStoreTestingClientType"
);
let client: Client<DiskStoreTestingClientType> =
Client::new(config, log.clone(), &executor)?;
run(client, executor, runtime, log)
}
DBType::Memory => {
info!(
log,
"BeaconNode starting";
"type" => "MemoryStoreTestingClientType"
);
let client: Client<MemoryStoreTestingClientType> =
Client::new(config, log.clone(), &executor)?;
run(client, executor, runtime, log)
}
}
}
pub fn run<T: ClientTypes>(
client: Client<T>,
executor: TaskExecutor,
mut runtime: Runtime,
log: &slog::Logger,
) -> error::Result<()> {
// run service until ctrl-c // run service until ctrl-c
let (ctrlc_send, ctrlc) = oneshot::channel(); let (ctrlc_send, ctrlc_oneshot) = oneshot::channel();
let ctrlc_send_c = RefCell::new(Some(ctrlc_send)); let ctrlc_send_c = RefCell::new(Some(ctrlc_send));
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
if let Some(ctrlc_send) = ctrlc_send_c.try_borrow_mut().unwrap().take() { if let Some(ctrlc_send) = ctrlc_send_c.try_borrow_mut().unwrap().take() {
@ -32,14 +68,10 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul
let (exit_signal, exit) = exit_future::signal(); let (exit_signal, exit) = exit_future::signal();
let executor = runtime.executor();
// currently testing - using TestingClientType
let client: Client<TestingClientType> = Client::new(config, log.clone(), &executor)?;
notifier::run(&client, executor, exit); notifier::run(&client, executor, exit);
runtime runtime
.block_on(ctrlc) .block_on(ctrlc_oneshot)
.map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))?; .map_err(|e| format!("Ctrlc oneshot failed: {:?}", e))?;
// perform global shutdown operations. // perform global shutdown operations.

View File

@ -0,0 +1,20 @@
[package]
name = "store"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dev-dependencies]
tempfile = "3"
[dependencies]
blake2-rfc = "0.2.18"
bls = { path = "../../eth2/utils/bls" }
bytes = "0.4.10"
db-key = "0.0.5"
leveldb = "0.8.4"
parking_lot = "0.7"
ssz = { path = "../../eth2/utils/ssz" }
ssz_derive = { path = "../../eth2/utils/ssz_derive" }
tree_hash = { path = "../../eth2/utils/tree_hash" }
types = { path = "../../eth2/types" }

View File

@ -0,0 +1,183 @@
use super::*;
use ssz::{Decode, DecodeError};
fn get_block_bytes<T: Store>(store: &T, root: Hash256) -> Result<Option<Vec<u8>>, Error> {
store.get_bytes(BeaconBlock::db_column().into(), &root[..])
}
fn read_slot_from_block_bytes(bytes: &[u8]) -> Result<Slot, DecodeError> {
let end = std::cmp::min(Slot::ssz_fixed_len(), bytes.len());
Slot::from_ssz_bytes(&bytes[0..end])
}
fn read_previous_block_root_from_block_bytes(bytes: &[u8]) -> Result<Hash256, DecodeError> {
let previous_bytes = Slot::ssz_fixed_len();
let slice = bytes
.get(previous_bytes..previous_bytes + Hash256::ssz_fixed_len())
.ok_or_else(|| DecodeError::BytesInvalid("Not enough bytes.".to_string()))?;
Hash256::from_ssz_bytes(slice)
}
pub fn get_block_at_preceeding_slot<T: Store>(
store: &T,
slot: Slot,
start_root: Hash256,
) -> Result<Option<(Hash256, BeaconBlock)>, Error> {
let mut root = start_root;
loop {
if let Some(bytes) = get_block_bytes(store, root)? {
let this_slot = read_slot_from_block_bytes(&bytes)?;
if this_slot == slot {
let block = BeaconBlock::from_ssz_bytes(&bytes)?;
break Ok(Some((root, block)));
} else if this_slot < slot {
break Ok(None);
} else {
root = read_previous_block_root_from_block_bytes(&bytes)?;
}
} else {
break Ok(None);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ssz::Encode;
use tree_hash::TreeHash;
#[test]
fn read_slot() {
let spec = FewValidatorsEthSpec::spec();
let test_slot = |slot: Slot| {
let mut block = BeaconBlock::empty(&spec);
block.slot = slot;
let bytes = block.as_ssz_bytes();
assert_eq!(read_slot_from_block_bytes(&bytes).unwrap(), slot);
};
test_slot(Slot::new(0));
test_slot(Slot::new(1));
test_slot(Slot::new(42));
test_slot(Slot::new(u64::max_value()));
}
#[test]
fn bad_slot() {
for i in 0..8 {
assert!(read_slot_from_block_bytes(&vec![0; i]).is_err());
}
}
#[test]
fn read_previous_block_root() {
let spec = FewValidatorsEthSpec::spec();
let test_root = |root: Hash256| {
let mut block = BeaconBlock::empty(&spec);
block.previous_block_root = root;
let bytes = block.as_ssz_bytes();
assert_eq!(
read_previous_block_root_from_block_bytes(&bytes).unwrap(),
root
);
};
test_root(Hash256::random());
test_root(Hash256::random());
test_root(Hash256::random());
}
fn build_chain(
store: &impl Store,
slots: &[usize],
spec: &ChainSpec,
) -> Vec<(Hash256, BeaconBlock)> {
let mut blocks_and_roots: Vec<(Hash256, BeaconBlock)> = vec![];
for (i, slot) in slots.iter().enumerate() {
let mut block = BeaconBlock::empty(spec);
block.slot = Slot::from(*slot);
if i > 0 {
block.previous_block_root = blocks_and_roots[i - 1].0;
}
let root = Hash256::from_slice(&block.tree_hash_root());
store.put(&root, &block).unwrap();
blocks_and_roots.push((root, block));
}
blocks_and_roots
}
#[test]
fn chain_without_skips() {
let n: usize = 10;
let store = MemoryStore::open();
let spec = FewValidatorsEthSpec::spec();
let slots: Vec<usize> = (0..n).collect();
let blocks_and_roots = build_chain(&store, &slots, &spec);
for source in 1..n {
for target in 0..=source {
let (source_root, _source_block) = &blocks_and_roots[source];
let (target_root, target_block) = &blocks_and_roots[target];
let (found_root, found_block) = store
.get_block_at_preceeding_slot(*source_root, target_block.slot)
.unwrap()
.unwrap();
assert_eq!(found_root, *target_root);
assert_eq!(found_block, *target_block);
}
}
}
#[test]
fn chain_with_skips() {
let store = MemoryStore::open();
let spec = FewValidatorsEthSpec::spec();
let slots = vec![0, 1, 2, 5];
let blocks_and_roots = build_chain(&store, &slots, &spec);
// Valid slots
for target in 0..3 {
let (source_root, _source_block) = &blocks_and_roots[3];
let (target_root, target_block) = &blocks_and_roots[target];
let (found_root, found_block) = store
.get_block_at_preceeding_slot(*source_root, target_block.slot)
.unwrap()
.unwrap();
assert_eq!(found_root, *target_root);
assert_eq!(found_block, *target_block);
}
// Slot that doesn't exist
let (source_root, _source_block) = &blocks_and_roots[3];
assert!(store
.get_block_at_preceeding_slot(*source_root, Slot::new(3))
.unwrap()
.is_none());
// Slot too high
let (source_root, _source_block) = &blocks_and_roots[3];
assert!(store
.get_block_at_preceeding_slot(*source_root, Slot::new(3))
.unwrap()
.is_none());
}
}

View File

@ -1,19 +1,20 @@
extern crate rocksdb; extern crate rocksdb;
use super::rocksdb::Error as RocksError; // use super::stores::COLUMNS;
use super::rocksdb::{Options, DB};
use super::{ClientDB, DBError, DBValue}; use super::{ClientDB, DBError, DBValue};
use rocksdb::Error as RocksError;
use rocksdb::{Options, DB};
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
/// A on-disk database which implements the ClientDB trait. /// A on-disk database which implements the ClientDB trait.
/// ///
/// This implementation uses RocksDB with default options. /// This implementation uses RocksDB with default options.
pub struct DiskDB { pub struct DiskStore {
db: DB, db: DB,
} }
impl DiskDB { impl DiskStore {
/// Open the RocksDB database, optionally supplying columns if required. /// Open the RocksDB database, optionally supplying columns if required.
/// ///
/// The RocksDB database will be contained in a directory titled /// The RocksDB database will be contained in a directory titled
@ -23,31 +24,32 @@ impl DiskDB {
/// ///
/// Panics if the database is unable to be created. /// Panics if the database is unable to be created.
pub fn open(path: &Path, columns: Option<&[&str]>) -> Self { pub fn open(path: &Path, columns: Option<&[&str]>) -> Self {
/* // Rocks options.
* Initialise the options
*/
let mut options = Options::default(); let mut options = Options::default();
options.create_if_missing(true); options.create_if_missing(true);
// TODO: ensure that columns are created (and remove // Ensure the path exists.
// the dead_code allow)
/*
* Initialise the path
*/
fs::create_dir_all(&path).unwrap_or_else(|_| panic!("Unable to create {:?}", &path)); fs::create_dir_all(&path).unwrap_or_else(|_| panic!("Unable to create {:?}", &path));
let db_path = path.join("database"); let db_path = path.join("database");
/* let columns = columns.unwrap_or(&COLUMNS);
* Open the database
*/
let db = match columns {
None => DB::open(&options, db_path),
Some(columns) => DB::open_cf(&options, db_path, columns),
}
.expect("Unable to open local database");;
Self { db } if db_path.exists() {
Self {
db: DB::open_cf(&options, db_path, &COLUMNS)
.expect("Unable to open local database"),
}
} else {
let mut db = Self {
db: DB::open(&options, db_path).expect("Unable to open local database"),
};
for cf in columns {
db.create_col(cf).unwrap();
}
db
}
} }
/// Create a RocksDB column family. Corresponds to the /// Create a RocksDB column family. Corresponds to the
@ -69,7 +71,7 @@ impl From<RocksError> for DBError {
} }
} }
impl ClientDB for DiskDB { impl ClientDB for DiskStore {
/// Get the value for some key on some column. /// Get the value for some key on some column.
/// ///
/// Corresponds to the `get_cf()` method on the RocksDB API. /// Corresponds to the `get_cf()` method on the RocksDB API.
@ -97,7 +99,7 @@ impl ClientDB for DiskDB {
None => Err(DBError { None => Err(DBError {
message: "Unknown column".to_string(), message: "Unknown column".to_string(),
}), }),
Some(handle) => self.db.put_cf(handle, key, val).map_err(Into::into), Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()),
} }
} }
@ -152,7 +154,7 @@ mod tests {
let col_name: &str = "TestColumn"; let col_name: &str = "TestColumn";
let column_families = vec![col_name]; let column_families = vec![col_name];
let mut db = DiskDB::open(&path, None); let mut db = DiskStore::open(&path, None);
for cf in column_families { for cf in column_families {
db.create_col(&cf).unwrap(); db.create_col(&cf).unwrap();

View File

@ -0,0 +1,30 @@
use ssz::DecodeError;
#[derive(Debug, PartialEq)]
pub enum Error {
SszDecodeError(DecodeError),
DBError { message: String },
}
impl From<DecodeError> for Error {
fn from(e: DecodeError) -> Error {
Error::SszDecodeError(e)
}
}
impl From<DBError> for Error {
fn from(e: DBError) -> Error {
Error::DBError { message: e.message }
}
}
#[derive(Debug)]
pub struct DBError {
pub message: String,
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}

View File

@ -0,0 +1,30 @@
use crate::*;
use ssz::{Decode, Encode};
impl StoreItem for BeaconBlock {
fn db_column() -> DBColumn {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}
impl<T: EthSpec> StoreItem for BeaconState<T> {
fn db_column() -> DBColumn {
DBColumn::BeaconState
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}

View File

@ -0,0 +1,100 @@
use super::*;
use db_key::Key;
use leveldb::database::kv::KV;
use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
use leveldb::options::{Options, ReadOptions, WriteOptions};
use std::path::Path;
/// A wrapped leveldb database.
pub struct LevelDB {
db: Database<BytesKey>,
}
impl LevelDB {
/// Open a database at `path`, creating a new database if one does not already exist.
pub fn open(path: &Path) -> Result<Self, Error> {
let mut options = Options::new();
options.create_if_missing = true;
let db = Database::open(path, options)?;
Ok(Self { db })
}
fn read_options(&self) -> ReadOptions<BytesKey> {
ReadOptions::new()
}
fn write_options(&self) -> WriteOptions {
WriteOptions::new()
}
fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey {
let mut col = col.as_bytes().to_vec();
col.append(&mut key.to_vec());
BytesKey { key: col }
}
}
/// Used for keying leveldb.
pub struct BytesKey {
key: Vec<u8>,
}
impl Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
fn as_slice<T, F: Fn(&[u8]) -> T>(&self, f: F) -> T {
f(self.key.as_slice())
}
}
impl Store for LevelDB {
/// Retrieve some bytes in `column` with `key`.
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.get(self.read_options(), column_key)
.map_err(Into::into)
}
/// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.put(self.write_options(), column_key, val)
.map_err(Into::into)
}
/// Return `true` if `key` exists in `column`.
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.get(self.read_options(), column_key)
.map_err(Into::into)
.and_then(|val| Ok(val.is_some()))
}
/// Removes `key` from `column`.
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.delete(self.write_options(), column_key)
.map_err(Into::into)
}
}
impl From<LevelDBError> for Error {
fn from(e: LevelDBError) -> Error {
Error::DBError {
message: format!("{:?}", e),
}
}
}

View File

@ -0,0 +1,223 @@
//! Storage functionality for Lighthouse.
//!
//! Provides the following stores:
//!
//! - `DiskStore`: an on-disk store backed by leveldb. Used in production.
//! - `MemoryStore`: an in-memory store backed by a hash-map. Used for testing.
//!
//! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See
//! tests for implementation examples.
mod block_at_slot;
mod errors;
mod impls;
mod leveldb_store;
mod memory_store;
pub use self::leveldb_store::LevelDB as DiskStore;
pub use self::memory_store::MemoryStore;
pub use errors::Error;
pub use types::*;
/// An object capable of storing and retrieving objects implementing `StoreItem`.
///
/// A `Store` is fundamentally backed by a key-value database, however it provides support for
/// columns. A simple column implementation might involve prefixing a key with some bytes unique to
/// each column.
pub trait Store: Sync + Send + Sized {
/// Store an item in `Self`.
fn put(&self, key: &Hash256, item: &impl StoreItem) -> Result<(), Error> {
item.db_put(self, key)
}
/// Retrieve an item from `Self`.
fn get<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
I::db_get(self, key)
}
/// Returns `true` if the given key represents an item in `Self`.
fn exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
I::db_exists(self, key)
}
/// Remove an item from `Self`.
fn delete<I: StoreItem>(&self, key: &Hash256) -> Result<(), Error> {
I::db_delete(self, key)
}
/// Given the root of an existing block in the store (`start_block_root`), return a parent
/// block with the specified `slot`.
///
/// Returns `None` if no parent block exists at that slot, or if `slot` is greater than the
/// slot of `start_block_root`.
fn get_block_at_preceeding_slot(
&self,
start_block_root: Hash256,
slot: Slot,
) -> Result<Option<(Hash256, BeaconBlock)>, Error> {
block_at_slot::get_block_at_preceeding_slot(self, slot, start_block_root)
}
/// Retrieve some bytes in `column` with `key`.
fn get_bytes(&self, column: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
/// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>;
/// Return `true` if `key` exists in `column`.
fn key_exists(&self, column: &str, key: &[u8]) -> Result<bool, Error>;
/// Removes `key` from `column`.
fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>;
}
/// A unique column identifier.
pub enum DBColumn {
BeaconBlock,
BeaconState,
BeaconChain,
}
impl<'a> Into<&'a str> for DBColumn {
/// Returns a `&str` that can be used for keying a key-value data base.
fn into(self) -> &'a str {
match self {
DBColumn::BeaconBlock => &"blk",
DBColumn::BeaconState => &"ste",
DBColumn::BeaconChain => &"bch",
}
}
}
/// An item that may be stored in a `Store`.
///
/// Provides default methods that are suitable for most applications, however when overridden they
/// provide full customizability of `Store` operations.
pub trait StoreItem: Sized {
/// Identifies which column this item should be placed in.
fn db_column() -> DBColumn;
/// Serialize `self` as bytes.
fn as_store_bytes(&self) -> Vec<u8>;
/// De-serialize `self` from bytes.
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error>;
/// Store `self`.
fn db_put(&self, store: &impl Store, key: &Hash256) -> Result<(), Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
store
.put_bytes(column, key, &self.as_store_bytes())
.map_err(Into::into)
}
/// Retrieve an instance of `Self`.
fn db_get(store: &impl Store, key: &Hash256) -> Result<Option<Self>, Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
match store.get_bytes(column, key)? {
Some(mut bytes) => Ok(Some(Self::from_store_bytes(&mut bytes[..])?)),
None => Ok(None),
}
}
/// Return `true` if an instance of `Self` exists in `Store`.
fn db_exists(store: &impl Store, key: &Hash256) -> Result<bool, Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
store.key_exists(column, key)
}
/// Delete `self` from the `Store`.
fn db_delete(store: &impl Store, key: &Hash256) -> Result<(), Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
store.key_delete(column, key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use tempfile::tempdir;
#[derive(PartialEq, Debug, Encode, Decode)]
struct StorableThing {
a: u64,
b: u64,
}
impl StoreItem for StorableThing {
fn db_column() -> DBColumn {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}
fn test_impl(store: impl Store) {
let key = Hash256::random();
let item = StorableThing { a: 1, b: 42 };
assert_eq!(store.exists::<StorableThing>(&key), Ok(false));
store.put(&key, &item).unwrap();
assert_eq!(store.exists::<StorableThing>(&key), Ok(true));
let retrieved = store.get(&key).unwrap().unwrap();
assert_eq!(item, retrieved);
store.delete::<StorableThing>(&key).unwrap();
assert_eq!(store.exists::<StorableThing>(&key), Ok(false));
assert_eq!(store.get::<StorableThing>(&key), Ok(None));
}
#[test]
fn diskdb() {
let dir = tempdir().unwrap();
let path = dir.path();
let store = DiskStore::open(&path).unwrap();
test_impl(store);
}
#[test]
fn memorydb() {
let store = MemoryStore::open();
test_impl(store);
}
#[test]
fn exists() {
let store = MemoryStore::open();
let key = Hash256::random();
let item = StorableThing { a: 1, b: 42 };
assert_eq!(store.exists::<StorableThing>(&key).unwrap(), false);
store.put(&key, &item).unwrap();
assert_eq!(store.exists::<StorableThing>(&key).unwrap(), true);
store.delete::<StorableThing>(&key).unwrap();
assert_eq!(store.exists::<StorableThing>(&key).unwrap(), false);
}
}

View File

@ -0,0 +1,63 @@
use super::{Error, Store};
use parking_lot::RwLock;
use std::collections::HashMap;
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
/// A thread-safe `HashMap` wrapper.
pub struct MemoryStore {
db: RwLock<DBHashMap>,
}
impl MemoryStore {
/// Create a new, empty database.
pub fn open() -> Self {
Self {
db: RwLock::new(HashMap::new()),
}
}
fn get_key_for_col(col: &str, key: &[u8]) -> Vec<u8> {
let mut col = col.as_bytes().to_vec();
col.append(&mut key.to_vec());
col
}
}
impl Store for MemoryStore {
/// Get the value of some key from the database. Returns `None` if the key does not exist.
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
Ok(self
.db
.read()
.get(&column_key)
.and_then(|val| Some(val.clone())))
}
/// Puts a key in the database.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
self.db.write().insert(column_key, val.to_vec());
Ok(())
}
/// Return true if some key exists in some column.
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
Ok(self.db.read().contains_key(&column_key))
}
/// Delete some key from the database.
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
self.db.write().remove(&column_key);
Ok(())
}
}

View File

@ -0,0 +1,37 @@
use super::*;
pub type Vec<u8> = Vec<u8>;
pub trait Store: Sync + Send + Sized {
fn put(&self, key: &Hash256, item: &impl StoreItem) -> Result<(), Error> {
item.db_put(self, key)
}
fn get<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
I::db_get(self, key)
}
fn exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
I::db_exists(self, key)
}
fn delete<I: StoreItem>(&self, key: &Hash256) -> Result<(), Error> {
I::db_delete(self, key)
}
fn get_block_at_preceeding_slot(
&self,
start_block_root: Hash256,
slot: Slot,
) -> Result<Option<(Hash256, BeaconBlock)>, Error> {
block_at_slot::get_block_at_preceeding_slot(self, slot, start_block_root)
}
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error>;
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error>;
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error>;
}

View File

@ -5,7 +5,7 @@ authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
db = { path = "../../beacon_node/db" } store = { path = "../../beacon_node/store" }
ssz = { path = "../utils/ssz" } ssz = { path = "../utils/ssz" }
types = { path = "../types" } types = { path = "../types" }
log = "0.4.6" log = "0.4.6"

View File

@ -1,16 +1,11 @@
//! The optimised bitwise LMD-GHOST fork choice rule. //! The optimised bitwise LMD-GHOST fork choice rule.
extern crate bit_vec;
use crate::{ForkChoice, ForkChoiceError}; use crate::{ForkChoice, ForkChoiceError};
use bit_vec::BitVec; use bit_vec::BitVec;
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
ClientDB,
};
use log::{debug, trace}; use log::{debug, trace};
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use store::Store;
use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot, SlotHeight}; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot, SlotHeight};
//TODO: Pruning - Children //TODO: Pruning - Children
@ -34,7 +29,7 @@ fn power_of_2_below(x: u64) -> u64 {
} }
/// Stores the necessary data structures to run the optimised bitwise lmd ghost algorithm. /// Stores the necessary data structures to run the optimised bitwise lmd ghost algorithm.
pub struct BitwiseLMDGhost<T: ClientDB + Sized, E> { pub struct BitwiseLMDGhost<T, E> {
/// A cache of known ancestors at given heights for a specific block. /// A cache of known ancestors at given heights for a specific block.
//TODO: Consider FnvHashMap //TODO: Consider FnvHashMap
cache: HashMap<CacheKey<u64>, Hash256>, cache: HashMap<CacheKey<u64>, Hash256>,
@ -46,30 +41,21 @@ pub struct BitwiseLMDGhost<T: ClientDB + Sized, E> {
/// The latest attestation targets as a map of validator index to block hash. /// The latest attestation targets as a map of validator index to block hash.
//TODO: Could this be a fixed size vec //TODO: Could this be a fixed size vec
latest_attestation_targets: HashMap<u64, Hash256>, latest_attestation_targets: HashMap<u64, Hash256>,
/// Block storage access. /// Block and state storage.
block_store: Arc<BeaconBlockStore<T>>, store: Arc<T>,
/// State storage access.
state_store: Arc<BeaconStateStore<T>>,
max_known_height: SlotHeight, max_known_height: SlotHeight,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
impl<T, E: EthSpec> BitwiseLMDGhost<T, E> impl<T: Store, E: EthSpec> BitwiseLMDGhost<T, E> {
where pub fn new(store: Arc<T>) -> Self {
T: ClientDB + Sized,
{
pub fn new(
block_store: Arc<BeaconBlockStore<T>>,
state_store: Arc<BeaconStateStore<T>>,
) -> Self {
BitwiseLMDGhost { BitwiseLMDGhost {
cache: HashMap::new(), cache: HashMap::new(),
ancestors: vec![HashMap::new(); 16], ancestors: vec![HashMap::new(); 16],
latest_attestation_targets: HashMap::new(), latest_attestation_targets: HashMap::new(),
children: HashMap::new(), children: HashMap::new(),
max_known_height: SlotHeight::new(0), max_known_height: SlotHeight::new(0),
block_store, store,
state_store,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -89,8 +75,8 @@ where
let mut latest_votes: HashMap<Hash256, u64> = HashMap::new(); let mut latest_votes: HashMap<Hash256, u64> = HashMap::new();
// gets the current weighted votes // gets the current weighted votes
let current_state: BeaconState<E> = self let current_state: BeaconState<E> = self
.state_store .store
.get_deserialized(&state_root)? .get(&state_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?; .ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?;
let active_validator_indices = let active_validator_indices =
@ -121,8 +107,8 @@ where
// return None if we can't get the block from the db. // return None if we can't get the block from the db.
let block_height = { let block_height = {
let block_slot = self let block_slot = self
.block_store .store
.get_deserialized(&block_hash) .get::<BeaconBlock>(&block_hash)
.ok()? .ok()?
.expect("Should have returned already if None") .expect("Should have returned already if None")
.slot; .slot;
@ -243,7 +229,7 @@ where
} }
} }
impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for BitwiseLMDGhost<T, E> { impl<T: Store, E: EthSpec> ForkChoice for BitwiseLMDGhost<T, E> {
fn add_block( fn add_block(
&mut self, &mut self,
block: &BeaconBlock, block: &BeaconBlock,
@ -252,8 +238,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for BitwiseLMDGhost<T, E> {
) -> Result<(), ForkChoiceError> { ) -> Result<(), ForkChoiceError> {
// get the height of the parent // get the height of the parent
let parent_height = self let parent_height = self
.block_store .store
.get_deserialized(&block.previous_block_root)? .get::<BeaconBlock>(&block.previous_block_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(block.previous_block_root))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(block.previous_block_root))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
@ -304,16 +290,16 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for BitwiseLMDGhost<T, E> {
trace!("Old attestation found: {:?}", attestation_target); trace!("Old attestation found: {:?}", attestation_target);
// get the height of the target block // get the height of the target block
let block_height = self let block_height = self
.block_store .store
.get_deserialized(&target_block_root)? .get::<BeaconBlock>(&target_block_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
// get the height of the past target block // get the height of the past target block
let past_block_height = self let past_block_height = self
.block_store .store
.get_deserialized(&attestation_target)? .get::<BeaconBlock>(&attestation_target)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
@ -337,8 +323,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for BitwiseLMDGhost<T, E> {
justified_block_start justified_block_start
); );
let block = self let block = self
.block_store .store
.get_deserialized(&justified_block_start)? .get::<BeaconBlock>(&justified_block_start)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?; .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?;
let block_slot = block.slot; let block_slot = block.slot;
@ -429,8 +415,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for BitwiseLMDGhost<T, E> {
// didn't find head yet, proceed to next iteration // didn't find head yet, proceed to next iteration
// update block height // update block height
block_height = self block_height = self
.block_store .store
.get_deserialized(&current_head)? .get::<BeaconBlock>(&current_head)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(current_head))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(current_head))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);

View File

@ -16,17 +16,14 @@
//! [`slow_lmd_ghost`]: struct.SlowLmdGhost.html //! [`slow_lmd_ghost`]: struct.SlowLmdGhost.html
//! [`bitwise_lmd_ghost`]: struct.OptimisedLmdGhost.html //! [`bitwise_lmd_ghost`]: struct.OptimisedLmdGhost.html
extern crate db;
extern crate ssz;
extern crate types;
pub mod bitwise_lmd_ghost; pub mod bitwise_lmd_ghost;
pub mod longest_chain; pub mod longest_chain;
pub mod optimized_lmd_ghost; pub mod optimized_lmd_ghost;
pub mod slow_lmd_ghost; pub mod slow_lmd_ghost;
use db::stores::BeaconBlockAtSlotError; // use store::stores::BeaconBlockAtSlotError;
use db::DBError; // use store::DBError;
use store::Error as DBError;
use types::{BeaconBlock, ChainSpec, Hash256}; use types::{BeaconBlock, ChainSpec, Hash256};
pub use bitwise_lmd_ghost::BitwiseLMDGhost; pub use bitwise_lmd_ghost::BitwiseLMDGhost;
@ -77,10 +74,11 @@ pub enum ForkChoiceError {
impl From<DBError> for ForkChoiceError { impl From<DBError> for ForkChoiceError {
fn from(e: DBError) -> ForkChoiceError { fn from(e: DBError) -> ForkChoiceError {
ForkChoiceError::StorageError(e.message) ForkChoiceError::StorageError(format!("{:?}", e))
} }
} }
/*
impl From<BeaconBlockAtSlotError> for ForkChoiceError { impl From<BeaconBlockAtSlotError> for ForkChoiceError {
fn from(e: BeaconBlockAtSlotError) -> ForkChoiceError { fn from(e: BeaconBlockAtSlotError) -> ForkChoiceError {
match e { match e {
@ -94,6 +92,7 @@ impl From<BeaconBlockAtSlotError> for ForkChoiceError {
} }
} }
} }
*/
/// Fork choice options that are currently implemented. /// Fork choice options that are currently implemented.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@ -1,31 +1,25 @@
use crate::{ForkChoice, ForkChoiceError}; use crate::{ForkChoice, ForkChoiceError};
use db::{stores::BeaconBlockStore, ClientDB};
use std::sync::Arc; use std::sync::Arc;
use store::Store;
use types::{BeaconBlock, ChainSpec, Hash256, Slot}; use types::{BeaconBlock, ChainSpec, Hash256, Slot};
pub struct LongestChain<T> pub struct LongestChain<T> {
where
T: ClientDB + Sized,
{
/// List of head block hashes /// List of head block hashes
head_block_hashes: Vec<Hash256>, head_block_hashes: Vec<Hash256>,
/// Block storage access. /// Block storage.
block_store: Arc<BeaconBlockStore<T>>, store: Arc<T>,
} }
impl<T> LongestChain<T> impl<T: Store> LongestChain<T> {
where pub fn new(store: Arc<T>) -> Self {
T: ClientDB + Sized,
{
pub fn new(block_store: Arc<BeaconBlockStore<T>>) -> Self {
LongestChain { LongestChain {
head_block_hashes: Vec::new(), head_block_hashes: Vec::new(),
block_store, store,
} }
} }
} }
impl<T: ClientDB + Sized> ForkChoice for LongestChain<T> { impl<T: Store> ForkChoice for LongestChain<T> {
fn add_block( fn add_block(
&mut self, &mut self,
block: &BeaconBlock, block: &BeaconBlock,
@ -55,9 +49,9 @@ impl<T: ClientDB + Sized> ForkChoice for LongestChain<T> {
* Load all the head_block hashes from the DB as SszBeaconBlocks. * Load all the head_block hashes from the DB as SszBeaconBlocks.
*/ */
for (index, block_hash) in self.head_block_hashes.iter().enumerate() { for (index, block_hash) in self.head_block_hashes.iter().enumerate() {
let block = self let block: BeaconBlock = self
.block_store .store
.get_deserialized(&block_hash)? .get(&block_hash)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_hash))?; .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_hash))?;
head_blocks.push((index, block)); head_blocks.push((index, block));
} }

View File

@ -1,16 +1,11 @@
//! The optimised bitwise LMD-GHOST fork choice rule. //! The optimised bitwise LMD-GHOST fork choice rule.
extern crate bit_vec;
use crate::{ForkChoice, ForkChoiceError}; use crate::{ForkChoice, ForkChoiceError};
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
ClientDB,
};
use log::{debug, trace}; use log::{debug, trace};
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use store::Store;
use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot, SlotHeight}; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot, SlotHeight};
//TODO: Pruning - Children //TODO: Pruning - Children
@ -34,7 +29,7 @@ fn power_of_2_below(x: u64) -> u64 {
} }
/// Stores the necessary data structures to run the optimised lmd ghost algorithm. /// Stores the necessary data structures to run the optimised lmd ghost algorithm.
pub struct OptimizedLMDGhost<T: ClientDB + Sized, E> { pub struct OptimizedLMDGhost<T, E> {
/// A cache of known ancestors at given heights for a specific block. /// A cache of known ancestors at given heights for a specific block.
//TODO: Consider FnvHashMap //TODO: Consider FnvHashMap
cache: HashMap<CacheKey<u64>, Hash256>, cache: HashMap<CacheKey<u64>, Hash256>,
@ -46,30 +41,21 @@ pub struct OptimizedLMDGhost<T: ClientDB + Sized, E> {
/// The latest attestation targets as a map of validator index to block hash. /// The latest attestation targets as a map of validator index to block hash.
//TODO: Could this be a fixed size vec //TODO: Could this be a fixed size vec
latest_attestation_targets: HashMap<u64, Hash256>, latest_attestation_targets: HashMap<u64, Hash256>,
/// Block storage access. /// Block and state storage.
block_store: Arc<BeaconBlockStore<T>>, store: Arc<T>,
/// State storage access.
state_store: Arc<BeaconStateStore<T>>,
max_known_height: SlotHeight, max_known_height: SlotHeight,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
impl<T, E: EthSpec> OptimizedLMDGhost<T, E> impl<T: Store, E: EthSpec> OptimizedLMDGhost<T, E> {
where pub fn new(store: Arc<T>) -> Self {
T: ClientDB + Sized,
{
pub fn new(
block_store: Arc<BeaconBlockStore<T>>,
state_store: Arc<BeaconStateStore<T>>,
) -> Self {
OptimizedLMDGhost { OptimizedLMDGhost {
cache: HashMap::new(), cache: HashMap::new(),
ancestors: vec![HashMap::new(); 16], ancestors: vec![HashMap::new(); 16],
latest_attestation_targets: HashMap::new(), latest_attestation_targets: HashMap::new(),
children: HashMap::new(), children: HashMap::new(),
max_known_height: SlotHeight::new(0), max_known_height: SlotHeight::new(0),
block_store, store,
state_store,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -89,8 +75,8 @@ where
let mut latest_votes: HashMap<Hash256, u64> = HashMap::new(); let mut latest_votes: HashMap<Hash256, u64> = HashMap::new();
// gets the current weighted votes // gets the current weighted votes
let current_state: BeaconState<E> = self let current_state: BeaconState<E> = self
.state_store .store
.get_deserialized(&state_root)? .get(&state_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?; .ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?;
let active_validator_indices = let active_validator_indices =
@ -121,8 +107,8 @@ where
// return None if we can't get the block from the db. // return None if we can't get the block from the db.
let block_height = { let block_height = {
let block_slot = self let block_slot = self
.block_store .store
.get_deserialized(&block_hash) .get::<BeaconBlock>(&block_hash)
.ok()? .ok()?
.expect("Should have returned already if None") .expect("Should have returned already if None")
.slot; .slot;
@ -214,7 +200,7 @@ where
} }
} }
impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for OptimizedLMDGhost<T, E> { impl<T: Store, E: EthSpec> ForkChoice for OptimizedLMDGhost<T, E> {
fn add_block( fn add_block(
&mut self, &mut self,
block: &BeaconBlock, block: &BeaconBlock,
@ -223,8 +209,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for OptimizedLMDGhost<T, E> {
) -> Result<(), ForkChoiceError> { ) -> Result<(), ForkChoiceError> {
// get the height of the parent // get the height of the parent
let parent_height = self let parent_height = self
.block_store .store
.get_deserialized(&block.previous_block_root)? .get::<BeaconBlock>(&block.previous_block_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(block.previous_block_root))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(block.previous_block_root))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
@ -275,16 +261,16 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for OptimizedLMDGhost<T, E> {
trace!("Old attestation found: {:?}", attestation_target); trace!("Old attestation found: {:?}", attestation_target);
// get the height of the target block // get the height of the target block
let block_height = self let block_height = self
.block_store .store
.get_deserialized(&target_block_root)? .get::<BeaconBlock>(&target_block_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
// get the height of the past target block // get the height of the past target block
let past_block_height = self let past_block_height = self
.block_store .store
.get_deserialized(&attestation_target)? .get::<BeaconBlock>(&attestation_target)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
@ -308,8 +294,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for OptimizedLMDGhost<T, E> {
justified_block_start justified_block_start
); );
let block = self let block = self
.block_store .store
.get_deserialized(&justified_block_start)? .get::<BeaconBlock>(&justified_block_start)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?; .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?;
let block_slot = block.slot; let block_slot = block.slot;
@ -400,8 +386,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for OptimizedLMDGhost<T, E> {
// didn't find head yet, proceed to next iteration // didn't find head yet, proceed to next iteration
// update block height // update block height
block_height = self block_height = self
.block_store .store
.get_deserialized(&current_head)? .get::<BeaconBlock>(&current_head)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(current_head))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(current_head))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);

View File

@ -1,44 +1,30 @@
extern crate db;
use crate::{ForkChoice, ForkChoiceError}; use crate::{ForkChoice, ForkChoiceError};
use db::{
stores::{BeaconBlockStore, BeaconStateStore},
ClientDB,
};
use log::{debug, trace}; use log::{debug, trace};
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use store::Store;
use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot}; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot};
//TODO: Pruning and syncing //TODO: Pruning and syncing
pub struct SlowLMDGhost<T: ClientDB + Sized, E> { pub struct SlowLMDGhost<T, E> {
/// The latest attestation targets as a map of validator index to block hash. /// The latest attestation targets as a map of validator index to block hash.
//TODO: Could this be a fixed size vec //TODO: Could this be a fixed size vec
latest_attestation_targets: HashMap<u64, Hash256>, latest_attestation_targets: HashMap<u64, Hash256>,
/// Stores the children for any given parent. /// Stores the children for any given parent.
children: HashMap<Hash256, Vec<Hash256>>, children: HashMap<Hash256, Vec<Hash256>>,
/// Block storage access. /// Block and state storage.
block_store: Arc<BeaconBlockStore<T>>, store: Arc<T>,
/// State storage access.
state_store: Arc<BeaconStateStore<T>>,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
impl<T, E: EthSpec> SlowLMDGhost<T, E> impl<T: Store, E: EthSpec> SlowLMDGhost<T, E> {
where pub fn new(store: Arc<T>) -> Self {
T: ClientDB + Sized,
{
pub fn new(
block_store: Arc<BeaconBlockStore<T>>,
state_store: Arc<BeaconStateStore<T>>,
) -> Self {
SlowLMDGhost { SlowLMDGhost {
latest_attestation_targets: HashMap::new(), latest_attestation_targets: HashMap::new(),
children: HashMap::new(), children: HashMap::new(),
block_store, store,
state_store,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -58,8 +44,8 @@ where
let mut latest_votes: HashMap<Hash256, u64> = HashMap::new(); let mut latest_votes: HashMap<Hash256, u64> = HashMap::new();
// gets the current weighted votes // gets the current weighted votes
let current_state: BeaconState<E> = self let current_state: BeaconState<E> = self
.state_store .store
.get_deserialized(&state_root)? .get(state_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?; .ok_or_else(|| ForkChoiceError::MissingBeaconState(*state_root))?;
let active_validator_indices = let active_validator_indices =
@ -90,15 +76,15 @@ where
) -> Result<u64, ForkChoiceError> { ) -> Result<u64, ForkChoiceError> {
let mut count = 0; let mut count = 0;
let block_slot = self let block_slot = self
.block_store .store
.get_deserialized(&block_root)? .get::<BeaconBlock>(&block_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_root))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_root))?
.slot; .slot;
for (vote_hash, votes) in latest_votes.iter() { for (vote_hash, votes) in latest_votes.iter() {
let (root_at_slot, _) = self let (root_at_slot, _) = self
.block_store .store
.block_at_slot(&vote_hash, block_slot)? .get_block_at_preceeding_slot(*vote_hash, block_slot)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_root))?; .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*block_root))?;
if root_at_slot == *block_root { if root_at_slot == *block_root {
count += votes; count += votes;
@ -108,7 +94,7 @@ where
} }
} }
impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for SlowLMDGhost<T, E> { impl<T: Store, E: EthSpec> ForkChoice for SlowLMDGhost<T, E> {
/// Process when a block is added /// Process when a block is added
fn add_block( fn add_block(
&mut self, &mut self,
@ -150,16 +136,16 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for SlowLMDGhost<T, E> {
trace!("Old attestation found: {:?}", attestation_target); trace!("Old attestation found: {:?}", attestation_target);
// get the height of the target block // get the height of the target block
let block_height = self let block_height = self
.block_store .store
.get_deserialized(&target_block_root)? .get::<BeaconBlock>(&target_block_root)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*target_block_root))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
// get the height of the past target block // get the height of the past target block
let past_block_height = self let past_block_height = self
.block_store .store
.get_deserialized(&attestation_target)? .get::<BeaconBlock>(&attestation_target)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))? .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*attestation_target))?
.slot .slot
.height(spec.genesis_slot); .height(spec.genesis_slot);
@ -180,8 +166,8 @@ impl<T: ClientDB + Sized, E: EthSpec> ForkChoice for SlowLMDGhost<T, E> {
) -> Result<Hash256, ForkChoiceError> { ) -> Result<Hash256, ForkChoiceError> {
debug!("Running LMD Ghost Fork-choice rule"); debug!("Running LMD Ghost Fork-choice rule");
let start = self let start = self
.block_store .store
.get_deserialized(&justified_block_start)? .get::<BeaconBlock>(&justified_block_start)?
.ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?; .ok_or_else(|| ForkChoiceError::MissingBeaconBlock(*justified_block_start))?;
let start_state_root = start.state_root; let start_state_root = start.state_root;

View File

@ -1,26 +1,14 @@
#![cfg(not(debug_assertions))] #![cfg(not(debug_assertions))]
// Tests the available fork-choice algorithms // Tests the available fork-choice algorithms
extern crate beacon_chain;
extern crate bls;
extern crate db;
// extern crate env_logger; // for debugging
extern crate fork_choice;
extern crate hex;
extern crate log;
extern crate slot_clock;
extern crate types;
extern crate yaml_rust;
pub use beacon_chain::BeaconChain; pub use beacon_chain::BeaconChain;
use bls::Signature; use bls::Signature;
use db::stores::{BeaconBlockStore, BeaconStateStore}; use store::MemoryStore;
use db::MemoryDB; use store::Store;
// use env_logger::{Builder, Env}; // use env_logger::{Builder, Env};
use fork_choice::{ use fork_choice::{
BitwiseLMDGhost, ForkChoice, ForkChoiceAlgorithm, LongestChain, OptimizedLMDGhost, SlowLMDGhost, BitwiseLMDGhost, ForkChoice, ForkChoiceAlgorithm, LongestChain, OptimizedLMDGhost, SlowLMDGhost,
}; };
use ssz::ssz_encode;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::{fs::File, io::prelude::*, path::PathBuf}; use std::{fs::File, io::prelude::*, path::PathBuf};
@ -106,7 +94,7 @@ fn test_yaml_vectors(
// process the tests // process the tests
for test_case in test_cases { for test_case in test_cases {
// setup a fresh test // setup a fresh test
let (mut fork_choice, block_store, state_root) = let (mut fork_choice, store, state_root) =
setup_inital_state(&fork_choice_algo, emulated_validators); setup_inital_state(&fork_choice_algo, emulated_validators);
// keep a hashmap of block_id's to block_hashes (random hashes to abstract block_id) // keep a hashmap of block_id's to block_hashes (random hashes to abstract block_id)
@ -149,9 +137,7 @@ fn test_yaml_vectors(
}; };
// Store the block. // Store the block.
block_store store.put(&block_hash, &beacon_block).unwrap();
.put(&block_hash, &ssz_encode(&beacon_block)[..])
.unwrap();
// run add block for fork choice if not genesis // run add block for fork choice if not genesis
if parent_id != block_id { if parent_id != block_id {
@ -222,29 +208,26 @@ fn load_test_cases_from_yaml(file_path: &str) -> Vec<yaml_rust::Yaml> {
fn setup_inital_state( fn setup_inital_state(
fork_choice_algo: &ForkChoiceAlgorithm, fork_choice_algo: &ForkChoiceAlgorithm,
num_validators: usize, num_validators: usize,
) -> (Box<ForkChoice>, Arc<BeaconBlockStore<MemoryDB>>, Hash256) { ) -> (Box<ForkChoice>, Arc<MemoryStore>, Hash256) {
let db = Arc::new(MemoryDB::open()); let store = Arc::new(MemoryStore::open());
let block_store = Arc::new(BeaconBlockStore::new(db.clone()));
let state_store = Arc::new(BeaconStateStore::new(db.clone()));
// the fork choice instantiation // the fork choice instantiation
let fork_choice: Box<ForkChoice> = match fork_choice_algo { let fork_choice: Box<ForkChoice> = match fork_choice_algo {
ForkChoiceAlgorithm::OptimizedLMDGhost => { ForkChoiceAlgorithm::OptimizedLMDGhost => {
let f: OptimizedLMDGhost<MemoryDB, FoundationEthSpec> = let f: OptimizedLMDGhost<MemoryStore, FoundationEthSpec> =
OptimizedLMDGhost::new(block_store.clone(), state_store.clone()); OptimizedLMDGhost::new(store.clone());
Box::new(f) Box::new(f)
} }
ForkChoiceAlgorithm::BitwiseLMDGhost => { ForkChoiceAlgorithm::BitwiseLMDGhost => {
let f: BitwiseLMDGhost<MemoryDB, FoundationEthSpec> = let f: BitwiseLMDGhost<MemoryStore, FoundationEthSpec> =
BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); BitwiseLMDGhost::new(store.clone());
Box::new(f) Box::new(f)
} }
ForkChoiceAlgorithm::SlowLMDGhost => { ForkChoiceAlgorithm::SlowLMDGhost => {
let f: SlowLMDGhost<MemoryDB, FoundationEthSpec> = let f: SlowLMDGhost<MemoryStore, FoundationEthSpec> = SlowLMDGhost::new(store.clone());
SlowLMDGhost::new(block_store.clone(), state_store.clone());
Box::new(f) Box::new(f)
} }
ForkChoiceAlgorithm::LongestChain => Box::new(LongestChain::new(block_store.clone())), ForkChoiceAlgorithm::LongestChain => Box::new(LongestChain::new(store.clone())),
}; };
let spec = FoundationEthSpec::spec(); let spec = FoundationEthSpec::spec();
@ -255,12 +238,10 @@ fn setup_inital_state(
let (state, _keypairs) = state_builder.build(); let (state, _keypairs) = state_builder.build();
let state_root = state.canonical_root(); let state_root = state.canonical_root();
state_store store.put(&state_root, &state).unwrap();
.put(&state_root, &ssz_encode(&state)[..])
.unwrap();
// return initialised vars // return initialised vars
(fork_choice, block_store, state_root) (fork_choice, store, state_root)
} }
// convert a block_id into a Hash256 -- assume input is hex encoded; // convert a block_id into a Hash256 -- assume input is hex encoded;